Pengenalan kepada RxJava

1. Gambaran keseluruhan

Dalam artikel ini, kita akan fokus menggunakan Reactive Extensions (Rx) di Java untuk menyusun dan menggunakan urutan data.

Sepintas lalu, API mungkin mirip dengan Java 8 Streams, tetapi sebenarnya, jauh lebih fleksibel dan lancar, menjadikannya paradigma pengaturcaraan yang kuat.

Sekiranya anda ingin membaca lebih lanjut mengenai RxJava, lihat tulisan ini.

2. Persediaan

Untuk menggunakan RxJava dalam projek Maven kami, kami perlu menambahkan kebergantungan berikut ke pom.xml kami :

 io.reactivex rxjava ${rx.java.version} 

Atau, untuk projek Gradle:

compile 'io.reactivex.rxjava:rxjava:x.y.z'

3. Konsep Reaktif Berfungsi

Di satu sisi, pengaturcaraan fungsional adalah proses membina perisian dengan menyusun fungsi murni, menghindari keadaan bersama, data yang dapat diubah, dan kesan sampingan.

Di sisi lain, pengaturcaraan reaktif adalah paradigma pengaturcaraan asinkron yang berkaitan dengan aliran data dan penyebaran perubahan.

Bersama-sama, pengaturcaraan reaktif fungsional membentuk gabungan teknik fungsional dan reaktif yang dapat mewakili pendekatan elegan untuk pengaturcaraan berdasarkan acara - dengan nilai-nilai yang berubah dari masa ke masa dan di mana pengguna bertindak balas terhadap data ketika masuk.

Teknologi ini menyatukan pelaksanaan yang berbeza dari prinsip utamanya, beberapa pengarang membuat dokumen yang menentukan kosa kata umum untuk menerangkan jenis aplikasi baru.

3.1. Manifesto Reaktif

Manifesto Reaktif adalah dokumen dalam talian yang menetapkan standard tinggi untuk aplikasi dalam industri pengembangan perisian. Ringkasnya, sistem reaktif adalah:

  • Responsif - sistem harus bertindak balas tepat pada masanya
  • Didorong oleh Mesej - sistem harus menggunakan penghantaran mesej tidak segerak antara komponen untuk memastikan gandingan longgar
  • Elastik - sistem harus sentiasa responsif di bawah beban tinggi
  • Berdaya tahan - sistem harus sentiasa responsif apabila beberapa komponen gagal

4. Boleh dilihat

Terdapat dua jenis kunci yang perlu difahami ketika bekerja dengan Rx:

  • Observable mewakili objek yang dapat memperoleh data dari sumber data dan yang keadaannya mungkin menarik dengan cara objek lain dapat mendaftarkan minat
  • An pemerhati adalah apa-apa objek yang ingin diberitahu apabila keadaan lain perubahan objek

Seorang pemerhati melanggan urutan yang Dapat Diperhatikan . Urutan menghantar item kepada pemerhati satu demi satu.

The pemerhati mengendalikan masing-masing sebelum memproses satu depan. Sekiranya banyak acara masuk secara tidak segerak, peristiwa tersebut mesti disimpan dalam barisan atau dijatuhkan.

Dalam Rx , pemerhati tidak akan dipanggil dengan item yang tidak teratur atau dipanggil sebelum panggilan balik telah kembali untuk item sebelumnya.

4.1. Jenis-Jenis yang Dapat Diperhatikan

Terdapat dua jenis:

  • Tidak Menyekat - pelaksanaan tidak segerak disokong dan dibenarkan berhenti berlangganan pada bila-bila masa dalam aliran acara. Pada artikel ini, kita akan menumpukan perhatian pada jenis ini
  • Sekat - semua panggilan pemerhati onNext akan segerak, dan tidak mungkin berhenti berlangganan di tengah-tengah aliran acara. Kita selalu boleh menukar Observable menjadi Blocking Observable , menggunakan kaedah untuk Blocking:
BlockingObservable blockingObservable = observable.toBlocking();

4.2. Pengendali

An pengendali adalah fungsi yang mengambil masa satu O bservable (sumber) sebagai hujah pertama dan mengembalikan lain diperhatikan (destinasi). Kemudian untuk setiap item yang sumbernya dapat dilihat, ia akan menerapkan fungsi pada item tersebut, dan kemudian memancarkan hasilnya di tempat tujuan yang Dapat Diperhatikan .

