RxJava2, RxAndroid2

[RxJava2] 흐름제어 함수 : sample(), buffer(), throttleFirst(), throttleLast(), window(), debounce()

알통몬_ 2018. 10. 11. 16:33
반응형


공감 및 댓글은 포스팅 하는데

 아주아주 큰 힘이 됩니다!!

포스팅 내용이 찾아주신 분들께 

도움이 되길 바라며

더 깔끔하고 좋은 포스팅을 

만들어 나가겠습니다^^

 




이번 포스팅에서는 RxJava2 의 흐름 제어 함수에 대해서 공부합니다.

지난 포스팅 마지막에 쓴 것 처럼 Observable이 데이터를 발행하는 속도와

옵저버가 데이터를 받아서 처리하는 속도 사이의 차이가 발생할 때

사용하는 함수가 흐름 제어 함수입니다.


RxJava2 는 푸시 방식으로 동작하므로 이런 문제를 해결할 수 있어야 합니다.


sample()

특정 시간동안 가장 최근에 발행된 데이터만 걸러내줍니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package rx.java.chapter07.control;
 
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
 
public class SampleEx {
 
    public static void main(String[] args) {
 
        String[] strs = {"1""10""100""10000""100000"};
        
        Observable<String> src1 = Observable.fromArray(strs)
                .take(4)
                .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a);
        
        Observable<String> src2 = Observable.fromArray(strs[4])
                .zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a);
        
        Observable<String> src3 = Observable.concat(src1, src2)
                .sample(300L, TimeUnit.MILLISECONDS);
        //.sample(300L, TimeUnit.MILLISECONDS, true); 
// sample작업이 끝나지 않았는데 Observable이 종료되는 경우
// 마지막 값을 발행하려면 3번 째 인자값 true로 설정하면 됨.
        
        src3.subscribe(System.out::println);
        
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
    }
 
}
 
cs


buffer()

일정 시간동안 데이터를 모았다가 한 번에 발행.

인자는 정수를 받음.

ex)  buffer(3) 일 경우 데이터를 3개 단위로 모아서 발행.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package rx.java.chapter07.control;
 
import java.util.List;
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
 
public class BufferEx {
 
    public static void main(String[] args) {
 
        String[] strs = {"0""1""10""100""10000""100000"};
        
        Observable<String> src1 = Observable.fromArray(strs)
                .take(3)
                .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a);
        
        Observable<String> src2 = Observable.fromArray(strs[3])
                .zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a);
        
        Observable<String> src3 = Observable.fromArray(strs[4], strs[5])
                .take(3)
                .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a);
        
        
        Observable<List<String>> src = Observable.concat(src1, src2, src3)
                .buffer(3);//[0, 1, 10], [100, 10000, 100000]
           // .buffer(2, 3) 2개 씩 데이터를 모으고 3번 째마다 스킵, 
// 첫 번째 인자보다 두 번째 인자 값이 커야함.
           // 즉 [0, 1], [100, 10000] 발행
        
        src.subscribe(System.out::println);
        
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
 
}
 
cs


throttleFirst(), throttleLast()

각각 주어진 조건의 가장 먼저 입력된 값, 가장 마지막에 입력된 값 발행.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package rx.java.chapter07.control;
 
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
 
//ThrottleFirst and ThrottleLast
public class ThrottleFNLEx {
 
    private static String[] strs = {"0""1""10""100""10000""100000"};
    
    public static void main(String[] args) {
        throttleFirst();
        System.out.println("===================");
        throttleLast();
 
    }
    
