본문 바로가기

gyub's 공부일기/RxJava

[RxJava] RxAndroid 적용 예제 -1

원본 : https://www.raywenderlich.com/141980/rxandroid-tutorial

 

RxJava를 공부하며 스케줄러, 함수 등 기본을 익히고 나서 바로 앱에 적용하려니 역시나,, 쉽지 않더라구요

그래서 참고할만한 예제를 찾아보던 중 RxAndroid의 기본 개념은 물론이고 사용법도 쉽게 나와있는 곳이 있어서 번역 및 수정을 해봤습니다!

 

 

Reactive programming 은 그저 또하나의 API가 아니라, 완전히 새롭고 매우 유용한 패러다임이다. RxJava는 이 패러다임을 안드로이드에서 사용할 수 있게 구현한 것이다. 여기에 비동기 UI 이벤트를 RxJava 스럽게 구현한 RxAndroid를 이용하면 안드로이드는 reactive world를 시작하기에 더없이 완벽하다.

이 튜토리얼에서 배우게 될 것들이다.
- Reactive Programming이 무엇인가
- Observable은 무엇인가
- 버튼 클릭과 텍스트 필드 내용의 변화와 같은 이벤트를 observables로 바꾸는 방법
- Transform, Filter
- 코드가 실행될 쓰레드 지정하기
- 여러 observables를 합치기

 

[시작]

starter project를 다운로드 받아서 안드로이드 스튜디오에서 열기

우리는 CheeseActivity.java에서 코딩을 할 것이다. CheeseActivity는 BaseSearchActivity를 상속받았는데 여기에는 showProgressBar, hideProgressBar, showResult(List<String> result) 등의 기능이 구현되어 있고, 텍스트를 받아서 cheese 리스트를 리턴하는 CheeseSearchEngine 의 객체인 mCheeseSearchEngine을 멤버로 가지고 있다. 이 프로젝트를 빌드하고 실행하면 다음과 같은 비어 있는 검색 화면을 볼 수 있다.

[Reactive Programming은 무엇인가?]

imperative programming에서는, 표현식이 한 번 수행되고, 그 값은 변수에 저장된다.

int a = 2;
int b = 3;
int c = a * b; // c is 6
a = 10;
// c is still 6

반면에 reactive programming 은 값의 변화에 대한 반응에 대한 모든 것이다.
아마도 여러분은 모르는 사이에 reactive programming을 했었을 것이다.
.스프레드시트에서 셀에 값을 설정하는 것은 imperative programming에서 값을 설정하는 것과 비슷하다.
.스프레드시트에서 표현식을 설정하는 것은 reactive programming에서 observables에 오퍼레이션을 정의하는 것과 비슷하다.
위의 내용을 스프레드시트에 구현한 화면이다.

스프레드시트 셀 B1에 2를 지정하고 B2에 3을 지정하고, B3에는 B1과 B2의 곱을 지정하였다. 둘중의 하나의 값이 변하면, 그 변화가 인지(observe)되고 수식이 다시 계산되어 자동으로 B3 값이 바뀐다.

[RxJava Observable 규약]

RxJava는 Observer 패턴을 이용한다.

Observer 패턴에서는, RxJava에 두가지 핵심 인터페이스가 있다. Observable과 Observer이다. Observable이 변화하면 거기에 subscribe하는 모든 Observer가 노티를 받는다. Observable 인터페이스의 메소드 중에, Observer가 subscription을 시작하면 호출하는 subscribe()가 있다. 이 시점부터 Observer 인터페이스는 Observable이 호출할 수 있는 세가지 메소드를 제공한다.

.onNext(T value) : Observable이 새로운 아이템을 Observer에 전달한다
.onComplete : Observable이 모든 아이템 전달이 끝났음을 Observer에게 알린다.
.onError(Throwable e) : Observable이 아이템 전달 중에 에러가 발생했음을 Observer에게 알린다.

잘 동작하는 Observable은 0개 이상의 아이템을 전달하고 마지막에 onComplete나 onError로 끝을 낸다.
복잡하게 들리 수 있지만 예를 보면 쉬워진다.
특정한 네트워크 요청 observable은 보통 하나의 아이템을 전달하고 즉시 complete된다.

원은 Observable이 전달하는 아이템을 의미하고 , 검정선은 complete나 error를 의미한다. 마우스 움직임에 대한 Observable은 마우스 좌표를 전달하지만 complete되지 않는다.

여기서 여러 아이템이 전달되지만 complete나 error 표시는 없다.
Observable 규약에서는 observable이 아이템 전달을 complete한 이후로는 새로운 아이템이 전달되서는 안된다. 아래는 잘못된 예이다.

[Observable은 어떻게 생성하는가]

아래와 같이 Observable.create 를 이용해서 Observable을 생성할 수 있다.

Observable<T> create(ObservableOnSubscribe<T> source)

source가 무엇인가? 이 표현을 이해하려면 ObservableOnSubscribe가 무엇인지 알아야 하는데. 이런 인터페이스이다.

public interface ObservableOnSubscribe<T> {
  void subscribe(ObservableEmitter<T> e) throws Exception;
}

그러니까 Observable을 생성하기 위해서 source가 필요한데 이 source는 subscribe()를 구현해야 하고, subscribe()는 파라미터로 emitter가 필요하다. emitter는 무엇인가?
RxJava의 Emitter는 Observer와 비슷한 인터페이스이다.

public interface Emitter<T> {
  void onNext(T value);
  void onError(Throwable error);
  void onComplete();
}

ObservableEmitter는 subscription을 취소할 수 있는 방법도 제공한다.

물을 흘려내리는 수도를 떠올려보자. 물이 흐르는 수도는 Observable이고 여기에 연결하는 파이프에 물을 내보낸다. 열고 잠글 수 있는 수도꼭지를 달면 이것이 ObservableEmitter이다. 여기에 파이프를 연결하는 것이 Observable.create()이다.
이제 구체적인 코딩으로 알아보자. 첫번째 observable을 만들어보자

 

[Observe Button Clicks]

아래 코드를 CheeseActivity에 작성하자

 

// 1
private fun createButtonClickObservable(): Observable<String> {
  // 2
  return Observable.create { emitter ->
    // 3
    searchButton.setOnClickListener {
      // 4
      emitter.onNext(queryEditText.text.toString())
    }

    // 5
    emitter.setCancellable {
      // 6
      searchButton.setOnClickListener(null)
    }
  }
}

 

1. strings를 전달하는 observable을 리턴하는 메소드를 선언한다
2. new ObservableOnSubscribe 를 매개변수로 Observable.create()을 호출해서 observable을 생성한다
3. subscribe()를 오버라이딩해서 ObservableOnSubscribe를 정의한다.
4. 버튼에 OnClickListener 를 달고
5. 클릭 이벤트가 발생하면, emitter.onNext를 호출하고 mQueryEditText의 텍스트를 전달한다.
6. ObservableEmitter의 setCancellable 의 cancle()을 구현해서 Observable이 제거될 때 버튼의 clickListener를 없애주는 것이 메모리 릭 방지를 위한 좋은 습관이다. Observable은 completed되거나 Observer가 unsubscribe할 때 제거된다.
7. setOnClickListener(null)로 clickListener를 없앤다

Observable을 정의했으므로, 여기에 subscription을 설정해야 한다. 이걸 하려면 Consumer 인터페이스를 알아야 한다. emitter에서 전달되는 값을 받아들이는 간단한 방법이다.

public interface Consumer<T> {
  void accept(T t) throws Exception;
}

Observable 인터페이스가 동작하기 위해 subscribe()가 필요한데 파라미터가 다른 몇가지 버전이 있다. 예를들면 모든 필요한 메소드를 구현한 완전한 Observer 를 전달할 수도 있는데, 그저 onNext에서 전달되는 값을 받기 위한 것이라면 하나의 Consumer를 가지는 subscribe()를 이용하면 된다. CheeseActivity.java의 onStart에 바로 이 경우를 추가하자.

 

override fun onStart() {
  super.onStart()
  // 1
  val searchTextObservable = createButtonClickObservable()

  searchTextObservable
      // 2
      .subscribe { query ->
        // 3
        showResult(cheeseSearchEngine.search(query))
      }
}

1. 먼저 위에서 만든 메소드를 호출해서 observable을 만든다
2. subscribe()를 호출해서 Subscribe하고 그 안에 간단한 Consumer를 넣는다
3. observable이 아이템을 전달할 때 호출되는 accept()를 구현한다.
4. 검색을 수행하고 결과를 표시한다. 앱을 실행하면 단어 입력후 치즈 검색 결과가 표시된다.

[RxJava Threading Model]

reactive programming 으로 앱을 만들었다!. 하지만 버튼을 누르면 몇 초간 UI가 얼어버리는 문제가 있다. 그래서 아래와 같은 내용이 android monitor에 표시될 것이다.

> 08-24 14:36:34.554 3500-3500/com.raywenderlich.cheesefinder I/Choreographer: Skipped 119 frames! The application may be doing too much work on its main thread.

