Reactive programming
Request
the volume control. In Reactive Streams terms this is called backpressure.
Subscriber -> publisher 시그널을 보낸다! 얼마나 많은 데이터를 받기를 원하는지
publisher가 데이터 양을 조절
1 | This control of the demand is done at the Subscription level: a Subscription is created for each subscribe() call and it can be manipulated to either cancel() the flow of data or tune demand with request(long). |
- publisher가 subscribe를 하면
- onSubscribe 호출
- Subscription 객체가 전달, Subscriber가 멤버 변수로 subscroption 가지고 잇음
- onNext - 데이터 전달
( https://grokonez.com/java/java-9/java-9-flow-api-reactive-streams )
- Create a StepVerifier that initially requests all values and expect 4 values to be received
1
2
3
4
5
6
7StepVerifier requestAllExpectFour(Flux<User> flux) {
return StepVerifier.create(flux)
.expectNextCount(4)
.expectComplete()
;
}
request 제어
https://www.youtube.com/watch?v=8hB1C4OCbz0&list=PLfI752FpVCS9hh_FE8uDuRVgPPnAivZTY&index=8 백기선님 강의를 토대로 작성하였습니다.
request(unbounded)
1 | Flux.range(1, 100) |
로그 : request(unbounded)
request(10) 10개씩
1 |
|
Error
onErrorResume
Return a Mono
containing User.SAUL when an error occurs in the input Mono, else do not change the input Mono. 1
2
3
4
Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
return mono.onErrorResume(e -> Mono.just(User.SAUL));
}Return a Flux
containing User.SAUL and User.JESSE when an error occurs in the input Flux, else do not change the input Flux. 1
2
3
4
Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
return flux.onErrorResume(e -> Flux.just(User.SAUL, User.JESSE));
}
Error propagate
- propagate - 에러 변환 담당
1
2
3
4
5
6
7
8
9
10
11
12
13Mono.just("hello")
.log()
.map(s -> {
try{
return Integer.parseInt(s);
} catch (Exception e){
throw Exceptions.propagate(e);
}
})
.onErrorReturn(200)
.doOnNext(System.out::println)
.subscribe();