Aliran Reaktif Java 9

1. Gambaran keseluruhan

Dalam artikel ini, kita akan melihat Aliran Reaktif Java 9. Ringkasnya, kita akan dapat menggunakan kelas Flow , yang merangkumi blok bangunan utama untuk membina logik pemprosesan aliran reaktif.

Aliran Reaktif adalah standard untuk pemprosesan aliran tak segerak dengan tekanan belakang yang tidak menyekat. Spesifikasi ini didefinisikan dalam Manifesto Reaktif, dan terdapat pelbagai pelaksanaannya, misalnya, RxJava atau Akka-Streams.

2. Tinjauan API Reaktif

Untuk membina Aliran , kita dapat menggunakan tiga abstraksi utama dan menyusunnya menjadi logik pemprosesan tak segerak.

Setiap Aliran perlu memproses peristiwa yang diterbitkan kepadanya oleh contoh Penerbit ; yang Penerbit mempunyai satu kaedah - melanggan ().

Sekiranya ada pelanggan yang ingin menerima acara yang diterbitkan olehnya, mereka perlu melanggan Penerbit yang diberikan .

Penerima mesej perlu melaksanakan antara muka Pelanggan . Biasanya ini adalah akhir untuk setiap pemprosesan Aliran kerana contohnya tidak menghantar mesej lebih jauh.

Kita boleh memikirkan Pelanggan sebagai Tenggelam. Ini mempunyai empat kaedah yang perlu diganti - onSubscribe (), onNext (), onError (), dan onComplete (). Kami akan melihat mereka di bahagian seterusnya.

Sekiranya kita ingin mengubah mesej masuk dan menyebarkannya lebih jauh ke Pelanggan seterusnya , kita perlu menerapkan antara muka Pemproses . Ini bertindak sebagai Pelanggan kerana menerima mesej, dan sebagai Penerbit kerana memproses mesej tersebut dan menghantarnya untuk diproses lebih lanjut.

3. Penerbitan dan Penggunaan Mesej

Katakanlah kita ingin membuat Aliran sederhana , di mana kita mempunyai mesej Penerbit yang menerbitkan, dan Pelanggan yang mudah menggunakan pesanan semasa mereka sampai - satu demi satu.

Mari buat kelas EndSubscriber . Kita perlu melaksanakan antara muka Pelanggan . Seterusnya, kami akan mengatasi kaedah yang diperlukan.

Kaedah onSubscribe () dipanggil sebelum pemprosesan dimulakan. Contoh Langganan diteruskan sebagai argumen. Ini adalah kelas yang digunakan untuk mengawal aliran mesej antara Pelanggan dan Penerbit:

public class EndSubscriber implements Subscriber { private Subscription subscription; public List consumedElements = new LinkedList(); @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } }

Kami juga dimulakan kosong List of consumedElements yang akan digunakan dalam ujian.

Sekarang, kita perlu menerapkan kaedah yang tersisa dari antara muka Pelanggan . Kaedah utama di sini adalah onNext () - ini dipanggil setiap kali Penerbit menerbitkan mesej baru:

@Override public void onNext(T item) { System.out.println("Got : " + item); subscription.request(1); }

Perhatikan bahawa ketika kita memulakan langganan dalam metode onSubscribe () dan ketika kita memproses pesan, kita perlu memanggil kaedah permintaan () pada Langganan untuk memberi isyarat bahawa Pelanggan saat ini siap untuk menggunakan lebih banyak pesan.

Terakhir, kita perlu menerapkan onError () - yang dipanggil setiap kali ada pengecualian dalam pemprosesan, dan juga onComplete () - dipanggil ketika Penerbit ditutup:

@Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { System.out.println("Done"); }

Mari tulis ujian untuk Aliran Pemprosesan . Kami akan menggunakan kelas SubmissionPublisher - konstruk dari java.util.concurrent - yang menerapkan antara muka Penerbit .

Kami akan menyerahkan elemen N kepada Penerbit - yang akan diterima oleh EndSubscriber kami :

@Test public void whenSubscribeToIt_thenShouldConsumeAll() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); EndSubscriber subscriber = new EndSubscriber(); publisher.subscribe(subscriber); List items = List.of("1", "x", "2", "x", "3", "x"); // when assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until( () -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(items) ); }

Perhatikan, bahawa kita memanggil kaedah tutup () pada contoh EndSubscriber. Ia akan memanggil balik (on ) Selesai () di bawah pada setiap Pelanggan Penerbit yang diberikan .

