将代码重构为响应式风格



我正在开发的一个应用程序大量使用异步处理,我正在寻找一种更好的方法来组织代码。

在servlet上接收到系统的外部输入。这个servlet收集的原始数据被存储到一个队列中。线程池针对该队列运行,并将原始数据解析为结构化记录,然后将该记录存储到一组N个队列中的一个。选择队列时,同一类型的所有记录都将进入同一队列。这N个队列由单个线程提供服务,每个线程将同一类型的记录收集到一个集合中。每一分钟,一个定时任务会被唤醒,并将前一分钟收集到的所有记录写入一个文件。

目前,这段代码是使用一堆队列、线程池和不断运行的可运行程序来组织的,这使得逻辑很难理解。我想重构这段代码,使上面描述的数据流更加明确。我正在寻找实现这一目标的工具和方法。

    像RxJava这样的工具有帮助吗?如果有,怎么做?
  • 我应该考虑其他方法吗?

根据您的描述,这是一个RxJava的例子。希望对你有帮助。

public class TestServlet extends HttpServlet {
private static final PublishSubject<String> o = PublishSubject.<String>create();
public static Observable<String> getServletObservable() {
    return o.observeOn(Schedulers.computation());
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
        throws ServletException, IOException {
    synchronized (TestServlet.class) {
        o.onNext("value");
    }
}
}
class Foo {
public static void main(String[] args) {
    TestServlet.getServletObservable().map(new Func1<String, String>() {
        @Override
        public String call(String t1) {
            // do something
            return null;
        }
    }).groupBy(new Func1<String, String>() {
        @Override
        public String call(String t1) {
            // do something
            return null;
        }
    }).subscribe(new Observer<GroupedObservable<String, String>>() {
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onNext(GroupedObservable<String, String> group) {
            group.observeOn(Schedulers.io()).subscribe(new Observer<String>() {
                @Override
                public void onCompleted() {
                }
                @Override
                public void onError(Throwable e) {
                }
                @Override
                public void onNext(String t) {
                    // store t
                }
            });
        }
    });
}
}

相关内容

  • 没有找到相关文章

最新更新