이번 글에서는 여러개의 Observable를 한개의 Observable로 조합하는 Operator에 대해 이야기 합니다. Operator는 아래와 같이 있습니다.

and/then/when

Not supported

combineLatest

각각 Observable에 데이터가 생성 될 때 데이터를 조합해서 전달하는 operator 입니다.

https://s3-us-west-2.amazonaws.com/secure.notion-static.com/209730b2-4abe-4626-a19f-20e450fb1767/Untitled.png

fun main() {
    val observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
    val observable2 = Observable.interval(750, TimeUnit.MILLISECONDS).map { Random.nextInt(10) }
    val observable = Observable.combineLatest(observable1, observable2, BiFunction<Long, Int, String> { t1, t2 ->
        "$t1 $t2"
    })
    observable.subscribe({
        // onNext로 전달 된 데이터 출력
        println(System.currentTimeMillis())
        println(it)
    }, {
        // 에러 처리
        it.printStackTrace()
    }, {
        // 완료 이벤트
        println("onComplete")
    })

    Thread.sleep(3000)

}

https://s3-us-west-2.amazonaws.com/secure.notion-static.com/8fe5ae86-ac70-4513-bdb7-702beb4a082d/Untitled.png

observable1은 0,1,2를 observable2는 1,7,9,7를 생성하고 각각 데이터가 생성 된 시점에 조합에서 전달 합니다.

join

TBD

merge

여러 개의 Observable을 합쳐서 한 개의 Observable로 만드는 operator 입니다.

https://s3-us-west-2.amazonaws.com/secure.notion-static.com/aae1d6a3-8634-49e9-bb7f-5cac5811b90e/Untitled.png

fun main() {
    val observableSource1 = Observable.interval(0,1000, TimeUnit.MILLISECONDS).map { "1:$it" }
    val observableSource2 = Observable.interval(0,500, TimeUnit.MILLISECONDS).map { "2:$it" }
    val observable = Observable.merge(observableSource1, observableSource2)

    observable.subscribe({
        // onNext로 전달 된 데이터 출력
        println(it)
    }, {
        // 에러 처리
        it.printStackTrace()
    }, {
        // 완료 이벤트
        println("onComplete")
    })

    Thread.sleep(3000)
}

https://s3-us-west-2.amazonaws.com/secure.notion-static.com/27be8eb7-e040-401f-8c69-83d18d27f426/Untitled.png

startWith

Observable에 첫 번째에 데이터를 추가하는 operator 입니다.

https://s3-us-west-2.amazonaws.com/secure.notion-static.com/8459f1d1-00d7-4488-8f34-c3c368285a70/Untitled.png

fun main() {
    val observable = Observable.just(1,2,3,4,5)
        .startWith(100)

    observable.subscribe({
        // onNext로 전달 된 데이터 출력
        println(it)
    }, {
        // 에러 처리
        it.printStackTrace()
    }, {
        // 완료 이벤트
        println("onComplete")
    })
}