RxJava2, RxAndroid2

[RxJava2] Observable 클래스와 팩토리 함수들, just(), subscribe(), create()

알통몬_ 2018. 9. 20. 12:19
반응형


공감 및 댓글은 포스팅 하는데

 아주아주 큰 힘이 됩니다!!

포스팅 내용이 찾아주신 분들께 

도움이 되길 바라며

더 깔끔하고 좋은 포스팅을 

만들어 나가겠습니다^^

 


RxJava2 의 핵심이라고 할 수 있는 Observable 클래스에 대해 공부합니다.


RxJava 2.2.1 버전을 사용합니다.


Observable 클래스

옵저버 패턴을 구현합니다.

옵저버 패턴이란?

객체의 상태 변화를 관찰하는 옵저버 목록을 객체에 등록 후

상태 변화가 있을 때마다 메서드를 호출해 객체가 직접 목록의

각 옵저버에게 변화를 알려줍니다. 라이프 사이클은 존재하지 않고

일반적으로 단일 함수를 통해 변화만 알려줍니다.

ex) 버튼을 누르면 미리 등록해 둔 onClick() 메서드를 호출하여 원하는

처리를 하는 것


Observable은 최대 세 가지의 알림을 구독자에게 전달합니다.

onNext()

Observable이 데이터 발행을 알림, 옵저버 패턴과 동일.


onError()

Observable 에서 어떠한 이유로 에러가 발생했는지 알려 줍니다.

onError()이 발생하면 이 후 onNext() 와 onComplete() 이벤트는

발생하지 않고, Observable이 실행을 종료합니다.


onComplete()

모든 데이터의 발행이 끝났음을 알림.

즉, onNext()가 끝날 때까지 onError 이벤트가 실행되지 않았다면

가장 마지막에 발생하는 이벤트.

=> onComplete 이후에는 onNext가 발생하면 안됨.


간단 예제

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package rx.java.chapter01;
 
import io.reactivex.Observable;
 
public class RxJavaEx {
 
    public static void main(String args[]) {
 
        Observable.just("hello""world").subscribe(
                System.out::println// onNext()
                e -> e.getMessage(), // onError()
                () -> System.out.println("onCompleted"// onComplete()
        );
 
    }
 
}
cs

결과는 각자 실행해보시면 됩니다.

아래와 같은 경우 onError이 발생합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package rx.java.chapter01;
 
import io.reactivex.Observable;
 
public class RxJavaEx {
 
    public static void main(String args[]) {
 
        Observable.just("hello"0).subscribe(
                str -> System.out.println(Integer.parseInt(String.valueOf(str))), // onNext()
                e -> System.out.println(e.getMessage()), // onError()
                () -> System.out.println("onCompleted"// onComplete()
        );
 
    }
 
}
cs


Observable의 팩토리 함수들.

just(), create(), fromArray(), fromIterable(), fromCallable(), fromFuture(), fromPublisher(), interval(), range(), timer(), defer() 등등.


just()

데이터를 발행하는 가장 쉬운 방법 : 기존의 자료 구조를 사용하는 것.


just()는 인자로 넣은 데이터를 차례로 발행하려고 Observable을 생성합니다.

* 실제 데이터의 발행은 subscribe()를 호출해야 함.

just()의 인자는 1~10개 값까지 넣을 수 있고 인자들의 타입은 모두 동일해야 합니다.


just()는 들어온 인자를 그대로 발행합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
package rx.java.chapter01;
 
import io.reactivex.Observable;
 
public class RxJavaEx {
 
    public static void main(String args[]) {
 
        Observable.just(1,2,3,4,5,6,7,8,9,10).subscribe(System.out::println);
 
    }
 
}
cs



subscribe() 와 Disposable 객체

RxJava 에서는 내가 동작시키기 원하는 것을 사전에 정의해둔 후 실행되는 시점을

조절할 수 있습니다. 이때 subscribe() 를 사용합니다.

Observable 은 just() 등의 팩토리 함수로 데이터의 흐름을 정의한 후에

subscribe() 를 호출해야 실제로 데이터를 발행합니다.


subscribe()는 4개로 오버로딩되어 있습니다.

subscribe()

onError 이벤트가 발생했을 때만 OnErrorNotImplementedException을 던집니다.

=> Observable로 작성한 코드를 테스트, 디버깅할 때 활용.


subscribe(Consumer<? super T> onNext)

onNext 이벤트를 처리. noError 이벤트가 발생한다면 위와 마찬가지로

OnErrorNotImplementedException을 던집니다.


subscribe(Consumer<? super T> onNext, Cunsumer<? super Throwable> onError)

onNext, onError 이벤트 처리


subscribe(Consumer<? super T> onNext, Cunsumer<? super Throwable> onError, Action onComplete)

onNext, onError, onComplete 이벤트 처리


위 4개의 메서드는 Disposable 인터페이스의 객체를 리턴합니다.

Disposable

RxJava1.x의 구독 객체에 해당.

메서드는 아래 2개만 있습니다.

void dispose()

Observable이 더이상 데이터 발행을 하지 않도록 구독을 해지하는 메서드


boolean isDisposed()

이름처럼 Observable이 데이터를 발행하지 않는지 확인하는 메서드


예제

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package rx.java.chapter01;
 
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
 
public class RxJavaEx {
 
    public static void main(String args[]) {
 
        Disposable disposable = Observable.just("hello""world").subscribe(
                System.out::println// onNext()
                e -> System.out.println(e.getMessage()), // onError()
                () -> System.out.println("onCompleted"// onComplete()
        );
        
    
        System.out.println("Observable is disposed => " + disposable.isDisposed());
 
    }
 
}
cs



create()

just() 의 경우 데이터를 인자로 넣을 경우 자동으로 알림 이벤트가 발생합니다.

create()는 onNext, onError, onComplete를 개발자가 직접호출합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class RxJavaEx {
 
    public static void main(String args[]) {
        
        Observable.create(new ObservableOnSubscribe<String>() {
 
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("hello");
                emitter.onNext("world");
                emitter.onComplete();
                emitter.onError(new Throwable());
            }
            
        }).subscribe(System.out::println);
    }
 
}
cs


위 예제들을 보면 전부 subscribe의 인자들을 람다식을 사용했는데, 그냥 일반적으로

사용할 수도 있습니다.

1
2
3
4
5
6
7
8
9
10
Observable.just("hello""world").subscribe(new Consumer<String>() {
    @Override
    public void accept(String t) throws Exception {
        // TODO Auto-generated method stub
        System.out.println(t);
    }
            
});
 
cs


이상입니다.

다음 포스팅에서는 fromXXX() 에 대해 공부합니다.


반응형