Penjadual di RxJava

1. Gambaran keseluruhan

Dalam artikel ini, kami akan memberi tumpuan kepada pelbagai jenis Penjadualan yang akan kami gunakan dalam menulis program multithreading berdasarkan kaedah subscribeOn dan observOn RxJava Observable .

Penjadual memberi peluang untuk menentukan di mana dan kemungkinan kapan untuk melaksanakan tugas-tugas yang berkaitan dengan operasi rantai yang Dapat Diperhatikan .

Kita boleh mendapatkan Penjadual dari kaedah kilang yang dijelaskan dalam Penjadual kelas .

2. Kelakuan Threading Lalai

Secara lalai, Rx adalah satu-utas yang menunjukkan bahawa suatu yang boleh diamati dan rangkaian pengendali yang dapat kita terapkan padanya akan memberitahu para pengamatnya pada utas yang sama di mana kaedah berlangganan () dipanggil.

Kaedah observOn dan subscribeOn mengambil sebagai argumen Penjadual, yang, seperti namanya, adalah alat yang dapat kita gunakan untuk menjadwalkan tindakan individu.

Kami akan membuat pelaksanaan Penjadual kami dengan menggunakan kaedah buat Pekerja , yang mengembalikan Penjadual. Seorang pekerja menerima tindakan dan melaksanakannya secara berurutan pada satu utas.

Dengan cara tertentu, pekerja adalah cheduler S itu sendiri, tetapi kita tidak akan menyebutnya sebagai Penjadual untuk mengelakkan kekeliruan.

2.1. Menjadualkan Tindakan

Kita boleh menjadualkan kerja di mana-mana Scheduler dengan membuat pekerja dan penjadualan beberapa tindakan:

Scheduler scheduler = Schedulers.immediate(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> result += "action"); Assert.assertTrue(result.equals("action"));

Tindakan itu kemudian diatur pada utas yang ditugaskan oleh pekerja.

2.2. Membatalkan Tindakan

Penjadual.Worker meluaskan Langganan . Memanggil kaedah berhenti berlangganan pekerja akan menyebabkan barisan dikosongkan dan semua tugas yang belum selesai dibatalkan. Kita dapat melihatnya dengan contoh:

Scheduler scheduler = Schedulers.newThread(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += "First_Action"; worker.unsubscribe(); }); worker.schedule(() -> result += "Second_Action"); Assert.assertTrue(result.equals("First_Action"));

Tugas kedua tidak pernah dilaksanakan kerana yang sebelumnya membatalkan keseluruhan operasi. Tindakan yang dalam proses dilaksanakan akan terganggu.

3. Penjadual.newThread

Penjadual ini hanya memulakan utas baru setiap kali diminta melalui subscribeOn () atau observOn () .

Hampir tidak pernah menjadi pilihan yang baik, bukan hanya kerana kependaman semasa memulakan utas tetapi juga kerana utas ini tidak digunakan semula:

Observable.just("Hello") .observeOn(Schedulers.newThread()) .doOnNext(s -> result2 += Thread.currentThread().getName() ) .observeOn(Schedulers.newThread()) .subscribe(s -> result1 += Thread.currentThread().getName() ); Thread.sleep(500); Assert.assertTrue(result1.equals("RxNewThreadScheduler-1")); Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));

Apabila Pekerja selesai, utasnya berhenti. Ini Scheduler boleh digunakan hanya apabila tugas-tugas yang kasar secara terperinci: ia mengambil banyak masa untuk selesai, tetapi terdapat sangat sedikit daripada mereka supaya benang tidak mungkin untuk digunakan semula pada semua.

Scheduler scheduler = Schedulers.newThread(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "_Start"; worker.schedule(() -> result += "_worker_"); result += "_End"; }); Thread.sleep(3000); Assert.assertTrue(result.equals( "RxNewThreadScheduler-1_Start_End_worker_"));

Semasa kami menjadualkan pekerja pada NewThreadScheduler, kami melihat bahawa pekerja itu terikat pada utas tertentu.

4. Penjadual . Sekejap

Schedulers.immediate adalah penjadual khas yang meminta tugas dalam urutan klien dengan cara menyekat, dan bukannya secara asinkron dan kembali apabila tindakan selesai:

Scheduler scheduler = Schedulers.immediate(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "_Start"; worker.schedule(() -> result += "_worker_"); result += "_End"; }); Thread.sleep(500); Assert.assertTrue(result.equals( "main_Start_worker__End"));

Sebenarnya, melanggan Observable melalui Penjadual segera biasanya mempunyai kesan yang sama seperti tidak berlangganan dengan cheduler S tertentu :

Observable.just("Hello") .subscribeOn(Schedulers.immediate()) .subscribe(s -> result += Thread.currentThread().getName() ); Thread.sleep(500); Assert.assertTrue(result.equals("main"));

5. Penjadual.trampolin

The trampolin Scheduler adalah sangat serupa dengan serta-merta kerana ia juga jadual tugas dalam thread yang sama, berkesan menyekat.

Walau bagaimanapun, tugas yang akan datang dilaksanakan apabila semua tugas yang dijadualkan sebelumnya selesai:

Observable.just(2, 4, 6, 8) .subscribeOn(Schedulers.trampoline()) .subscribe(i -> result += "" + i); Observable.just(1, 3, 5, 7, 9) .subscribeOn(Schedulers.trampoline()) .subscribe(i -> result += "" + i); Thread.sleep(500); Assert.assertTrue(result.equals("246813579"));

Segera memanggil tugas yang diberikan dengan segera, sementara trampolin menunggu tugas yang sekarang selesai.

The trampolin 's pekerja melaksanakan setiap tugas kepada benang yang dijadualkan tugas pertama. Panggilan pertama yang dijadualkan disekat sehingga barisan dikosongkan:

Scheduler scheduler = Schedulers.trampoline(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "Start"; worker.schedule(() -> { result += "_middleStart"; worker.schedule(() -> result += "_worker_" ); result += "_middleEnd"; }); result += "_mainEnd"; }); Thread.sleep(500); Assert.assertTrue(result .equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Penjadual. Dari

Penjadual secara dalaman lebih rumit daripada pelaksana dari java.util.concurrent - jadi diperlukan pengabstrakan berasingan.

Tetapi kerana konsepnya sangat mirip, tidak mengejutkan ada pembungkus yang dapat mengubah Executor menjadi Penjadual menggunakan kaedah dari kilang:

private ThreadFactory threadFactory(String pattern) { return new ThreadFactoryBuilder() .setNameFormat(pattern) .build(); } @Test public void givenExecutors_whenSchedulerFrom_thenReturnElements() throws InterruptedException { ExecutorService poolA = newFixedThreadPool( 10, threadFactory("Sched-A-%d")); Scheduler schedulerA = Schedulers.from(poolA); ExecutorService poolB = newFixedThreadPool( 10, threadFactory("Sched-B-%d")); Scheduler schedulerB = Schedulers.from(poolB); Observable observable = Observable.create(subscriber -> { subscriber.onNext("Alfa"); subscriber.onNext("Beta"); subscriber.onCompleted(); });; observable .subscribeOn(schedulerA) .subscribeOn(schedulerB) .subscribe( x -> result += Thread.currentThread().getName() + x + "_", Throwable::printStackTrace, () -> result += "_Completed" ); Thread.sleep(2000); Assert.assertTrue(result.equals( "Sched-A-0Alfa_Sched-A-0Beta__Completed")); }

SchedulerB digunakan untuk jangka waktu yang pendek, tetapi hampir tidak menjadwalkan tindakan baru pada schedulerA , yang melakukan semua pekerjaan. Oleh itu, beberapa kaedah subscribeOn tidak hanya diabaikan, tetapi juga memperkenalkan sedikit overhead.

7. Penjadual.io

This Scheduler is similar to the newThread except for the fact that already started threads are recycled and can possibly handle future requests.

This implementation works similarly to ThreadPoolExecutor from java.util.concurrent with an unbounded pool of threads. Every time a new worker is requested, either a new thread is started (and later kept idle for some time) or the idle one is reused:

Observable.just("io") .subscribeOn(Schedulers.io()) .subscribe(i -> result += Thread.currentThread().getName()); Assert.assertTrue(result.equals("RxIoScheduler-2"));

We need to be careful with unbounded resources of any kind – in case of slow or unresponsive external dependencies like web services, ioscheduler might start an enormous number of threads, leading to our very own application becoming unresponsive.

In practice, following Schedulers.io is almost always a better choice.

8. Schedulers.computation

Computation Scheduler by default limits the number of threads running in parallel to the value of availableProcessors(), as found in the Runtime.getRuntime() utility class.

So we should use a computation scheduler when tasks are entirely CPU-bound; that is, they require computational power and have no blocking code.

It uses an unbounded queue in front of every thread, so if the task is scheduled, but all cores are occupied, it will be queued. However, the queue just before each thread will keep growing:

Observable.just("computation") .subscribeOn(Schedulers.computation()) .subscribe(i -> result += Thread.currentThread().getName()); Assert.assertTrue(result.equals("RxComputationScheduler-1"));

If for some reason, we need a different number of threads than the default, we can always use the rx.scheduler.max-computation-threads system property.

By taking fewer threads we can ensure that there is always one or more CPU cores idle, and even under heavy load, computation thread pool does not saturate the server. It's simply not possible to have more computation threads than cores.

9. Schedulers.test

This Scheduler is used only for testing purposes, and we'll never see it in production code. Its main advantage is the ability to advance the clock, simulating time passing by arbitrarily:

List letters = Arrays.asList("A", "B", "C"); TestScheduler scheduler = Schedulers.test(); TestSubscriber subscriber = new TestSubscriber(); Observable tick = Observable .interval(1, TimeUnit.SECONDS, scheduler); Observable.from(letters) .zipWith(tick, (string, index) -> index + "-" + string) .subscribeOn(scheduler) .subscribe(subscriber); subscriber.assertNoValues(); subscriber.assertNotCompleted(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); subscriber.assertNoErrors(); subscriber.assertValueCount(1); subscriber.assertValues("0-A"); scheduler.advanceTimeTo(3, TimeUnit.SECONDS); subscriber.assertCompleted(); subscriber.assertNoErrors(); subscriber.assertValueCount(3); assertThat( subscriber.getOnNextEvents(), hasItems("0-A", "1-B", "2-C"));

10. Default Schedulers

Some Observable operators in RxJava have alternate forms that allow us to set which Scheduler the operator will use for its operation. Others don't operate on any particular Scheduler or operate on a particular default Scheduler.

For example, the delay operator takes upstream events and pushes them downstream after a given time. Obviously, it cannot hold the original thread during that period, so it must use a different Scheduler:

ExecutorService poolA = newFixedThreadPool( 10, threadFactory("Sched1-")); Scheduler schedulerA = Schedulers.from(poolA); Observable.just('A', 'B') .delay(1, TimeUnit.SECONDS, schedulerA) .subscribe(i -> result+= Thread.currentThread().getName() + i + " "); Thread.sleep(2000); Assert.assertTrue(result.equals("Sched1-A Sched1-B "));

Without supplying a custom schedulerA, all operators below delay would use the computation Scheduler.

Other important operators that support custom Schedulers are buffer, interval, range, timer, skip, take, timeout, and several others. If we don't provide a Scheduler to such operators, computation scheduler is utilized, which is a safe default in most cases.

11. Conclusion

In truly reactive applications, for which all long-running operations are asynchronous, very few threads and thus Schedulers are needed.

Menguasai penjadual sangat penting untuk menulis kod yang berskala dan selamat menggunakan RxJava. Perbezaan antara subscribeOn dan observOn sangat penting di bawah beban tinggi di mana setiap tugas mesti dilaksanakan tepat ketika kita harapkan.

Akhir sekali, kita perlu memastikan bahawa penjadual digunakan hiliran boleh bersaing dengan iklan lo yang dihasilkan oleh penjadual upstrea m. Untuk maklumat lebih lanjut, terdapat artikel ini mengenai tekanan balik.

Pelaksanaan semua contoh dan coretan kod ini terdapat dalam projek GitHub - ini adalah projek Maven, jadi mudah untuk diimport dan dijalankan sebagaimana adanya.