search 함수가 메인 쓰레드에서 동작하기 때문이다. 이제 고쳐보자

RxJava가 기본적으로 멀티쓰레드로 동작한다고 생각하는 경우가 있는데 이것은 오해고, 별도의 지정을 하지 않는다면 호출된 쓰레드에서 동작한다.
subscribeOn과 observeOn 을 이용해서 이 동작을 변경할 수 있다.
observable에 연결된 오퍼레이터들의 동작 과정에서 subscribeOn은 한 번만 호출된다. subscribeOn은 observable이 subscribe되는(또는 생성되는) 쓰레드를 지정한다. 그래서 만일 안드로이드뷰의 이벤트를 전달하는 observable을 이용한다면 subscription은 UI 쓰레드에서 수행되도록 지정해야 한다.
반면에 observeOn은 여러번 호출해도 괜찮다. observeOn은 그 다음 오퍼레이터들이 실행될 쓰레드를 지정한다. 예를 보자.

myObservable // observable will be subscribed on i/o thread
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(/* this will be called on main thread... */)
.doOnNext(/* ...and everything below until next observeOn */)
.observeOn(Schedulers.io())
.subscribe(/* this will be called on i/o thread */);

myObservable // observable will be subscribed on i/o thread
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .map { //메인 스레드에서 호출됩니다. }
      .doOnNext{ // 다음 observeOn까지 아래의 모든 것 }
      .observeOn(Schedulers.io())
      .subscribe {// io스레드에서 호출됩니다. }

유용한 스케줄러

Schedulers.io() : IO 관련된 동작을 수행하기에 적합하다 (네트워크 요청이나 디크스 오퍼레이션)
Schedulers.computation() : 이벤트 루프나 콜백을 처리하는 등의 동작을 수행하기에 적합
AndroidSchedulers.mainThread() : UI쓰레드에서 실행된다.

 

[The Map Operator]

map 오퍼레이터는 observable이 전달하는 아이템들 각각에 적용되고, 그 오퍼레이션으로 변형된 다른 형태의 아이템을 전달하는 observable을 리턴한다.
숫자들을 전달하는 아래와 같은 numbers라는 observable이 있다

 

여기에 map을 이렇게 적용할 수 있다.

numbers.map {num-> num * num}

결과는 이렇게 된다. 우리 예제에 적용해보자

CheeseActivity onStart를 아래와 같이 수정하자

override  fun  onStart () {
   super .onStart () val searchTextObservable = createButtonClickObservable () 
  searchTextObservable // 1 
      .subscribeOn (AndroidSchedulers.mainThread ()) // 2 
      .observeOn (Schedulers.io ()) // 3 
      .map {cheeseSearchEngine. search (it)} // 4 
      .observeOn (AndroidSchedulers.mainThread ()) 
      .subscribe { 
        showResult (it) 
      } 
}

1.먼저 다음 오퍼레이터가 io 쓰레드에서 호출되도록 지정한다.
2.매번 검색어에 대해서 검색결과를 리턴한다
3.마지막으로 다음 오퍼레이터가 메인쓰레드에서 실행되도록 지정한다.
빌드해서 실행해보면 UI 가 멈추는 현상이 없을 것이다.

 

[Show Progress Bar with doOnNext]

프로그레스 바를 표시해보자.
그러려면 doOnNext 오퍼레이터가 필요하다. doOnNext 는 파라미터로 Consumer가 있어야 하고, observable에서 아이템이 전달될 때마다 특정 동작을 수행할 수 있게 해준다. CheeseActivity onStart()를 아래처럼 수정하자

override  fun  onStart () {
   super .onStart () val searchTextObservable = createButtonClickObservable () 
  searchTextObservable // 1 
      .observeOn (AndroidSchedulers.mainThread ()) // 2 
      .doOnNext {showProgress ()} 
      .observeOn (Schedulers.io ()) 
      . map {cheeseSearchEngine.search (it)} 
      .observeOn (AndroidSchedulers.mainThread ()) 
      .subscribe { // 3 
        hideProgress () 
        showResult (it) 
      } 
}

  

      
      
        

1. 다음 오퍼레이터들이 메인 쓰레드에서 수행되도록 시정한다
2. doOnNext를 추가해서 showProgressBar()를 넣으면 매번 아이템이 전달될 때마다 호출된다.
3. 결과를 표시하고 나면 hideProgressBar()를 호출한다.
빌드해서 실행하면 검색을 실행하기 전에 프로그레스 바가 표시된다.

 

 

[검색 입력 텍스트 변화 감지]

구글처럼 사용자가 텍스트를 하나씩 입력할 때마다 자동으로 검색을 실행하려면 어떻게 하면될까? 먼저 TextView의 text changes 에 subscribe해야 한다. 다음 코드를 CheeseActivity에 추가하자

// 1
private fun createTextChangeObservable(): Observable<String> {
  // 2
  val textChangeObservable = Observable.create<String> { emitter ->
    // 3
    val textWatcher = object : TextWatcher {

      override fun afterTextChanged(s: Editable?) = Unit

      override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) = Unit

      // 4
      override fun onTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {
        s?.toString()?.let { emitter.onNext(it) }
      }
    }

    // 5
    queryEditText.addTextChangedListener(textWatcher)

    // 6
    emitter.setCancellable {
      queryEditText.removeTextChangedListener(textWatcher)
    }
  }

  // 7
  return textChangeObservable
}



1. text change observable을 리턴하는 메소드를 선언한다
2. create()로 textChangeObservable을 만든다.
3. 먼저 TextWatcher를 생성한다
4. 텍스트 수정이 발행해서 onTextChanged()가 호출되면 그 텍스트를 전달한다
5. TextView에 watcher를 붙인다.
6. emitter.setCancellable()의 cance()을 오버라이드해서 text watcher가 제거될 수 있도록 한다.
7. 마지막으로 생성한 observable을 리턴한다.

CheeseActivity의 onStart()에서 searchTextObservable 지정하는 코드를 수정하면 결과를 볼 수 있다.

Observable<String> searchTextObservable = createTextChangeObservable();

앱을 실행하고 글자를 입력하면 바로 검색이 실행되는 것을 볼 수 있다.

 

 

[Filter Query by Length]

검색어 한글자로 검색하는 경우는 유효하지 않을 것이다. 이걸 막기 위해서는 filter 오퍼레이터를 써보자
filter 는 특정한 조건을 만족하는 아이템들만 전달한다. filter 는 Predicate를 매개변수로 받는데, Predicate는 입력받은 값을 검사하여 boolean을 리턴하는 조건을 정의하는 interface이다. 우리의 경우에 Predicate는 String을 받아서 그 길이가 두자 이상인 경우에 true를 리턴한다.

return textChangeObservable.filter { it.length >= 2 }

입력텍스트가 2글자 미만인 경우에는 동작하지 않는 것만 빼면 이전과 동일하게 동작한다. 앱을 실행하면 두번째 글자를 입력할 때 검색이 실행되는 것을 볼 수 있다.

 

[Debounce operator]

글자가 바뀔 때마다 매번 서버에 새로운 요청을 전송하기를 원하지는 않을 것이다.
debounce는 reactive 패러다임의 진짜 효용성을 보여주는 오퍼레이터 중의 하나이다. filter 오퍼레이터처럼 debounce도 observable이 전달하는 아이템들을 필터링한다. 다만 아이템들 중에 어떤 아이템이 필터를 통과할지는 아이템이 무엇인지가 아니라, 언제 아이템이 전달되느냐에 따라 결정된다.
debounce는 다음 아이템을 전달할 때까지 특정 시간동안 기다린다. 기다리는 시간동안 아무런 아이템이 전달되지 않았다면, 마지막 아이템이 전달된다.

 

 

createTextChangeObservable() 안에서 filter바로 아래에 다음과 같이 debounce 오퍼레이터를 추가한다.

return textChangeObservable
      .filter { it.length >= 2 }
      .debounce(1000, TimeUnit.MILLISECONDS) // add this line

앱을 실행해보면, 타이핑을 계속하다가 멈추어야 검색이 시작되는 것을 볼 수 있다. debounce는 1000 밀리세컨드를 기다린 후에 마지막 검색어를 전달한다.

 

[Merge Operator]

버튼 클릭에 반응하는 observable을 만들었고, 텍스트 필드 변화에 반응하는 observable도 만들었는데, 어떻게하면 둘다에 반응할 수 있을까? 이런 경우 observable들을 합칠 수 있는 오퍼레이터들이 많이 있다. 가장 간단하고 유용한 오퍼레이터가 merge이다. merge는 두개 이상의 observable에서 아이템을 받아서 하나의 observable로 전달한다.

onStart()를 아래처럼 수정하자.

val buttonClickStream = createButtonClickObservable()
val textChangeStream = createTextChangeObservable()

val searchTextObservable = Observable.merge<String>(buttonClickStream, textChangeStream)

Observable<String> searchTextObservable = Observable.merge(textChangeStream, buttonClickStream);

앱을 실행하면 텍스트 필드와 버튼 모두 동작하는 것을 확인할 수 있다.

반응형