RxJava2, RxAndroid2

[RxJava2] 리액티브 결합 연산자 : zip(), zipWith(), combineLatest(), merge(), concat()

알통몬_ 2018. 10. 1. 12:15
반응형


공감 및 댓글은 포스팅 하는데

 아주아주 큰 힘이 됩니다!!

포스팅 내용이 찾아주신 분들께 

도움이 되길 바라며

더 깔끔하고 좋은 포스팅을 

만들어 나가겠습니다^^

 



이번 포스팅에서는 결합 연산자에 대해서 공부합니다.


결합 연산자

여러 개의 Observable을 조합하여 활용하는 연산자.


zip()

입력 Observable에서 데이터를 모두 새로 발행했을 경우 데이터들을 합해줍니다.

입력되는 모든 Observable을 활용해서 Observable을 결합합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package rx.java.chapter04.combine;
 
import io.reactivex.Observable;
 
public class ZipNumberEx {
 
    public static void main(String[] args) {
 
        Observable<Integer> src = Observable.zip(
                Observable.just(123),
                Observable.just(102030),
                Observable.just(100200300),
                Observable.just(100020003000),
                (w, x, y, z) -> w + x + y + z);
            
        src.subscribe(System.out::println);
    }
 
}
 
cs

위 예제는 4개의 Observable을 발행하고 발행된 Observable 들의 값을

모두 더해서 출력하는 예제입니다.

입력된 Observable이 4개이기 때문에

(w, x, y, z) -> w + x + y + z)

에서 (w, x, y, z) 입력인자를 4개를 썼습니다.

return 되는 값은 어떻게 해도 상관없지만, 입력된 Observable의 수만큼

입력 인자를 넣어줘야 합니다.

zip() 함수는 최대 9개의 Observable을 결합할 수 있습니다.

일반적으로는 2~3개 정도 결합합니다.


아래 예제처럼 just() 와 interval()을 결합할 수도 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package rx.java.chapter04.combine;
 
import io.reactivex.Observable;
 
public class ZipNumberEx {
 
    public static void main(String[] args) {
 
        Observable<Integer> src = Observable.zip(
                Observable.just(123),
                Observable.just(102030),
                Observable.just(100200300),
                Observable.just(100020003000),
                (w, x, y, z) -> w + x + y + z);apackage rx.java.chapter04.combine;
 
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
 
public class ZipIntervalEx {
 
    public static void main(String[] args) {
 
        Observable<String> src = Observable.zip(
                Observable.just("AAA""BBB","CCC"),
                Observable.interval(200L, TimeUnit.MILLISECONDS),
                (val, time) -> val
                );
        
        src.subscribe(System.out::println);
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
    }
 
}
 
        src.subscribe(System.out::println);
    }
 
}
 
cs


zipWith()

 zip() 함수와 동일합니다.

다른 점이라고 하면 Observable을 다양한 함수와 조합하며 중간 중간 조합할 수

있다는 장점이 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package rx.java.chapter04.combine;
 
import io.reactivex.Observable;
 
public class ZipWithEx {
 
    public static void main(String[] args) {
        
        Observable<Integer> src = Observable.zip(
                Observable.just(102030),
                Observable.just(102030),
                (a, b) -> a+b)
                .zipWith(Observable.just(123), (c, d) -> c + d);
        src.subscribe(System.out::println);
        
    }
 
}
 
cs


combineLatest()

처음에 각 Observable에서 데이터를 발행한 후에는 어느 Observable에서 값을 

발행하던지 최신 값으로 갱싱해줍니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package rx.java.chapter04.combine;
 
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
 
public class CombineLatestEx {
 
    public static void main(String[] args) {
 
        String[] names = {"Alpha""Bravo""Charlie"};
        String[] ages = {"20""24""33"};
        
        Observable<String> src = Observable.combineLatest(
                //200밀리초마다 이름데이터 발행하는 Observable
                Observable.fromArray(names)
                .zipWith(Observable.interval(200L, TimeUnit.MILLISECONDS), 
                        (name, i) -> name),
                //초기 150밀리초를 딜레이주고, 100밀리초 마다 나이 데이터를 발행하는 Observable
                Observable.fromArray(ages)
                .zipWith(Observable.interval(150L, 100L, TimeUnit.MILLISECONDS), 
                        (age, i) -> age),
                (val1, val2) -> val1 + val2);
        src.subscribe(System.out::println);
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
    }
 
}
 
cs


merge()

최신 데이터 여부와 관계 없이 각 Observable에서 발행하는 데이터를 그대로 출력

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package rx.java.chapter04.combine;
 
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
 
public class MergeEx {
 
    public static void main(String[] args) {
 
        String[] str1 = {"1""3"};
        String[] str2 = {"2""4""6"};
        
        Observable<String> src1 = Observable.interval(0L, 100L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(idx -> str1[idx])
                .take(str1.length);
        Observable<String> src2 = Observable.interval(50L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(idx -> str2[idx])
                .take(str2.length);
        
        Observable<String> src3 = Observable.merge(src1, src2);
        
        src3.subscribe(System.out::println);
        
        
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
    }
 
}
 
cs


concat()

입력된 Observable 을 Observable 단위로 이어 붙여 줍니다.

아래 예제처럼 Observable을 두 개를 이어 붙인다고 하면,

첫 번째 Observable이 onComplete 가 발생해야 두 번째 Observable을 구독합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package rx.java.chapter04.combine;
 
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
import io.reactivex.functions.Action;
 
public class ConcatEx {
 
    public static void main(String[] args) {
 
        Action onCompleteAction = () -> System.out.println("onComplete()");
        
        String[] str1 = {"AAA""BBB""CCC"};
        String[] str2 = {"111""222""333"};
        
        Observable<String> src1 = Observable.fromArray(str1)
                .doOnComplete(onCompleteAction);
        
        Observable<String> src2 = Observable.interval(100L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(idx -> str2[idx])
                .take(str2.length)
                .doOnComplete(onCompleteAction);
        
        Observable<String> src = Observable.concat(src1, src2)
                .doOnComplete(onCompleteAction);
        src.subscribe(System.out::println);
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
    }
 
}
 
cs


반응형