1. Gambaran keseluruhan
Dalam artikel ini, kita akan melihat cara perpustakaan RxJava membantu kita menangani tekanan balik.
Ringkasnya - RxJava menggunakan konsep aliran reaktif dengan memperkenalkan Observables, yang boleh dilanggan oleh satu atau banyak pemerhati . Berurusan dengan aliran yang mungkin tidak terbatas sangat mencabar, kerana kita perlu menghadapi masalah tekanan balik.
Tidak sukar untuk memasuki situasi di mana orang yang dapat diamati mengeluarkan barang dengan lebih cepat daripada pelanggan yang dapat menggunakannya. Kami akan melihat penyelesaian yang berbeza untuk masalah penambahan barang yang tidak habis.
2. Hot cerap Versus Cold cerap
Pertama, mari buat fungsi pengguna mudah yang akan digunakan sebagai pengguna elemen dari Observables yang akan kita tentukan kemudian:
public class ComputeFunction { public static void compute(Integer v) { try { System.out.println("compute integer v: " + v); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
Fungsi compute () kami hanya mencetak argumen. Perkara penting yang perlu diperhatikan di sini adalah permohonan kaedah Thread.sleep (1000) - kami melakukannya untuk meniru beberapa tugas yang telah lama berjalan yang akan menyebabkan Observable dapat mengisi barang dengan lebih cepat sehingga Observer dapat memakannya.
Kami mempunyai dua jenis yang Dapat Diperhatikan - Panas dan Dingin - yang sama sekali berbeza ketika menangani penekanan tekanan belakang.
2.1. Terlihat Sejuk
Observable yang sejuk mengeluarkan urutan item tertentu tetapi dapat mula memancarkan urutan ini apabila Observernya merasa senang, dan pada kadar apa pun yang dikehendaki oleh Observer , tanpa mengganggu integriti urutan. Cold Observable adalah menyediakan barang dengan cara yang malas.
The Observer mengambil elemen hanya apabila ia bersedia untuk memproses item itu dan barangan tidak perlu buffered dalam diperhatikan kerana mereka diminta dalam fesyen tarik.
Contohnya, jika anda membuat Observable berdasarkan rangkaian elemen statik dari satu hingga satu juta, Observable akan mengeluarkan urutan item yang sama tidak kira seberapa kerap item tersebut diperhatikan:
Observable.range(1, 1_000_000) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute);
Semasa kita memulakan program kita, item akan dikira oleh Observer dengan malas dan akan diminta secara menarik. Kaedah Schedulers.computation () bermaksud bahawa kami ingin menjalankan Observer kami dalam kumpulan thread pengiraan di RxJava.
Keluaran program akan terdiri daripada hasil kaedah komputasi () yang dipanggil untuk satu demi satu item dari Observable :
compute integer v: 1 compute integer v: 2 compute integer v: 3 compute integer v: 4 ...
Cold Observables tidak perlu mempunyai bentuk tekanan belakang kerana ia berfungsi secara menarik. Contoh item yang dikeluarkan oleh dingin yang dapat Diperhatikan mungkin termasuk hasil pertanyaan pangkalan data, pengambilan fail, atau permintaan web.
2.2. Terlihat Panas
Observable yang panas mula menghasilkan item dan mengeluarkannya dengan segera apabila ia dibuat. Ini bertentangan dengan model pemprosesan Cold Observables pull. Hot Observable memancarkan barang mengikut kadarnya sendiri, dan terserah kepada pemerhati untuk mengikuti.
Apabila Observer tidak dapat memakan barang secepat yang dihasilkan oleh Observable mereka perlu disangga atau ditangani dengan cara lain, kerana mereka akan mengisi memori, akhirnya menyebabkan OutOfMemoryException.
Mari kita perhatikan contoh Hot Observable, iaitu menghasilkan 1 juta item kepada pengguna akhir yang memproses item tersebut. Apabila kaedah compute () dalam Observer memerlukan sedikit masa untuk memproses setiap item, Observable mulai mengisi memori dengan item, menyebabkan program gagal:
PublishSubject source = PublishSubject.create(); source.observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace); IntStream.range(1, 1_000_000).forEach(source::onNext);
Menjalankan program tersebut akan gagal dengan MissingBackpressureException kerana kami tidak menentukan cara menangani Observable yang terlalu banyak menghasilkan .
Contoh item yang dikeluarkan oleh Observable yang panas mungkin termasuk peristiwa tetikus & papan kekunci, peristiwa sistem, atau harga saham.
3. Penimbunan Lebih Berlebihan Dapat Diperhatikan
Cara pertama untuk menangani Observable yang terlalu banyak pengeluaran adalah dengan menentukan beberapa jenis buffer untuk elemen yang tidak dapat diproses oleh Observer.
Kita boleh melakukannya dengan memanggil kaedah penyangga () :
PublishSubject source = PublishSubject.create(); source.buffer(1024) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace);
Mendefinisikan penyangga dengan ukuran 1024 akan memberi Pemerhati sedikit masa untuk mengejar sumber yang terlalu banyak menghasilkan. Penyangga akan menyimpan item yang belum diproses.
Kita dapat meningkatkan ukuran penyangga untuk memiliki ruang yang cukup untuk nilai yang dihasilkan.
Namun, perhatikan bahawa secara amnya, ini mungkin hanya perbaikan sementara kerana limpahan masih dapat terjadi jika sumbernya menghasilkan ukuran penyangga yang diprediksi berlebihan.
4. Item Berkelompok Berkelompok
Kita dapat mengumpulkan lebih banyak item di tetingkap elemen N.
Apabila Observable menghasilkan elemen lebih cepat daripada Observer dapat memprosesnya, kita dapat meredakannya dengan mengelompokkan elemen yang dihasilkan bersama-sama dan mengirim sekumpulan elemen ke Observer yang dapat memproses kumpulan elemen dan bukannya elemen satu demi satu:
PublishSubject source = PublishSubject.create(); source.window(500) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace);
Menggunakan kaedah window () dengan argumen 500, akan memberitahu Observable untuk mengelompokkan elemen ke dalam kumpulan berukuran 500. Teknik ini dapat mengurangkan masalah pengeluaran berlebihan Observable apabila Observer dapat memproses sekumpulan elemen lebih cepat berbanding dengan memproses elemen satu persatu.
5. Unsur Melangkau
Sekiranya beberapa nilai yang dihasilkan oleh Observable dapat diabaikan dengan selamat, kita dapat menggunakan pensampelan dalam waktu tertentu dan pengendali pendikit.
Kaedah sampel () dan throttleFirst () mengambil masa sebagai parameter:
- Kaedah s ample () secara berkala meneliti urutan elemen dan mengeluarkan item terakhir yang dihasilkan dalam jangka masa yang ditentukan sebagai parameter
- Kaedah throttleFirst () mengeluarkan item pertama yang dihasilkan setelah tempoh yang ditentukan sebagai parameter
Tempohnya adalah masa selepas satu elemen tertentu dipilih dari urutan unsur yang dihasilkan. Kami dapat menentukan strategi untuk menangani tekanan belakang dengan melangkau elemen:
PublishSubject source = PublishSubject.create(); source.sample(100, TimeUnit.MILLISECONDS) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace);
We specified that strategy of skipping elements will be a sample() method. We want a sample of a sequence from 100 milliseconds duration. That element will be emitted to the Observer.
Remember, however, that these operators only reduce the rate of value reception by the downstream Observer and thus they may still lead to MissingBackpressureException.
6. Handling a Filling Observable Buffer
In case that our strategies of sampling or batching elements do not help with filling up a buffer, we need to implement a strategy of handling cases when a buffer is filling up.
We need to use an onBackpressureBuffer() method to prevent BufferOverflowException.
The onBackpressureBuffer() method takes three arguments: a capacity of an Observable buffer, a method that is invoked when a buffer is filling up, and a strategy for handling elements that need to be discarded from a buffer. Strategies for overflow are in a BackpressureOverflow class.
There are 4 types of actions that can be executed when the buffer fills up:
- ON_OVERFLOW_ERROR – this is the default behavior signaling a BufferOverflowException when the buffer is full
- ON_OVERFLOW_DEFAULT – currently it is the same as ON_OVERFLOW_ERROR
- ON_OVERFLOW_DROP_LATEST – if an overflow would happen, the current value will be simply ignored and only the old values will be delivered once the downstream Observer requests
- ON_OVERFLOW_DROP_OLDEST – drops the oldest element in the buffer and adds the current value to it
Let's see how to specify that strategy:
Observable.range(1, 1_000_000) .onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) .observeOn(Schedulers.computation()) .subscribe(e -> {}, Throwable::printStackTrace);
Here our strategy for handling the overflowing buffer is dropping the oldest element in a buffer and adding newest item produced by an Observable.
Note that the last two strategies cause a discontinuity in the stream as they drop out elements. In addition, they won't signal BufferOverflowException.
7. Dropping All Overproduced Elements
Whenever the downstream Observer is not ready to receive an element, we can use an onBackpressureDrop() method to drop that element from the sequence.
We can think of that method as an onBackpressureBuffer() method with a capacity of a buffer set to zero with a strategy ON_OVERFLOW_DROP_LATEST.
This operator is useful when we can safely ignore values from a source Observable (such as mouse moves or current GPS location signals) as there will be more up-to-date values later on:
Observable.range(1, 1_000_000) .onBackpressureDrop() .observeOn(Schedulers.computation()) .doOnNext(ComputeFunction::compute) .subscribe(v -> {}, Throwable::printStackTrace);
The method onBackpressureDrop() is eliminating a problem of overproducing Observable but needs to be used with caution.
8. Conclusion
Dalam artikel ini, kami melihat masalah pengeluaran berlebihan yang dapat Diperhatikan dan cara menangani tekanan balik. Kami melihat strategi penyanggaan, penumpahan dan pelepasan elemen apabila Pengamat tidak dapat menggunakan elemen secepat yang dihasilkan oleh Pemerhatian.
Pelaksanaan semua contoh dan coretan kod ini terdapat dalam projek GitHub - ini adalah projek Maven, jadi mudah untuk diimport dan dijalankan sebagaimana adanya.