Menjalankan program itu akan menghasilkan output berikut:

Got : 1 Got : x Got : 2 Got : x Got : 3 Got : x Done

4. Transformasi Mesej

Katakan bahawa kita ingin membina logik yang serupa antara Penerbit dan Pelanggan , tetapi juga menerapkan beberapa transformasi.

Kami akan membuat kelas TransformProcessor yang menerapkan Processor dan memperluas SubmissionPublisher - kerana ini akan menjadi penerbit P dan juga pengguna.

Kami akan memasukkan Fungsi yang akan mengubah input menjadi output:

public class TransformProcessor extends SubmissionPublisher implements Flow.Processor { private Function function; private Flow.Subscription subscription; public TransformProcessor(Function function) { super(); this.function = function; } @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { submit(function.apply(item)); subscription.request(1); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { close(); } }

Mari sekarang tulis ujian pantas dengan aliran pemprosesan di mana Penerbit menerbitkan elemen String .

TransformProcessor kami akan menguraikan String sebagai Integer - yang bermaksud penukaran perlu berlaku di sini:

@Test public void whenSubscribeAndTransformElements_thenShouldConsumeAll() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); TransformProcessor transformProcessor = new TransformProcessor(Integer::parseInt); EndSubscriber subscriber = new EndSubscriber(); List items = List.of("1", "2", "3"); List expectedResult = List.of(1, 2, 3); // when publisher.subscribe(transformProcessor); transformProcessor.subscribe(subscriber); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until(() -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(expectedResult) ); }

Perhatikan, bahawa memanggil kaedah close () pada dasar Publisher akan menyebabkan kaedah onComplete () pada TransformProcessor dipanggil.

Perlu diingat bahawa semua penerbit dalam rantaian pemprosesan perlu ditutup dengan cara ini.

5. Mengawal Permintaan untuk Mesej Menggunakan Langganan

Katakan bahawa kita hanya mahu menggunakan elemen pertama dari Langganan, menerapkan logik dan menyelesaikan proses. Kita boleh menggunakan kaedah permintaan () untuk mencapainya.

Mari ubah suai EndSubscriber kami untuk menggunakan N jumlah mesej sahaja. Kami akan memberikan nombor itu sebagai argumen konstruktor howMuchMessagesConsume :

public class EndSubscriber implements Subscriber { private AtomicInteger howMuchMessagesConsume; private Subscription subscription; public List consumedElements = new LinkedList(); public EndSubscriber(Integer howMuchMessagesConsume) { this.howMuchMessagesConsume = new AtomicInteger(howMuchMessagesConsume); } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { howMuchMessagesConsume.decrementAndGet(); System.out.println("Got : " + item); consumedElements.add(item); if (howMuchMessagesConsume.get() > 0) { subscription.request(1); } } //... }

Kita boleh meminta elemen selagi kita mahu.

Mari tulis ujian di mana kita hanya mahu menggunakan satu elemen dari Langganan yang diberikan :

@Test public void whenRequestForOnlyOneElement_thenShouldConsumeOne() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); EndSubscriber subscriber = new EndSubscriber(1); publisher.subscribe(subscriber); List items = List.of("1", "x", "2", "x", "3", "x"); List expected = List.of("1"); // when assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until(() -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(expected) ); }

Walaupun penerbit menerbitkan enam elemen, EndSubscriber kami akan menggunakan satu elemen sahaja kerana ini menunjukkan permintaan untuk memproses satu elemen sahaja.

Dengan menggunakan kaedah permintaan () pada Langganan, kita dapat menerapkan mekanisme tekanan balik yang lebih canggih untuk mengendalikan kecepatan penggunaan pesan.

6. Kesimpulannya

Dalam artikel ini, kami telah melihat Java 9 Reactive Streams.

Kami melihat bagaimana membuat Aliran pemprosesan yang terdiri daripada Penerbit dan Pelanggan. Kami membuat aliran pemprosesan yang lebih kompleks dengan transformasi elemen menggunakan Pemproses .

Akhirnya, kami menggunakan Langganan untuk mengawal permintaan elemen oleh Pelanggan.

Pelaksanaan semua contoh dan coretan kod ini terdapat dalam projek GitHub - ini adalah projek Maven, jadi mudah untuk diimport dan dijalankan sebagaimana adanya.