1. Gambaran keseluruhan
Rangka kerja garpu / bergabung disajikan di Java 7. Ini menyediakan alat untuk membantu mempercepat pemprosesan selari dengan berusaha menggunakan semua inti pemproses yang tersedia - yang dicapai melalui pendekatan pembahagi dan penaklukan .
Dalam praktiknya, ini bermaksud bahawa kerangka pertama "garpu" , secara berturut-turut memecah tugas menjadi subtugas bebas yang lebih kecil sehingga mereka cukup mudah untuk dilaksanakan secara tidak segerak.
Selepas itu, bahagian "gabung" dimulakan , di mana hasil dari semua subtugas digabungkan secara berulang ke dalam satu hasil, atau dalam hal tugas yang kembali terbatal, program hanya menunggu sehingga setiap subtugas dilaksanakan.
Untuk memberikan pelaksanaan selari yang berkesan, kerangka fork / join menggunakan kumpulan utas yang dipanggil ForkJoinPool , yang menguruskan urutan pekerja jenis ForkJoinWorkerThread .
2. ForkJoinPool
The ForkJoinPool adalah hati rangka kerja tersebut. Ini adalah pelaksanaan ExecutorService yang menguruskan utas pekerja dan memberi kami alat untuk mendapatkan maklumat mengenai keadaan dan prestasi kumpulan utas.
Benang pekerja hanya dapat menjalankan satu tugas pada satu masa, tetapi ForkJoinPool tidak membuat utas terpisah untuk setiap subtugas. Sebagai gantinya, setiap utas di kolam renang mempunyai barisan hujung dua sendiri (atau dek, diucapkan dek ) yang menyimpan tugas.
Senibina ini sangat penting untuk mengimbangi beban kerja utas dengan bantuan algoritma pencuri kerja.
2.1. Algoritma Mencuri Kerja
Ringkasnya - benang bebas cuba "mencuri" karya dari barang-barang benang yang sibuk.
Secara lalai, utas pekerja mendapat tugas dari ketua dequeanya sendiri. Apabila kosong, utas mengambil tugas dari ekor deque utas sibuk yang lain atau dari barisan kemasukan global, kerana di sinilah tempat kerja terbesar kemungkinan berada.
Pendekatan ini meminimumkan kemungkinan bahawa benang akan bersaing untuk tugas. Ini juga mengurangkan berapa kali utas perlu mencari kerja, kerana ia berfungsi pada bahagian kerja yang paling banyak tersedia terlebih dahulu.
2.2. Instantiasi ForkJoinPool
Di Java 8, cara paling mudah untuk mendapatkan akses ke contoh ForkJoinPool adalah menggunakan kaedah statik commonPool (). Seperti namanya, ini akan memberikan rujukan kepada kumpulan umum, yang merupakan kumpulan utas lalai untuk setiap ForkJoinTask .
Menurut dokumentasi Oracle, menggunakan kumpulan umum yang telah ditentukan akan mengurangkan penggunaan sumber daya, kerana ini tidak menggalakkan pembuatan kumpulan utas yang terpisah untuk setiap tugas.
ForkJoinPool commonPool = ForkJoinPool.commonPool();
Tingkah laku yang sama dapat dicapai di Java 7 dengan membuat ForkJoinPool dan menetapkannya ke medan statik umum kelas utiliti:
public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);
Sekarang ia dapat diakses dengan mudah:
ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;
Dengan konstruktor ForkJoinPool , adalah mungkin untuk membuat kumpulan benang khusus dengan tahap paralelisme tertentu, kilang benang, dan pengendali pengecualian. Dalam contoh di atas, kolam mempunyai tahap paralelisme 2. Ini bermaksud kolam akan menggunakan 2 teras pemproses.
3. ForkJoinTask
ForkJoinTask adalah jenis asas untuk tugas yang dilaksanakan di dalam ForkJoinPool. Dalam praktiknya, salah satu daripada dua subkelasnya harus diperluas: RecursiveAction untuk tugas yang tidak sah dan RecursiveTask untuk tugas yang mengembalikan nilai.Mereka berdua mempunyai kaedah abstrak menghitung () di mana logik tugas ditentukan.
3.1. RecursiveAction - Contoh
Dalam contoh di bawah, unit kerja yang akan diproses diwakili oleh String yang disebut beban kerja . Untuk tujuan demonstrasi, tugasnya adalah tidak masuk akal: ia hanya mementingkan input dan mencatatnya.
Untuk menunjukkan tingkah laku penempaan kerangka kerja, contohnya membagi tugas jika beban kerja. Panjang () lebih besar daripada ambang yang ditentukanmenggunakan kaedah createSubtask () .
String secara rekursif dibahagikan kepada substring, membuat instance CustomRecursiveTask yang berdasarkan substring ini.
Hasilnya, kaedah mengembalikan Senarai.
Senarai dihantar ke ForkJoinPool menggunakan kaedah invokeAll () :
public class CustomRecursiveAction extends RecursiveAction { private String workload = ""; private static final int THRESHOLD = 4; private static Logger logger = Logger.getAnonymousLogger(); public CustomRecursiveAction(String workload) { this.workload = workload; } @Override protected void compute() { if (workload.length() > THRESHOLD) { ForkJoinTask.invokeAll(createSubtasks()); } else { processing(workload); } } private List createSubtasks() { List subtasks = new ArrayList(); String partOne = workload.substring(0, workload.length() / 2); String partTwo = workload.substring(workload.length() / 2, workload.length()); subtasks.add(new CustomRecursiveAction(partOne)); subtasks.add(new CustomRecursiveAction(partTwo)); return subtasks; } private void processing(String work) { String result = work.toUpperCase(); logger.info("This result - (" + result + ") - was processed by " + Thread.currentThread().getName()); } }
Corak ini boleh digunakan untuk mengembangkan kelas RecursiveAction anda sendiri . Untuk melakukan ini, buat objek yang mewakili jumlah keseluruhan kerja, pilih ambang yang sesuai, tentukan kaedah untuk membahagikan karya, dan tentukan kaedah untuk melakukan kerja.
3.2. Tugasan Rekursif
Untuk tugas yang mengembalikan nilai, logik di sini serupa, kecuali hasil untuk setiap subtugas disatukan dalam satu hasil:
public class CustomRecursiveTask extends RecursiveTask { private int[] arr; private static final int THRESHOLD = 20; public CustomRecursiveTask(int[] arr) { this.arr = arr; } @Override protected Integer compute() { if (arr.length > THRESHOLD) { return ForkJoinTask.invokeAll(createSubtasks()) .stream() .mapToInt(ForkJoinTask::join) .sum(); } else { return processing(arr); } } private Collection createSubtasks() { List dividedTasks = new ArrayList(); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, 0, arr.length / 2))); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, arr.length / 2, arr.length))); return dividedTasks; } private Integer processing(int[] arr) { return Arrays.stream(arr) .filter(a -> a > 10 && a a * 10) .sum(); } }
Dalam contoh ini, karya diwakili oleh array yang disimpan di medan arr kelas CustomRecursiveTask . The createSubtasks () kaedah rekursif membahagikan tugas menjadi lebih kecil kerja sehingga setiap keping adalah lebih kecil daripada ambang . Kemudian, kaedah invokeAll () menyerahkan subtugas ke kumpulan umum dan mengembalikan senarai Masa Depan .
Untuk mencetuskan pelaksanaan, kaedah join () dipanggil untuk setiap subtugas.
Dalam contoh ini, ini dapat dicapai dengan menggunakan API Stream Java 8 ; kaedah sum () digunakan sebagai representasi penggabungan sub hasil ke hasil akhir.
4. Menyerahkan Tugas ke ForkJoinPool
Untuk menyerahkan tugas ke kumpulan utas, beberapa pendekatan dapat digunakan.
Kaedah submit () atau eksekusi () (kes penggunaannya sama):
forkJoinPool.execute(customRecursiveTask); int result = customRecursiveTask.join();
Kaedah invoke () membebankan tugas dan menunggu hasilnya, dan tidak memerlukan penyertaan manual:
int result = forkJoinPool.invoke(customRecursiveTask);
Kaedah invokeAll () adalah kaedah yang paling mudah untuk menghantar urutan ForkJoinTasks ke ForkJoinPool. Ia memerlukan tugas sebagai parameter (dua tugas, var args, atau koleksi), garpu kemudian mengembalikan koleksi objek Masa Depan dalam urutan yang dihasilkan.
Alternatively, you can use separate fork() and join() methods. The fork() method submits a task to a pool, but it doesn't trigger its execution. The join() method must be used for this purpose. In the case of RecursiveAction, the join() returns nothing but null; for RecursiveTask, it returns the result of the task's execution:
customRecursiveTaskFirst.fork(); result = customRecursiveTaskLast.join();
In our RecursiveTask example we used the invokeAll() method to submit a sequence of subtasks to the pool. The same job can be done with fork() and join(), though this has consequences for the ordering of the results.
Untuk mengelakkan kekeliruan, biasanya adalah idea yang baik untuk menggunakan kaedah invokeAll () untuk menyerahkan lebih dari satu tugas ke ForkJoinPool.
5. Kesimpulan
Menggunakan kerangka garpu / gabung dapat mempercepat pemprosesan tugas besar, tetapi untuk mencapai hasil ini, beberapa panduan harus diikuti:
- Gunakan sekumpulan benang sesedikit mungkin - dalam kebanyakan kes, keputusan terbaik adalah menggunakan satu kumpulan utas setiap aplikasi atau sistem
- Gunakan kumpulan utas umum lalai, jika tidak diperlukan penalaan khusus
- Gunakan ambang yang munasabah untuk membahagikan ForkJoinTask menjadi subtugas
- Elakkan penyekat dalam ForkJoinTasks anda
Contoh yang digunakan dalam artikel ini terdapat di repositori GitHub yang dipautkan.