异步并发请求

注意前面的调度器例子,最终结果都是在一条子线程中按事件队列的顺序执行的,它实现的是异步序列请求。然而,经常有这样的需求:给出三个URL,并发进行IO处理,实际上RxJava也能实现异步并发请求,这就用到前一节讲到了flatmap()变换操作。

传统的方式

IOModuleAsync.java

public class IOModuleAsync
{
    private Callback callback;
    public void readFile(String url)
    {
        new Thread(()->{
            try
            {

                System.out.println("reading from " + url);
                Thread.sleep(3000);
                Event event = new Event("read finished");
                this.callback.onCall(event);
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }).start();
    }
    public void onFinish(Callback callback)
    {
        this.callback = callback;
    }
}

主函数

IOModuleAsync ioModuleAsync = new IOModuleAsync();

ioModuleAsync.onFinish((e)->{
    System.out.println(e.eventType);
});

String[] urls = {"http://aaa", "http://bbb", "http://ccc"};
for(String s : urls)
{
    ioModuleAsync.readFile(s);
}

//This is UI Thread
while(true)
{
    try
    {
        Thread.sleep(500);
    }
    catch (InterruptedException e)
    {
        e.printStackTrace();
    }
}

代码分析

IOModuleAsync在异步调用时,实际上是创建了子线程,线程执行结束后,调用传入的回调函数。主函数中,先绑定回调函数,然后传入异步操作的参数,结果就可以并发执行了。

运行结果:

reading from http://aaa
reading from http://bbb
reading from http://ccc
read finished
read finished
read finished

RxJava的方式

IOModule.java

public class IOModule
{
    public void readFile(String url)
    {
        try
        {
            System.out.println("reading from " + url);
            Thread.sleep(3000);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

主函数

String[] urls = {"http://aaa", "http://bbb", "http://ccc"};
Observable
        .from(urls)
        .flatMap((s) -> Observable.just(s)
                .subscribeOn(Schedulers.io())
                .map((url) ->
                {
                    IOModule ioModule = new IOModule();
                    ioModule.readFile(url);
                    return null;
                }))
        .subscribe((result) ->
        {
            System.out.println("read finished");
        });

// 假设这里是UI线程
while (true)
{
    try
    {
        Thread.sleep(500);
    }
    catch (InterruptedException e)
    {
        e.printStackTrace();
    }
}

代码分析

IOModule十分简单,只是一个同步操作,实际上大部分情况下,UI框架提供的API都是如此,如果想使用传统的实现方式,异步封装和事件也要自己写。 主函数中,使用flatMap创建子Observable,嵌套使用map获得IO结果,subscribeOn指定了子Observable在子线程上执行“订阅”,因此实现了并发。

运行结果:

reading from http://aaa
reading from http://bbb
reading from http://ccc
read finished
read finished
read finished

哪种简单?

实际上可能RxJava更简单一些,我们所使用的框架可能只提供了最简单的同步式API,比如上面的IOModule这种:

如果使用传统实现,要自己编写IOModuleAsync,Event,Callback,在有性能需求的情况下,要实现线程池或按计算需求控制线程数,复杂的事件队列可能使发射事件的操作而不仅仅是一个for循环。

如果用RxJava,很多操作都可以直接拿来用了,Observable的链式调用省去了定义很多类,还可以直接使用调度器和事件队列变换的操作符,最终需要编写的就是几个lambda表达式而已。如果把ActionX和FuncX抽取出来,还可以进行复用,使用传统实现就需要更多代码了。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap