RxJava 2 - Alir

1. Pengenalan

RxJava adalah implementasi Reactive Extensions Java yang memungkinkan kita menulis aplikasi berdasarkan peristiwa, dan tidak segerak. Maklumat lanjut mengenai cara menggunakan RxJava boleh didapati dalam artikel intro kami di sini.

RxJava 2 ditulis semula dari awal, yang membawa pelbagai ciri baru; beberapa di antaranya diciptakan sebagai tindak balas terhadap masalah yang ada dalam versi kerangka sebelumnya.

Salah satu ciri tersebut ialah io.reactivex.Flowable .

2. diperhatikan vs . Alir

Dalam versi RxJava sebelumnya, hanya ada satu kelas asas untuk menangani sumber yang menyedari tekanan dan tidak sedar tekanan balik - Dapat dilihat.

RxJava 2 memperkenalkan perbezaan yang jelas antara kedua-dua jenis sumber ini - sumber yang menyedari tekanan belakang kini diwakili menggunakan kelas khusus - Flowable.

Diperhatikan sumber tidak menyokong tekanan belakang. Oleh kerana itu, kita harus menggunakannya untuk sumber yang hanya kita konsumsi dan tidak dapat mempengaruhi.

Juga, jika kita berhadapan dengan sebilangan besar elemen, dua senario yang mungkin berkaitan dengan tekanan balik boleh berlaku bergantung pada jenis yang Dapat Diperhatikan .

Sekiranya menggunakan apa yang disebut " sejuk Observable ", peristiwa dipancarkan dengan malas, jadi kami selamat dari meluap-luap pemerhati.

Namun, ketika menggunakan " Teramati Panas " , ini akan terus memancarkan peristiwa, walaupun pengguna tidak dapat mengikuti.

3. Membuat Flowable

Terdapat pelbagai cara untuk membuat Flowable . Ketika bagi kita, orang-orang kaedah kelihatan sama dengan kaedah diperhatikan dalam versi pertama RxJava.

3.1. Mudah Alir

Kita boleh membuat Flowable menggunakan kaedah adil () sama seperti yang kita dapat dengan Observable:

Flowable integerFlowable = Flowable.just(1, 2, 3, 4);

Walaupun menggunakan just () cukup sederhana, tidak biasa membuat Flowable dari data statik, dan digunakan untuk tujuan pengujian.

3.2. Aliran dari Boleh Diperhatikan

Apabila kita mempunyai diperhatikan kita boleh mengubahnya untuk flowable menggunakan () toFlowable kaedah :

Observable integerObservable = Observable.just(1, 2, 3); Flowable integerFlowable = integerObservable .toFlowable(BackpressureStrategy.BUFFER);

Perhatikan bahawa untuk dapat melakukan penukaran, kita perlu memperkaya Observable dengan BackpressureStrategy. Kami akan menerangkan strategi yang tersedia di bahagian seterusnya.

3.3. Flowable dari FlowableOnSubscribe

RxJava 2 memperkenalkan antara muka fungsional FlowableOnSubscribe , yang mewakili Flowable yang mula memancarkan peristiwa setelah pengguna melanggannya.

Oleh kerana itu, semua pelanggan akan menerima rangkaian acara yang sama, yang menjadikan tekanan balik FlowableOnSubscribe selamat.

Apabila kita mempunyai FlowableOnSubscribe kita dapat menggunakannya untuk membuat Flowable :

FlowableOnSubscribe flowableOnSubscribe = flowable -> flowable.onNext(1); Flowable integerFlowable = Flowable .create(flowableOnSubscribe, BackpressureStrategy.BUFFER);

Dokumentasi menerangkan lebih banyak kaedah untuk membuat Flowable.

4. flowable BackpressureStrategy

Beberapa kaedah seperti toFlowable () atau create () mengambil BackpressureStrategy sebagai hujah.

The BackpressureStrategy adalah penghitungan, yang mentakrifkan tingkah laku tekanan belakang yang kita akan memohon kepada kami flowable.

Ia dapat menyembunyikan atau menjatuhkan peristiwa atau sama sekali tidak melaksanakan tingkah laku, dalam kes terakhir, kita akan bertanggung jawab untuk menentukannya, menggunakan pengendali tekanan balik.

BackpressureStrategy mirip dengan BackpressureMode yang terdapat pada versi RxJava sebelumnya.

Terdapat lima strategi berbeza yang terdapat di RxJava 2.

4.1. Penyangga

Sekiranya kita menggunakan BackpressureStrategy.BUFFER , sumber akan menyekat semua peristiwa sehingga pelanggan dapat menggunakannya :

public void thenAllValuesAreBufferedAndReceived() { List testList = IntStream.range(0, 100000) .boxed() .collect(Collectors.toList()); Observable observable = Observable.fromIterable(testList); TestSubscriber testSubscriber = observable .toFlowable(BackpressureStrategy.BUFFER) .observeOn(Schedulers.computation()).test(); testSubscriber.awaitTerminalEvent(); List receivedInts = testSubscriber.getEvents() .get(0) .stream() .mapToInt(object -> (int) object) .boxed() .collect(Collectors.toList()); assertEquals(testList, receivedInts); }

Ini serupa dengan menggunakan kaedah onBackpressureBuffer () pada Flowable, tetapi tidak membenarkan untuk menentukan ukuran penyangga atau tindakan onOverflow secara eksplisit.

4.2. Jatuhkan

Kita boleh menggunakan BackpressureStrategy.DROP untuk membuang peristiwa yang tidak dapat dimakan dan bukannya menyangga.

Sekali lagi ini serupa dengan menggunakan onBackpressureDrop () pada Flowable :

public void whenDropStrategyUsed_thenOnBackpressureDropped() { Observable observable = Observable.fromIterable(testList); TestSubscriber testSubscriber = observable .toFlowable(BackpressureStrategy.DROP) .observeOn(Schedulers.computation()) .test(); testSubscriber.awaitTerminalEvent(); List receivedInts = testSubscriber.getEvents() .get(0) .stream() .mapToInt(object -> (int) object) .boxed() .collect(Collectors.toList()); assertThat(receivedInts.size() < testList.size()); assertThat(!receivedInts.contains(100000)); }

4.3. Terkini

Menggunakan BackpressureStrategy.LATEST akan memaksa sumber untuk menyimpan hanya peristiwa terkini, sehingga menimpa nilai sebelumnya jika pengguna tidak dapat mengikuti:

public void whenLatestStrategyUsed_thenTheLastElementReceived() { Observable observable = Observable.fromIterable(testList); TestSubscriber testSubscriber = observable .toFlowable(BackpressureStrategy.LATEST) .observeOn(Schedulers.computation()) .test(); testSubscriber.awaitTerminalEvent(); List receivedInts = testSubscriber.getEvents() .get(0) .stream() .mapToInt(object -> (int) object) .boxed() .collect(Collectors.toList()); assertThat(receivedInts.size() < testList.size()); assertThat(receivedInts.contains(100000)); }

BackpressureStrategy.LATEST dan BackpressureStrategy.DROP kelihatan sangat serupa ketika kita melihat kodnya.

Walau bagaimanapun, BackpressureStrategy.LATEST akan menimpa elemen yang tidak dapat ditangani oleh pelanggan kami dan menyimpan hanya yang terbaru, maka namanya.

BackpressureStrategy.DROP, on the other hand, will discard elements that can't be handled. This means that newest elements won't necessarily be emitted.

4.4. Error

When we're using the BackpressureStrategy.ERROR, we're simply saying that we don't expect backpressure to occur. Consequently, a MissingBackpressureException should be thrown if the consumer can't keep up with the source:

public void whenErrorStrategyUsed_thenExceptionIsThrown() { Observable observable = Observable.range(1, 100000); TestSubscriber subscriber = observable .toFlowable(BackpressureStrategy.ERROR) .observeOn(Schedulers.computation()) .test(); subscriber.awaitTerminalEvent(); subscriber.assertError(MissingBackpressureException.class); }

4.5. Missing

If we use the BackpressureStrategy.MISSING, the source will push elements without discarding or buffering.

The downstream will have to deal with overflows in this case:

public void whenMissingStrategyUsed_thenException() { Observable observable = Observable.range(1, 100000); TestSubscriber subscriber = observable .toFlowable(BackpressureStrategy.MISSING) .observeOn(Schedulers.computation()) .test(); subscriber.awaitTerminalEvent(); subscriber.assertError(MissingBackpressureException.class); }

In our tests, we're excepting MissingbackpressureException for both ERROR and MISSING strategies. As both of them will throw such exception when the source's internal buffer is overflown.

However, it's worth to note that both of them have a different purpose.

We should use the former one when we don't expect backpressure at all, and we want the source to throw an exception in case if it occurs.

The latter one could be used if we don't want to specify a default behavior on the creation of the Flowable. And we're going to use backpressure operators to define it later on.

5. Summary

In this tutorial, we've presented the new class introduced in RxJava 2 called Flowable.

Untuk mendapatkan lebih banyak maklumat mengenai Flowable itu sendiri dan APInya, kita boleh merujuk kepada dokumentasi.

Seperti biasa, semua contoh kod boleh didapati di GitHub.