RxJava는 ReactiveX (Reactive Extensions)의 Java VM 구현입니다. 관찰 가능한 시퀀스를 사용하여 비동기 및 이벤트 기반 프로그램을 구성하기 위한 라이브러리 입니다.
생산자: 데이터를 만들고 전달 ⇒ Flowable, Observable 소비자: 데이터를 받아 처리 ⇒ Subscriber, Observer
Observable에서 onSubscribe(구독시작), onNext(데이터 전달), onError(에러 전달), onComplete(전달 끝)를 전달하면 Observer가 이 전달을 받습니다.
생산자가 만든 데이터를 소비자에게 전달 할 때 데이터를 삭제, 변환, 조합을 해서 보낼 수 있습니다.
fun main() {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.filter { it % 2 == 0 }
.map { it * 2 }
.subscribe { println(it) }
}
결과화면
filter: 데이터 분류
map: 데이터 변환
Method chain을 통해 Builder pattern과 유사하게 코딩을 할 수 있지만 연산자 순서에 따라 결과가 달라질 수 있어서 주의해야 합니다.
Scheduler를 지정하면 특정 Thread에서 작업을 할 수 있습니다.
Cold는 소비자가 구독 할 때 마다 새로운 데이터 스트림을 만들지만 Hot은 이미 생성된 데이터 스트림에 소비자가 참여하는 방식이고 조건에 따라서 소비자가 구독을 하더라고 데이터 스트림을 만들지 않을 수도 있습니다.
Rxjava는 기본으로 Cold이며 Hot을 사용하고 싶다면 refCount()
, autoConnect()
함수를 사용해서 만들거나 Processor/Subject
를 생성해야 합니다.
앞에 Connectable이 붙은 Flowable/Observable은 Hot Flowable/Observable입니다. subscribe를 하더라도 데이터를 받을 수 없고 특정 조건이 충족되면 그 때 데이터를 받아서 처리 할 수 있습니다.
Hot 생성자를 사용하려면 기존 Cold 생성자에서 변환하는 연산자를 사용해야 합니다.