Pengenalan Thread Pools di Jawa

1. Pengenalan

Artikel ini adalah melihat kumpulan thread di Java - dimulai dengan implementasi yang berbeza di perpustakaan Java standard dan kemudian melihat perpustakaan Jambu Google.

2. Kolam Thread

Di Java, utas dipetakan ke utas tingkat sistem yang merupakan sumber sistem operasi. Sekiranya anda membuat utas dengan tidak terkawal, anda mungkin kehabisan sumber ini dengan cepat.

Peralihan konteks antara utas dilakukan oleh sistem operasi juga - untuk meniru paralelisme. Pandangan ringkas adalah bahawa - semakin banyak utas yang anda hasilkan, semakin sedikit masa yang dihabiskan oleh setiap utas untuk melakukan kerja sebenar.

Corak Thread Pool membantu menjimatkan sumber dalam aplikasi multithreaded, dan juga untuk menahan paralelisme dalam had tertentu yang telah ditentukan.

Apabila anda menggunakan kumpulan utas, anda menulis kod serentak anda dalam bentuk tugas selari dan menyerahkannya untuk pelaksanaan ke kumpulan kumpulan utas . Contoh ini mengawal beberapa utas yang digunakan semula untuk melaksanakan tugas-tugas ini.

Corak ini membolehkan anda mengawal jumlah utas yang dibuat aplikasi , kitaran hidupnya, serta menjadwalkan pelaksanaan tugas dan menyimpan tugas masuk dalam barisan.

3. Kolam Benang di Jawa

3.1. Pegawai Pelaksana , Pelaksana dan Pelaksana

The Pelaku kelas penolong mengandungi beberapa kaedah untuk mewujudkan keadaan pra-konfigurasi thread kolam untuk anda. Kelas-kelas tersebut adalah tempat yang baik untuk memulakannya - gunakannya jika anda tidak perlu menggunakan penyesuaian yang khusus.

The Wasi dan ExecutorService antara muka yang digunakan untuk kerja-kerja dengan berbeza pelaksanaan thread kolam di Jawa. Biasanya, anda harus memastikan kod anda dipisahkan dari pelaksanaan sebenar kumpulan utas dan menggunakan antara muka ini sepanjang aplikasi anda.

The Wasi antara muka mempunyai satu melaksanakan kaedah untuk mengemukakan Runnable contoh bagi pelaksanaan.

Berikut adalah contoh ringkas bagaimana anda boleh menggunakan API Pelaksana untuk memperoleh contoh Pelaksana yang disokong oleh kumpulan utas tunggal dan barisan tanpa had untuk melaksanakan tugas secara berurutan. Di sini, kami melaksanakan satu tugas yang hanya mencetak " Hello World " di skrin. Tugas diserahkan sebagai lambda (fitur Java 8) yang disimpulkan sebagai Runnable .

Executor executor = Executors.newSingleThreadExecutor(); executor.execute(() -> System.out.println("Hello World"));

The ExecutorService muka mengandungi sebilangan besar kaedah untuk mengawal kemajuan tugas dan menguruskan penamatan perkhidmatan . Dengan menggunakan antara muka ini, Anda dapat menyerahkan tugas untuk pelaksanaan dan juga mengendalikan pelaksanaannya menggunakan instance Future yang dikembalikan .

Dalam contoh berikut , kami menghasilkan ExecutorService , mengemukakan tugas dan kemudian gunakan kembali Future 's get kaedah untuk menunggu sehingga tugas yang dikemukakan adalah siap dan nilai dikembalikan:

ExecutorService executorService = Executors.newFixedThreadPool(10); Future future = executorService.submit(() -> "Hello World"); // some operations String result = future.get();

Sudah tentu, dalam senario kehidupan sebenar, anda biasanya tidak mahu memanggil masa depan.get () tetapi menangguhkannya sehingga anda benar-benar memerlukan nilai pengiraan.

Yang mengemukakan kaedah terbeban untuk mengambil sama ada Runnable atau Panggil kedua-duanya berada antara muka berfungsi dan boleh diluluskan sebagai lambdas (bermula dengan Java 8).

