RxJava2, RxAndroid2

[RxJava2] 스케줄러 종류와 사용 법 : newThread(), computation(), io(), trampoline(), single(), from(executor)

알통몬_ 2018. 10. 4. 10:54
반응형


공감 및 댓글은 포스팅 하는데

 아주아주 큰 힘이 됩니다!!

포스팅 내용이 찾아주신 분들께 

도움이 되길 바라며

더 깔끔하고 좋은 포스팅을 

만들어 나가겠습니다^^

 



이번 포스팅에서는 스케줄러의 종류와 사용법에 대해 공부합니다.

여러 종류의 스케줄러가 있는데, 오늘 공부할 종류 중

RxJava 에서는 computation() 스케줄러, io() 스케줄러, trampoline() 스케줄러의

사용을 권장합니다.


첫 번째로 공부할 newThread() 스케줄러는 특수한 상황에서만 사용하시면

되겠습니다.


newThread()

새로운 스레드를 생성

새 스레드를 만들어 어떠한 동작을 실행하고 싶을 때

Schedulers.newThread() 를 인자로 넣어주면 됩니다.

예제)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package rx.java.chapter05.schedulers;
 
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
 
public class NewSchedulersEx {
 
    public static void main(String[] args) {
 
        try {
            String[] strs = { "1""5""9" };
            Observable.fromArray(strs)
                    .doOnNext(str -> System.out.println(Thread.currentThread().getName() + " =>" + str))
                    .map(str -> "{{" + str + "}}")
                    .subscribeOn(Schedulers.newThread()).subscribe(System.out::println);
 
            Thread.sleep(1000);
 
            Observable.fromArray(strs)
                    .doOnNext(str -> System.out.println(Thread.currentThread().getName() + " =>" + str))
                    .map(str -> "((" + str + "))")
                    .subscribeOn(Schedulers.newThread()).subscribe(System.out::println);
 
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
    }
 
}
 
cs

예제를 돌려보면 

첫 번째 Observable과 두 번째 Observable이 다른 스레드에서 실행됨을

확인할 수 있습니다.


computation()

CPU에 대응하는 계산용 스케줄러입니다.

계산 작업을 할 때는 대기 시간이 없이 빠르게 결과를 도출하는 것이 중요합니다.

내부적으로 스레드풀을 생성하기 때문에 스레드 갯수는 기본적으로는

프로세서 갯수와 동입합니다.

예제)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package rx.java.chapter05.schedulers;
 
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
 
public class ComputationEx {
 
    public static void main(String[] args) {
 
        // 첫 번째 구독과 두 번째 구독이 거의 동시에 이루어져서
        // 여러 번 실행해보면 같은 스레드에 두 가지를 모두 할 당하는 경우도 발생
        
        String[] strs = {"1""3""5""7""9"};
        
        Observable<String> src = Observable.fromArray(strs)
                .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a);
 
        src.map(str -> "!!" + str + "!!")
        .subscribeOn(Schedulers.computation())
        .subscribe(data -> System.out.println(Thread.currentThread() + " :" + data));
 
        src.map(str -> "@@" + str + "@@")
        .subscribeOn(Schedulers.computation())
        .subscribe(data -> System.out.println(Thread.currentThread() + " :" + data));
        
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
    }
 
}
 
cs

100밀리초의 시간마다 데이터를 발행하는 observable을 만들고

2개의 구독을 만들었습니다.

그리고 실행해보면 스레드 3과 스레드 4에서 작업을 실행하는데

계속 반복해서 실행하다보면

이렇게 하나의 스레드에서 모든 작업을 처리하는 경우가 생깁니다.

그 이유는 첫 번째, 두 번째 구독이 거의 동시에 이루어져서 RxJava 내부에서

하나의 스레드에 모든 작업을 할당했기 때문입니다.


[ 광고 보고 가시죠! ]



[ 감사합니다! ]

io()

네트워크 상의 요청처리 or 각종 입,출력 작업 실행을 위한 스케줄러

계산 스케줄러와는 다르게 기본으로 생성되는 스레드 갯수가 다릅니다.

필요할 때마다 스레드를 계속 생성합니다.

입, 출력 작업의 경우 비동기로 실행되지만, 결과를 얻는데 까지는 대기 시간이 깁니다.

