RxJava2, RxAndroid2

[RxJava2] Flowable 과 배압

알통몬_ 2018. 10. 12. 14:08
반응형


공감 및 댓글은 포스팅 하는데

 아주아주 큰 힘이 됩니다!!

포스팅 내용이 찾아주신 분들께 

도움이 되길 바라며

더 깔끔하고 좋은 포스팅을 

만들어 나가겠습니다^^

 




이번 포스팅에서는 Flowable 과 배압에 대해 공부합니다.


Flowable은 Observable 사용 시 발생할 수 있는 BackPressure

배압을 해결하기 위해 존재하는 클래스입니다.

사용방법은 Observable과 동일합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package rx.java.chapter08.flowable;
 
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
 
public class FlowableEx {
 
    public static void main(String[] args) {
 
        // Observable 과 사용법은 동일하다.
        // 서로 변환도 간단하다
        // Observable -> Flowable : toFlowable()
        // Flowable -> Observable : toObsevable()
        Flowable.just("Hello Flowable").toObservable()
        .subscribe(System.out::println);
        Observable.just("Hello Flowable").toFlowable(BackpressureStrategy.BUFFER)
        .subscribe(System.out::println);
        
    }
 
}
 
cs

 

Flowable 과 Observable을 선택하는 기준입니다.

https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#observable-and-flowable

링크 들어가셔서 확인해 보시면 되겠습니다.

일반적으로 Observable로 최대한 해결해보고(sample(), throttle(), debounce() 등의

흐름 제어 함수를 활용해본다)

그래도 해결이 어렵다면 Flowable로 변경하면 됩니다.


Flowable 에서 제공하는 배압 문제에 대응하는 함수 3가지

- onBackPressureBuffer()

배압 문제 발생 시 별도의 버퍼에 저장, Flowable은 기본적으로 128개의

버퍼가 있음.

onBackPressureDrop()

배압 문제 발생 시 해당 데이터 무시.


onBackPressureLatest()

처리할 수 없어서 쌓이는 데이터를 무시하면서 최신 데이터만 유지.

아래 4개 메서드를 통해 배압 문제와 해결방법에 대해 보겠습니다.

backPressure()

배압 문제가 발생하는 경우입니다.

PublishSubject 객체는 뜨거운 observable입니다.

데이터를 발행하는 속도와 처리하는 속도의 차이가 발생하더라도

어떠한 처리도 할 수 없습니다. 때문에 배압 문제 발생.


useOnBackPressureBuffer()

128개의 버퍼 생성 후 버퍼 overflow가 발생하면 가장 오래된 데이터 버림.

onBackPressureBuffer() : 기본 128개의 버퍼가 존재

onBackPressureBuffer(boolean delayError) : delayError 여부 지정 가능

onBackPressureBuffer(int capacity, Action onOverflow) : capacity : 버퍼 수,

onOverflow : 버퍼가 넘쳤을 때 실행할 동작.

onBackPressureBuffer(int capacity, Action onOverflow, BackPressureOverflowStrategy overflowStrategy) : capacity : 버퍼 수,

onOverflow 여부 지정버퍼가 넘쳤을 때 실행할 동작

overflowStrategy : 버퍼가 가득찼을 때 추가로 실행할 동작

3 가지 지정가능

BackpressureOverflowStrategy.DROP_OLDEST, DROP_LATEST, ERROR


useOnBackPressureDrop()

onBackPressureDrop() :버퍼가 가득차면 이후의 데이터는 모두 무시.

* Thread.sleep(20000); 을 주었는데, 그 이유는 버퍼에 128개의 데이터가

가득찼을 때 데이터를 계산 스케줄러에서 출력하기도 전에 예제가

끝나기 때문입니다.


useOnBackPressureLatest()

onBackPressureLatest() = onBackPressureDrop() + 마지막 데이터 발행.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package rx.java.chapter08.flowable;
 
import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
 
public class BackPressureEx {
 
    public static void main(String[] args) {
        
        useOnBackPressureLatest();
        
    }
    
    private static void backPressure() {
        // BackPressure issue
        PublishSubject<Integer> ps = PublishSubject.create();
        
        ps.observeOn(Schedulers.computation())
        .subscribe(data -> {
            Thread.sleep(100);
            System.out.println(data);
        }, error -> System.out.println(error.getMessage()));
        
        for(int i = 0; i < 50000000; i++) {
            ps.onNext(i);
        }
        ps.onComplete();
        
    }
    
    private static void useOnBackPressureBuffer() {
        Flowable.range(1, 50_000_000)
        .onBackpressureBuffer(128, () -> {}, BackpressureOverflowStrategy.DROP_OLDEST)
        .observeOn(Schedulers.computation())
        .subscribe(data -> {
            Thread.sleep(100);
            System.out.println(data);
        }, error -> System.out.println(error.getMessage()));
        
    }
    
    //기본 버퍼 갯수 만큼만 버퍼에 저장하고 나머지는 무시하므로
    // 128까지만 출력됨.
    private static void useOnBackPressureDrop() {
        Flowable.range(1, 50_000_000)
        .onBackpressureDrop()
        .observeOn(Schedulers.computation())
        .subscribe(data -> {
            Thread.sleep(100);
            System.out.println(data);
        }, error -> System.out.println(error.getMessage()));
        
        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }
 
    // onBackPressureDrop() + 마지막 데이터까지 발행.
    private static void useOnBackPressureLatest() {
        Flowable.range(1, 50_000_000)
        .onBackpressureLatest()
        .observeOn(Schedulers.computation())
        .subscribe(data -> {
            Thread.sleep(100);
            System.out.println(data);
        }, error -> System.out.println(error.getMessage()));
        
        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }
    
}
 
cs


위 예제의 메서드들을 하나씩 번갈아가며 실행해보시면 어떤 결과가 나오는지,

각 함수들이 어떤 역할을 하는지 명확히 이해하실 수 있을거에요.


이상입니다.

감사합니다.



반응형