Penghantaran Mesej RabbitMQ dengan Spring AMQP

1. Pengenalan

Dalam tutorial ini, kita akan meneroka konsep pertukaran peminat dan topik dengan Spring AMQP dan RabbitMQ.

Pada tahap tinggi, pertukaran fanout akan menyiarkan pesan yang sama ke semua antrian terikat , sementara pertukaran topik menggunakan kunci routing untuk menyampaikan pesan ke antrian atau antrian terikat tertentu .

Sebelum membaca Permesejan Dengan Musim Semi AMQP disyorkan untuk tutorial ini.

2. Menubuhkan Fanout Exchange

Mari buat satu pertukaran fanout dengan dua barisan terikat dengannya. Apabila kami menghantar mesej ke pertukaran ini, kedua-dua barisan akan menerima mesej tersebut. Pertukaran fanout kami mengabaikan sebarang kunci penghala yang disertakan dengan mesej.

Spring AMQP membolehkan kami mengumpulkan semua deklarasi antrean, pertukaran, dan pengikatan dalam objek yang dapat Diumumkan :

@Bean public Declarables fanoutBindings() { Queue fanoutQueue1 = new Queue("fanout.queue1", false); Queue fanoutQueue2 = new Queue("fanout.queue2", false); FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange"); return new Declarables( fanoutQueue1, fanoutQueue2, fanoutExchange, bind(fanoutQueue1).to(fanoutExchange), BindingBuilder.bind(fanoutQueue2).to(fanoutExchange)); }

3. Menetapkan Pertukaran Topik

Sekarang, kami juga akan mengatur pertukaran topik dengan dua barisan, masing-masing dengan corak pengikat yang berbeza:

@Bean public Declarables topicBindings() { Queue topicQueue1 = new Queue(topicQueue1Name, false); Queue topicQueue2 = new Queue(topicQueue2Name, false); TopicExchange topicExchange = new TopicExchange(topicExchangeName); return new Declarables( topicQueue1, topicQueue2, topicExchange, BindingBuilder .bind(topicQueue1) .to(topicExchange).with("*.important.*"), BindingBuilder .bind(topicQueue2) .to(topicExchange).with("#.error")); }

Pertukaran topik membolehkan kita mengikat barisan padanya dengan corak utama yang berbeza. Ini sangat fleksibel dan membolehkan kita mengikat beberapa barisan dengan corak yang sama atau bahkan beberapa corak dengan barisan yang sama.

Apabila kunci penghalaan mesej sesuai dengan corak, ia akan diletakkan dalam barisan. Sekiranya barisan mempunyai beberapa ikatan yang sepadan dengan kunci penghalaan mesej, hanya satu salinan mesej yang diletakkan di barisan.

Corak pengikatan kami dapat menggunakan tanda bintang (“*”) untuk mencocokkan kata dalam posisi tertentu atau tanda pound (“#”) untuk mencocokkan kata sifar atau lebih.

Oleh itu, topicQueue1 kami akan menerima mesej yang mempunyai kunci penghalaan yang mempunyai corak tiga perkataan dengan kata tengah menjadi "penting" - contohnya: "user.important.error" atau "blog.important.notification".

Dan, topicQueue2 kami akan menerima mesej yang mempunyai kunci penghalaan yang berakhir dengan kesalahan kata; contoh yang sepadan adalah "ralat" , "user.important.error" atau "blog.post.save.error".

4. Menetapkan Pengeluar

Kami akan menggunakan convertAndSend kaedah yang RabbitTemplate untuk menghantar mesej sampel kami:

 String message = " payload is broadcast"; return args -> { rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, "topic important warn" + message); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, "topic important error" + message); };

The RabbitTemplate menyediakan banyak terlebih beban convertAndSend () kaedah untuk jenis pertukaran yang berbeza.

Ketika kami mengirim pesan ke pertukaran fanout, kunci perutean diabaikan, dan pesan dihantar ke semua antrian terikat.

Semasa kami menghantar mesej ke pertukaran topik, kami perlu memberikan kunci penghalaan. Berdasarkan kunci penghalaan ini, mesej akan dihantar ke barisan tertentu.

5. Mengkonfigurasi Pengguna

Akhirnya, mari kita tentukan empat pengguna - satu untuk setiap barisan - untuk mengambil mesej yang dihasilkan:

 @RabbitListener(queues = {FANOUT_QUEUE_1_NAME}) public void receiveMessageFromFanout1(String message) { System.out.println("Received fanout 1 message: " + message); } @RabbitListener(queues = {FANOUT_QUEUE_2_NAME}) public void receiveMessageFromFanout2(String message) { System.out.println("Received fanout 2 message: " + message); } @RabbitListener(queues = {TOPIC_QUEUE_1_NAME}) public void receiveMessageFromTopic1(String message) { System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message); } @RabbitListener(queues = {TOPIC_QUEUE_2_NAME}) public void receiveMessageFromTopic2(String message) { System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message); }

Kami mengkonfigurasi pengguna menggunakan anotasi @RabbitListener . Satu-satunya hujah yang dilontarkan di sini adalah nama barisan. Pengguna tidak mengetahui pertukaran atau kunci penghalaan di sini.

6. Menjalankan Contoh

Projek contoh kami adalah aplikasi Spring Boot, dan oleh itu ia akan menginisialisasi aplikasi bersama-sama dengan sambungan ke RabbitMQ dan mengatur semua barisan, pertukaran, dan ikatan.

Secara lalai, aplikasi kami menjangkakan contoh RabbitMQ berjalan di localhost di port 5672. Kami boleh mengubahsuai ini dan lalai lain dalam application.yaml .

Projek kami memperlihatkan titik akhir HTTP pada URI - / siaran - yang menerima POST dengan mesej di badan permintaan.

Semasa kami menghantar permintaan ke URI ini dengan isi "Uji", kami akan melihat sesuatu yang serupa dengan ini dalam output:

Received fanout 1 message: fanout payload is broadcast Received topic 1 (*.important.*) message: topic important warn payload is broadcast Received topic 2 (#.error) message: topic important error payload is broadcast Received fanout 2 message: fanout payload is broadcast Received topic 1 (*.important.*) message: topic important error payload is broadcast

Urutan di mana kita akan melihat mesej ini tentu saja tidak dijamin.

7. Kesimpulannya

Dalam tutorial ringkas ini, kami membahas pertukaran fanout dan topik dengan Spring AMQP dan RabbitMQ.

Kod sumber yang lengkap dan semua coretan kod untuk tutorial ini terdapat di repositori GitHub.