관찰 가능한 스트림(Observable streams)을 사용하는 비동기 프로그래밍 API이다.

Publisher(데이터를 만들어 통지하는 생산자)와 Subscriber로 구성되어있으며, Subscriber가 Publisher를 구독하면 Publisher가 통보하는 데이터를 받을 수 있다.

Observable

RxJava에서는 Observable을 구독하는 Observer가 존재한다. Observable은 데이터 흐름에 맞게 알림을 보내 Observer가 데이터를 사용할 수 있도록 한다.

Observable의 특징


Observable 생성

Observable을 생성할 때에는 직접 인스턴스를 만들지 않고 정적 팩토리 함수(생성 연산자)를 호출한다.

Observable은 팩토리 함수로 데이터 흐름을 정의한 후 subscribe() 함수를 호출해야 실제로 데이터를 발행한다.

  1. just() 인자로 넣은 데이터를 있는 그대로 차례로 발행한다. (데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생) 한 개의 값을 넣을 수도 있고 인자로 여러 개의 값(동일한 타입, 최대 10개)을 넣을 수도 있다. 데이터를 다르게 변경하고 싶으면 map()과 같은 연산자를 통해 변환해야한다.

  2. create() Emitter에 포함되어 있는 onNext(), onComplete(), onError() 같은 알림을 개발자가 직접 호출해야 한다.

    ** Observable은 항상 onComplete() 혹은 onError() 둘 중 하나로만 데이터 발행이 종료되어야 한다.

    **create()사용시 주의점

  3. fromXXX() 특정 타입의 데이터를 Observable로 바꿔주는 메소드이다

    3-1. fromArray() 배열에 들어있는 데이터를 처리할때사용한다.

    Integer [] array = {1, 2, 3, 4, 5}
    Observable.fromArray(array)
    .subscribe(System.out::println)
    

    3-2. fromCallable() Callable 을 Observable 로 변환하여 비동기적으로 아이템을 발행할 수 있다.

    fun main() {
        val callable: Callable<String> = Callable<String> { "Hello" }
        val source: Observable<String> = Observable.fromCallable(callable)
        source.subscribe(System.out::println)
    }
    

    3-3. fromFuture() Future 객체에서 fromFuture() 함수를 사용해 Observable를 생성 Future 인터페이스를 지원하는 모든 객체를 Observable Source 로 변환하고, Future.get() 메소드를 통해 호출한 값을 반환

    Future<String> future = Executors.newSingleThreadExecutor().submit(() -> {
        Thread.sleep(1000)
    		return "Hello Future" 
    })
    
    source: Observable<String> = Observable.fromFuture(future)
    source.subscribe { it -> println(it) }
    
  4. Interval() 시간 간격을 두고 데이터를 방출하는 Observable 생성