Kaedah tunggal Runnable tidak membuang pengecualian dan tidak mengembalikan nilai. Antara muka yang Boleh Dipanggil mungkin lebih mudah, kerana ia membolehkan kita membuang pengecualian dan mengembalikan nilai.

Akhirnya - untuk membiarkan penyusun menyimpulkan jenis yang Boleh Dipanggil , hanya mengembalikan nilai dari lambda.

Untuk lebih banyak contoh menggunakan antara muka dan niaga hadapan ExecutorService , lihat "Panduan untuk Java ExecutorService".

3.2. ThreadPoolExecutor

The ThreadPoolExecutor adalah thread extensible pelaksanaan kolam dengan banyak parameter dan cangkuk untuk penalaan halus.

Parameter konfigurasi utama yang akan kita bincangkan di sini adalah: corePoolSize , maksimumPoolSize , dan keepAliveTime .

Kolam ini terdiri daripada sejumlah utas inti yang tetap disimpan di dalam sepanjang masa, dan beberapa utas berlebihan yang mungkin dilahirkan dan kemudian ditamatkan apabila ia tidak diperlukan lagi. The corePoolSize parameter adalah bilangan benang teras yang akan instantiated dan disimpan di kolam renang. Apabila tugas baru masuk, jika semua utas inti sibuk dan barisan dalaman penuh, maka kolam dibiarkan berkembang hingga maksimumPoolSize .

The keepAliveTime parameter adalah selang masa di mana benang yang berlebihan (instantiated melebihi corePoolSize ) dibenarkan wujud di negeri ini terbiar. Secara lalai, ThreadPoolExecutor hanya mempertimbangkan utas bukan teras untuk dikeluarkan. Untuk menerapkan dasar penyingkiran yang sama pada utas inti, kita dapat menggunakan kaedah allowCoreThreadTimeOut (true) .

Parameter ini merangkumi pelbagai kes penggunaan, tetapi konfigurasi yang paling biasa ditentukan dalam kaedah statik Pelaksana .

Sebagai contoh , kaedah newFixedThreadPool membuat ThreadPoolExecutor dengan corePoolSize yang sama dan nilai parameter maksimumPoolSize dan zeroAliveTime zero . Ini bermaksud bahawa bilangan utas dalam kumpulan utas ini selalu sama:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); assertEquals(2, executor.getPoolSize()); assertEquals(1, executor.getQueue().size());

Dalam contoh di atas, kita membuat ThreadPoolExecutor dengan jumlah utas tetap 2. Ini bermaksud bahawa jika jumlah tugas yang dijalankan secara serentak kurang atau sama dengan dua setiap saat, maka mereka akan segera dilaksanakan. Jika tidak, sebilangan tugas ini boleh dimasukkan ke dalam barisan untuk menunggu giliran mereka .

Kami membuat tiga tugas Callable yang meniru kerja berat dengan tidur selama 1000 milisaat. Dua tugas pertama akan dilaksanakan sekaligus, dan yang ketiga harus menunggu dalam barisan. Kami dapat mengesahkannya dengan memanggil kaedah getPoolSize () dan getQueue (). Size () segera setelah menyerahkan tugas.

ThreadPoolExecutor pra-konfigurasi lain boleh dibuat dengan kaedah Executors.newCachedThreadPool () . Kaedah ini sama sekali tidak menerima sebilangan utas. The corePoolSize sebenarnya ditetapkan kepada 0, dan maximumPoolSize ditetapkan kepada Integer.MAX_VALUE misalnya ini. The keepAliveTime adalah 60 saat untuk yang satu ini.

Nilai parameter ini bermaksud bahawa kumpulan utas cache dapat tumbuh tanpa batas untuk menampung sejumlah tugas yang dihantar . Tetapi apabila benang tidak diperlukan lagi, tali akan dilupuskan setelah 60 saat tidak aktif. Kes penggunaan biasa adalah apabila anda mempunyai banyak tugas jangka pendek dalam aplikasi anda.

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); assertEquals(3, executor.getPoolSize()); assertEquals(0, executor.getQueue().size());

