RxJava2, RxAndroid2

[RxJava2] 리액티브 생성 연산자 : interval(), timer(), range(), intervalRange(), defer(), repeat()

알통몬_ 2018. 9. 28. 11:58
반응형


공감 및 댓글은 포스팅 하는데

 아주아주 큰 힘이 됩니다!!

포스팅 내용이 찾아주신 분들께 

도움이 되길 바라며

더 깔끔하고 좋은 포스팅을 

만들어 나가겠습니다^^

 


이번 포스팅에서는 생성 연산자에 대해 공부합니다.


생성 연산자

: 데이터의 흐름을 만드는 역할을 합니다

=> Observable을 만든다.


interval()

일정 시간 간격으로 데이터 흐름을 생성합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package rx.java.chapter04.constructor;
 
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
 
public class IntervalEx {
 
    public static void main(String[] args) {
        
        Observable<Long> src = Observable.interval(100L, TimeUnit.MILLISECONDS)
                .map(data -> (data + 1* 100)
                .take(5);
        
        src.subscribe(System.out::println);
        
    //sleep을 호출하는 이유
    //interval은 main 스레드가 아닌 별도의 스레드에서 동작하기 때문에
    //작업이 완료될 때까지 main 스레드가 기다려주도록 하기 위함.
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
    }
 
}
 
cs

interval(long period, TimeUnit unit) 함수의 인자는 두 개입니다.

period 동안 쉬었다가, 데이터를 발행합니다.

interval(long initialDelay, long period, TimeUnit unit) 도 있는데,

첫 번째 인자인 initialDelay는 초기 지연시간을 줄 수 있습니다.


interval()은 영원히 지속되기 때문에 폴링 용도로 많이 씁니다.

폴링 : 동기화를 목적으로 하여 상태를 주기적으로 검사해서 일정 조건이 만족하면

처리하는 방법입니다.


TimeUnit은 시간단위입니다.

단위는 총 7개 사용가능합니다.

1
2
3
4
5
6
7
    NANOSECONDS
    MICROSECONDS
    MILLISECONDS
    SECONDS
    MINUTES
    HOURS
    DAYS
cs



timer()

한번만 실행하는 함수.

일정 시간이 지난 후에 한 개의 데이터를 발행하고 onComplete() 이벤트 발생.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package rx.java.chapter04.constructor;
 
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
 
public class TImerEx {
 
    public static void main(String[] args) {
 
        Observable<String> oneTime = Observable.timer(200L, TimeUnit.MILLISECONDS)
                .map(none -> "한 번만 실행합니다.");
        
        oneTime.subscribe(System.out::println);
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
    }
 
}
 
cs



range()

for문 대신에 사용할 수 있습니다.

range(int start, int end) 두 개의 인자를 가지는데 

start : 초기 값

end : 갯수

즉, start 가 5이고 end가 10이면 5 ~ 14까지의 정수임.

Integer 객체를 발행합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package rx.java.chapter04.constructor;
 
import io.reactivex.Observable;
 
public class RangeEx {
 
    public static void main(String[] args) {
        // for 루프 대신에 사용
        Observable<Integer> src = Observable.range(110)
                .filter(in -> in % == 1);
        src.subscribe(System.out::println);
        
    }
 
}
 
cs

위 예제는 1~10까지의 정수 중 홀수만 출력하는 예제입니다.

[ 광고 보고 가시죠! ]



[ 감사합니다! ]


intervalRange()

interval() + range() 함수

일정시간 간격으로 값을 출력하고, range() 특성을 그대로 가집니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package rx.java.chapter04.constructor;
 
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
 
public class IntervalRangeEx {
 
    public static void main(String[] args) {
 
        Observable<Long> src = Observable.intervalRange(
                1// start
                5// count
                100L, // initialDelay
                100L, // period
                TimeUnit.MILLISECONDS
        ).map(cnt -> cnt * 100);
        src.subscribe(System.out::println);
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO: handle exception
        }
        
    }
}
 
cs


defer()

timer() 함수와 유사하지만, 데이터 흐름 생성을 구독자가 subscribe()를

호출할 때까지 미룰 수 있음.

인자는 Callable을 받습니다.

defer() 활용과 미활용 예제)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package rx.java.chapter04.constructor;
 
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.Callable;
 
import io.reactivex.Observable;
 
public class DeferEx {
    Iterator<String> iterator = Arrays.asList("AAA""BBB""CCC""DDD").iterator();
 
    public static void main(String[] args) {
        
        DeferEx ex = new DeferEx();
        
        
        Callable<Observable<String>> supp = () -> ex.observable();
        
        Observable<String> src = Observable.defer(supp);
 
        src.subscribe(value -> System.out.println("구독자 #1 =>" + value));
        src.subscribe(value -> System.out.println("구독자 #2 =>" + value));
        System.out.println("----------------------------------------");
        
        Observable<String> none = ex.observable();
        none.subscribe(value -> System.out.println("구독자 #1 =>" + value));
        none.subscribe(value -> System.out.println("구독자 #2 =>" + value));
        
    }
    
    private Observable<String> observable() {
        if(iterator.hasNext()) {
            String iter = iterator.next();
            return Observable.just(iter + " Good"
                    , iter + " Soso"
                    , iter + " Bad");
        }
        return Observable.empty();
    }
    
}
 
cs


defer()를 썻을 때와 안썼을 때가 결과가 다르게 나오죠?

Observable을 생성할 때에 입력값이 결정되고 구독자가 subscribe()를 호출하면

그 때 해당 데이터 흐름을 발행하기 때문입니다.

defer()를 쓰게 되면 subscribe()를 호출할 때의 상황을 반영해서

데이트 흐름의 생성을 지연하는 효과를 볼 수 있습니다.



repeat()

이름 그대로 반복하는 함수입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package rx.java.chapter04.constructor;
 
import io.reactivex.Observable;
 
public class RepeatEx {
 
    public static void main(String[] args) {
 
        String[] names = {"AAA""BBB""CCC"};
        
        Observable<String> src = Observable.fromArray(names)
                .repeat(2);
        
        src.doOnComplete(() -> System.out.println("finished")).subscribe(System.out::println);
        
        
    }
 
}
 
cs

어렵지 않으니 예제만 보고 넘어갑니다.


이상입니다.

다음 포스팅에서는 변환 연산자에 대해 공부합니다.

반응형