    //주어진 조건에서 가장 먼저 입력된 값 발행.
    //어떤 데이터를 발행하면 지정된 시간동안 다른데이터 발행되지 않도록 막음.
    // 계산 스케줄러에서 실행 => 비동기로 동작하도록 설계 됨.
    public static void throttleFirst() {
        Observable<String> src = concatObs()
                .throttleFirst(200L, TimeUnit.MILLISECONDS);
        
        src.subscribe(System.out::println);
        
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    //주어진 조건에서 가장 마지막에 입력된 값 발행.
    public static void throttleLast() {
        Observable<String> src = concatObs()
                .throttleLast(200L, TimeUnit.MILLISECONDS);
        
        src.subscribe(System.out::println);
        
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    
    public static Observable<String> concatObs() {
        Observable<String> firstSrc = Observable.just(strs[0])
                .zipWith(Observable.timer(100L, TimeUnit.MILLISECONDS), (a, b) -> a);
        
        Observable<String> secondSrc = Observable.just(strs[1])
                .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS), (a, b) -> a);
        
        Observable<String> thirdSrc = Observable.just(strs[2], strs[3], strs[4], strs[5])
                .zipWith(Observable.timer(100L, TimeUnit.MILLISECONDS), (a, b) -> a)
                .doOnNext(str -> System.out.println("Debug" + str));
        return Observable.concat(firstSrc, secondSrc, thirdSrc);
    }
 
}
 
cs


window()

개념적으로 groupBy() 와 비슷한 함수.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package rx.java.chapter07.control;
 
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
 
public class WindowEx {
    
    private static String[] strs = {"0""1""10""100""10000""100000"};
    
    public static void main(String[] args) {
 
        window();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }
 
    
    //groupBy() 와 비슷함.
    // groupBy() 는 조건에 맞는 입력값들을 그룹화해서 별도의 Observable을 병렬로 만든다면,
    // window()는 throttleFirst() 나 sample() 처럼 처리할 수 있는 일부 값들만
    // 받아들일 수 있음.
    // => 흐름제어 기능 + 별도의 Observable 분리 기능.
    public static void window() {
        //window()는 인자로 count. 정수를 입력받습니다.
        // count만큼 데이터가 발행될 때마다 새로운 Observable 생성.
        Observable<Observable<String>> src = concatObs().window(3);
        src.subscribe(ob -> ob.subscribe(data -> 
                System.out.println(Thread.currentThread().getName() + "| value=" + data)));
    }
    
    public static Observable<String> concatObs() {
        Observable<String> firstSrc = Observable.fromArray(strs)
                .take(3)
                .zipWith(Observable.timer(100L, TimeUnit.MILLISECONDS), (a, b) -> a);
        
        Observable<String> secondSrc = Observable.just(strs[3])
                .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS), (a, b) -> a);
        
        Observable<String> thirdSrc = Observable.just(strs[4], strs[5])
                .zipWith(Observable.timer(100L, TimeUnit.MILLISECONDS), (a, b) -> a)
                .doOnNext(str -> System.out.println("Debug" + str));
        return Observable.concat(firstSrc, secondSrc, thirdSrc);
    }
 
}
 
cs


debounce()

빠르게 연속 이벤트를 처리하는 흐름제어 함수.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package rx.java.chapter07.control;
 
import java.util.concurrent.TimeUnit;
 
 
import io.reactivex.Observable;
 
public class DebounceEx {
    
    private static String[] strs = {"0""1""10""100""10000""100000"};
    
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        debounce();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    //빠르게 연속 이벤트를 처리하는 흐름제어 함수.
    // 계산 스케줄러에서 동작함.
    // 이떤 이벤트 입력 후 timeout에서 지정한 시간 동안
    // 추가 이벤트 발생 없을 시 마지막 이벤트를 발행.
    public static void debounce() {
        Observable<String> src = Observable.concat(
                Observable.timer(100L, TimeUnit.MILLISECONDS).map(i -> strs[0]),
                Observable.timer(300L, TimeUnit.MILLISECONDS).map(i -> strs[1]),
                Observable.timer(100L, TimeUnit.MILLISECONDS).map(i -> strs[2]),
                Observable.timer(300L, TimeUnit.MILLISECONDS).map(i -> strs[3]))
                .debounce(200L, TimeUnit.MILLISECONDS);
        src.subscribe(System.out::println);
                
    }
 
}
 
cs


이상입니다.

다음포스팅에서는 RxJava를 활용한 테스트에 대해 공부합니다.


반응형