注意前面的调度器例子,最终结果都是在一条子线程中按事件队列的顺序执行的,它实现的是异步序列请求。然而,经常有这样的需求:给出三个URL,并发进行IO处理,实际上RxJava也能实现异步并发请求,这就用到前一节讲到了flatmap()
变换操作。
IOModuleAsync.java
public class IOModuleAsync
{
private Callback callback;
public void readFile(String url)
{
new Thread(()->{
try
{
System.out.println("reading from " + url);
Thread.sleep(3000);
Event event = new Event("read finished");
this.callback.onCall(event);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}).start();
}
public void onFinish(Callback callback)
{
this.callback = callback;
}
}
主函数
IOModuleAsync ioModuleAsync = new IOModuleAsync();
ioModuleAsync.onFinish((e)->{
System.out.println(e.eventType);
});
String[] urls = {"http://aaa", "http://bbb", "http://ccc"};
for(String s : urls)
{
ioModuleAsync.readFile(s);
}
//This is UI Thread
while(true)
{
try
{
Thread.sleep(500);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
IOModuleAsync在异步调用时,实际上是创建了子线程,线程执行结束后,调用传入的回调函数。主函数中,先绑定回调函数,然后传入异步操作的参数,结果就可以并发执行了。
运行结果:
reading from http://aaa
reading from http://bbb
reading from http://ccc
read finished
read finished
read finished
IOModule.java
public class IOModule
{
public void readFile(String url)
{
try
{
System.out.println("reading from " + url);
Thread.sleep(3000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
主函数
String[] urls = {"http://aaa", "http://bbb", "http://ccc"};
Observable
.from(urls)
.flatMap((s) -> Observable.just(s)
.subscribeOn(Schedulers.io())
.map((url) ->
{
IOModule ioModule = new IOModule();
ioModule.readFile(url);
return null;
}))
.subscribe((result) ->
{
System.out.println("read finished");
});
// 假设这里是UI线程
while (true)
{
try
{
Thread.sleep(500);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
IOModule十分简单,只是一个同步操作,实际上大部分情况下,UI框架提供的API都是如此,如果想使用传统的实现方式,异步封装和事件也要自己写。 主函数中,使用flatMap创建子Observable,嵌套使用map获得IO结果,subscribeOn指定了子Observable在子线程上执行“订阅”,因此实现了并发。
运行结果:
reading from http://aaa
reading from http://bbb
reading from http://ccc
read finished
read finished
read finished
实际上可能RxJava更简单一些,我们所使用的框架可能只提供了最简单的同步式API,比如上面的IOModule这种:
如果使用传统实现,要自己编写IOModuleAsync,Event,Callback,在有性能需求的情况下,要实现线程池或按计算需求控制线程数,复杂的事件队列可能使发射事件的操作而不仅仅是一个for循环。
如果用RxJava,很多操作都可以直接拿来用了,Observable的链式调用省去了定义很多类,还可以直接使用调度器和事件队列变换的操作符,最终需要编写的就是几个lambda表达式而已。如果把ActionX和FuncX抽取出来,还可以进行复用,使用传统实现就需要更多代码了。