본문 바로가기

gyub's 공부일기/RxJava

[RxJava] RxJava의 핵심 스케줄러 - 2 :트램펄린 스케줄러, 싱글 스레드 스케줄러, Executor 변환 스케줄러

1. 트램펄린 스케줄러

트램펄린 스케줄러는 새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 대기 행렬(Queue)을 생성하는 스케줄러입니다. 새로운 스레드를 생성하지 않는다는 것과 대기 행렬을 자동으로 만들어 주는 것이 뉴 스레드 스케줄러, 계산 스케줄러, IO 스케줄러와 다른 부분입니다!

 

val orgs = arrayOf("1","3","5")

Observable.fromArray(orgs).subscribeOn(Schedulers.trampoline())
.map{it-> "<<$it>>"}
.subscribe(Log::i)

Observable.fromArray(orgs).subscribeOn(Schedulers.trampoline())
.map{it-> "##$it##"}
.subscribe(Log::i)

//result
//main: <<1>>
//main: <<2>>
//main: <<3>>
//main: ##1##
//main: ##2##
//main: ##3##

새로운 스레드를 생성하지 않고 main 스레드에서 모든 작업을 실행합니다. 큐에 작업을 넣은 후 1개씩 꺼내어 동작하므로 첫 번째 구독과 두 번째 구독의 실행 순서가 바뀌는 경우는 발생하지 않죠.

 

 

2. 싱글 스레드 스케줄러

싱글 스레드 스케줄러는 RxJava 내부에서 단일 스레드를 별도로 생성하여 구독 작업을 처리합니다. 단, 생성된 스레드는 여러 번 구독 요청이 와도 공통으로 사용합니다.

이러한 이유로 비동기 프로그래밍을 지향하는 리액티브 프로그래밍은 싱글 스레드 스케줄러를 활용할 확률은 낮습니다.

 

val numbers:Observable<Int> = Observable.range(100,5)
val strings:Observable<String> = Observable.range(0,5).toAlphabet

numbers.subscribeOn(Schedulers.single())
.subscribe(Log::i)
strings.subscribeOn(Schedulers.single())
.subscribe(Log::i)

sleep(500)

// RxSingleScheduler-1 : value = 100
// RxSingleScheduler-1 : value = 101
// RxSingleScheduler-1 : value = 102
// RxSingleScheduler-1 : value = 103
// RxSingleScheduler-1 : value = 104
// RxSingleScheduler-1 : value = A
// RxSingleScheduler-1 : value = B
// RxSingleScheduler-1 : value = C
// RxSingleScheduler-1 : value = D
// RxSingleScheduler-1 : value = E

첫 번째 Observable은 100에서 104까지 5개 숫자를 발행합니다.

두 번째 Observable은 0에서 4까지 숫자를 바탕으로 알파벳을 발행합니다.

 

싱글 스레드 스케줄러에서 실행하면 비록 여러 개 Observable이 있어도 별도 마련해놓은 단일 스레드에서 차례로 실행합니다!!

 

3. Executor 변환 스케줄러

자바에서는 java.util.current 패지에서 제공하는 실행자를 변환하여 스케줄러를 생성 할 수 있어요.

하지만 Executor 클래스와 스케줄러의 동작 방시과 다르므로 추천 방법은 아닙니다. 기존에 사용하던 Executor 클래스를 재사용할 때만 한정적으로 활용합니다.

 

Executor 클래스를 활용하여 스케줄러를 지정하는 방법은 다음과 같습니다.

const val THREAD_NUM = 10

val data = arrayOf("1","3","5")
val source = Observable.fromArray(data)
Executor executor = Executors.newFixedThreadPool(THREAD_NUM)

source.subscribeOn(Schedulers.from(executor))
.subscribe(Log::i)
source.subscribeOn(Schedulers.from(executor))
.subscribe(Log::i)

sleep(500)

//result
//pool-1-thread : value = 1
//pool-1-thread : value = 2
//pool-1-thread : value = 3
//pool-2-thread : value = 1
//pool-2-thread : value = 2
//pool-2-thread : value = 3

 

executor 변수는 고정 개수(10개)의 스레드 풀을 생성합니다. 그리고 첫 번째 Observable과 두 번째 Observable에 subscribeOn() 함수를 호출하여 Executor 변환 스케줄러를 지정했습니다.

 

만약 Executors.newSingleThreadExecutor()로 Executors를 생성했다면 앞 실행 결과도 2개의 슬데드가 아니라 1개의 스레드에서 모두 실행하게 됩니다.

반응형