본문 바로가기

gyub's 공부일기/RxJava

[RxJava] Observable 클래스

Observable의 역할은 데이터 흐름에 맞게 알림을 보내 구독자가 데이터를 처리할 수 있도록 하는 것입니다!

 

책에 따르면 RxJava 프로그래밍은 Observable에서 시작해 Observable로 끝난다고 해도 과어인 아닐 정도로 중요한 개념이라고 하네요

 

따라서 이번 글에서는 Observable  클래스에 대해 알아보겠습니다!!

 

1. Observable 클래스

Observable은 옵저버 패턴을 구현합니다. 옵저버 패턴은 객체의 상태 변화를 관찰하는 관찰자(옵저버) 목록을 객체에 등록합니다. 그 후 상태 변화가 있을 때마다 메서드를 호출하여 객체가 직접 목록의 각 옵저버에게 변화를 알려줍니다. 라이프 사이클은 존재하지 않으며 보통 단일 함수를 통해 변화만 알립니다.

 

여기서 잠깐! Observable은 무슨 의미일까요?????

직관적으로는 관찰자가(Observer)가 관찰하는 대상입니다.

좀 더 깊게 보자면 Observed라는 단어가 관찰을 통해 얻은 결과를 의미한다면 Observable은 현재는 관찰되지 않았지만 이론을 통해 앞으로 관찰할 가능성을 의미한다고 하네요

 

이제 글을 이어가보겠습니다.

사용자가 버튼을 누르면 버튼에 미리 등록해 둔 onClick()메서드가 호출 돼 처리를 하죠??? 이게 바로 옵저버 패턴의

대표적인 예입니다!

 

RxJava의 Observable은 세 가지의 알림을 구독자에게 전달합니다.

 

- onNest: Observable이 데이터의 발행을 아립니다. 기존의 옵저버 패턴과 같습니다.

- onComplete: 모든 데이터의 발행을 완료했음을 알립니다. onComplete 이벤트는 단 한 번만 발생하며, 발생한 후에는 더 이상 onNext 이벤트가 발생해선 안 됩니다.

- onError: Observable에서 어떤 이유로 에러가 발생했음을 알립니다. onError 이벤트가 발생하면 이후에 onNext 및 onComplete 이벤트가 발생하지 않습니다. 즉, Observable의실행을 종료합니다

 

Observable을 생성할 때는 직접 인스터는스를 만들지 않고 정적 팩토리 함수를 호출합니다

Observable.just("hi","gyubin")
.subscribe(System.out::println)

// 결과
// hi
// gyubin

이렇게 사용할 수 있는 것도 정적 팩토리 함수를 호출하기 때문이었네요!!!!

함수의 원형은 이렇게 돼 있네요

 

2. just() 함수

just() 함수는 인자로 넣은 데이터를 차례로 발행하려고 Observable을 생성합니다.(실제 데이터의 발행은 subscribe() 함수를 호출해야 시작합니다) 한 개의 값을 넣을 수 도 있고 인자로 여러 개의 값(최대 10개)을 넣을 수도 있습니다. 하지만 주의할 점은 타입은 모두 같아야 한다는 것이죠

 

just함수는 데이터의 내용을 변경하지 않고 그대로 발행합니다!

Observable.just(1,2,3,4,5,6)
.subscribe(System.out::println)

//result
//1
//2
//3
//4
//5
//6

요렇게 말이져

 이것을 굳이 말하는 이유는 ! 데이터의 내용을 변경하는 함수도 있다는 말이죠 kggk

 

3. subscribe() 함수와 Disposable 객체

1) subscribe()

RxJava는 내가 동작 시키기 원하는 것을 사전에 정의해둔 다음 실제 그것이 실행되는 시점을 조절할 수 있습니다.

이때!!!! 사용하는 것이 subscribe()함수에요

Observable은 just() 등 의 팩토리 함수로 데이터 흐름을 정의한 후 subscribe() 함수를 호출해야 실제로 데이터를 발행합니다.

