간단한 리액티브 애플리케이션 만들기
Flow클래스에 정의된 인터페이스 대부분은 직접 구현하도록 의도된 것이 아니다. 그럼에도 Java9 라이브러리는 이들 인터페이스를 구현하는 클래스를 제공하지 않는다.
이전에 언급한 Akka, RxJava 등의 리액티브 라이브러리에서는 이들 인터페이스를 구현했다.
java.util.concurrency.Flow 명세는 이들 라이브러리가 준수해야 할 규칙과 다양한 리액티브 라이브러리를 제시한다.
그렇지만 Flow API를 직접 이용해보고 애플리케이션을 개발하면서 Flow 클래스안에 4개의 인터페이스가 어떻게 작동하는지 알아보자.
예를 들어 로또 번호를 발급한다고 가정해보자.
1부터 45까지에 랜덤한 숫자를 6번 발급하도록 한다면 7번째 발급 시에 RuntimeException이 발생하도록 설정하였다.
@Getter
@ToString
public class LottoNumber {
public static final Random random = new Random();
private long number; // 발급 번호
private long order; // 순서
public LottoNumber(long number, long order) {
this.number = number;
this.order = order;
}
public static LottoNumber issue(long order) { // 번호 발급
// 6번째까지만 발급
if (order > 6) throw new RuntimeException("6번째 까지만 발급이 가능합니다.");
int randomNumber = random.nextInt(45);
return new LottoNumber(randomNumber, order);
}
}
간단한 도메인 모델을 정의한 다음에는 다음 예제에서 보여주는 것처럼 Subscriber가 요청할때마다 로또 번호를 발급받도록 Subscription을 구현한다.
public static class LottoNumberSubscription implements Flow.Subscription {
private final Flow.Subscriber<? super LottoNumber> subscriber;
public LottoNumberSubscription(Flow.Subscriber<? super LottoNumber> subscriber) {
this.subscriber = subscriber;
}
@Override
public void request(long n) { // Subscriber가 만든 요청을 실행
try {
subscriber.onNext(LottoNumber.issue(n));
} catch (Exception e) {
// 로또 번호 발급 실패 시 Subscriber로 에러전달
subscriber.onError(e);
}
}
@Override
public void cancel() {
// 구독이 취소되면 완료(onComplete) 신호를 Subscriber로 전달
subscriber.onComplete();
}
}
이번에는 새 로또 번호를 얻을 때마다 Subscription이 발급받은 번호를 출력하고 새 레포트를 요청하는 Subscriber 클래스를 구현한다.
public class LottoNumberSubscriber implements Flow.Subscriber<LottoNumber> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) { // 구독을 저장하고 첫 번째 요청을 전달
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(LottoNumber lottoNumber) { // 번호를 발급받고 다음 번호 발급을 요청
long order = lottoNumber.getOrder();
long value = lottoNumber.getNumber();
System.out.println(order + "번째 번호 : " + value);
subscription.request(order + 1);
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
다음 예제는 애플리케이션이 실제 동작할 수 있도록 Publisher를 만들고 LottoNumberSubscriber를 이용해 Publisher에 구독하도록 구현한 코드다.
private static Flow.Publisher<LottoNumber> getLottoNumbers() {
return subscriber -> subscriber.onSubscribe(new LottoNumberSubscription(subscriber));
}
getLottoNumbers().subscribe(new LottoNumberSubscriber());
여기서 getLottoNumbers 메서드는 Subscriber를 인수로 받아 Subscriber의 onSubscribe 메서드를 호출한다.
람다의 시그니처가 Publisher의 함수형 인터페이스의 유일한 메서드와 같은 시그니처를 가지므로 자바 컴파일러는 자동으로 람다를 Publisher로 바꿀 수 있다.
자세히 풀어보자면 아래와 같은 코드라고 할 수 있다.
private static Flow.Publisher<LottoNumber> getLottoNumbers() {
return new Flow.Publisher<LottoNumber>() {
@Override
public void subscribe(Flow.Subscriber<? super LottoNumber> subscriber) {
subscriber.onSubscribe(new LottoNumberSubscription(subscriber));
}
};
}
예제 코드를 실행해보면 다음과 같은 결과가 출력된다.
결과를 보면 로또 번호는 정상적으로 발급받다가 7번째 발급시에 설정한 RuntimeException이 발생해서 LottoNumberSubscriber의 onError 메서드가 실행되었다.
그런데 만약 7번째 발급에서 멈추기 않고 계속해서 번호를 발급한다면 어떻게 될까?
RuntimeException을 발생시키는 부분을 주석 처리하고 다시 실행해보자.
@Getter
@ToString
public static class LottoNumber {
...
public static LottoNumber issue(long order) {
// if (order > 6) throw new RuntimeException("6번째 까지만 발급이 가능합니다.");
int randomNumber = random.nextInt(45);
return new LottoNumber(randomNumber, order);
}
...
}
수정한 코드를 실행해보면 번호를 발급하다가 어느순간 StackOverflowError가 발생하게 된다.
이런 경우 Executor를 LottoNumberSubscription으로 추가한 다음 다른 스레드에서 LottoNumberSubscriber로 다음 요소를 전달하는 방법이 있다. 그러려면 다음 예제처럼 LottoNumberSubscription을 수정해야 한다.
public static class LottoNumberSubscription implements Flow.Subscription {
// ExxcutorService 생성
private static final ExecutorService executor = Executors.newSingleThreadExecutor();
private final Flow.Subscriber<? super LottoNumber> subscriber;
public LottoNumberSubscription(Flow.Subscriber<? super LottoNumber> subscriber) {
this.subscriber = subscriber;
}
@Override
public void request(long n) {
executor.submit(() -> { // 다른 스레드에서 다음 요소를 구독자에게 보낸다.
try {
subscriber.onNext(LottoNumber.issue(n));
} catch (Exception e) {
subscriber.onError(e);
}
});
}
...
}
실행 결과를 확인해보면 멀티쓰레드에서 비동기 식으로 번호가 발급되는 것을 확인할 수 있다.
위 예제에서 Flow API의 4개중 3가지 인터페이스를 사용해보았다.
Flow API에 4번째 인터페이스 Processor는 Subscriber이며 동시에 Publisher다.
사실 Processor의 목적은 Publisher를 구독한 다음 데이터를 가공해 다시 제공하는 것이다.
Processor를 이용해 이를 발급받은 번호에 100을 더해 반환하도록 해보자.
public static class LottoNumberProcessor implements Flow.Processor<LottoNumber, LottoNumber> {
private Flow.Subscriber<? super LottoNumber> subscriber;
@Override
public void subscribe(Flow.Subscriber<? super LottoNumber> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscriber.onSubscribe(subscription);
}
@Override
public void onNext(LottoNumber lottoNumber) {
subscriber.onNext(
new LottoNumber(lottoNumber.getNumber() + 100, lottoNumber.getOrder())
);
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
}
onNext 메서드를 보면 발급된 LottoNumber의 number값에 100을 더해 반환하도록 오버라이드하였다.
다른 메서드들은 업스트림 구독자에게 그대로 전달하도록 하였다.
그럼 위에서 구현했던 getLootoNumbers 메서드에 생성한 Processor를 적용하고 실행해보자.
private static Flow.Publisher<LottoNumber> getLottoNumbers() {
return subscriber -> {
LottoNumberProcessor lottoNumberProcessor = new LottoNumberProcessor();
lottoNumberProcessor.subscribe(subscriber);
lottoNumberProcessor.onSubscribe(new LottoNumberSubscription(lottoNumberProcessor));
};
}
getLottoNumbers().subscribe(new LottoNumberSubscriber());
Java는 왜 Flow API 구현을 제공하지 않는가?
그런데 맨처음 Flow 클래스에 정의된 인터페이스 직접 구현하도록 의도된 것이 아니라고 했다.
그런데 Java9에서는 Flow API 구현을 제공하지 않는다.
보통 Java 라이브러리는 List와 ArrayList처럼 인터페이스와 구현체를 제공해주고 있다.
반면 Java9에서는 Publisher<T> 인터페이스만 선언하고 구현을 제공하지 않으므로 직접 인스턴스를 구현해야 한다.
인터페이스가 프로그래밍의 구조를 만드는데 도움이 될 순 있지만 프로그램을 더 빨리 구현하는 데는 도움이 되지 않는다.
그럼 왜 제공하지 않을까?
API를 만들 당시 Akka, RxJava 등 다양한 리액티브 스트림의 자바 코드 라이브러리가 이미 존재했기 때문이다.
원래 발행-구독 사상에 기반해 리액티브 프로그래밍을 구현했지만, 이들 라이브러리는 독립적으로 개발되었고 서로 다른 이름 규칙과 API를 사용했다.
Java9의 표준화 과정에서 기존처럼 자신만의 방법이 아닌 이들 라이브러리는 공식적으로 java.util.concurrent.Flow의 인터페이스를 기반으로 리액티브 개념을 구현하도록 진화했다.
이 표준화 작업 덕분에 다양한 라이브러리가 쉽게 협력할 수 있게 되었다.
리액티브 라이브러리 RxJava 사용하기
RxJava는 Java로 리액티브 애플리케이션을 구현하는 데 사용하는 라이브러리다.
RxJava는 기반한 넷플릭스의 Reactive Extensions(Rx) 프로젝트의 일부로 시작되었다.
좋은 시스템 아키텍처 스타일을 유지하려면 시스템에서 오직 일부에 사용된 개념의 세부 사항을 전체 시스템에서 볼 수 있게 만들지 않아야 한다.
예를 들어 ArrayList 타입의 인수를 받을 것을 알지만 List 타입의 인수로 받음으로써 구현 세부사항을 밖에서 볼 수 없도록 구현하기도 하고, 나중에 ArrayList가 아닌 LinkedList로 바꾸더라도 기존 코드를 크게 바꿀 필요가 없게 된다.
따라서 Observable의 추가 구조가 필요한 상황에서만 Observable을 사용하고 그렇지 않으면 Publisher의 인터페이스를 사용하는 것이 좋다.
RxJava 문서를 읽다보면 Java9에서 리액티브 당김 기반 역압력 기능(request 메서드)이 있는 Flow를 초함하는 io.reactivex.Flowable 클래스를 확인할 수있다.
역압력은 Publisher가 너무 빠른 속도로 데이터를 발행하면서 Subscriber가 이를 감당할 수 없는 상황에 이르는 것을 방지하는 기능이다.
나머지 클래스는 역압력을 지원하지 않는 기존 버전의 RxJava에서 제공하던 Publisher io.reactivex.Observable 클래스다.
이 클래스는 단순한 프로그램, 마우스 움직임 같은 사용자 인터페이스 이벤트에 더 적합하다.
사용자 인터페이스 이벤트의 성능을 낮출 수 없기 때문에 역압력을 적용하는것이 어렵기 때문이다.
RxJava는 천 개 이하의 요소를 가진 스트림이나 GUI, 자주 발생하지 않는 이벤트에 역압력을 적용하지 말 것을 권장하고 있다.
모든 구독자는 request(Long.MAX_VALUE) 메서드를 이용해 역압력 기능을 끌 수 있지만 Subscriber가 정해진 시간 안에 수신한 모든 이벤트를 처리할 수 있다고 확신할 수 있는 상황이 아니라면 역압력 기능을 끄지 않는 것이 좋다.
Observable 만들고 사용하기
Observable, Flowable 클래스는 다양한 종류의 리액티브 스트림을 편리하게 만들 수 있도록 여러 팩토리 메서드를 제공한다.(Observable과 Flowable은 Publisher를 구현하므로 팩토리 메서드는 리액티브 스트림을 만든다.)
RxJava에서 Observable이 Flow API의 Publisher 역할을 하며 Observer는 Flow의 Subscriber 인터페이스 역활을 한다.
위에서 만든 예제를 RxJava API를 사용해서 구현해보자.
6번째 까지만 로또 번호가 발급되도록 위에서 주석 처리했던 부분을 다시 풀어준다.
@Getter
@ToString
public static class LottoNumber {
public static final Random random = new Random();
private long number;
private long order;
public LottoNumber(long number, long order) {
this.number = number;
this.order = order;
}
public static LottoNumber issue(long order) {
if (order > 6) throw new RuntimeException("6번째 까지만 발급이 가능합니다.");
int randomNumber = random.nextInt(45);
return new LottoNumber(randomNumber, order);
}
}
Observable의 팩토리 메서드를 사용해서 매 초마다 번호를 발급받도록 하는 Observable을 반환할 것이다.
private Observable<LottoNumber> getLottoNumbers() {
return Observable.create(
emitter -> Observable.interval(1, TimeUnit.SECONDS)
.subscribe(n -> {
try {
emitter.onNext(LottoNumber.issue(n + 1)); // 0 부터 시작하기 때문에 +1
} catch (Exception e) {
emitter.onError(e);
}
})
);
}
필요한 이벤트를 전송하는 ObservableEmitter 인터페이스는 RxJava의 기본 Emitter(onSubscribe 메서드가 빠진 Observer와 같음)를 상속한다.
Observable의 interval 팩토리 메서드를 사용해 1초마다 로또 번호를 발급받고 있다.
발급받은 번호를 출력하는 Observer도 구현해보자.
public class LottoNumberObserver implements Observer<LottoNumber> {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull LottoNumber lottoNumber) {
long order = lottoNumber.getOrder();
long value = lottoNumber.getNumber();
System.out.println(order + "번째 번호 : " + value);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
RxJava의 Observeable은 역압력을 지원하지 않으므로 전달된 요소를 처리한 다음 추가 요소를 요청하는 request 메서드가 필요 없기 때문에 Observer는 위에서 구현한 LottoNumberSubscriber 클래스와 비슷하지만 더 단순하다.
그럼 코드를 실행해 보자.
Observable<LottoNumber> observable = getLottoNumbers();
observable.blockingSubscribe(new LottoNumberObserver());
생성되는 Observable이 RxJava의 연산 스레드 풀(데몬 스레드)에서 실행되기 때문에 위 예제에서는 현재 우리가 사용한 스레드에서는 실행할 코드가 없어서 데몬 스레드에서 연산되기 전에 프로그램이 종료가 된다.
이를 막기위해 현재 스레드에서 콜백을 호출하는 blockingSubscribe 메서드를 사용했다.
마블 다이어그램(marble diagram)
RxJava나 기타 리액티브 라이브러리는 Java9 Flow API에 비해 스트림을 합치고, 만들고, 거르는 등의 풍부한 도구상자를 제공하는 것이 장점이다.
이미 위에 Processor를 구현해서 데이터를 변환할수도 있고, 스트림에서 관심있는 요소만 거른 다른 스트림을 만들거나 매핑 함수로 요소를 변환하거나 두 스트림을 다양한 방법으로 합치는 등의 작업을 할 수 있다.(이는 Processor만으로 구현하기는 어렵다.)
이런 변환, 합치기 함수는 상당히 복잡하므로 말로 설명하기가 상당히 어렵다.
리액티브 스트림 커뮤니티는 마블 다이어그램(marble diagram)이라는 시각적 방법을 이용해 이런 어려움을 해결하고자 노력한다.
마블 다이어그램은 수평선으로 표시된 리액티브 스트림에 임의의 순서로 구성된 요소가 기하하적 모형이 나타난다.
특수 기호는 에러나 완료 신호를 나타낸다. 박스는 해당 연산이 요소를 어떻게 변화하거나 여러 스트림을 어떻게 합치는지 보여준다.
이 표기법을 이용하면 모든 RxJava 라이브러리 함수를 시각적으로 표현할 수 있다.
아래는 map과 merge 함수의 마블 다이어그램이다.
Observable을 변환하고 합치기
위에 Processor을 사용해 발급받은 번호에 100을 더해 발급받은 것처럼 map을 사용해 구현해보자.
private static Observable<LottoNumber> getPlusLottoNumbers() {
return getLottoNumbers().map(lottoNumber ->
new LottoNumber(lottoNumber.getNumber() + 100, lottoNumber.getOrder())
);
}
간단한 위의 메서드는 getLottoNumbers 메서드가 반환하는 Observable을 받아 100을 더한 값으로 매 초 한개씩 번호를 발급하는 또 다른 Observable을 반환한다.
이와 마찬가지로 filter 함수를 사용하면 원하는 조건의 Observable만 필터링이 가능하다.
private static Observable<LottoNumber> getLowLottoNumbers() {
return getLottoNumbers().filter(lottoNumber -> lottoNumber.getNumber() < 20);
}
또한 merge 메서드를 사용해 여러개의 Observable 결과를 병합해 하나의 Observable처럼 동작할 수 도 있다.
위 예제에서 구현한 getLottoNumbers에 문자열 message를 인수로 받아 출력되도록 수정했다.
private Observable<LottoNumber> getLottoNumbers(String message) {
System.out.println(message);
return Observable.create(
emitter -> Observable.interval(1, TimeUnit.SECONDS)
.subscribe(n -> {
try {
emitter.onNext(LottoNumber.issue(n + 1));
} catch (Exception e) {
emitter.onError(e);
}
})
);
}
private static Observable<LottoNumber> getMergeLottoNumbers(String ... message) {
return Observable.merge(
Arrays.stream(message).map(StudyTest::getLottoNumbers).collect(Collectors.toList())
);
}
Observable의 스트림은 리스트로 모아지며 다시 리스트는 Observable 클래스가 제공하는 정적 팩토리 메서드 merge로 전달된다.
merge 메서드는 Observable의 Iterable을 인수로 받아 마치 한 개의 Observable처럼 동작하도록 결과를 합치고, 모든 Observable의 이벤트 발행물을 시간 순서대로 방출한다.
이제 위 코드를 실행해보자.
Observable<LottoNumber> observable = getMergeLottoNumbers(
"3월 1째주 로또 번호 발급",
"3월 2째주 로또 번호 발급",
"3월 3째주 로또 번호 발급"
);
observable.blockingSubscribe(new LottoNumberObserver());
'Java' 카테고리의 다른 글
[모던 자바] 재귀와 반복 (0) | 2022.04.04 |
---|---|
[모던 자바] 함수형 프로그래밍이란 무엇인가? (0) | 2022.04.04 |
[모던 자바] 리액티브 스트림과 Flow API란? (0) | 2022.03.30 |
[모던 자바] 리액티브 프로그래밍이란? (0) | 2022.03.30 |
[모던 자바] CompletableFuture를 활용한 비동기 작업 파이프라인 만들기 (0) | 2022.03.29 |