공감 및 댓글은 포스팅 하는데 아주아주 큰 힘이 됩니다!! 포스팅 내용이 찾아주신 분들께 도움이 되길 바라며 더 깔끔하고 좋은 포스팅을 만들어 나가겠습니다^^
|
이번 포스팅에서는 결합 연산자에 대해서 공부합니다.
결합 연산자
여러 개의 Observable을 조합하여 활용하는 연산자.
zip()
입력 Observable에서 데이터를 모두 새로 발행했을 경우 데이터들을 합해줍니다.
입력되는 모든 Observable을 활용해서 Observable을 결합합니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | package rx.java.chapter04.combine; import io.reactivex.Observable; public class ZipNumberEx { public static void main(String[] args) { Observable<Integer> src = Observable.zip( Observable.just(1, 2, 3), Observable.just(10, 20, 30), Observable.just(100, 200, 300), Observable.just(1000, 2000, 3000), (w, x, y, z) -> w + x + y + z); src.subscribe(System.out::println); } } | cs |
위 예제는 4개의 Observable을 발행하고 발행된 Observable 들의 값을
모두 더해서 출력하는 예제입니다.
입력된 Observable이 4개이기 때문에
(w, x, y, z) -> w + x + y + z)
에서 (w, x, y, z) 입력인자를 4개를 썼습니다.
return 되는 값은 어떻게 해도 상관없지만, 입력된 Observable의 수만큼
입력 인자를 넣어줘야 합니다.
zip() 함수는 최대 9개의 Observable을 결합할 수 있습니다.
일반적으로는 2~3개 정도 결합합니다.
아래 예제처럼 just() 와 interval()을 결합할 수도 있습니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | package rx.java.chapter04.combine; import io.reactivex.Observable; public class ZipNumberEx { public static void main(String[] args) { Observable<Integer> src = Observable.zip( Observable.just(1, 2, 3), Observable.just(10, 20, 30), Observable.just(100, 200, 300), Observable.just(1000, 2000, 3000), (w, x, y, z) -> w + x + y + z);apackage rx.java.chapter04.combine; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; public class ZipIntervalEx { public static void main(String[] args) { Observable<String> src = Observable.zip( Observable.just("AAA", "BBB","CCC"), Observable.interval(200L, TimeUnit.MILLISECONDS), (val, time) -> val ); src.subscribe(System.out::println); try { Thread.sleep(1000); } catch (Exception e) { // TODO: handle exception } } } src.subscribe(System.out::println); } } | cs |
zipWith()
zip() 함수와 동일합니다.
다른 점이라고 하면 Observable을 다양한 함수와 조합하며 중간 중간 조합할 수
있다는 장점이 있습니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | package rx.java.chapter04.combine; import io.reactivex.Observable; public class ZipWithEx { public static void main(String[] args) { Observable<Integer> src = Observable.zip( Observable.just(10, 20, 30), Observable.just(10, 20, 30), (a, b) -> a+b) .zipWith(Observable.just(1, 2, 3), (c, d) -> c + d); src.subscribe(System.out::println); } } | cs |
combineLatest()
처음에 각 Observable에서 데이터를 발행한 후에는 어느 Observable에서 값을
발행하던지 최신 값으로 갱싱해줍니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | package rx.java.chapter04.combine; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; public class CombineLatestEx { public static void main(String[] args) { String[] names = {"Alpha", "Bravo", "Charlie"}; String[] ages = {"20", "24", "33"}; Observable<String> src = Observable.combineLatest( //200밀리초마다 이름데이터 발행하는 Observable Observable.fromArray(names) .zipWith(Observable.interval(200L, TimeUnit.MILLISECONDS), (name, i) -> name), //초기 150밀리초를 딜레이주고, 100밀리초 마다 나이 데이터를 발행하는 Observable Observable.fromArray(ages) .zipWith(Observable.interval(150L, 100L, TimeUnit.MILLISECONDS), (age, i) -> age), (val1, val2) -> val1 + val2); src.subscribe(System.out::println); try { Thread.sleep(1000); } catch (Exception e) { // TODO: handle exception } } } | cs |
merge()
최신 데이터 여부와 관계 없이 각 Observable에서 발행하는 데이터를 그대로 출력
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | package rx.java.chapter04.combine; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; public class MergeEx { public static void main(String[] args) { String[] str1 = {"1", "3"}; String[] str2 = {"2", "4", "6"}; Observable<String> src1 = Observable.interval(0L, 100L, TimeUnit.MILLISECONDS) .map(Long::intValue) .map(idx -> str1[idx]) .take(str1.length); Observable<String> src2 = Observable.interval(50L, TimeUnit.MILLISECONDS) .map(Long::intValue) .map(idx -> str2[idx]) .take(str2.length); Observable<String> src3 = Observable.merge(src1, src2); src3.subscribe(System.out::println); try { Thread.sleep(1000); } catch (Exception e) { // TODO: handle exception } } } | cs |
concat()
입력된 Observable 을 Observable 단위로 이어 붙여 줍니다.
아래 예제처럼 Observable을 두 개를 이어 붙인다고 하면,
첫 번째 Observable이 onComplete 가 발생해야 두 번째 Observable을 구독합니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | package rx.java.chapter04.combine; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; import io.reactivex.functions.Action; public class ConcatEx { public static void main(String[] args) { Action onCompleteAction = () -> System.out.println("onComplete()"); String[] str1 = {"AAA", "BBB", "CCC"}; String[] str2 = {"111", "222", "333"}; Observable<String> src1 = Observable.fromArray(str1) .doOnComplete(onCompleteAction); Observable<String> src2 = Observable.interval(100L, TimeUnit.MILLISECONDS) .map(Long::intValue) .map(idx -> str2[idx]) .take(str2.length) .doOnComplete(onCompleteAction); Observable<String> src = Observable.concat(src1, src2) .doOnComplete(onCompleteAction); src.subscribe(System.out::println); try { Thread.sleep(1000); } catch (Exception e) { // TODO: handle exception } } } | cs |