흠... 제가 이해한 바로는 이렇게 이해하면 될 것 같군여

유튜브에 영상을 업로드한다고 했을 때 just()함수는 영상에 필요한 재료들을 모으는 작업이고,

subscribe()는 유튜브 플랫폼에 업로드 시켜 구독자들이 볼 수 있게 하는 작업이라고 생각할 수 있겠네요

 

자 이제 subscribe의 주요 함수 원형을 보자면

- 인자가 없을 때

: onNext와 onComplete 이베트를 무시하고 onError 이벤트가 발생했을 때만 OnErrorNotImplementedException을 던집니다(throw). 따라서 Observable로 작성한 코드를 테스트하거나 디버깅할 때 활용합니다

 

- 인자가 1개

: onNext 이벤틀를 처리합니다! 이때도 onError 이벤트가 발생하면 OnErrorNotImplementedException을 던지네여

 

subscribe({ Log.d(TAG, ": onNext $it" })

- 인자가 2개

: onNext와 onError 이벤트를 처리합니다

subscribe({ Log.d(TAG, ": onNext $it") },
          { error -> Log.e(TAG, "Unable to get username", error) })

 

- 인자가 3개

: onNext, onError, onComplete 이벤트를 모두 처리할 수 있습니다

 

subscribe({Log.d(TAG, ": onNext $it") },
                        { error -> Log.e(TAG, "Unable to get username", error) },
                        {
                            Log.d(TAG, ": completed")
                        })

 

2) Disposable

: Disposable은 RxJava 1.x의 Subscription 객체에 해당합니다. 그리고 2개의 함수를 가지고 있죠

 

- fun dispose()

: dispose()는 Observable에게 더 이상 데이터를 발행하지 않도록 구독을 해지하는 함수입니다.

Observable이 onComplete 알림을 보냈을 때 자동으로 dispose()를 호출해 Observable과 구독자의 관계를 끊습니다.

즉, onComplete 이벤트가 정상적으로 발생했다면 구독자가 별도로 dispose()를 호출할 필요가 없다는 거죠

 

- fun isDiposed(): Boolean

: isDisposed() 함수는 이름에서 알 수 있는 것처럼 Observable이 데이터를 발행하지 않는지( 구독을 해지했는지) 확인하는 함수입니다.

 

4. create()

just() 함수는 데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생하지만 create() 함수는 onNext, onComplete, onError 같은 알림을 개발자가 직접 호출해야 합니다😒

 

구독자에게 데이터를 발행하려면 onNext() 함수를 호출해야 하며 모든 데이터를 발행한 후에는 반드시 onComplete() 함수를 호출해야 합니다!!!!!

 

Observable<Integer> source = Observable.create(
it-> {
 it.onNext(100)
 it.onNext(200)
 it.onNext(300)
 it.onComplete()
})
source.subscribe(System.out::println)

//result
//100
//200
//300

이렇게요👍👍

 

여기서 잠깐!!! 지금 Observable<Integer> source처럼 변수를 분리했는데 source변수는 차가운 Observable입니다

(차가운과 뜨거운... 나중에 다시 설명할게요)

즉, 첫 번째 문장만으로는 실제로 데이터를 발행하지 않고 두 번째 문장에서 subscribe()함수를 호출했을 때 100,200,300 의 값을 발행합니다. -> subscribe() 함수를 호출하지 않으면 아무것도 출력되지 않는다는 뜻이죠

 

5. fromXXX()

지금까지 just()나 create()는 단일 데이터를 다뤘습니다. 그렇다면 단일 데이터가 아닐 때는 어떻게 해야 할까요??

fromArray(), fromIterable()등등의 함수를 사용하면 됩니다!

구조는 어려운 부분이 아니니 간단한 예시와 함께 넘어가도록 할게여

arr = intArrayOf(100,200,300)

source = Observable.fromArray(arr)

source.subscribe(System.out::println)

//result
//100
//200
//300
반응형