エンターテイメント!!

遊戯王好きのJavaエンジニアのブログ。バーニングソウルを会得する特訓中。

Java9 Reactive Streams 試し実装

きっかけ

Java9のリリースが迫っているのと、ITproの記事見て試したくなったから。
あとは、Typescriptで非同期の処理を書くことが非常に多いので、Javaでもやりたくなった。

Reactive Streams

非同期処理を実現するための仕様。
非同期処理を採用しているライブラリとして、RxJava/Akka Streamsが有名。

データの流れに着目したプログラミングスタイルで、Publish-Subscribeモデルの実装ができる。
※Publish/Subscribeは、Observerパターンの発展系みたいなもんだという認識でいる。

非同期にしなければいけないってものでもないが、データを作る側と使う側が疎結合になるので、非同期処理に移行しやすくなる。

java.util.stream.Streamインタフェースとは別ものってことには、要注意。

Publish-Subscribeモデルの処理の流れ

http://itpro.nikkeibp.co.jp/atcl/column/15/120700278/061900041/sequence01.png

このフローは、RxJavaも同じ。
※Akka Streamsは知らない。。。

そして、定義されているインタフェースは、下記の通り。

  • Flow.Publisher
  • Flow.Subscriber
  • Flow.Subscription
  • Flow.Processor

実装

とりあえずITproの奴を真似して作った。

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class SampleSubscriber<T> implements Subscriber<T> {

    private Subscription subscription;
    private final String name;

    public SampleSubscriber(String name) {
        this.name = name;
    }

    @Override
    public void onComplete() {
        System.out.println(name + ": " + Thread.currentThread().getName() + ": Complete.");
    }

    @Override
    public void onError(Throwable arg0) {
    }

    @Override
    public void onNext(T item) {
        // 配布されたデータを出力する
        System.out.println(name + ": " + Thread.currentThread().getName() + ": " + item);

        // データの要求
        subscription.request(1);
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;

        subscription.request(1);
    }

}
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.IntStream;

public class Sample {

    public static void main(String[] args) {
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        // Subscriberの生成と登録
        Subscriber<Integer> subscriber1 = new SampleSubscriber<>("Sub1");
        publisher.subscribe(subscriber1);

        Subscriber<Integer> subscriber2 = new SampleSubscriber<>("Sub2");
        publisher.subscribe(subscriber2);

        Subscriber<Integer> subscriber3 = new SampleSubscriber<>("Sub3");
        publisher.subscribe(subscriber3);

        // データの登録
        IntStream.range(0, 5).forEach(publisher::submit);

        // 非同期で処理が終わってしまうので、実行終了するまで一時的に待つ。
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        publisher.close();
    }

}

実行結果

Sub2: ForkJoinPool.commonPool-worker-1: 0
Sub2: ForkJoinPool.commonPool-worker-1: 1
Sub2: ForkJoinPool.commonPool-worker-1: 2
Sub2: ForkJoinPool.commonPool-worker-1: 3
Sub2: ForkJoinPool.commonPool-worker-1: 4
Sub3: ForkJoinPool.commonPool-worker-2: 0
Sub3: ForkJoinPool.commonPool-worker-2: 1
Sub3: ForkJoinPool.commonPool-worker-2: 2
Sub3: ForkJoinPool.commonPool-worker-2: 3
Sub3: ForkJoinPool.commonPool-worker-2: 4
Sub1: ForkJoinPool.commonPool-worker-3: 0
Sub1: ForkJoinPool.commonPool-worker-3: 1
Sub1: ForkJoinPool.commonPool-worker-3: 2
Sub1: ForkJoinPool.commonPool-worker-3: 3
Sub1: ForkJoinPool.commonPool-worker-3: 4
Sub2: ForkJoinPool.commonPool-worker-2: Complete.
Sub1: ForkJoinPool.commonPool-worker-3: Complete.
Sub3: ForkJoinPool.commonPool-worker-1: Complete.

最初、全部表示されへん!なんでや!って思ったら、非同期処理だから終わってるから表示されないやん!って事に気づいた。
Thread.sleep入れてるのは、そのため。
なんか全部見れる上手い手ってあるものかが気になる。

感想

Filterまで紹介されていたけど、今回は基本的な部分のタッチのみ。
非同期は覚えておいて損はないと思う。
そのうち、ビルドシステムもJavaで書くようになるだろうね。
Gradleは、ピュアJavaじゃないけど、非同期での実装がやりやすくなると、効率的なビルドが実現できるのではなかろうかと、ココロの中で思ってる。
※もしかしたら、知らないだけで、もう出来てる?

ForkJoinPoolって、たしかスレッド管理のやつだったよね。
あんまり使われてるイメージはないんだけど、裏の仕組みで必須なのかな?
どっかでキャッチアップしないとダメだな。

参考サイト

最新Java情報局 - Java SE 9の非同期処理ライブラリ、新概念の「Reactive Streams」を学ぶ:ITpro

タスク

ForkJoinについて理解しておく