RxJava2, RxAndroid2

[RxJava2] 리액티브 조건 연산자 : amb(), takeUntil(), skipUntil(), all()

알통몬_ 2018. 10. 1. 14:18
반응형


공감 및 댓글은 포스팅 하는데

 아주아주 큰 힘이 됩니다!!

포스팅 내용이 찾아주신 분들께 

도움이 되길 바라며

더 깔끔하고 좋은 포스팅을 

만들어 나가겠습니다^^

 



이번 포스팅에서는 조건(condition) 연산자에 대해 공부합니다.


조건 연산자

흐름을 제어하는 역할



amb()

Observable이 여러개 들어왔을 때 먼저 나오는 Observable을 선택해서 발행함.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package rx.java.chapter04.condition;
 
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
 
public class AmbEx {
 
    public static void main(String[] args) {
 
        String[] first = {"1""2""3"};
        String[] second = {"4""5""6"};
        
        List<Observable<String>> src = Arrays.asList(
                Observable.fromArray(first)
                .delay(101L, TimeUnit.MILLISECONDS)
                .doOnComplete(() -> System.out.println("onComplete()")),
                Observable.fromArray(second)
                .delay(100L, TimeUnit.MILLISECONDS)
                .doOnComplete(() -> System.out.println("onComplete()"))
                
                );
        
        Observable.amb(src)
        .doOnComplete(() -> System.out.println("Result => onComplete()"))
        .subscribe(System.out::println);
        
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
    }
 
}
 
cs

이 후에 들어오는 Observable들은 무시합니다.


takeUntil()

take()에 조건을 추가할 수 있습니다.

인자로는 값을 발행할 수 있는 또 다른 Observable이 필요합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package rx.java.chapter04.condition;
 
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
 
public class TakeUntilEx {
 
    public static void main(String[] args) {
 
        String[] numbers = {"1""2""3""4""5""6"};
        
        Observable<String> src = Observable.fromArray(numbers)
                .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (val, notUsed) -> val)
                .takeUntil(Observable.timer(500L, TimeUnit.MILLISECONDS));
        
        src.subscribe(System.out::println);
        try {
            Thread.sleep(600);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
        
    }
 
}
 
cs


skipUntil()

takeUntil() 사용 방법은 동일하지만 기능은 정 반대 입니다.

Observable에서 데이터를 발행할 때까지 값을 건너 뜁니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package rx.java.chapter04.condition;
 
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
 
public class SkipUntilEx {
 
    public static void main(String[] args) {
 
String[] numbers = {"1""2""3""4""5""6"};
        
        Observable<String> src = Observable.fromArray(numbers)
                .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (val, notUsed) -> val)
                .skipUntil(Observable.timer(500L, TimeUnit.MILLISECONDS));
        
        src.subscribe(System.out::println);
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
        
    }
 
}
 
cs


all()

아래 주석처럼 주어진 조건을 100%만족할 때는 true를 반환하고

그 이외에는 false를 반환합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package rx.java.chapter04.condition;
 
import io.reactivex.Observable;
import io.reactivex.Single;
 
public class AllEx {
    //주어진 조건을 100% 만족할 때만 true 반환. 그 외에는 false 반환.
 
    public static void main(String[] args) {
 
        String[] numbers = { "1""2""3""4""5""6" };
        
        Single<Boolean> src = Observable.fromArray(numbers)
                .map(val -> {
                    if (Integer.parseInt(val) % == 1return "[" + val + "]";
                    else return val;
                    })
                .all(vals -> vals.contains("["));
        src.subscribe(System.out::println);
        
    }
 
}
 
cs


이상입니다.

다음 포스팅에서는 또 다른 연산자들에 대해 공부합니다.

반응형