Pengendali dapat dirantai bersama untuk membuat aliran data kompleks yang menyaring peristiwa berdasarkan kriteria tertentu. Pelbagai pengendali boleh digunakan untuk pemerhatian yang sama .

Tidak sukar untuk masuk ke dalam situasi di mana Observable memancarkan barang lebih cepat daripada yang boleh dimakan oleh operator atau pemerhati . Anda boleh membaca lebih lanjut mengenai tekanan belakang di sini.

4.3. Buat Pemerhatian

Pengendali asas hanya menghasilkan Observable yang memancarkan satu contoh generik sebelum menyelesaikannya, String "Hello". Apabila kami ingin mendapatkan maklumat dari Observable , kami melaksanakan antara muka pemerhati dan kemudian memanggil langganan di Observable yang dikehendaki :

Observable observable = Observable.just("Hello"); observable.subscribe(s -> result = s); assertTrue(result.equals("Hello"));

4.4. OnNext, OnError, dan OnCompleted

Terdapat tiga kaedah pada antara muka pemerhati yang ingin kami ketahui:

  1. OnNext dipanggil kepada pemerhati kami setiap kali acara baru diterbitkan ke Observable terlampir . Ini adalah kaedah di mana kita akan melakukan beberapa tindakan pada setiap acara
  2. OnCompleted dipanggil apabila urutan peristiwa yang berkaitan dengan Observable selesai, menunjukkan bahawa kita tidak seharusnya mengharapkan lagi onNext panggilan seterusnya pada pemerhati kita
  3. OnError dipanggil apabila pengecualian yang tidak ditangani dilemparkan semasa kod rangka RxJava atau kod pengendalian acara kami

Nilai kembali untuk kaedah langganan Observables adalah antara muka langganan :

String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; Observable observable = Observable.from(letters); observable.subscribe( i -> result += i, //OnNext Throwable::printStackTrace, //OnError () -> result += "_Completed" //OnCompleted ); assertTrue(result.equals("abcdefg_Completed"));

5. Pengubah Transformasi dan Pengendali Bersyarat

5.1. Peta

Operator m ap mengubah item yang dikeluarkan oleh Observable dengan menerapkan fungsi pada setiap item.

Mari kita anggap ada sebilangan rentetan yang dinyatakan yang mengandungi beberapa huruf dari abjad dan kami ingin mencetaknya dalam mod huruf besar:

Observable.from(letters) .map(String::toUpperCase) .subscribe(letter -> result += letter); assertTrue(result.equals("ABCDEFG"));

FlatMap dapat digunakan untuk meratakan Observables setiap kali kita berakhir dengan Observable bersarang .

Maklumat lebih lanjut mengenai perbezaan antara peta dan peta datar boleh didapati di sini.

Dengan andaian kita mempunyai kaedah yang mengembalikan Observable dari senarai rentetan. Sekarang kita akan mencetak untuk setiap rentetan dari senarai tajuk yang Dapat Diperhatikan yang baru berdasarkan apa yang dilihat oleh Pelanggan :

Observable getTitle() { return Observable.from(titleList); } Observable.just("book1", "book2") .flatMap(s -> getTitle()) .subscribe(l -> result += l); assertTrue(result.equals("titletitle"));

5.2. Imbas

The scan operator applies a function to each item emitted by an Observable sequentially and emits each successive value.

It allows us to carry forward state from event to event:

String[] letters = {"a", "b", "c"}; Observable.from(letters) .scan(new StringBuilder(), StringBuilder::append) .subscribe(total -> result += total.toString()); assertTrue(result.equals("aababc"));

5.3. GroupBy

Group by operator allows us to classify the events in the input Observable into output categories.

Let's assume that we created an array of integers from 0 to 10, then apply group by that will divide them into the categories even and odd:

Observable.from(numbers) .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD") .subscribe(group -> group.subscribe((number) -> { if (group.getKey().toString().equals("EVEN")) { EVEN[0] += number; } else { ODD[0] += number; } }) ); assertTrue(EVEN[0].equals("0246810")); assertTrue(ODD[0].equals("13579"));

5.4. Filter

The operator filter emits only those items from an observable that pass a predicate test.

So let's filter in an integer array for the odd numbers:

Observable.from(numbers) .filter(i -> (i % 2 == 1)) .subscribe(i -> result += i); assertTrue(result.equals("13579"));

5.5. Conditional Operators

DefaultIfEmpty emits item from the source Observable, or a default item if the source Observable is empty:

Observable.empty() .defaultIfEmpty("Observable is empty") .subscribe(s -> result += s); assertTrue(result.equals("Observable is empty"));

The following code emits the first letter of the alphabet ‘a' because the array letters is not empty and this is what it contains in the first position:

Observable.from(letters) .defaultIfEmpty("Observable is empty") .first() .subscribe(s -> result += s); assertTrue(result.equals("a"));

TakeWhile operator discards items emitted by an Observable after a specified condition becomes false:

Observable.from(numbers) .takeWhile(i -> i  sum[0] += s); assertTrue(sum[0] == 10);

Of course, there more others operators that could cover our needs like Contain, SkipWhile, SkipUntil, TakeUntil, etc.

6. Connectable Observables

A ConnectableObservable resembles an ordinary Observable, except that it doesn't begin emitting items when it is subscribed to, but only when the connect operator is applied to it.

In this way, we can wait for all intended observers to subscribe to the Observable before the Observable begins emitting items:

String[] result = {""}; ConnectableObservable connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); connectable.subscribe(i -> result[0] += i); assertFalse(result[0].equals("01")); connectable.connect(); Thread.sleep(500); assertTrue(result[0].equals("01"));

7. Single

Single is like an Observable who, instead of emitting a series of values, emits one value or an error notification.

With this source of data, we can only use two methods to subscribe:

  • OnSuccess returns a Single that also calls a method we specify
  • OnError also returns a Single that immediately notifies subscribers of an error
String[] result = {""}; Single single = Observable.just("Hello") .toSingle() .doOnSuccess(i -> result[0] += i) .doOnError(error -> { throw new RuntimeException(error.getMessage()); }); single.subscribe(); assertTrue(result[0].equals("Hello"));

8. Subjects

A Subject is simultaneously two elements, a subscriber and an observable. As a subscriber, a subject can be used to publish the events coming from more than one observable.

And because it's also observable, the events from multiple subscribers can be reemitted as its events to anyone observing it.

In the next example, we'll look at how the observers will be able to see the events that occur after they subscribe:

Integer subscriber1 = 0; Integer subscriber2 = 0; Observer getFirstObserver() { return new Observer() { @Override public void onNext(Integer value) { subscriber1 += value; } @Override public void onError(Throwable e) { System.out.println("error"); } @Override public void onCompleted() { System.out.println("Subscriber1 completed"); } }; } Observer getSecondObserver() { return new Observer() { @Override public void onNext(Integer value) { subscriber2 += value; } @Override public void onError(Throwable e) { System.out.println("error"); } @Override public void onCompleted() { System.out.println("Subscriber2 completed"); } }; } PublishSubject subject = PublishSubject.create(); subject.subscribe(getFirstObserver()); subject.onNext(1); subject.onNext(2); subject.onNext(3); subject.subscribe(getSecondObserver()); subject.onNext(4); subject.onCompleted(); assertTrue(subscriber1 + subscriber2 == 14)

9. Resource Management

Using operation allows us to associate resources, such as a JDBC database connection, a network connection, or open files to our observables.

Di sini kami tunjukkan dalam ulasan langkah-langkah yang perlu kita lakukan untuk mencapai tujuan ini dan juga contoh pelaksanaannya:

String[] result = {""}; Observable values = Observable.using( () -> "MyResource", r -> { return Observable.create(o -> { for (Character c : r.toCharArray()) { o.onNext(c); } o.onCompleted(); }); }, r -> System.out.println("Disposed: " + r) ); values.subscribe( v -> result[0] += v, e -> result[0] += e ); assertTrue(result[0].equals("MyResource"));

10. Kesimpulannya

Dalam artikel ini, kami telah membincangkan cara menggunakan perpustakaan RxJava dan juga cara meneroka ciri-ciri terpentingnya.

Kod sumber penuh untuk projek termasuk semua contoh kod yang digunakan di sini boleh didapati di Github.