Project Reactor: About Fuseable interface ASYNC mode under the same thread

RMAG news

Last time I introduce how to use the SYNC mode of Fuseable interface to retrieve signal from pipeline. The disadvantage of SYNC mode is the singal sequence must exist in source (For example the source from Flux.just() or Flux.generate() function). In order to retrieve signal which is generated in indefinite time. ASYNC mode must be used. Below I will introduce the ASYNC mode which receives action that is triggered by the thread of the upstream operator.

Example of ASYNC mode implementation under the same thread.

Below is an example to display the signal received by implements both Fuseable ASYNC mode (which signal is triggered by thread of parent operator) and traditional Reactive Streams’s request and onNext cycle. Project Reactor 3.6.3 is used.

package example;

import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class AsyncDisplaySubscriber<T> implements CoreSubscriber<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncDisplaySubscriber.class);
private Subscription subscription;

private final AtomicInteger called = new AtomicInteger(0);

@Override
public void onSubscribe(Subscription s) {
LOGGER.info(“onSubscribe(): s={}”, s);
subscription = s;
if (s instanceof Fuseable.QueueSubscription) { // (1)
@SuppressWarnings(“unchecked”)
Fuseable.QueueSubscription<T> qs = (Fuseable.QueueSubscription<T>) s;
int mode = qs.requestFusion(Fuseable.ASYNC); // (2)
LOGGER.info(“onSubscribe(): requestFusion()={}”, mode);
if (mode == Fuseable.ASYNC) { // (3)
return;
}
}
s.request(1L);
}

@Override
public void onNext(T t) {
if (Objects.nonNull(t)) {
LOGGER.info(“onNext(): t={}”, t);
subscription.request(1L);
} else {
LOGGER.info(“onNext(): can poll”); // (4)
drain();
}
}

@Override
public void onError(Throwable t) { // (10)
LOGGER.info(“onError()”, t);
}

@Override
public void onComplete() { // (10)
LOGGER.info(“onComplete()”);
}

// only one thread can run it in a time
private void drain() {
LOGGER.info(“drain(): start drain”);
if (called.getAndIncrement() > 0) { // (5)
LOGGER.info(“drain(): not 1st thread detected, end drain”);
return;
}

@SuppressWarnings(“unchecked”)
Fuseable.QueueSubscription<T> qs = (Fuseable.QueueSubscription<T>) subscription;
T next;
do {
do {
next = qs.poll(); // (6)
if (Objects.nonNull(next)) {
LOGGER.info(“drain(): next={}”, next); // (7)
}
} while (Objects.nonNull(next)); // (8)
} while (called.decrementAndGet() > 0); // (9)
LOGGER.info(“drain(): end drain”);
}

}

(1): The Subscription from upstream operator must implement Fuseable.QueueSubscription, otherwise Fuseable is not supported.

(2): Using Fuseable.QueueSubscription.requestFusion(Fuseable.ASYNC) method to check if ASYNC mode is supported or not.

(3): ASYNC mode is supported only if the return is Fuseable.ASYNC. In ASYNC mode it is not necessarily to call Subscription.request(long). If Fuseable.NONE is returned, the subscriber will get data by Reactive Streams’s request and onNext cycle.

(4): If onNext(null) is received, values of onNext signal can be collected by calling Fuseable.QueueSubscription.poll(). Please note that only one thread can call this at a time.

(5): Lock for checking if some thread is running or not. And record the time of this function being called.

(6): Get value.

(7): Non null value return of poll() means receives onNext signal of that value.

(8): Null means that no signal is available. Should break the loop and wait for the next onNext(null) event.

(9): The time of running the loop above is equal to the time of onNext(null) event received.

(10): onComplete and onError is received at these calls.

Below is the code to test the subscriber. The main() method is to simulate the blocking operation of generating signals.

package example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