예제)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package rx.java.chapter05.schedulers;
 
import java.io.File;
 
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
 
public class IOEx {
 
    public static void main(String[] args) {
        
        // IO Scheduler
        // : 네트워크 상의 요청을 처리하거나,
        //  각종 입, 출력 작업을 위한 스케줄러.
        // 계산 스케줄러와 기본으로 생성되는 스레드 수가 다른다는 차이점이 있다.
        
        String path = "c:\\";
        File[] files = new File(path).listFiles();
        
        Observable<String> src = Observable.fromArray(files)
                .filter(file -> !file.isDirectory())
                .map(file -> file.getAbsolutePath())
                .subscribeOn(Schedulers.io());
        src.subscribe(System.out::println);
        
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
    }
 
}
 
cs

C 루트 드라이브에 있는 파일들 중 디렉토리를 제외한 파일들만 필터링하여

출력해주는 예제입니다.


trampoline()

새로운 스레드를 생성하지 않고 현재 스레드의 무한한 크기의 대기 행렬 Queue를

생성하는 스케줄러

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package rx.java.chapter05.schedulers;
 
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
 
public class TrampolineEx {
 
    public static void main(String[] args) {
 
        // 새로운 스레드를 생성하지 않고,
        // 현재 스레드에 무한의 대기 행렬(Queue) 생성하는 스케줄러.
        // 하나의 스레드에서 실행되고 Queue에 작업을 넣어서
        // 실행하기 때문에 순서가 바뀌는 경우는 발생하지 않음.
        String[] strs = {"1""3""5""7""9"};
        
        Observable<String> src = Observable.fromArray(strs);
 
        src.map(str -> "!!" + str + "!!")
        .subscribeOn(Schedulers.trampoline())
        .subscribe(data -> System.out.println(Thread.currentThread() + " :" + data));
 
        src.map(str -> "@@" + str + "@@")
        .subscribeOn(Schedulers.trampoline())
        .subscribe(data -> System.out.println(Thread.currentThread() + " :" + data));
        
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
    }
 
}
 
cs

새로운 스레드를 생성하지 않고 main 스레드에서 모든 작업을 처리함.

Queue에 작업을 넣은 후 1개 씩 꺼내서 동작하기 때문에

순서가 바뀌는 경우가 발생하지 않음.


single()

Rxjava 내부에서 단일 스레드를 별도로 생성하여 구독 작업 처리.

리액티브 프로그래밍에서는 비동기 프로그래밍을 지향하므로

사용할 확률이 낮음.

예제)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package rx.java.chapter05.schedulers;
 
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
 
public class SingleEx {
 
    public static void main(String[] args) {
 
        // RxJava 내부에서 단일 스레드를 별도로 생성해 구독작업을 처리.
        // * 생성된 스레드는 여러 번 구독 요청이 와도 공통으로 사용함.
 
        Observable<Integer> src1 =
                Observable.range(1005);
 
        Observable<String> src2 = 
                Observable.range(05).map(num -> String.valueOf(num));
 
        src1.subscribeOn(Schedulers.single())
        .subscribe(System.out::println);
 
        src2.subscribeOn(Schedulers.single())
        .subscribe(System.out::println);
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
    }
 
}
 
cs



from(executor)

java.util.current 에서 제공하는 Executor 를 변환해 스케줄러를 생성할 수 있습니다.

BUT Executor 클래스와 스케줄러의 동작방식이 다르기 때문에

기존에 사용하던 Executor 클래스를 재사용해야하는 경우에만 제한적으로

활용하면 되겠습니다.

예제)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package rx.java.chapter05.schedulers;
 
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
 
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
 
public class ExecutorEx {
 
    public static void main(String[] args) {
 
        final int THREADS = 10;
        
        String[] strs = {"1""2""3"};
        Observable<String> src = Observable.fromArray(strs);
        Executor executor = Executors.newFixedThreadPool(THREADS);
 
        src.subscribeOn(Schedulers.from(executor))
        .subscribe(System.out::println);
        
 
        src.subscribeOn(Schedulers.from(executor))
        .subscribe(System.out::println);
 
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
    }
 
}
 
cs


이상입니다.


반응형