Saiz barisan dalam contoh di atas akan selalu sifar kerana secara dalaman contoh SynchronousQueue digunakan. Dalam SynchronousQueue , pasangan operasi memasukkan dan membuang selalu berlaku secara serentak, jadi barisan tidak pernah mengandungi apa-apa.

The Executors.newSingleThreadExecutor() API creates another typical form of ThreadPoolExecutor containing a single thread. The single thread executor is ideal for creating an event loop. The corePoolSize and maximumPoolSize parameters are equal to 1, and the keepAliveTime is zero.

Tasks in the above example will be executed sequentially, so the flag value will be 2 after the task's completion:

AtomicInteger counter = new AtomicInteger(); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> { counter.set(1); }); executor.submit(() -> { counter.compareAndSet(1, 2); });

Additionally, this ThreadPoolExecutor is decorated with an immutable wrapper, so it cannot be reconfigured after creation. Note that also this is the reason we cannot cast it to a ThreadPoolExecutor.

3.3. ScheduledThreadPoolExecutor

The ScheduledThreadPoolExecutor extends the ThreadPoolExecutor class and also implements the ScheduledExecutorService interface with several additional methods:

  • schedule method allows to execute a task once after a specified delay;
  • scheduleAtFixedRate method allows to execute a task after a specified initial delay and then execute it repeatedly with a certain period; the period argument is the time measured between the starting times of the tasks, so the execution rate is fixed;
  • scheduleWithFixedDelay method is similar to scheduleAtFixedRate in that it repeatedly executes the given task, but the specified delay is measured between the end of the previous task and the start of the next; the execution rate may vary depending on the time it takes to execute any given task.

The Executors.newScheduledThreadPool() method is typically used to create a ScheduledThreadPoolExecutor with a given corePoolSize, unbounded maximumPoolSize and zero keepAliveTime. Here's how to schedule a task for execution in 500 milliseconds:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); executor.schedule(() -> { System.out.println("Hello World"); }, 500, TimeUnit.MILLISECONDS);

The following code shows how to execute a task after 500 milliseconds delay and then repeat it every 100 milliseconds. After scheduling the task, we wait until it fires three times using the CountDownLatch lock, then cancel it using the Future.cancel() method.

CountDownLatch lock = new CountDownLatch(3); ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); ScheduledFuture future = executor.scheduleAtFixedRate(() -> { System.out.println("Hello World"); lock.countDown(); }, 500, 100, TimeUnit.MILLISECONDS); lock.await(1000, TimeUnit.MILLISECONDS); future.cancel(true);

3.4. ForkJoinPool

ForkJoinPool is the central part of the fork/join framework introduced in Java 7. It solves a common problem of spawning multiple tasks in recursive algorithms. Using a simple ThreadPoolExecutor, you will run out of threads quickly, as every task or subtask requires its own thread to run.

In a fork/join framework, any task can spawn (fork) a number of subtasks and wait for their completion using the join method. The benefit of the fork/join framework is that it does not create a new thread for each task or subtask, implementing the Work Stealing algorithm instead. This framework is thoroughly described in the article “Guide to the Fork/Join Framework in Java”

Let’s look at a simple example of using ForkJoinPool to traverse a tree of nodes and calculate the sum of all leaf values. Here’s a simple implementation of a tree consisting of a node, an int value and a set of child nodes:

static class TreeNode { int value; Set children; TreeNode(int value, TreeNode... children) { this.value = value; this.children = Sets.newHashSet(children); } }

Now if we want to sum all values in a tree in parallel, we need to implement a RecursiveTask interface. Each task receives its own node and adds its value to the sum of values of its children. To calculate the sum of children values, task implementation does the following:

  • streams the children set,
  • maps over this stream, creating a new CountingTask for each element,
  • executes each subtask by forking it,
  • collects the results by calling the join method on each forked task,
  • sums the results using the Collectors.summingInt collector.
public static class CountingTask extends RecursiveTask { private final TreeNode node; public CountingTask(TreeNode node) { this.node = node; } @Override protected Integer compute() { return node.value + node.children.stream() .map(childNode -> new CountingTask(childNode).fork()) .collect(Collectors.summingInt(ForkJoinTask::join)); } }

