RxJavaの備忘録(flatMap, concatMap, concatMapEager)

このメソッドのように、別スレッドで動く処理を呼び出したときのflatMap, concatMap, concatMapEagerの違いについての備忘録。

plugins {
    id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

repositories {
    mavenCentral()
}

dependencies {
    annotationProcessor 'org.projectlombok:lombok:1.18.16'
    compileOnly 'org.projectlombok:lombok:1.18.16'
    implementation 'io.reactivex.rxjava2:rxjava:2.2.20'
    implementation 'ch.qos.logback:logback-classic:1.2.3'
}
Flowable<Integer> getItems(int id) {
    return Flowable.just(id, id * 10, id * 100)
                   .subscribeOn(Schedulers.newThread())
                   .doOnNext(item -> log.info("getItems: {}", item));
}

flatMap

getItems()は異なる順番で呼ばれ、subscribe()も異なる順番で通知される

Flowable.range(1, 3)
        .flatMap(this::getItems)
        .subscribe(item -> log.info("subscribe: {}", item));
16:50:50.090 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 3
16:50:50.090 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 2
16:50:50.090 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 1
16:50:50.096 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 3
16:50:50.096 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 30
16:50:50.096 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 30
16:50:50.096 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 300
16:50:50.096 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 300
16:50:50.097 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 10
16:50:50.097 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 1
16:50:50.097 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 100
16:50:50.097 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 10
16:50:50.097 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 100
16:50:50.097 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 2
16:50:50.097 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 20
16:50:50.097 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 20
16:50:50.098 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 200
16:50:50.098 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 200

maxConcurrency = 1

maxConcurrencyを1にした場合は、getItems()は1つずつ順番に呼ばれ、subscribe()も順番に通知される

Flowable.range(1, 3)
        .flatMap(this::getItems, false, 1)
        .subscribe(item -> log.info("subscribe: {}", item));
17:04:41.265 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 1
17:04:41.270 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 1
17:04:41.270 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 10
17:04:41.270 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 10
17:04:41.270 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 100
17:04:41.270 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 100
17:04:41.270 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 2
17:04:41.270 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 2
17:04:41.270 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 20
17:04:41.271 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 20
17:04:41.271 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 200
17:04:41.271 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 200
17:04:41.271 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 3
17:04:41.271 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 3
17:04:41.271 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 30
17:04:41.271 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 30
17:04:41.271 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 300
17:04:41.271 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 300

maxConcurrency = 2

17:14:12.448 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 1
17:14:12.448 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 2
17:14:12.454 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 1
17:14:12.454 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 10
17:14:12.454 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 10
17:14:12.454 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 100
17:14:12.454 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 100
17:14:12.454 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 3
17:14:12.454 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 3
17:14:12.455 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 30
17:14:12.455 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 30
17:14:12.455 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 300
17:14:12.455 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 300
17:14:12.455 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 2
17:14:12.455 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 20
17:14:12.455 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 20
17:14:12.455 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 200
17:14:12.455 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 200

concatMap

getItems()は1つずつ順番に呼ばれ、subscribe()も順番に通知される

Flowable.range(1, 3)
        .concatMap(this::getItems)
        .subscribe(item -> log.info("subscribe: {}", item));
16:54:14.618 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 1
16:54:14.622 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 1
16:54:14.622 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 10
16:54:14.622 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 10
16:54:14.622 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 100
16:54:14.622 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 100
16:54:14.623 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 2
16:54:14.623 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 2
16:54:14.623 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 20
16:54:14.623 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 20
16:54:14.623 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 200
16:54:14.623 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 200
16:54:14.623 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 3
16:54:14.623 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 3
16:54:14.623 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 30
16:54:14.623 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 30
16:54:14.623 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 300
16:54:14.623 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 300

concatMapEager

getItems()は異なる順番で呼ばれるが、subscribe()は順番に通知される

Flowable.range(1, 3)
        .concatMapEager(this::getItems)
        .subscribe(item -> log.info("subscribe: {}", item));
16:55:41.393 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 3
16:55:41.393 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 1
16:55:41.398 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 30
16:55:41.393 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 2
16:55:41.398 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 300
16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 1
16:55:41.398 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 20
16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 10
16:55:41.398 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 200
16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 10
16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 100
16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 100
16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 2
16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 20
16:55:41.398 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 200
16:55:41.399 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 3
16:55:41.399 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 30
16:55:41.399 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 300

maxConcurrency = 1, prefetch = 1

Flowable.range(1, 3)
        .concatMapEager(this::getItems, 1, 1)
        .subscribe(item -> log.info("subscribe: {}", item));
17:11:06.375 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 1
17:11:06.380 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 1
17:11:06.380 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 10
17:11:06.380 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 10
17:11:06.380 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 100
17:11:06.380 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 100
17:11:06.381 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 2
17:11:06.381 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 2
17:11:06.381 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 20
17:11:06.381 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 20
17:11:06.381 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 200
17:11:06.381 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 200
17:11:06.381 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 3
17:11:06.381 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 3
17:11:06.381 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 30
17:11:06.381 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 30
17:11:06.381 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 300
17:11:06.381 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 300

maxConcurrency = 2, prefetch = 2

17:12:52.364 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 1
17:12:52.364 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 2
17:12:52.370 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 1
17:12:52.370 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 20
17:12:52.370 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 10
17:12:52.370 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 10
17:12:52.370 [RxNewThreadScheduler-1] INFO com.example.Main - getItems: 100
17:12:52.370 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 100
17:12:52.370 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 2
17:12:52.370 [RxNewThreadScheduler-1] INFO com.example.Main - subscribe: 20
17:12:52.370 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 3
17:12:52.370 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 30
17:12:52.371 [RxNewThreadScheduler-2] INFO com.example.Main - getItems: 200
17:12:52.371 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 200
17:12:52.371 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 3
17:12:52.371 [RxNewThreadScheduler-2] INFO com.example.Main - subscribe: 30
17:12:52.371 [RxNewThreadScheduler-3] INFO com.example.Main - getItems: 300
17:12:52.371 [RxNewThreadScheduler-3] INFO com.example.Main - subscribe: 300

参考

http://reactivex.io/documentation/operators/flatmap.html
http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html
https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables