Pengenalan Teras Reaktor

1. Pengenalan

Reactor Core adalah perpustakaan Java 8 yang menerapkan model pengaturcaraan reaktif. Ia dibina di atas Spesifikasi Aliran Reaktif, standard untuk membina aplikasi reaktif.

Dari latar belakang pengembangan Java yang tidak reaktif, berjalan reaktif dapat menjadi kurva pembelajaran yang cukup curam. Ini menjadi lebih mencabar ketika membandingkannya dengan Java 8 Stream API, kerana mereka boleh disalah anggap sebagai abstraksi tahap tinggi yang sama.

Dalam artikel ini, kami akan cuba mengungkap paradigma ini. Kami akan mengambil langkah-langkah kecil melalui Reactor sehingga kami membina gambaran tentang bagaimana menyusun kod reaktif, meletakkan asas untuk artikel yang lebih maju untuk datang dalam siri yang kemudian.

2. Spesifikasi Aliran Reaktif

Sebelum kita melihat Reactor, kita harus melihat Spesifikasi Aliran Reaktif. Inilah yang dilaksanakan oleh Reactor, dan ini meletakkan asas bagi perpustakaan.

Pada dasarnya, Aliran Reaktif adalah spesifikasi untuk pemprosesan aliran tak segerak.

Dengan kata lain, sistem di mana banyak acara dihasilkan dan dimakan secara serentak. Fikirkan tentang aliran ribuan kemas kini stok sesaat yang masuk ke dalam aplikasi kewangan, dan untuk itu perlu bertindak balas terhadap kemas kini tersebut tepat pada masanya.

Salah satu tujuan utama ini adalah untuk mengatasi masalah tekanan balik. Sekiranya kita mempunyai pengeluar yang memancarkan acara kepada pengguna lebih cepat daripada yang dapat memprosesnya, akhirnya pengguna akan dibanjiri dengan peristiwa, kehabisan sumber sistem.

Tekanan balik bermaksud bahawa pengguna kita harus dapat memberitahu pengeluar berapa banyak data yang akan dihantar untuk mencegahnya, dan inilah yang dinyatakan dalam spesifikasi.

3. Pergantungan Maven

Sebelum memulakan, mari tambahkan kebergantungan Maven kami:

 io.projectreactor reactor-core 3.3.9.RELEASE   ch.qos.logback logback-classic 1.1.3 

Kami juga menambah Logback sebagai pergantungan. Ini kerana kami akan mencatat output Reactor untuk lebih memahami aliran data.

4. Menghasilkan Aliran Data

Agar aplikasi menjadi reaktif, perkara pertama yang mesti dilakukan adalah menghasilkan aliran data.

Ini mungkin seperti contoh kemas kini saham yang kami berikan sebelumnya. Tanpa data ini, kami tidak akan bertindak balas, sebab itulah langkah pertama yang logik.

Reactive Core memberi kita dua jenis data yang membolehkan kita melakukan ini.

4.1. Selsema

Cara pertama untuk melakukan ini adalah dengan Flux. Ia adalah aliran yang dapat memancarkan elemen 0..n . Mari cuba buat yang mudah:

Flux just = Flux.just(1, 2, 3, 4);

Dalam kes ini, kita mempunyai aliran empat elemen statik.

4.2. Mono

Cara kedua untuk melakukan ini adalah dengan Mono, yang merupakan aliran elemen 0..1 . Mari cuba buat contoh:

Mono just = Mono.just(1);

Ini kelihatan dan berkelakuan hampir sama dengan Flux , hanya kali ini kita terhad kepada tidak lebih dari satu elemen.

4.3. Mengapa Tidak Hanya Flux?

Sebelum bereksperimen lebih jauh, perlu dinyatakan mengapa kita mempunyai dua jenis data ini.

Pertama, harus diperhatikan bahawa kedua-dua Flux dan Mono adalah implementasi antara muka Reactive Streams Publisher . Kedua-dua kelas mematuhi spesifikasi, dan kami dapat menggunakan antara muka ini di tempat mereka:

Publisher just = Mono.just("foo");

Tetapi sebenarnya, mengetahui kardinaliti ini berguna. Ini kerana beberapa operasi hanya masuk akal untuk salah satu daripada dua jenis itu, dan kerana ia dapat lebih ekspresif (bayangkan findOne () di repositori).

5. Melanggan Stream

