RxJava2, RxAndroid2

[RxJava2] 뜨거운 Observable, Subject 클래스

알통몬_ 2018. 9. 27. 11:51
반응형


공감 및 댓글은 포스팅 하는데

 아주아주 큰 힘이 됩니다!!

포스팅 내용이 찾아주신 분들께 

도움이 되길 바라며

더 깔끔하고 좋은 포스팅을 

만들어 나가겠습니다^^

 


Observable에는 뜨거운 Observable과 차가운 Observable이 있습니다.


차가운 Observable

다른 말로 게으른 접근법이라고 합니다.

Observable을 선언한 후에 just(), fromIterable() 함수를 호출해도 

옵저버가 subscribe() 함수를 호출해 구독하지 않으면, 데이터를 발행하지 않습니다.


뜨거운 Observable

구독자가 존재 여부와 관계 없이  데이터를 발행하는 Observable입니다.

=> 여러 구독자를 고려할 수 있지만, 구독자로서는 observable 에서 발행하는

모든 데이터를 처음부터 수신하는 것을 보장할 수 없습니다.


==> 차가운 Observable은 구독자가 구독하면 준비된 데이터를 처음부터 발행

뜨거운 Observable은 구독한 시점부터 Observable에서 발행한 데이터를 받음.


* 뜨거운 Observable을 발행할 때에는 배압을 고려해야 하는데요.

배압은 Observable에서 데이터를 발행하는 속도와 구독자가 처리하는 속도의

차이가 클 때 발생합니다. 

RxJava2 에서는 이 배압 처리를 위한 Flowable 이라는 특화된 클래스를 제공합니다.

이후에 공부하도록 하겠습니다.



Subject

Subject 클래스는 차가운 Observable -> 뜨거운 Observable 으로 바꿔줍니다.

특징 : Observable의 특성과 구독자의 특성을 모두 가집니다.

데이터를 발행할 수도 있고 발행된 데이터를 바로 처리할 수도 있습니다.


주요 클래스 : AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject 등이

있습니다.


AsyncSubject

Observable에서 발행한 데이터 중 마지막 데이터를 얻어올 수 있는 Subject 클래스

완료되기 전 마지막 데이터에만 관심이 있고 이 전 데이터들은 모두 무시합니다.


AsyncSubject는 정적 함수인 create()로 생성합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import io.reactivex.subjects.AsyncSubject;
 
public class AsyncSubjectEx {
 
    public static void main(String[] args) {
    
        AsyncSubject<String> subject = AsyncSubject.create();
        
        subject.subscribe(data -> System.out.println("구독자1 =>" + data));
        subject.onNext("hello1");
        subject.onNext("hello2");
        subject.subscribe(data -> System.out.println("구독자2 =>" + data));
        subject.onNext("hello3");
        subject.onComplete();
        
    }
    
}
 
cs

실행 결과를 보면 구독자 1과 구독자 2 둘다 hello만 출력합니다.

그리고  onComplete()을 호출하지 않으면 둘 다 아무것도 출력하지 않습니다.


구독자로 동작하는 AsyncSubject 예제

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import io.reactivex.Observable;
import io.reactivex.subjects.AsyncSubject;
 
public class AsyncSubjectEx {
 
    public static void main(String[] args) {
    
        String[] greeting = {"Hi""Hello""What's up"};
        Observable<String> src = Observable.fromArray(greeting);
        
        AsyncSubject<String> subject = AsyncSubject.create();
        
        subject.subscribe(System.out::println);
        src.subscribe(subject);
    }
    
}
 
cs


onComplete() 이후 구독했을 때 예제

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

import io.reactivex.subjects.AsyncSubject;
 
public class AsyncSubjectEx {
 
    public static void main(String[] args) {
    
        AsyncSubject<String> subject = AsyncSubject.create();
        
        subject.subscribe(data -> System.out.println("구독자1 =>" + data));
        subject.onNext("hello1");
        subject.onNext("hello2");
        subject.subscribe(data -> System.out.println("구독자2 =>" + data));
        subject.onNext("hello3");
        subject.onComplete();
        subject.onNext("hello4");
        
        subject.subscribe(data -> System.out.println("구독자3 =>" + data));
        
    }
    
}
 
