1. Gambaran keseluruhan
Dalam artikel ini, kita akan melihat salah satu konstruksi java.util.concurrent yang paling berguna untuk menyelesaikan masalah pengeluar-pengguna serentak. Kami akan melihat API antara muka BlockingQueue dan bagaimana kaedah dari antara muka itu menjadikan penulisan program serentak lebih mudah.
Kemudian dalam artikel, kami akan menunjukkan contoh program sederhana yang mempunyai banyak utas pengeluar dan banyak utas pengguna.
2. Jenis BlockingQueue
Kita dapat membezakan dua jenis BlockingQueue :
- barisan tanpa had - boleh berkembang hampir selama-lamanya
- barisan berhad - dengan kapasiti maksimum ditentukan
2.1. Baris Tidak Terikat
Membuat giliran tanpa had adalah mudah:
BlockingQueue blockingQueue = new LinkedBlockingDeque();
Kapasiti blockingQueue akan ditetapkan ke Integer.MAX_VALUE. Semua operasi yang menambahkan elemen ke dalam barisan tanpa batas tidak akan dapat disekat, sehingga dapat berkembang menjadi ukuran yang sangat besar.
Perkara yang paling penting semasa merancang program pengeluar-pengguna menggunakan BlockingQueue tanpa had adalah pengguna harus dapat menggunakan mesej secepat pengeluar menambahkan mesej ke barisan. Jika tidak, memori boleh diisi dan kami akan mendapat pengecualian OutOfMemory .
2.2. Beratur Terikat
Jenis giliran kedua adalah barisan yang dibatasi. Kita boleh membuat barisan seperti itu dengan menyampaikan kapasiti sebagai hujah kepada pembina:
BlockingQueue blockingQueue = new LinkedBlockingDeque(10);
Di sini kita mempunyai blockingQueue yang memiliki kapasitas sama dengan 10. Ini bermaksud apabila pengeluar berusaha menambahkan elemen ke dalam antrian yang sudah penuh, bergantung pada kaedah yang digunakan untuk menambahkannya ( offer () , add () atau put () ), ia akan menyekat sehingga ruang untuk memasukkan objek menjadi tersedia. Jika tidak, operasi akan gagal.
Menggunakan antrean terikat adalah cara yang baik untuk merancang program serentak kerana ketika kita memasukkan elemen ke dalam antrian yang sudah penuh, operasi itu harus menunggu hingga pengguna mengejar dan menyediakan sedikit ruang dalam barisan. Ini memberi kita pendikit tanpa usaha dari pihak kita.
3. API BlockingQueue
Terdapat dua jenis kaedah dalam antara muka BlockingQueue - kaedah yang bertanggungjawab untuk menambahkan elemen ke dalam barisan dan kaedah yang mengambil elemen tersebut. Setiap kaedah dari kedua kumpulan tersebut berkelakuan berbeza sekiranya barisan penuh / kosong.
3.1. Menambah Elemen
- tambah () - kembali benar jika penyisipan berjaya, jika tidak, lemparkan IllegalStateException
- put () - memasukkan elemen yang ditentukan ke dalam barisan, menunggu slot percuma jika perlu
- offer () - mengembalikan true jika penyisipan berjaya, jika tidak palsu
- tawaran (E, jangka masa panjang, unit TimeUnit) - cuba memasukkan elemen ke dalam barisan dan menunggu slot yang tersedia dalam jangka masa yang ditentukan
3.2. Mengambil Elemen
- take () - menunggu elemen kepala beratur dan menghapusnya. Sekiranya barisan kosong, ia akan menyekat dan menunggu elemen tersedia
- poll (timeout yang panjang, unit TimeUnit) - mengambil dan mengeluarkan kepala barisan, menunggu masa menunggu yang ditentukan jika perlu untuk elemen tersedia. Mengembalikan nol setelah tamat masa
Kaedah ini adalah blok bangunan terpenting dari antara muka BlockingQueue ketika membina program pengeluar-pengguna.
4. Contoh Pengeluar-Pengguna Multithreaded
Mari buat program yang terdiri daripada dua bahagian - Pengeluar dan Pengguna.
Pengeluar akan menghasilkan nombor rawak dari 0 hingga 100 dan akan memasukkan nombor itu dalam BlockingQueue . Kami akan mempunyai 4 utas pengeluar dan menggunakan kaedah put () untuk menyekat sehingga ada ruang yang tersedia dalam barisan.
Perkara penting yang perlu diingat ialah kita perlu menghentikan urutan pengguna daripada menunggu elemen muncul dalam barisan tanpa had.
Teknik yang baik untuk memberi isyarat dari pengeluar kepada pengguna bahawa tidak ada lagi pesan untuk diproses adalah dengan menghantar mesej khas yang disebut pil racun. Kita perlu menghantar sebilangan pil racun yang kita ada pada pengguna. Kemudian apabila pengguna mengambil pesanan pil racun khas itu dari barisan, pengguna akan menyelesaikan pelaksanaannya dengan anggun.
Mari lihat kelas pengeluar:
public class NumbersProducer implements Runnable { private BlockingQueue numbersQueue; private final int poisonPill; private final int poisonPillPerProducer; public NumbersProducer(BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) { this.numbersQueue = numbersQueue; this.poisonPill = poisonPill; this.poisonPillPerProducer = poisonPillPerProducer; } public void run() { try { generateNumbers(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void generateNumbers() throws InterruptedException { for (int i = 0; i < 100; i++) { numbersQueue.put(ThreadLocalRandom.current().nextInt(100)); } for (int j = 0; j < poisonPillPerProducer; j++) { numbersQueue.put(poisonPill); } } }
Pembuat pengeluar kami mengambil hujah BlockingQueue yang digunakan untuk menyelaraskan pemprosesan antara pengeluar dan pengguna. Kami melihat kaedah menghasilkanNumbers () akan meletakkan 100 elemen dalam barisan. Diperlukan juga pesan pil racun, untuk mengetahui jenis pesan apa yang harus dimasukkan ke dalam antrian ketika pelaksanaannya akan selesai. Mesej itu perlu meletakkan kali poisonPillPerProducer ke dalam barisan.
Setiap pengguna akan mengambil elemen dari kaedah BlockingQueue menggunakan kaedah take () sehingga akan menyekat sehingga ada elemen dalam barisan. Setelah mengambil Integer dari barisan, ia memeriksa apakah mesej itu adalah pil racun, jika ya maka pelaksanaan utas selesai. Jika tidak, ia akan mencetak hasilnya pada output standard bersama dengan nama utas semasa.
Ini akan memberi kita gambaran mengenai cara kerja pengguna:
public class NumbersConsumer implements Runnable { private BlockingQueue queue; private final int poisonPill; public NumbersConsumer(BlockingQueue queue, int poisonPill) { this.queue = queue; this.poisonPill = poisonPill; } public void run() { try { while (true) { Integer number = queue.take(); if (number.equals(poisonPill)) { return; } System.out.println(Thread.currentThread().getName() + " result: " + number); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
Perkara penting yang perlu diperhatikan adalah penggunaan giliran. Sama seperti dalam pengeluar pengeluar, barisan diserahkan sebagai hujah. Kita dapat melakukannya kerana BlockingQueue dapat dikongsi antara utas tanpa penyegerakan yang jelas.
Now that we have our producer and consumer, we can start our program. We need to define the queue's capacity, and we set it to 100 elements.
We want to have 4 producer threads and a number of consumers threads will be equal to the number of available processors:
int BOUND = 10; int N_PRODUCERS = 4; int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); int poisonPill = Integer.MAX_VALUE; int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; int mod = N_CONSUMERS % N_PRODUCERS; BlockingQueue queue = new LinkedBlockingQueue(BOUND); for (int i = 1; i < N_PRODUCERS; i++) { new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start(); } for (int j = 0; j < N_CONSUMERS; j++) { new Thread(new NumbersConsumer(queue, poisonPill)).start(); } new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
BlockingQueue is created using construct with a capacity. We're creating 4 producers and N consumers. We specify our poison pill message to be an Integer.MAX_VALUE because such value will never be sent by our producer under normal working conditions. The most important thing to notice here is that BlockingQueue is used to coordinate work between them.
Semasa kita menjalankan program, 4 utas pengeluar akan memasukkan Integer rawak dalam BlockingQueue dan pengguna akan mengambil elemen-elemen tersebut dari barisan. Setiap utas akan mencetak ke output standard nama utas berserta hasilnya.
5. Kesimpulan
Artikel ini menunjukkan penggunaan praktikal BlockingQueue dan menerangkan kaedah yang digunakan untuk menambah dan mengambil elemen darinya. Juga, kami telah menunjukkan cara membina program pengeluar-pengguna multithreaded menggunakan BlockingQueue untuk menyelaraskan kerja antara pengeluar dan pengguna.
Pelaksanaan semua contoh dan coretan kod ini boleh didapati di projek GitHub - ini adalah projek berasaskan Maven, jadi mudah diimport dan dijalankan sebagaimana adanya.