The code to run the calculation on an actual tree is very simple:

TreeNode tree = new TreeNode(5, new TreeNode(3), new TreeNode(2, new TreeNode(2), new TreeNode(8))); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); int sum = forkJoinPool.invoke(new CountingTask(tree));

4. Thread Pool's Implementation in Guava

Guava is a popular Google library of utilities. It has many useful concurrency classes, including several handy implementations of ExecutorService. The implementing classes are not accessible for direct instantiation or subclassing, so the only entry point for creating their instances is the MoreExecutors helper class.

4.1. Adding Guava as a Maven Dependency

Add the following dependency to your Maven pom file to include the Guava library to your project. You can find the latest version of Guava library in the Maven Central repository:

 com.google.guava guava 19.0 

4.2. Direct Executor and Direct Executor Service

Sometimes you want to execute the task either in the current thread or in a thread pool, depending on some conditions. You would prefer to use a single Executor interface and just switch the implementation. Although it is not so hard to come up with an implementation of Executor or ExecutorService that executes the tasks in the current thread, it still requires writing some boilerplate code.

Gladly, Guava provides predefined instances for us.

Here's an example that demonstrates the execution of a task in the same thread. Although the provided task sleeps for 500 milliseconds, it blocks the current thread, and the result is available immediately after the execute call is finished:

Executor executor = MoreExecutors.directExecutor(); AtomicBoolean executed = new AtomicBoolean(); executor.execute(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } executed.set(true); }); assertTrue(executed.get());

The instance returned by the directExecutor() method is actually a static singleton, so using this method does not provide any overhead on object creation at all.

You should prefer this method to the MoreExecutors.newDirectExecutorService() because that API creates a full-fledged executor service implementation on every call.

4.3. Exiting Executor Services

Another common problem is shutting down the virtual machine while a thread pool is still running its tasks. Even with a cancellation mechanism in place, there is no guarantee that the tasks will behave nicely and stop their work when the executor service shuts down. This may cause JVM to hang indefinitely while the tasks keep doing their work.

To solve this problem, Guava introduces a family of exiting executor services. They are based on daemon threads that terminate together with the JVM.

These services also add a shutdown hook with the Runtime.getRuntime().addShutdownHook() method and prevent the VM from terminating for a configured amount of time before giving up on hung tasks.

In the following example, we're submitting the task that contains an infinite loop, but we use an exiting executor service with a configured time of 100 milliseconds to wait for the tasks upon VM termination. Without the exitingExecutorService in place, this task would cause the VM to hang indefinitely:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); ExecutorService executorService = MoreExecutors.getExitingExecutorService(executor, 100, TimeUnit.MILLISECONDS); executorService.submit(() -> { while (true) { } });

4.4. Listening Decorators

Listening decorators allow you to wrap the ExecutorService and receive ListenableFuture instances upon task submission instead of simple Future instances. The ListenableFuture interface extends Future and has a single additional method addListener. This method allows adding a listener that is called upon future completion.

Anda mungkin jarang mahu menggunakan kaedah ListenableFuture.addListener () secara langsung, tetapi sangat penting bagi kebanyakan kaedah pembantu di kelas utiliti Futures . Contohnya, dengan kaedah Futures.allAsList () anda boleh menggabungkan beberapa contoh ListenableFuture dalam satu ListenableFuture tunggal yang selesai setelah berjaya menyelesaikan semua gabungan niaga hadapan:

ExecutorService executorService = Executors.newCachedThreadPool(); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService); ListenableFuture future1 = listeningExecutorService.submit(() -> "Hello"); ListenableFuture future2 = listeningExecutorService.submit(() -> "World"); String greeting = Futures.allAsList(future1, future2).get() .stream() .collect(Collectors.joining(" ")); assertEquals("Hello World", greeting);

5. Kesimpulan

Dalam artikel ini, kami telah membincangkan pola Thread Pool dan pelaksanaannya di perpustakaan Java standard dan di perpustakaan Jambu Google.

Kod sumber untuk artikel tersebut terdapat di GitHub.