cs

구독자1, 2, 3 전부 hello3을 출력합니다.

[ 광고 보고 가시죠! ]



[ 감사합니다! ]


BehaviorSubject

구독자가 구독을 하면 가장 최근 값이나 기본 값을 넘겨주는 클래스

구독자가 subscribe()를 호출했을 때 발행된 값이 없다면 기본 값을 대신 발행

그 이후에는 최근 값을 모두 가져옵니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import io.reactivex.subjects.BehaviorSubject;
 
public class BehaviorSubjectEx {
 
    public static void main(String[] args) {
        BehaviorSubject<String> subject = 
                BehaviorSubject.createDefault("hello0");
        subject.subscribe(data -> System.out.println("구독자1 =>" + data));
        subject.onNext("hello1");
        subject.onNext("hello2");
        subject.subscribe(data -> System.out.println("구독자2 =>" + data));
        subject.onNext("hello3");
        subject.onComplete();
    }
 
}
 
cs



PublishSubject

가장 평범한 Subject, 구독자가 subscribe()를 호출하면 값을 발행.

오직 해당 시간에 발생한 데이터를 그대로 구독자에게 전달받음.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import io.reactivex.subjects.PublishSubject;
 
public class PublishSubjectEx {
 
    public static void main(String[] args) {
 
        PublishSubject<String> subject =  PublishSubject.create();
        subject.subscribe(data -> System.out.println("구독자1 => " + data));
        subject.onNext("Hi1");
        subject.onNext("Hi2");
        subject.subscribe(data -> System.out.println("구독자2 => " + data));
        subject.onNext("Hi3");
        subject.onComplete();
        
    }
 
}
 
cs


ReplaySubject

* Subject  은 뜨거운 Observable을 활용하는 것인데, ReplaySubject는

차가운 Observable 처럼 동작합니다.

구독자가 새로 생기면 항상 데이터를 처음부터 끝까지 발행하는 것을 보장.

=> 모든 데이터를 저장하는 과정에서 메모리 누수가 발생할 가능성을

염두해두고 사용할 때 주의 해야함.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import io.reactivex.subjects.ReplaySubject;
 
public class PublishSubjectEx {
 
    public static void main(String[] args) {
 
        ReplaySubject<String> subject =  ReplaySubject.create();
        subject.subscribe(data -> System.out.println("구독자1 => " + data));
        subject.onNext("Hi1");
        subject.onNext("Hi2");
        subject.subscribe(data -> System.out.println("구독자2 => " + data));
        subject.onNext("Hi3");
        subject.onComplete();
        
    }
 
}
 
cs


ConnectableObservable

차가운 Observable -> 뜨거운 Observable

특이한 점은 subscribe() 함수를 호출해도 아무 동작도 일어나지 않음.

추가로 connect() 함수까지 호출해주어야 합니다.

예제)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.TimeUnit;
 
import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
 
public class ConnectableObservableEx {
 
    public static void main(String[] args) {
        try {
        String[] greeting = {"Hi""Hello""What's up"};
        
        Observable<String> src = 
                         Observable.interval(100L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(i -> greeting[i])
                .take(greeting.length);
        
        ConnectableObservable<String> src2 = src.publish();
        src2.subscribe(data -> System.out.println("구독자1 => " + data));
        src2.subscribe(data -> System.out.println("구독자2 => " + data));
        src2.connect();
 
        Thread.sleep(250);
        src2.subscribe(data -> System.out.println("구독자3 => " + data));
        Thread.sleep(100);
        } catch (Exception e) {
 
        }
        
    }
 
}
 
cs

지금까지 위에서 사용한 예제들을 한 번씩 실행해보시면
각 클래스들에 대해서 이해하기 쉬우실 거에요.

다음 포스팅부터는 리액티브 연산자들에 대해 공부합니다.
이상입니다. 감사합니다.


반응형