public class FuseableAsyncModeTest {

private static final Logger LOGGER = LoggerFactory.getLogger(FuseableAsyncModeTest.class);

public static void main(String[] args) throws InterruptedException {
Sinks.Many<Integer> sink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> flux = sink.asFlux()
.publishOn(Schedulers.parallel());
flux.subscribe(new AsyncDisplaySubscriber<>());

LOGGER.info(“main(): sleep”);
Thread.sleep(1000L);
LOGGER.info(“main(): wake up”);
sink.tryEmitNext(1);
sink.tryEmitNext(2);
sink.tryEmitNext(3);
sink.tryEmitNext(4);
sink.tryEmitNext(5);
LOGGER.info(“main(): sleep”);
Thread.sleep(1000L);
LOGGER.info(“main(): wake up”);
sink.tryEmitNext(6);
sink.tryEmitNext(7);
sink.tryEmitNext(8);
sink.tryEmitNext(9);
sink.tryEmitNext(10);
LOGGER.info(“main(): sleep”);
Thread.sleep(1000L);
LOGGER.info(“main(): wake up”);
sink.tryEmitComplete();
}

}

The output is

13:30:16.031 [main] INFO example.AsyncDisplaySubscriber — onSubscribe(): s=reactor.core.publisher.FluxPublishOn$PublishOnSubscriber@11c9af63
13:30:16.032 [main] INFO example.AsyncDisplaySubscriber — onSubscribe(): requestFusion()=2
13:30:16.034 [main] INFO example.FuseableAsyncModeTest — main(): sleep
13:30:16.035 [parallel-1] INFO example.AsyncDisplaySubscriber — onNext(): can poll
13:30:16.035 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): start drain
13:30:16.035 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): end drain
13:30:17.035 [main] INFO example.FuseableAsyncModeTest — main(): wake up
13:30:17.036 [parallel-1] INFO example.AsyncDisplaySubscriber — onNext(): can poll
13:30:17.036 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): start drain
13:30:17.036 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): next=1
13:30:17.036 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): end drain
13:30:17.036 [main] INFO example.FuseableAsyncModeTest — main(): sleep
13:30:17.036 [parallel-1] INFO example.AsyncDisplaySubscriber — onNext(): can poll
13:30:17.036 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): start drain
13:30:17.036 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): next=2
13:30:17.036 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): next=3
13:30:17.036 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): next=4
13:30:17.036 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): next=5
13:30:17.036 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): end drain
13:30:17.036 [parallel-1] INFO example.AsyncDisplaySubscriber — onNext(): can poll
13:30:17.036 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): start drain
13:30:17.036 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): end drain
13:30:18.036 [main] INFO example.FuseableAsyncModeTest — main(): wake up
13:30:18.036 [main] INFO example.FuseableAsyncModeTest — main(): sleep
13:30:18.036 [parallel-1] INFO example.AsyncDisplaySubscriber — onNext(): can poll
13:30:18.037 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): start drain
13:30:18.037 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): next=6
13:30:18.037 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): next=7
13:30:18.037 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): next=8
13:30:18.037 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): next=9
13:30:18.037 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): next=10
13:30:18.037 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): end drain
13:30:18.037 [parallel-1] INFO example.AsyncDisplaySubscriber — onNext(): can poll
13:30:18.037 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): start drain
13:30:18.037 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): end drain
13:30:19.038 [main] INFO example.FuseableAsyncModeTest — main(): wake up
13:30:19.038 [parallel-1] INFO example.AsyncDisplaySubscriber — onNext(): can poll
13:30:19.038 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): start drain
13:30:19.038 [parallel-1] INFO example.AsyncDisplaySubscriber — drain(): end drain
13:30:19.038 [parallel-1] INFO example.AsyncDisplaySubscriber — onComplete()

From the output above, The “parallel-1” thread is created by the publishOn() operator, and it will call the method of the subscriber in the below way repeatedly until the terminal signal is received.

onNext(): can poll
drain(): start drain
(If singal exists) drain(): next=N
drain(): end

Leave a Reply

Your email address will not be published. Required fields are marked *