きっかけ
Java9のリリースが迫っているのと、ITproの記事見て試したくなったから。
あとは、Typescriptで非同期の処理を書くことが非常に多いので、Javaでもやりたくなった。
Reactive Streams
非同期処理を実現するための仕様。
非同期処理を採用しているライブラリとして、RxJava/Akka Streamsが有名。
データの流れに着目したプログラミングスタイルで、Publish-Subscribeモデルの実装ができる。
※Publish/Subscribeは、Observerパターンの発展系みたいなもんだという認識でいる。
非同期にしなければいけないってものでもないが、データを作る側と使う側が疎結合になるので、非同期処理に移行しやすくなる。
java.util.stream.Streamインタフェースとは別ものってことには、要注意。
Publish-Subscribeモデルの処理の流れ
このフローは、RxJavaも同じ。
※Akka Streamsは知らない。。。
そして、定義されているインタフェースは、下記の通り。
- Flow.Publisher
- Flow.Subscriber
- Flow.Subscription
- Flow.Processor
実装
とりあえずITproの奴を真似して作った。
import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; public class SampleSubscriber<T> implements Subscriber<T> { private Subscription subscription; private final String name; public SampleSubscriber(String name) { this.name = name; } @Override public void onComplete() { System.out.println(name + ": " + Thread.currentThread().getName() + ": Complete."); } @Override public void onError(Throwable arg0) { } @Override public void onNext(T item) { // 配布されたデータを出力する System.out.println(name + ": " + Thread.currentThread().getName() + ": " + item); // データの要求 subscription.request(1); } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } }
import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.SubmissionPublisher; import java.util.stream.IntStream; public class Sample { public static void main(String[] args) { SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(); // Subscriberの生成と登録 Subscriber<Integer> subscriber1 = new SampleSubscriber<>("Sub1"); publisher.subscribe(subscriber1); Subscriber<Integer> subscriber2 = new SampleSubscriber<>("Sub2"); publisher.subscribe(subscriber2); Subscriber<Integer> subscriber3 = new SampleSubscriber<>("Sub3"); publisher.subscribe(subscriber3); // データの登録 IntStream.range(0, 5).forEach(publisher::submit); // 非同期で処理が終わってしまうので、実行終了するまで一時的に待つ。 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } publisher.close(); } }
実行結果
Sub2: ForkJoinPool.commonPool-worker-1: 0 Sub2: ForkJoinPool.commonPool-worker-1: 1 Sub2: ForkJoinPool.commonPool-worker-1: 2 Sub2: ForkJoinPool.commonPool-worker-1: 3 Sub2: ForkJoinPool.commonPool-worker-1: 4 Sub3: ForkJoinPool.commonPool-worker-2: 0 Sub3: ForkJoinPool.commonPool-worker-2: 1 Sub3: ForkJoinPool.commonPool-worker-2: 2 Sub3: ForkJoinPool.commonPool-worker-2: 3 Sub3: ForkJoinPool.commonPool-worker-2: 4 Sub1: ForkJoinPool.commonPool-worker-3: 0 Sub1: ForkJoinPool.commonPool-worker-3: 1 Sub1: ForkJoinPool.commonPool-worker-3: 2 Sub1: ForkJoinPool.commonPool-worker-3: 3 Sub1: ForkJoinPool.commonPool-worker-3: 4 Sub2: ForkJoinPool.commonPool-worker-2: Complete. Sub1: ForkJoinPool.commonPool-worker-3: Complete. Sub3: ForkJoinPool.commonPool-worker-1: Complete.
最初、全部表示されへん!なんでや!って思ったら、非同期処理だから終わってるから表示されないやん!って事に気づいた。
Thread.sleep入れてるのは、そのため。
なんか全部見れる上手い手ってあるものかが気になる。
感想
Filterまで紹介されていたけど、今回は基本的な部分のタッチのみ。
非同期は覚えておいて損はないと思う。
そのうち、ビルドシステムもJavaで書くようになるだろうね。
Gradleは、ピュアJavaじゃないけど、非同期での実装がやりやすくなると、効率的なビルドが実現できるのではなかろうかと、ココロの中で思ってる。
※もしかしたら、知らないだけで、もう出来てる?
ForkJoinPoolって、たしかスレッド管理のやつだったよね。
あんまり使われてるイメージはないんだけど、裏の仕組みで必須なのかな?
どっかでキャッチアップしないとダメだな。
参考サイト
最新Java情報局 - Java SE 9の非同期処理ライブラリ、新概念の「Reactive Streams」を学ぶ:ITpro
タスク
ForkJoinについて理解しておく