티스토리 뷰

Sping Framework

Mono [1]

kkoon9 2022. 2. 4. 22:53

현재 진행중인 사이드 프로젝트에서 사용하는 WebFlux를 분석하면서 정리한 글입니다.

 

Mono 내에는 많은 메서드가 존재하므로 사용했던 메서드 위주로 정리할 예정입니다.

찾으시는 메서드가 없다면 아래 적어논 docs를 참고해주시면 됩니다.

reactor.core.publisher 패키지에 존재하는 추상 클래스입니다.

기존 개발 형태는 데이터를 파라미터로 한번에 던지고 결과를 한 번에 받습니다.

결과가 다수면 Collection으로 받습니다.

Mono를 이용하여 위와 유사하게 작성할 수 있습니다.

List와 같은 container라고 생각하면 됩니다.

OnNext 신호를 통해 최대 한 개의 항목을 방출하는 기본 rx 연산자를 가진 Reactive Streams Publisher는 onComplete 신호(값 포함 또는 없음)로 종료하거나 단일 onError 신호(실패한 Mono)만 방출합니다.

대부분의 Mono 구현체는 Subscriber#onNext(T)를 호출한 후 Subscriber.onComplete()를 즉시 호출해야 합니다.

Mono.never()는 outlier입니다

어떠한 신호도 방출하지 않으며, 이는 기술적으로 금지되지는 않지만 테스트 밖에서는 그리 유용하지 않습니다.

반면에 onNext와 onError의 조합은 명시적으로 금지됩니다.

rx 연산자는 결과 mono의 "최대 한개" 속성을 보존하기 위해 입력 Mono 타입에 대한 별칭을 제공합니다.

예를 들어 flatMap은 Mono를 반환하지만 1개 이상의 방출을 가진 flatMapMany 별칭이 있습니다.

리턴값이 없는 Publisher에는 Mono<Void>를 사용해야 합니다.

implements 또는 return type에서 사용하도록 제작되었으며 입력 매개 변수는 가능한 한 raw Publisher 사용해야 합니다.

Mono Operator 내에서 사용되는 java.util.function나 lamda에서 상태를 사용하는 것은 여러 subscriber 간에 공유될 수 있으므로 피해야 합니다.

Mono 내 Static Generators

1. create

콜백 기반 API와 함께 사용하여 하나의 값(전체 또는 오류 신호)으로 신호를 보낼 수 있는 deferred emitter를 만듭니다.

callback 방식과 listener 방식이 있습니다.

🐻 이제 소개할 두 가지 방식은 현재 주석처리된 상태입니다.
  1. addListener(), removeListener() pairs

listener를 인스턴스화하는 API와 같은 작업하려면 listener에서 sink를 호출한 다음 소스에 등록해야 한다.

Mono.<String>create(sink -> {
     HttpListener listener = event -> {
         if (event.getResponseCode() >= 400) {
             sink.error(new RuntimeException("Failed"));
         } else {
             String body = event.getBody();
             if (body.isEmpty()) {
                 sink.success();
             } else {
                 sink.success(body.toLowerCase());
             }
         }
     };

     client.addListener(listener);

     sink.onDispose(() -> client.removeListener(listener));
 });

이 기능은 single-value emitting listeners에서만 작동합니다.

그렇지 않으면 이후의 모든 신호가 삭제됩니다.

다음 코드를 listener 본문에 추가해야 할 수 있습니다.

client.removeListener(this);
  1. callback handler

이는 위와 같은 유사한 인스턴스화 패턴을 요구하지만 대개 성공적인 완료와 오류는 다른 방법으로 분리됩니다.

게다가 레거시 API는 일부 취소 메커니즘을 지원할 수도 있고 지원하지 않을 수도 있다.

Mono.<String>create(sink -> {
     Callback<String> callback = new Callback<String>() {
         @Override
         public void onResult(String data) {
             sink.success(data.toLowerCase());
         }

         @Override
         public void onError(Exception e) {
             sink.error(e);
         }
     }

     // without cancellation support:

     client.call("query", callback);

     // with cancellation support:

     AutoCloseable cancel = client.call("query", callback);
     sink.onDispose(() -> {
         try {
             cancel.close();
         } catch (Exception ex) {
             Exceptions.onErrorDropped(ex);
         }
     });
 });
🐻 이제 소개할 방식이 현재 Mono에서 사용하는 create입니다.
public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
	    return onAssembly(new MonoCreate<>(callback));
	}

create를 설명하기 전에 Consumer와 MonoSink, MonoCreate를 먼저 살펴보는 게 좋을 것 같습니다.

2022.02.04 - [Computer Science/JAVA] - Consumer

 

Consumer

함수형 인터페이스입니다. 단일 입력 인수를 수락하고 결과를 반환하지 않는 작업을 나타냅니다. 대부분의 다른 함수형 인터페이스와 달리 Consumer는 사이드 이팩트을 통해 작동할 것으로 예상

