Reactive programming

Request

  • the volume control. In Reactive Streams terms this is called backpressure.

  • Subscriber -> publisher 시그널을 보낸다! 얼마나 많은 데이터를 받기를 원하는지

  • publisher가 데이터 양을 조절

1
This control of the demand is done at the Subscription level: a Subscription is created for each subscribe() call and it can be manipulated to either cancel() the flow of data or tune demand with request(long).
  1. publisher가 subscribe를 하면
  2. onSubscribe 호출
  3. Subscription 객체가 전달, Subscriber가 멤버 변수로 subscroption 가지고 잇음
  4. onNext - 데이터 전달
    ( https://grokonez.com/java/java-9/java-9-flow-api-reactive-streams )
  1. Create a StepVerifier that initially requests all values and expect 4 values to be received
    1
    2
    3
    4
    5
    6
    7
     	StepVerifier requestAllExpectFour(Flux<User> flux) {
    return StepVerifier.create(flux)
    .expectNextCount(4)
    .expectComplete()

    ;
    }

request 제어

https://www.youtube.com/watch?v=8hB1C4OCbz0&list=PLfI752FpVCS9hh_FE8uDuRVgPPnAivZTY&index=8 백기선님 강의를 토대로 작성하였습니다.

request(unbounded)

1
2
3
4
Flux.range(1, 100)
.log()
.doOnNext(System.out::println)
.subscribe();

로그 : request(unbounded)

request(10) 10개씩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
  
Flux.range(1, 100)
.log()
.doOnNext(System.out::println)
.subscribe(new Subscriber<Integer>() {

private Subscription subscription;
private int cnt;
@Override
public void onSubscribe(Subscription subscription) {


this.subscription = subscription;
this.subscription.request(10);
}

@Override
public void onNext(Integer integer) {

cnt++;
if(cnt%10==0){
this.subscription.request(10);
}
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onComplete() {

}
});

Error

onErrorResume

  1. Return a Mono containing User.SAUL when an error occurs in the input Mono, else do not change the input Mono.

    1
    2
    3
    4

    Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
    return mono.onErrorResume(e -> Mono.just(User.SAUL));
    }
  2. Return a Flux containing User.SAUL and User.JESSE when an error occurs in the input Flux, else do not change the input Flux.

    1
    2
    3
    4

    Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
    return flux.onErrorResume(e -> Flux.just(User.SAUL, User.JESSE));
    }

Error propagate

  1. propagate - 에러 변환 담당
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    Mono.just("hello")
    .log()
    .map(s -> {
    try{
    return Integer.parseInt(s);

    } catch (Exception e){
    throw Exceptions.propagate(e);
    }
    })
    .onErrorReturn(200)
    .doOnNext(System.out::println)
    .subscribe();