Sekarang kita mempunyai gambaran umum tahap tinggi tentang bagaimana menghasilkan aliran data, kita perlu melanggannya agar dapat memancarkan unsur-unsurnya.

5.1. Mengumpulkan Elemen

Mari gunakan kaedah subscribe () untuk mengumpulkan semua elemen dalam aliran:

List elements = new ArrayList(); Flux.just(1, 2, 3, 4) .log() .subscribe(elements::add); assertThat(elements).containsExactly(1, 2, 3, 4);

Data tidak akan mula mengalir sehingga kami melanggan. Perhatikan bahawa kami telah menambahkan beberapa pembalakan juga, ini akan sangat berguna apabila kita melihat apa yang berlaku di sebalik tabir.

5.2. Aliran Unsur

Dengan masuk, kami dapat menggunakannya untuk memvisualisasikan bagaimana data mengalir melalui aliran kami:

20:25:19.550 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | request(unbounded) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(1) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(2) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(3) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(4) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onComplete()

Pertama sekali, semuanya berjalan di utas utama. Jangan sampai ke perincian mengenai perkara ini, kerana kita akan melihat lebih lanjut mengenai persetujuan di kemudian hari dalam artikel ini. Ia menjadikan semuanya mudah, kerana kita dapat menangani semuanya dengan teratur.

Sekarang mari kita lihat urutan yang telah kita log satu demi satu:

  1. onSubscribe () - Ini dipanggil semasa kami melanggan aliran kami
  2. request(unbounded) – When we call subscribe, behind the scenes we are creating a Subscription. This subscription requests elements from the stream. In this case, it defaults to unbounded, meaning it requests every single element available
  3. onNext() – This is called on every single element
  4. onComplete() – This is called last, after receiving the last element. There's actually a onError() as well, which would be called if there is an exception, but in this case, there isn't

This is the flow laid out in the Subscriber interface as part of the Reactive Streams Specification, and in reality, that's what's been instantiated behind the scenes in our call to onSubscribe(). It's a useful method, but to better understand what's happening let's provide a Subscriber interface directly:

Flux.just(1, 2, 3, 4) .log() .subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { elements.add(integer); } @Override public void onError(Throwable t) {} @Override public void onComplete() {} });

We can see that each possible stage in the above flow maps to a method in the Subscriber implementation. It just happens that the Flux has provided us with a helper method to reduce this verbosity.

5.3. Comparison to Java 8 Streams

It still might appear that we have something synonymous to a Java 8 Stream doing collect:

List collected = Stream.of(1, 2, 3, 4) .collect(toList());

Only we don't.

The core difference is that Reactive is a push model, whereas the Java 8 Streams are a pull model. In a reactive approach, events are pushed to the subscribers as they come in.

The next thing to notice is a Streams terminal operator is just that, terminal, pulling all the data and returning a result. With Reactive we could have an infinite stream coming in from an external resource, with multiple subscribers attached and removed on an ad hoc basis. We can also do things like combine streams, throttle streams and apply backpressure, which we will cover next.

6. Backpressure

The next thing we should consider is backpressure. In our example, the subscriber is telling the producer to push every single element at once. This could end up becoming overwhelming for the subscriber, consuming all of its resources.

Backpressure is when a downstream can tell an upstream to send it fewer data in order to prevent it from being overwhelmed.

We can modify our Subscriber implementation to apply backpressure. Let's tell the upstream to only send two elements at a time by using request():

Flux.just(1, 2, 3, 4) .log() .subscribe(new Subscriber() { private Subscription s; int onNextAmount; @Override public void onSubscribe(Subscription s) { this.s = s; s.request(2); } @Override public void onNext(Integer integer) { elements.add(integer); onNextAmount++; if (onNextAmount % 2 == 0) { s.request(2); } } @Override public void onError(Throwable t) {} @Override public void onComplete() {} });

Now if we run our code again, we'll see the request(2) is called, followed by two onNext() calls, then request(2) again.

23:31:15.395 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 23:31:15.397 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.397 [main] INFO reactor.Flux.Array.1 - | onNext(1) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(3) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(4) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onComplete()

Essentially, this is reactive pull backpressure. We are requesting the upstream to only push a certain amount of elements, and only when we are ready.

If we imagine we were being streamed tweets from twitter, it would then be up to the upstream to decide what to do. If tweets were coming in but there are no requests from the downstream, then the upstream could drop items, store them in a buffer, or some other strategy.