kkoon9.tistory.com

2022.02.04 - [Sping Framework] - MonoSink

 

MonoSink

현재 진행중인 사이드 프로젝트에서 사용하는 WebFlux를 분석하면서 정리한 글입니다. 아무 값도 없거나 단일 값 또는 오류를 내보내지 않도록 실제 다운스트림 subscriber 주위에 Wrapper 인터페이스

kkoon9.tistory.com

그리고 onAssembly를 리턴하는데 onAssembly를 먼저 설명하겠습니다.

protected static <T> Mono<T> onAssembly(Mono<T> source) {
		Function<Publisher, Publisher> hook = Hooks.onEachOperatorHook;
		if(hook != null) {
			source = (Mono<T>) hook.apply(source);
		}
		if (Hooks.GLOBAL_TRACE) {
			AssemblySnapshot stacktrace = new AssemblySnapshot(null, Traces.callSiteSupplierFactory.get());
			source = (Mono<T>) Hooks.addAssemblyInfo(source, stacktrace);
		}
		return source;
	}

커스텀 연산자가 사용하는 방법: Mono가 지정된 어셈블리 Hooks pointcut을 호출하여 새 Mono를 반환할 수 있습니다.

예를 들어 이것은 조립 시 cross-cutting concern를 활성화하는 데 유용합니다(예: checkpoint()).

Parameters:

source - assembly hook를 부착할 소스

Returns:

Hooks라는 추상 클래스의 addAssemblyInfo 메서드 결과를 Mono로 감싼 source를 리턴합니다.

MonoCreate는 Mono 추상 클래스를 상속받고 SourceProducer 인터페이스를 구현한 상수 클래스입니다.

reactor.core.publisher 패키지 내에 있습니다.

다운스트림 Subscriber를 단일 방출 개체로 래핑하고 지정된 콜백을 호출하여 동기적 또는 비동기적으로 신호를 생성합니다.

SourceProducer는 Scannable, Publisher 인터페이스를 구현한 인터페이스입니다.

SourceProducer는 일련의 연산자에 다시 연결될 목적으로 스캔 가능한 Publisher입니다.

소스 참조에 의해서만 다운스트림에서 업스트림으로 연결될 수 있기 때문에 그 자체로는 연산자의 계층 구조를 이동시킬 수 없습니다.

public static <T> Mono<T> just(T data) {
	return onAssembly(new MonoJust<>(data));
}

지정된 항목을 내보내는 새 Mono를 만듭니다.

이 항목은 인스턴스화할 때 캡처됩니다.

public static <T> Mono<T> error(Throwable error) {
	return onAssembly(new MonoError<>(error));
}

MonoError 생성자를 source 파라미터로 가지는 onAssembly 메서드를 리턴합니다.

public static <T> Mono<T> empty() {
	return MonoEmpty.instance();
}

onSubscribe과 onComplete만 호출하는 빈 publisher를 리턴합니다.

이 publisher는 효과적으로 상태 비저장 상태이며 단일 인스턴스만 있습니다.

public final <T2> Mono<Tuple2<T, T2>> zipWith(Mono<? extends T2> other) {
	return zipWith(other, Flux.tuple2Function());
}

이 모노와 다른 모노의 결과를 Tuple2로 결합합니다.

소스의 오류 또는 빈 완료로 인해 다른 소스가 취소되고 그 결과 모노가 각각 즉시 오류 또는 완료됩니다.

public final <T2, O> Mono<O> zipWith(Mono<? extends T2> other,
		BiFunction<? super T, ? super T2, ? extends O> combinator) {
	if (this instanceof MonoZip) {
		@SuppressWarnings("unchecked") MonoZip<T, O> o = (MonoZip<T, O>) this;
		Mono<O> result = o.zipAdditionalSource(other, combinator);
		if (result != null) {
			return result;
		}
	}

	return zip(this, other, combinator);
}

제공된 조합 함수에 의해 정의된 대로 이 모노와 다른 모노의 결과를 임의의 O 객체로 결합합니다.

references

Mono (reactor-core 3.4.14)

 

Mono (reactor-core 3.4.14)

static  Mono using(Callable  resourceSupplier, Function > sourceSupplier, Consumer  resourceCleanup, boolean eager) Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a Mono derived from the same re

projectreactor.io

'Sping Framework' 카테고리의 다른 글

Spring Boot에 Sentry 끼얹기 [2]  (1) 2022.04.03
Spring Boot에 Sentry 끼얹기 [1]  (0) 2022.03.26
MonoSink  (0) 2022.02.04
webflux - RequestPredicate(interface)  (0) 2022.02.01
[Spring boot] 테스트 코드 작성  (0) 2020.02.14
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/01   »
1 2 3
4 5 6 7 8 9 10
11 12 13 14 15 16 17
18 19 20 21 22 23 24
25 26 27 28 29 30 31
글 보관함