반응형
공감 및 댓글은 포스팅 하는데 아주아주 큰 힘이 됩니다!! 포스팅 내용이 찾아주신 분들께 도움이 되길 바라며 더 깔끔하고 좋은 포스팅을 만들어 나가겠습니다^^
|
이번 포스팅에서는 RxJava2 의 흐름 제어 함수에 대해서 공부합니다.
지난 포스팅 마지막에 쓴 것 처럼 Observable이 데이터를 발행하는 속도와
옵저버가 데이터를 받아서 처리하는 속도 사이의 차이가 발생할 때
사용하는 함수가 흐름 제어 함수입니다.
RxJava2 는 푸시 방식으로 동작하므로 이런 문제를 해결할 수 있어야 합니다.
sample()
특정 시간동안 가장 최근에 발행된 데이터만 걸러내줍니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | package rx.java.chapter07.control; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; public class SampleEx { public static void main(String[] args) { String[] strs = {"1", "10", "100", "10000", "100000"}; Observable<String> src1 = Observable.fromArray(strs) .take(4) .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable<String> src2 = Observable.fromArray(strs[4]) .zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable<String> src3 = Observable.concat(src1, src2) .sample(300L, TimeUnit.MILLISECONDS); //.sample(300L, TimeUnit.MILLISECONDS, true); // sample작업이 끝나지 않았는데 Observable이 종료되는 경우 // 마지막 값을 발행하려면 3번 째 인자값 true로 설정하면 됨. src3.subscribe(System.out::println); try { Thread.sleep(1000); } catch (Exception e) { // TODO: handle exception } } } | cs |
buffer()
일정 시간동안 데이터를 모았다가 한 번에 발행.
인자는 정수를 받음.
ex) buffer(3) 일 경우 데이터를 3개 단위로 모아서 발행.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | package rx.java.chapter07.control; import java.util.List; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; public class BufferEx { public static void main(String[] args) { String[] strs = {"0", "1", "10", "100", "10000", "100000"}; Observable<String> src1 = Observable.fromArray(strs) .take(3) .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable<String> src2 = Observable.fromArray(strs[3]) .zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable<String> src3 = Observable.fromArray(strs[4], strs[5]) .take(3) .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable<List<String>> src = Observable.concat(src1, src2, src3) .buffer(3);//[0, 1, 10], [100, 10000, 100000] // .buffer(2, 3) 2개 씩 데이터를 모으고 3번 째마다 스킵, // 첫 번째 인자보다 두 번째 인자 값이 커야함. // 즉 [0, 1], [100, 10000] 발행 src.subscribe(System.out::println); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } | cs |
throttleFirst(), throttleLast()
각각 주어진 조건의 가장 먼저 입력된 값, 가장 마지막에 입력된 값 발행.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | package rx.java.chapter07.control; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; //ThrottleFirst and ThrottleLast public class ThrottleFNLEx { private static String[] strs = {"0", "1", "10", "100", "10000", "100000"}; public static void main(String[] args) { throttleFirst(); System.out.println("==================="); throttleLast(); } //주어진 조건에서 가장 먼저 입력된 값 발행. //어떤 데이터를 발행하면 지정된 시간동안 다른데이터 발행되지 않도록 막음. // 계산 스케줄러에서 실행 => 비동기로 동작하도록 설계 됨. public static void throttleFirst() { Observable<String> src = concatObs() .throttleFirst(200L, TimeUnit.MILLISECONDS); src.subscribe(System.out::println); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //주어진 조건에서 가장 마지막에 입력된 값 발행. public static void throttleLast() { Observable<String> src = concatObs() .throttleLast(200L, TimeUnit.MILLISECONDS); src.subscribe(System.out::println); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static Observable<String> concatObs() { Observable<String> firstSrc = Observable.just(strs[0]) .zipWith(Observable.timer(100L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable<String> secondSrc = Observable.just(strs[1]) .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable<String> thirdSrc = Observable.just(strs[2], strs[3], strs[4], strs[5]) .zipWith(Observable.timer(100L, TimeUnit.MILLISECONDS), (a, b) -> a) .doOnNext(str -> System.out.println("Debug" + str)); return Observable.concat(firstSrc, secondSrc, thirdSrc); } } | cs |
window()
개념적으로 groupBy() 와 비슷한 함수.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | package rx.java.chapter07.control; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; public class WindowEx { private static String[] strs = {"0", "1", "10", "100", "10000", "100000"}; public static void main(String[] args) { window(); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //groupBy() 와 비슷함. // groupBy() 는 조건에 맞는 입력값들을 그룹화해서 별도의 Observable을 병렬로 만든다면, // window()는 throttleFirst() 나 sample() 처럼 처리할 수 있는 일부 값들만 // 받아들일 수 있음. // => 흐름제어 기능 + 별도의 Observable 분리 기능. public static void window() { //window()는 인자로 count. 정수를 입력받습니다. // count만큼 데이터가 발행될 때마다 새로운 Observable 생성. Observable<Observable<String>> src = concatObs().window(3); src.subscribe(ob -> ob.subscribe(data -> System.out.println(Thread.currentThread().getName() + "| value=" + data))); } public static Observable<String> concatObs() { Observable<String> firstSrc = Observable.fromArray(strs) .take(3) .zipWith(Observable.timer(100L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable<String> secondSrc = Observable.just(strs[3]) .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable<String> thirdSrc = Observable.just(strs[4], strs[5]) .zipWith(Observable.timer(100L, TimeUnit.MILLISECONDS), (a, b) -> a) .doOnNext(str -> System.out.println("Debug" + str)); return Observable.concat(firstSrc, secondSrc, thirdSrc); } } | cs |
debounce()
빠르게 연속 이벤트를 처리하는 흐름제어 함수.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | package rx.java.chapter07.control; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; public class DebounceEx { private static String[] strs = {"0", "1", "10", "100", "10000", "100000"}; public static void main(String[] args) { // TODO Auto-generated method stub debounce(); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //빠르게 연속 이벤트를 처리하는 흐름제어 함수. // 계산 스케줄러에서 동작함. // 이떤 이벤트 입력 후 timeout에서 지정한 시간 동안 // 추가 이벤트 발생 없을 시 마지막 이벤트를 발행. public static void debounce() { Observable<String> src = Observable.concat( Observable.timer(100L, TimeUnit.MILLISECONDS).map(i -> strs[0]), Observable.timer(300L, TimeUnit.MILLISECONDS).map(i -> strs[1]), Observable.timer(100L, TimeUnit.MILLISECONDS).map(i -> strs[2]), Observable.timer(300L, TimeUnit.MILLISECONDS).map(i -> strs[3])) .debounce(200L, TimeUnit.MILLISECONDS); src.subscribe(System.out::println); } } | cs |
이상입니다.
다음포스팅에서는 RxJava를 활용한 테스트에 대해 공부합니다.
반응형