[모던 자바] 리액티브 스트림과 Flow API란?
리액티브 스트림이란?
리액티브 프로그래밍은 리액티브 스트림을 사용하는 프로그래밍이다.
리액티브 스트림은 잠재적으로 무한의 비동기 데이터를 순서대로 그리고 블록 하지 않는 역압력을 전재해 처리하는 표준 기술이다.
역압력은 발행-구독 프로토콜에서 이벤트 스트림의 구독자가 이벤트를 소비하는 시간이 발행자가 이벤트를 제공하는 속도보다 느릴 때 문제가 발생하지 않도록 보장하는 장치다.
이런 상황이 발생하였을 때 부하가 발생한 컴포넌트가 완전 불능이 되거나 예기치 않는 방식으로 이벤트를 잃어버리는 등의 문제가 발생하지 않는다. 부하가 발생했을 때 발행자가 충분한 알림을 받을 수 있어야 한다.
실제 비동기 작업이 실행되는 동안 시스템에는 암묵적으로 블록 API로 인해 역압력이 제공되는 것이다.
안타깝게도 비동기 작업을 실행하는 동안에는 그 작업이 완료될 때까지 다른 유용한 작업을 실행할 수 없으므로 기다리면서 많은 자원을 낭비하게 된다.
반면 비동기 API를 이용하면 하드웨어 사용률을 극대화할 수 있지만 다른 느린 다운스트림 컴포넌트에 너무 큰 부하를 줄 가능성도 생긴다. 따라서 이런 상황을 방지할 수 있도록 역압력이나 제어 프름 기법이 필요하다.
이들 기법은 데이터 수신자가 스레드를 블록 하지 않고도 데이터 수신자가 처리할 수 없을 만큼 많은 데이터를 받는 일을 방지하는 프로토콜을 제공한다.
넥플릭스, 레드햇, 트위터, 라이트 밴드 및 기타 회사들이 참여한 리액티브 스트림 프로젝트에서는 모든 리액티브 스트림 구현이 제공해야 하는 최소 기능 집합을 네 개의 관련 인터페이스로 정의했다.
Java9에 새로운 java.util.concurrent.Flow 클래스뿐 아니라 Akka 스트림(라이트 밴드), 리액터(피보탈), RxJava(넷플릭스), Vert.x(레드햇) 등 많은 서드 파티 라이브러리에서 이들 인터페이스를 구현한다.
Flow 클래스 소개
Java9에서는 리액티브 프로그래밍을 제공하는 클래스 java.util.concurrent.Flow를 추가했다.
이 클래스는 정적 컴포넌트 하나를 포함하고 있으며 인스턴스 화할 수 없다.
리액티브 스트림 프로젝트의 표준에 따라 프로그래밍 발행-구독 모델을 지원할 수 있도록 Flow 클래스는 중첩된 인터페이스 네 개를 포함한다.
- Publisher
- Subscriber
- Subscription
- Processor
Publisher가 항목을 발행하면 Subscriber가 한 개씩 또는 한 번에 여러 항목을 소비하는데 Subscription이 이 과정을 관리할 수 있도록 Flow 클래스는 관련된 인터페이스와 정적 메서드를 제공한다.
Publisher는 수많은 일련의 이벤트를 제공할 수 있지만 Subscriber의 요구사항에 따라 역압력 기법에 의해 이벤트 제공 속도가 제한되고, Subscription은 Publisher와 Subscriber 사이의 제어 흐름, 역압력을 관리한다.
Publisher는 자바의 함수형 인터페이스로, Subscriber는 Publisher가 발행한 이벤트로 리스너로 자신을 등록할 수 있다.
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
Subscriber 인터페이스는 Publisher가 관련 이벤트를 발행할 때 호출할 수 있도록 콜백 메서드 네 개를 정의한다.
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
이들 이벤트는 다음 프로토콜에서 정의한 순서로 지정된 메서드 호출을 통해 발행되어야 한다.
Subscriber가 Publisher에 자신을 등록할 때 Publisher는 처음으로 onSubscribe 메서드를 호출해 Subscription 객체를 전달하고 이어서 onNext가 여러 번 호출될 수 있다.
이벤트 스트림은 계속 지속되거나 onComplete 콜백을 통해 더 이상의 데이터가 없고 종료됨을 알릴 수 있으며, Pushlisher 장애가 발생했을 때는 onError를 호출할 수 있다.
Subscription 인터페이스는 두 개의 메서드를 정의한다.
public static interface Subscription {
public void request(long n);
public void cancel();
}
Publisher에게 주어진 개수의 이벤트를 처리할 준비가 되었음을 request 메서드를 통해 알릴 수 있고, cancel 메서드는 Subscription을 취소, 즉 Publisher에게 더 이상 이벤트를 받지 않음을 통지한다.
마지막 Processor 인터페이스는 단지 Publisher와 Subscriber를 상속받을 뿐 아무 메서드도 추가하지 않는다.
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
실제 이 인터페이스는 리액티브 스트림에서 처리하는 이벤트의 변환 단계를 나타낸다.
Processor가 에러를 수신하면 이로부터 회복하거나(Subscription은 취소로 간주) 즉시 onError 신호로 모든 Subscriber에 에러를 전파할 수 있다.
마지막 Subscriber가 Subscription을 최소 하면 Processor는 자신의 업스트림 Subscription도 취소함으로 취소 신호를 전파해야 한다.
Flow 명세서
Java9 Flow 명세서에서는 이들 인터페이스 구현이 어떻게 서로 협력해야 하는지를 설명하는 규칙 집합을 정의한다.
- Publisher는 반드시 Subscription의 request 메서드에 정의된 개수 이하의 요소만 Subscriber에 전달해야 한다.
- Publisher는 지정된 개수보다 적은 수의 요소를 onNext로 전달할 수 있으며 동작이 성공적으로 끝났으면 onComplete를 호출하고 문제가 발생하면 onError를 호출해 Subscription을 종료할 수 있다.
- Subscriber는 요소를 받아 처리할 수 있음을 Publisher에 알려야 한다. 이런 방식으로 Subscriber는 Publisher에 역압력을 행사항 수 있고 Subscriber가 관리할 수 없이 너무 많은 요소를 받는 일을 피할 수 있다.
- onComplete나 onError 신호를 처리하는 상황에서 Subscriber는 Publisher나 Subscription의 어떤 메서드도 호출할 수 없으며, Subscription이 최소 되었다고 가정해야 한다.
- Subscriber는 Subscription.request() 메서드 호출이 없이도 언제든 종료 시그널을 받을 준비가 되어있어야 하며 Subscription.cancel()이 호출된 이후에라도 한 개 이상의 onNext를 받을 준비가 되어있어야 한다.
- Publisher와 Subscriber는 정확하게 Subscription을 공유해야 하며 각각이 고유한 역할을 수행해야 한다. 그러려면 onSubscribe와 onNext 메서드에서 Subscriber는 request 메서드를 동기적으로 호출할 수 있어야 한다.
- 표준에서는 Subscription.cancel() 메서드는 몇 번을 호출해도 한 번 호출한 것과 같은 효과를 가져야 하며, 여러 번 이 메서드를 호출해도 다른 추가 호출에 영향이 없도록 ThreadSafe 해야 한다고 명시한다.
- 같은 Subscriber 객체에 다시 가입하는 것은 권장하지 않지만 이런 상황에서 예외가 발생해야 한다고 명세서가 강제하진 않는다. 이전에 취소된 가입이 영구적으로 적용되었다면 이후의 기능에 영향을 주지 않을 가능성도 있기 때문이다.
Java 9 플로 API/리액티브 스트림 API에서는 Subscriber 인터페이스의 모든 메서드 구현이 Publisher를 블록 하지 않도록 강제하지만 이들 메서드가 이벤트를 동기적으로 처리해야 하는지 아니면 비동기적으로 처리해야 하는지는 지정하지 않는다.
하지만 이들 인터페이스에 정의된 모든 메서드는 void를 반환하므로 온전히 비동기 방식으로 이들 메서드를 구현할 수 있다.