reactive streams 표준 라이브 러리에 대한 이해와 실습
reactive streams은 데이터 스트림을 Non-Blocking하면서, 비동기적 방식으로 처리하기 위한 리액티브 라이브러리 표준 사양이다.
즉, 리액티브한 코드를 작성하기 위한 라이브러리로 개발자는 보다 쉽게 리액티브 어플리케이션을 구성할 수 있게 만든다.
이를 실습하기 위해서 우선, java 프로젝트를 생성한 후에 해당 라이브러리를 사용하기 위한 빌드를 해준다. Gradle을 기준으로 아래와 같은 의존성을 주입해준다.
// https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams
implementation group: 'org.reactivestreams', name: 'reactive-streams', version: '1.0.0'
이후 빌드를 진행하고 외부 라이브러리에 추가된 모습을 확인할 수 있다.
각 인터페이스들을 사용하여 리액티브 프로그래밍을 진행할 수 있다.
이제 각 인터페이스를 구현하여 간단한 리액티브 프로그램을 구현해보자.
Publisher 인터페이스
먼저, Publisher 인터페이스이다.
구독 - 발행의 관계로 바라보았을 때, Publisher는 출판사에 해당한다고 고려하면 편하다.
해당 인터페이스를 implements 하면 subscribe 메소드를 오버라이딩을 통한 구현이 필요하다.
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import java.util.Arrays;
public class MyPub implements Publisher<Integer> {
Iterable <Integer> data = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
System.out.println("구독자 : 당신의 서비스를 구독할게요.");
System.out.println("발행처 : 구독 정보를 만들어 주겠습니다.");
// 구독 정보 생성
MySubscription subscription = new MySubscription(subscriber, data);
System.out.println("발행처 : 구독 정보 생성 완료했습니다. 받으세요.");
// 구독자에게 구독 정보 주기
subscriber.onSubscribe(subscription);
}
}
그리고 해당 메소드를 호출하기 위해, Subscriber 타입의 객체를 파라미터로 받는다. 해당 클래스에 대한 구현을 진행해보자.
Subscriber는 구독자에 해당하는 기능과 행위들을 구현하는 인터페이스에 해당한다.
그리고, Subscription은 사용하여 구독 정보를 다루는 인터페이스로 위의 코드에서는 MySubscription 타입에 해당하여 뒤에 따로 관련 내용과 코드들을 보며 추가적으로 이해해보자.
다만 현재 구독 정보를 생성함에 있어( Publisher ), 파라미터로 구독자 정보와 오버라이딩한 메소드 내부에서 선언한 data변수를 넘겨 정보를 만든다는 점만 이해하자.
요약해보자면, 발행처인 Publisher 인터페이스는 Interger 정보( 제네릭 )를 다루며,
subscribe 메소드로 구독자의 정보를 받아, 구독 정보를 생성하고 구독자에게 구독 정보를 주는 역할과 기능을 수행한다.
Subscriber 인터페이스
한편, Subscriber 인터페이스는 구독자의 역할과 기능을 정의하는 인터페이스에 해당한다.
해당 인터페이스를 implements하면 4개의 메소드를 오버라이딩으로 구현하여 구독자에 해당하는 객체의 기능과 역할을 정의해야한다.
이에 구현한 아래의 코드를 보면서 이해해보자.
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class MySub implements Subscriber<Integer> {
private Subscription subscription;
private int bufferSize =3;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("구독자 : 구독 정보 잘 받음");
this.subscription = subscription;
System.out.println("구독자 : 매일 3번 상품을 줘");
subscription.request(bufferSize); // (백프레셔) 구독자(=소비자)가 한번에 처리할 수 있는 개수를 요청하는 것
}
@Override
public void onNext(Integer o) {
System.out.println("구독 데이터 전달");
System.out.println("onNext : " + o);
bufferSize--;
if(bufferSize == 0){
System.out.println("----다음날----");
bufferSize = 3;
// 메모리에 request()에 대한 데이터를 처리한 후에 다시 받아서 소비
subscription.request(bufferSize);
}
}
@Override
public void onError(Throwable throwable) {
System.out.println("구독 중 에러 발생");
}
@Override
public void onComplete() {
System.out.println("구독 완료");
}
}
일단, 각 메소드 마다의 구현하는 기능이 무엇인지 인지하고 이해해보자.
먼저, onSubscribe 메서드는 앞서 Publisher 인터페이스에서 구독자에게 구독 정보를 주기위해 사용했던 메소드이다.
따라서 구독자 객체인 Subscriber는 해당 메소드를 가지고 Publisher에서 보낸 정보 받는 것이다.
그리고 그 정보에 해당하는 객체의 타입은 역시나 Publisher에서 생성한 Subscription 객체이다.
( Publisher의 코드에서 MySubscription에 해당 )
이어서 Subscription 인스턴스의 request 메서드를 호출하는 데 이는 백프레셔 기능에 해당한다.
'백프래셔'
는 구독자가 요구하는 요청량의 데이터를 받아서 처리할 수 있도록 도와주는 메소드이다.
만약, 구독만으로 모든 데이터를 한번에 구독자가 받아야하는 경우에 그 데이터의 양이 너무 많다면,
처리할 수 없어 OOM이 발생할 수도 있는 등의 다양한 고려 사항들이 있을 수 있다.
따라서 백프래셔는 구독자가 수용할 수 있는 만큼만 데이터를 요청하는 방식이다.
위의 코드에서는 임의의 bufferSize 만큼 데이터를 처리하고 있다.
이어서 onNext 메서드의 역할은 Publisher에서 넘겨준 데이터를 처리하는 로직을 구현하는 기능을 담당한다.
위의 코드에서는 bufferSize만큼의 데이터를 읽고난 처리한 뒤에 다시 bufferSize만큼 데이터를 처리하도록 구성되었다.
( subscription 인스턴스의 requeset 호출이 데이터를 소비 )
나머지 onError 메서드는 구독 중에 에러가 발생하면 처리하는 기능을 담당하며,
onComplete 메서드는 Publisher가 데이터 통지를 완료했을 때 후처리 역할을 한다.
Subscription 인터페이스
이제 구독 정보를 생성과 관리의 기능을 담당하는 Subscription 인터페이스에 대해서 알아보자.
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.Iterator;
// 구독 정보 (구독자, 어떤 데이터를 구독할지)
public class MySubscription implements Subscription {
// 넘겨받은 구독자 정보와 데이터 정보
private Subscriber s;
private Iterator it;
public MySubscription(Subscriber<? super Integer> subscriber, Iterable<Integer> data) {
s = subscriber;
it = data.iterator();
System.out.println("발행처 : 구독 정보 생성 완료");
}
@Override
public void request(long l) {
while( l > 0) {
if(it.hasNext()) {
s.onNext(it.next());
} else {
s.onComplete();
break;
}
l--; // 파라미터로 넘어온 수가 0과 같아지면 break
}
}
@Override
public void cancel() {
}
}
먼저 생성자 주입을 통해서 Publisher객체가 정보를 생성할 때, 구독자 정보와 데이터를 담도록 구현되었다.
( 앞서 Publisher 인터페이스에서 해당 타입의 인스턴스를 생성할 때, 넘겨준 파라미터에 해당 )
이어서 데이터 소비를 하는 request 메서드에서 호출 시 받은 파라미터와 Publisher가 넘겨준 데이터를 활용하여 파리미터를 조건으로 데이터를 순회하며 처리하고 있다.
그리고 순회하면서 순회한 값을 Subscriber 객체의 onNext 메서드의 파라미터로 넣어 호출한다.
그럼, onNext 메서드가 호출되어 데이터를 처리하여 구독자가 이를 소비할 수 있도록 기능하는 것이다.
현재, bufferSize를 3으로 설정하였기에 구독자는 3개의 단위로 데이터를 소비하는 동작을 수행할 것이다.
해당 기능을 확인하기 위해, main 메서드에 아래와 같이 코드를 작성한 뒤에 실행하면, 3개 단위로 데이터를 Streaming하여 처리하는 출력문을 확인할 수 있다.
public class Main {
public static void main(String[] args) {
MyPub pub = new MyPub();
MySub sub = new MySub();
pub.subscribe(sub);
}
}
이처럼 일반적으로 collection에 데이터를 담아서 한번에 처리하는 개발 양식과 달리 데이터를 흘리듯
일정량으로 끊어서 처리하는 Streaming / Publisher-Subscriber 형식의 라이브러리에 대한 간단한 실습을 진행했다.
그리고 Spring에서 지원하는 WebFlux 역시도 이와같은 개념을 기반으로 제공하는 기술이다.
Reactive Streams API 를 기반으로 만든 기술이 곧 WevFlux라고 생각하면 되겠다.
마치 우리가 Spring 프레임 워크를 이해하기 전에 순차적으로 JAVA를 학습하는 것처럼,
Reactive Streams 역시 비동기, Non-Blocking을 위한 소프트웨어를 구현하기에 Webflux의 사용에 앞서 중요한 하나의 필수 사항이다.
'Spring > WebFlux' 카테고리의 다른 글
리액티브 프로그래밍( text/event-streaming MIME TPYE ) (0) | 2024.12.26 |
---|---|
리액티브 프로그래밍 (0) | 2024.12.26 |
댓글