RxJavaの備忘録(flatMap, concatMap, concatMapEager)
このメソッドのように、別スレッドで動く処理を呼び出したときのflatMap, concatMap, concatMapEagerの違いについての備忘録。
plugins { id 'java' } group = 'com.example' version = '0.0.1-SNAPSHOT' sourceCompatibility = '11' repositories { mavenCentral() } dependencies { annotationProcessor 'org.projectlombok:lombok:1.18.16' compileOnly 'org.projectlombok:lombok:1.18.16' implementation 'io.reactivex.rxjava2:rxjava:2.2.20' implementation 'ch.qos.logback:logback-classic:1.2.3' }
Flowable<Integer> getItems(int id) { return Flowable.just(id, id * 10, id * 100) .subscribeOn(Schedulers.newThread()) .doOnNext(item -> log.info("getItems: {}", item)); }
flatMap
getItems()は異なる順番で呼ばれ、subscribe()も異なる順番で通知される
Flowable.range(1, 3) .flatMap(this::getItems) .subscribe(item -> log.info("subscribe: {}", item));
16:50:50.090 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 3 16:50:50.090 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 2 16:50:50.090 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 1 16:50:50.096 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 3 16:50:50.096 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 30 16:50:50.096 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 30 16:50:50.096 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 300 16:50:50.096 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 300 16:50:50.097 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 10 16:50:50.097 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 1 16:50:50.097 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 100 16:50:50.097 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 10 16:50:50.097 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 100 16:50:50.097 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 2 16:50:50.097 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 20 16:50:50.097 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 20 16:50:50.098 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 200 16:50:50.098 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 200
maxConcurrency = 1
maxConcurrencyを1にした場合は、getItems()は1つずつ順番に呼ばれ、subscribe()も順番に通知される
Flowable.range(1, 3) .flatMap(this::getItems, false, 1) .subscribe(item -> log.info("subscribe: {}", item));
17:04:41.265 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 1 17:04:41.270 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 1 17:04:41.270 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 10 17:04:41.270 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 10 17:04:41.270 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 100 17:04:41.270 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 100 17:04:41.270 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 2 17:04:41.270 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 2 17:04:41.270 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 20 17:04:41.271 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 20 17:04:41.271 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 200 17:04:41.271 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 200 17:04:41.271 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 3 17:04:41.271 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 3 17:04:41.271 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 30 17:04:41.271 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 30 17:04:41.271 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 300 17:04:41.271 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 300
maxConcurrency = 2
17:14:12.448 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 1 17:14:12.448 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 2 17:14:12.454 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 1 17:14:12.454 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 10 17:14:12.454 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 10 17:14:12.454 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 100 17:14:12.454 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 100 17:14:12.454 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 3 17:14:12.454 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 3 17:14:12.455 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 30 17:14:12.455 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 30 17:14:12.455 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 300 17:14:12.455 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 300 17:14:12.455 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 2 17:14:12.455 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 20 17:14:12.455 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 20 17:14:12.455 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 200 17:14:12.455 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 200
concatMap
getItems()は1つずつ順番に呼ばれ、subscribe()も順番に通知される
Flowable.range(1, 3) .concatMap(this::getItems) .subscribe(item -> log.info("subscribe: {}", item));
16:54:14.618 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 1 16:54:14.622 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 1 16:54:14.622 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 10 16:54:14.622 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 10 16:54:14.622 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 100 16:54:14.622 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 100 16:54:14.623 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 2 16:54:14.623 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 2 16:54:14.623 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 20 16:54:14.623 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 20 16:54:14.623 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 200 16:54:14.623 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 200 16:54:14.623 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 3 16:54:14.623 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 3 16:54:14.623 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 30 16:54:14.623 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 30 16:54:14.623 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 300 16:54:14.623 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 300
concatMapEager
getItems()は異なる順番で呼ばれるが、subscribe()は順番に通知される
Flowable.range(1, 3) .concatMapEager(this::getItems) .subscribe(item -> log.info("subscribe: {}", item));
16:55:41.393 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 3 16:55:41.393 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 1 16:55:41.398 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 30 16:55:41.393 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 2 16:55:41.398 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 300 16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 1 16:55:41.398 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 20 16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 10 16:55:41.398 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 200 16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 10 16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 100 16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 100 16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 2 16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 20 16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 200 16:55:41.399 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 3 16:55:41.399 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 30 16:55:41.399 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 300
maxConcurrency = 1, prefetch = 1
Flowable.range(1, 3) .concatMapEager(this::getItems, 1, 1) .subscribe(item -> log.info("subscribe: {}", item));
17:11:06.375 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 1 17:11:06.380 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 1 17:11:06.380 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 10 17:11:06.380 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 10 17:11:06.380 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 100 17:11:06.380 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 100 17:11:06.381 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 2 17:11:06.381 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 2 17:11:06.381 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 20 17:11:06.381 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 20 17:11:06.381 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 200 17:11:06.381 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 200 17:11:06.381 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 3 17:11:06.381 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 3 17:11:06.381 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 30 17:11:06.381 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 30 17:11:06.381 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 300 17:11:06.381 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 300
maxConcurrency = 2, prefetch = 2
17:12:52.364 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 1 17:12:52.364 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 2 17:12:52.370 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 1 17:12:52.370 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 20 17:12:52.370 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 10 17:12:52.370 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 10 17:12:52.370 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 100 17:12:52.370 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 100 17:12:52.370 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 2 17:12:52.370 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 20 17:12:52.370 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 3 17:12:52.370 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 30 17:12:52.371 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 200 17:12:52.371 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 200 17:12:52.371 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 3 17:12:52.371 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 30 17:12:52.371 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 300 17:12:52.371 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 300
参考
http://reactivex.io/documentation/operators/flatmap.html
http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html
https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables