1. Pengenalan
Tutorial ini adalah panduan untuk fungsi dan kes penggunaan kelas CompletableFuture yang diperkenalkan sebagai peningkatan Java 8 Concurrency API.
2. Pengiraan Asinkron di Jawa
Pengiraan tak segerak sukar difikirkan. Biasanya kita ingin memikirkan pengiraan apa pun sebagai rangkaian langkah, tetapi dalam kes pengiraan tidak segerak, tindakan yang ditunjukkan sebagai panggilan balik cenderung tersebar di seluruh kod atau bersarang dalam satu sama lain . Segala-galanya menjadi lebih teruk apabila kita perlu menangani kesilapan yang mungkin berlaku semasa salah satu langkah.
The Future antara muka telah ditambah di Jawa 5 untuk berkhidmat sebagai hasil daripada pengiraan tak segerak, tetapi ia tidak mempunyai apa-apa kaedah untuk menggabungkan pengiraan ini atau mengendalikan kesilapan yang mungkin.
Java 8 memperkenalkan kelas CompletableFuture . Bersama dengan antara muka Masa Depan , ia juga menerapkan antara muka PenyelesaianStage . Antaramuka ini menentukan kontrak untuk langkah pengiraan tak segerak yang boleh kita gabungkan dengan langkah lain.
CompletableFuture pada masa yang sama adalah blok bangunan dan kerangka kerja, dengan kira - kira 50 kaedah berbeza untuk menyusun, menggabungkan, dan melaksanakan langkah-langkah pengiraan tidak segerak dan menangani kesalahan .
API sebegitu besar boleh menjadi luar biasa, tetapi kebanyakannya terdapat dalam beberapa kes penggunaan yang jelas dan berbeza.
3. Menggunakan CompletableFuture sebagai Masa Depan yang Sederhana
Pertama sekali, kelas CompletableFuture mengimplementasikan antara muka Masa Depan , jadi kami dapat menggunakannya sebagai implementasi Masa Depan , tetapi dengan logik penyelesaian tambahan .
Sebagai contoh, kita dapat membuat contoh kelas ini dengan konstruktor tanpa arg untuk mewakili beberapa hasil masa depan, menyerahkannya kepada pengguna, dan menyelesaikannya pada masa akan datang menggunakan kaedah lengkap . Pengguna boleh menggunakan kaedah get untuk menyekat utas semasa sehingga hasil ini diberikan.
Dalam contoh di bawah, kami mempunyai kaedah yang membuat instance CompletableFuture , kemudian memutar beberapa perhitungan di thread lain dan mengembalikan Masa Depan dengan segera.
Apabila pengiraan selesai, kaedah melengkapkan Masa Depan dengan memberikan hasilnya ke metode lengkap :
public Future calculateAsync() throws InterruptedException { CompletableFuture completableFuture = new CompletableFuture(); Executors.newCachedThreadPool().submit(() -> { Thread.sleep(500); completableFuture.complete("Hello"); return null; }); return completableFuture; }
Untuk melepaskan pengiraan, kami menggunakan API Pelaksana . Kaedah membuat dan menyelesaikan CompletableFuture ini dapat digunakan bersama dengan mekanisme atau API serentak apa pun, termasuk benang mentah.
Perhatikan bahawa yang calculateAsync Cara mengembalikan Future contoh .
Kami hanya memanggil kaedah, menerima contoh Masa Depan , dan memanggil kaedah get padanya ketika kami siap untuk memblokir hasilnya.
Perhatikan juga bahawa kaedah get melemparkan beberapa pengecualian yang diperiksa, yaitu ExecutionException (merangkumi pengecualian yang berlaku semasa pengiraan) dan InterruptException (pengecualian yang menunjukkan bahawa utas yang menjalankan kaedah terganggu):
Future completableFuture = calculateAsync(); // ... String result = completableFuture.get(); assertEquals("Hello", result);
Jika kita sudah tahu hasil daripada pengiraan yang , kita boleh menggunakan statik completedFuture kaedah dengan hujah yang mewakili hasil pengiraan ini. Akibatnya, kaedah get the Future tidak akan pernah menyekat, dengan segera mengembalikan hasil ini sebagai gantinya:
Future completableFuture = CompletableFuture.completedFuture("Hello"); // ... String result = completableFuture.get(); assertEquals("Hello", result);
Sebagai senario alternatif, kami mungkin ingin membatalkan pelaksanaan Masa Depan .
4. CompletableFuture dengan Encapsulated Pengiraan Logik
Kod di atas membolehkan kita memilih mekanisme pelaksanaan serentak, tetapi bagaimana jika kita mahu melangkau plat boiler ini dan hanya melaksanakan beberapa kod secara tidak segerak?
Kaedah statik runAsync dan supplyAsync membolehkan kita membuat instance CompletableFuture daripada jenis fungsional Runnable dan Supplier .
Kedua-dua Runnable dan Pembekal adalah antara muka fungsional yang membolehkan melewati contohnya sebagai ungkapan lambda berkat ciri Java 8 yang baru.
Antara muka Runnable adalah antara muka lama yang sama yang digunakan dalam utas dan tidak membenarkan mengembalikan nilai.
The Supplier antara muka antara muka fungsi generik dengan kaedah tunggal yang tidak mempunyai hujah-hujah dan mengembalikan nilai sesuatu jenis parameterized.
Ini membolehkan kami memberikan contoh Pembekal sebagai ungkapan lambda yang melakukan pengiraan dan mengembalikan hasilnya . Ia semudah:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello"); // ... assertEquals("Hello", future.get());
5. Memproses Hasil Pengiraan Asinkron
Cara yang paling umum untuk memproses hasil pengiraan adalah dengan memberi fungsi. The thenApply kaedah tidak tepat; ia menerima contoh Fungsi , menggunakannya untuk memproses hasilnya, dan mengembalikan Masa Depan yang menyimpan nilai yang dikembalikan oleh fungsi:
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApply(s -> s + " World"); assertEquals("Hello World", future.get());
Sekiranya kita tidak perlu mengembalikan nilai ke rantai Masa Depan , kita dapat menggunakan contoh antara muka fungsi Pengguna . Kaedah tunggal mengambil parameter dan mengembalikan kekosongan .
Terdapat kaedah untuk kes penggunaan ini di CompletableFuture. The thenAccept kaedah menerima Pengguna dan pas ia hasil daripada pengiraan. Kemudian panggilan final.get () terakhir mengembalikan contoh jenis Void :
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenAccept(s -> System.out.println("Computation returned: " + s)); future.get();
Akhirnya, jika kita tidak memerlukan nilai pengiraan, atau tidak mengembalikan beberapa nilai di hujung rantai, maka kita dapat meneruskan lambda Runnable ke kaedah thenRun . Dalam contoh berikut, kami hanya mencetak garis di konsol setelah memanggil masa depan.get ():
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenRun(() -> System.out.println("Computation finished.")); future.get();
6. Menggabungkan Niaga Hadapan
The best part of the CompletableFuture API is the ability to combine CompletableFuture instances in a chain of computation steps.
The result of this chaining is itself a CompletableFuture that allows further chaining and combining. This approach is ubiquitous in functional languages and is often referred to as a monadic design pattern.
In the following example we use the thenCompose method to chain two Futures sequentially.
Notice that this method takes a function that returns a CompletableFuture instance. The argument of this function is the result of the previous computation step. This allows us to use this value inside the next CompletableFuture‘s lambda:
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); assertEquals("Hello World", completableFuture.get());
The thenCompose method, together with thenApply, implement basic building blocks of the monadic pattern. They closely relate to the map and flatMap methods of Stream and Optional classes also available in Java 8.
Both methods receive a function and apply it to the computation result, but the thenCompose (flatMap) method receives a function that returns another object of the same type. This functional structure allows composing the instances of these classes as building blocks.
If we want to execute two independent Futures and do something with their results, we can use the thenCombine method that accepts a Future and a Function with two arguments to process both results:
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCombine(CompletableFuture.supplyAsync( () -> " World"), (s1, s2) -> s1 + s2)); assertEquals("Hello World", completableFuture.get());
A simpler case is when we want to do something with two Futures‘ results, but don't need to pass any resulting value down a Future chain. The thenAcceptBoth method is there to help:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> System.out.println(s1 + s2));
7. Difference Between thenApply() and thenCompose()
In our previous sections, we've shown examples regarding thenApply() and thenCompose(). Both APIs help chain different CompletableFuture calls, but the usage of these 2 functions is different.
7.1. thenApply()
We can use this method to work with a result of the previous call. However, a key point to remember is that the return type will be combined of all calls.
So this method is useful when we want to transform the result of a CompletableFuture call:
CompletableFuture finalResult = compute().thenApply(s-> s + 1);
7.2. thenCompose()
The thenCompose() method is similar to thenApply() in that both return a new Completion Stage. However, thenCompose() uses the previous stage as the argument. It will flatten and return a Future with the result directly, rather than a nested future as we observed in thenApply():
CompletableFuture computeAnother(Integer i){ return CompletableFuture.supplyAsync(() -> 10 + i); } CompletableFuture finalResult = compute().thenCompose(this::computeAnother);
So if the idea is to chain CompletableFuture methods then it’s better to use thenCompose().
Also, note that the difference between these two methods is analogous to the difference between map() and flatMap().
8. Running Multiple Futures in Parallel
When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results.
The CompletableFuture.allOf static method allows to wait for completion of all of the Futures provided as a var-arg:
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture combinedFuture = CompletableFuture.allOf(future1, future2, future3); // ... combinedFuture.get(); assertTrue(future1.isDone()); assertTrue(future2.isDone()); assertTrue(future3.isDone());
Notice that the return type of the CompletableFuture.allOf() is a CompletableFuture. The limitation of this method is that it does not return the combined results of all Futures. Instead, we have to manually get results from Futures. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:
String combined = Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" ")); assertEquals("Hello Beautiful World", combined);
The CompletableFuture.join() method is similar to the get method, but it throws an unchecked exception in case the Future does not complete normally. This makes it possible to use it as a method reference in the Stream.map() method.
9. Handling Errors
For error handling in a chain of asynchronous computation steps, we have to adapt the throw/catch idiom in a similar fashion.
Instead of catching an exception in a syntactic block, the CompletableFuture class allows us to handle it in a special handle method. This method receives two parameters: a result of a computation (if it finished successfully), and the exception thrown (if some computation step did not complete normally).
In the following example, we use the handle method to provide a default value when the asynchronous computation of a greeting was finished with an error because no name was provided:
String name = null; // ... CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { if (name == null) { throw new RuntimeException("Computation error!"); } return "Hello, " + name; })}).handle((s, t) -> s != null ? s : "Hello, Stranger!"); assertEquals("Hello, Stranger!", completableFuture.get());
As an alternative scenario, suppose we want to manually complete the Future with a value, as in the first example, but also have the ability to complete it with an exception. The completeExceptionally method is intended for just that. The completableFuture.get() method in the following example throws an ExecutionException with a RuntimeException as its cause:
CompletableFuture completableFuture = new CompletableFuture(); // ... completableFuture.completeExceptionally( new RuntimeException("Calculation failed!")); // ... completableFuture.get(); // ExecutionException
In the example above, we could have handled the exception with the handle method asynchronously, but with the get method we can use the more typical approach of a synchronous exception processing.
10. Async Methods
Most methods of the fluent API in CompletableFuture class have two additional variants with the Async postfix. These methods are usually intended for running a corresponding step of execution in another thread.
The methods without the Async postfix run the next execution stage using a calling thread. In contrast, the Async method without the Executor argument runs a step using the common fork/join pool implementation of Executor that is accessed with the ForkJoinPool.commonPool() method. Finally, the Async method with an Executor argument runs a step using the passed Executor.
Here's a modified example that processes the result of a computation with a Function instance. The only visible difference is the thenApplyAsync method, but under the hood the application of a function is wrapped into a ForkJoinTask instance (for more information on the fork/join framework, see the article “Guide to the Fork/Join Framework in Java”). This allows us to parallelize our computation even more and use system resources more efficiently:
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApplyAsync(s -> s + " World"); assertEquals("Hello World", future.get());
11. JDK 9 CompletableFuture API
Java 9 enhances the CompletableFuture API with the following changes:
- New factory methods added
- Support for delays and timeouts
- Improved support for subclassing
and new instance APIs:
- Executor defaultExecutor()
- CompletableFuture newIncompleteFuture()
- CompletableFuture copy()
- CompletionStage minimalCompletionStage()
- CompletableFuture completeAsync(Supplier supplier, Executor executor)
- CompletableFuture completeAsync(Supplier supplier)
- CompletableFuture orTimeout(long timeout, TimeUnit unit)
- CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit)
We also now have a few static utility methods:
- Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
- Executor delayedExecutor(long delay, TimeUnit unit)
- CompletionStage completedStage(U value)
- CompletionStage failedStage(Throwable ex)
- CompletableFuture failedFuture(Throwable ex)
Finally, to address timeout, Java 9 has introduced two more new functions:
- orTimeout()
- completeOnTimeout()
Here's the detailed article for further reading: Java 9 CompletableFuture API Improvements.
12. Conclusion
Dalam artikel ini, kami telah menerangkan kaedah dan kes penggunaan khas kelas CompletableFuture .
Kod sumber untuk artikel tersebut terdapat di GitHub.