gyub's 공부일기/RxJava

[RxJava] RxJava의 핵심 스케줄러 - 2 :트램펄린 스케줄러, 싱글 스레드 스케줄러, Executor 변환 스케줄러

gyu__b 2020. 11. 14. 18:15

1. 트램펄린 스케줄러

트램펄린 스케줄러는 새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 대기 행렬(Queue)을 생성하는 스케줄러입니다. 새로운 스레드를 생성하지 않는다는 것과 대기 행렬을 자동으로 만들어 주는 것이 뉴 스레드 스케줄러, 계산 스케줄러, IO 스케줄러와 다른 부분입니다!

 

val orgs = arrayOf("1","3","5")

Observable.fromArray(orgs).subscribeOn(Schedulers.trampoline())
.map{it-> "<<$it>>"}
.subscribe(Log::i)

Observable.fromArray(orgs).subscribeOn(Schedulers.trampoline())
.map{it-> "##$it##"}
.subscribe(Log::i)

//result
//main: <<1>>
//main: <<2>>
//main: <<3>>
//main: ##1##
//main: ##2##
//main: ##3##

새로운 스레드를 생성하지 않고 main 스레드에서 모든 작업을 실행합니다. 큐에 작업을 넣은 후 1개씩 꺼내어 동작하므로 첫 번째 구독과 두 번째 구독의 실행 순서가 바뀌는 경우는 발생하지 않죠.

 

 

2. 싱글 스레드 스케줄러

싱글 스레드 스케줄러는 RxJava 내부에서 단일 스레드를 별도로 생성하여 구독 작업을 처리합니다. 단, 생성된 스레드는 여러 번 구독 요청이 와도 공통으로 사용합니다.

이러한 이유로 비동기 프로그래밍을 지향하는 리액티브 프로그래밍은 싱글 스레드 스케줄러를 활용할 확률은 낮습니다.

 

val numbers:Observable<Int> = Observable.range(100,5)
val strings:Observable<String> = Observable.range(0,5).toAlphabet

numbers.subscribeOn(Schedulers.single())
.subscribe(Log::i)
strings.subscribeOn(Schedulers.single())
.subscribe(Log::i)

sleep(500)

// RxSingleScheduler-1 : value = 100
// RxSingleScheduler-1 : value = 101
// RxSingleScheduler-1 : value = 102
// RxSingleScheduler-1 : value = 103
// RxSingleScheduler-1 : value = 104
// RxSingleScheduler-1 : value = A
// RxSingleScheduler-1 : value = B
// RxSingleScheduler-1 : value = C
// RxSingleScheduler-1 : value = D
// RxSingleScheduler-1 : value = E

첫 번째 Observable은 100에서 104까지 5개 숫자를 발행합니다.

두 번째 Observable은 0에서 4까지 숫자를 바탕으로 알파벳을 발행합니다.

 

싱글 스레드 스케줄러에서 실행하면 비록 여러 개 Observable이 있어도 별도 마련해놓은 단일 스레드에서 차례로 실행합니다!!

 

3. Executor 변환 스케줄러

자바에서는 java.util.current 패지에서 제공하는 실행자를 변환하여 스케줄러를 생성 할 수 있어요.

하지만 Executor 클래스와 스케줄러의 동작 방시과 다르므로 추천 방법은 아닙니다. 기존에 사용하던 Executor 클래스를 재사용할 때만 한정적으로 활용합니다.

 

Executor 클래스를 활용하여 스케줄러를 지정하는 방법은 다음과 같습니다.

const val THREAD_NUM = 10

val data = arrayOf("1","3","5")
val source = Observable.fromArray(data)
Executor executor = Executors.newFixedThreadPool(THREAD_NUM)

source.subscribeOn(Schedulers.from(executor))
.subscribe(Log::i)
source.subscribeOn(Schedulers.from(executor))
.subscribe(Log::i)

sleep(500)

//result
//pool-1-thread : value = 1
//pool-1-thread : value = 2
//pool-1-thread : value = 3
//pool-2-thread : value = 1
//pool-2-thread : value = 2
//pool-2-thread : value = 3

 

executor 변수는 고정 개수(10개)의 스레드 풀을 생성합니다. 그리고 첫 번째 Observable과 두 번째 Observable에 subscribeOn() 함수를 호출하여 Executor 변환 스케줄러를 지정했습니다.

 

만약 Executors.newSingleThreadExecutor()로 Executors를 생성했다면 앞 실행 결과도 2개의 슬데드가 아니라 1개의 스레드에서 모두 실행하게 됩니다.

반응형