7. Operating on a Stream

We can also perform operations on the data in our stream, responding to events as we see fit.

7.1. Mapping Data in a Stream

A simple operation that we can perform is applying a transformation. In this case, let's just double all the numbers in our stream:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .subscribe(elements::add);

map() will be applied when onNext() is called.

7.2. Combining Two Streams

We can then make things more interesting by combining another stream with this one. Let's try this by using zip() function:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .zipWith(Flux.range(0, Integer.MAX_VALUE), (one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two)) .subscribe(elements::add); assertThat(elements).containsExactly( "First Flux: 2, Second Flux: 0", "First Flux: 4, Second Flux: 1", "First Flux: 6, Second Flux: 2", "First Flux: 8, Second Flux: 3");

Here, we are creating another Flux that keeps incrementing by one and streaming it together with our original one. We can see how these work together by inspecting the logs:

20:04:38.064 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:04:38.065 [main] INFO reactor.Flux.Array.1 - | onNext(1) 20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onNext(0) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(2) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(1) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(3) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(2) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(4) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(3) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onComplete() 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | cancel() 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | cancel()

Note how we now have one subscription per Flux. The onNext() calls are also alternated, so the index of each element in the stream will match when we apply the zip() function.

8. Hot Streams

Currently, we've focused primarily on cold streams. These are static, fixed-length streams that are easy to deal with. A more realistic use case for reactive might be something that happens infinitely.

For example, we could have a stream of mouse movements that constantly needs to be reacted to or a twitter feed. These types of streams are called hot streams, as they are always running and can be subscribed to at any point in time, missing the start of the data.

8.1. Creating a ConnectableFlux

One way to create a hot stream is by converting a cold stream into one. Let's create a Flux that lasts forever, outputting the results to the console, which would simulate an infinite stream of data coming from an external resource:

ConnectableFlux publish = Flux.create(fluxSink -> { while(true) { fluxSink.next(System.currentTimeMillis()); } }) .publish();

By calling publish() we are given a ConnectableFlux. This means that calling subscribe() won't cause it to start emitting, allowing us to add multiple subscriptions:

publish.subscribe(System.out::println); publish.subscribe(System.out::println);

If we try running this code, nothing will happen. It's not until we call connect(), that the Flux will start emitting:

publish.connect();

8.2. Throttling

If we run our code, our console will be overwhelmed with logging. This is simulating a situation where too much data is being passed to our consumers. Let's try getting around this with throttling:

ConnectableFlux publish = Flux.create(fluxSink -> { while(true) { fluxSink.next(System.currentTimeMillis()); } }) .sample(ofSeconds(2)) .publish();

Here, we've introduced a sample() method with an interval of two seconds. Now values will only be pushed to our subscriber every two seconds, meaning the console will be a lot less hectic.

Of course, there are multiple strategies to reduce the amount of data sent downstream, such as windowing and buffering, but they will be left out of scope for this article.

9. Concurrency

All of our above examples have currently run on the main thread. However, we can control which thread our code runs on if we want. The Scheduler interface provides an abstraction around asynchronous code, for which many implementations are provided for us. Let's try subscribing to a different thread to main:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .subscribeOn(Schedulers.parallel()) .subscribe(elements::add);

The Parallel scheduler will cause our subscription to be run on a different thread, which we can prove by looking at the logs. We see the first entry comes from the main thread and the Flux is running in another thread called parallel-1.

20:03:27.505 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 20:03:27.529 [parallel-1] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | request(unbounded) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(1) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(2) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(3) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(4) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onComplete()

Concurrency get's more interesting than this, and it will be worth us exploring it in another article.

10. Conclusion

Dalam artikel ini, kami telah memberi gambaran keseluruhan mengenai Reactive Core tahap tinggi hingga akhir. Kami telah menjelaskan bagaimana kami dapat menerbitkan dan melanggan aliran, menerapkan tekanan balik, beroperasi pada aliran dan juga menangani data secara tidak segerak. Ini semoga menjadi asas bagi kita untuk menulis aplikasi reaktif.

Artikel kemudian dalam siri ini akan merangkumi konsep yang lebih maju dan konsep reaktif yang lain. Terdapat juga artikel lain yang merangkumi Reactor with Spring.

Kod sumber untuk aplikasi kami boleh didapati di GitHub; ini adalah projek Maven yang seharusnya dapat dijalankan seperti sedia kala.