Menguruskan Antrian SQS Amazon di Java

1. Gambaran keseluruhan

Dalam tutorial ini, kita akan meneroka cara menggunakan SQS Amazon (Simple Queue Service) menggunakan Java SDK .

2. Prasyarat

Pergantungan Maven, tetapan akaun AWS, dan sambungan pelanggan yang diperlukan untuk menggunakan Amazon AWS SDK untuk SQS adalah sama seperti dalam artikel ini di sini.

Dengan andaian kami telah membuat contoh AWSCredentials, seperti yang dijelaskan dalam artikel sebelumnya, kami dapat terus maju dan membuat klien SQS kami:

AmazonSQS sqs = AmazonSQSClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials)) .withRegion(Regions.US_EAST_1) .build(); 

3. Membuat Barisan

Sebaik sahaja kami menyediakan pelanggan SQS kami, membuat antrean cukup mudah.

3.1. Membuat Antrian Piawai

Mari lihat bagaimana kita dapat membuat Antrian Piawai. Untuk melakukan ini, kita perlu membuat contoh CreateQueueRequest:

CreateQueueRequest createStandardQueueRequest = new CreateQueueRequest("baeldung-queue"); String standardQueueUrl = sqs.createQueue(createStandardQueueRequest).getQueueUrl(); 

3.2. Mencipta Antrian FIFO

Membuat FIFO adalah serupa dengan membuat Antrian Piawai. Kami masih akan menggunakan contoh CreateQueueRequest , seperti sebelumnya. Cuma kali ini, kita harus meneruskan atribut antrian, dan menetapkan atribut FifoQueue ke true :

Map queueAttributes = new HashMap(); queueAttributes.put("FifoQueue", "true"); queueAttributes.put("ContentBasedDeduplication", "true"); CreateQueueRequest createFifoQueueRequest = new CreateQueueRequest( "baeldung-queue.fifo").withAttributes(queueAttributes); String fifoQueueUrl = sqs.createQueue(createFifoQueueRequest) .getQueueUrl(); 

4. Menghantar Mesej ke Antrian

Setelah mengatur barisan, kami boleh mula menghantar mesej.

4.1. Menghantar Mesej ke Antrian Piawai

Untuk menghantar mesej ke barisan standard, kita harus membuat contoh SendMessageRequest.

Kemudian kami melampirkan peta atribut mesej untuk permintaan ini:

Map messageAttributes = new HashMap(); messageAttributes.put("AttributeOne", new MessageAttributeValue() .withStringValue("This is an attribute") .withDataType("String")); SendMessageRequest sendMessageStandardQueue = new SendMessageRequest() .withQueueUrl(standardQueueUrl) .withMessageBody("A simple message.") .withDelaySeconds(30) .withMessageAttributes(messageAttributes); sqs.sendMessage(sendMessageStandardQueue); 

The withDelaySeconds () Menentukan selepas berapa lama mesej hendaklah tiba pada barisan.

4.2. Menghantar Mesej ke Antrian FIFO

Satu-satunya perbezaan, dalam kes ini, kita mesti menentukan kumpulan yang menjadi milik mesej:

SendMessageRequest sendMessageFifoQueue = new SendMessageRequest() .withQueueUrl(fifoQueueUrl) .withMessageBody("Another simple message.") .withMessageGroupId("baeldung-group-1") .withMessageAttributes(messageAttributes);

Seperti yang anda lihat dalam contoh kod di atas, kami menentukan kumpulan dengan menggunakan withMessageGroupId ().

4.3. Menyiarkan Beberapa Mesej ke Antrian

Kami juga dapat mengeposkan beberapa mesej ke barisan, menggunakan satu permintaan. Kami akan membuat senarai SendMessageBatchRequestEntry yang akan kami kirim menggunakan contoh SendMessageBatchRequest :

List  messageEntries = new ArrayList(); messageEntries.add(new SendMessageBatchRequestEntry() .withId("id-1") .withMessageBody("batch-1") .withMessageGroupId("baeldung-group-1")); messageEntries.add(new SendMessageBatchRequestEntry() .withId("id-2") .withMessageBody("batch-2") .withMessageGroupId("baeldung-group-1")); SendMessageBatchRequest sendMessageBatchRequest = new SendMessageBatchRequest(fifoQueueUrl, messageEntries); sqs.sendMessageBatch(sendMessageBatchRequest);

5. Membaca Mesej dari Baris

Kami dapat menerima mesej dari barisan kami dengan menggunakan kaedah acceptMessage () pada contoh ReceiveMessageRequest:

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(fifoQueueUrl) .withWaitTimeSeconds(10) .withMaxNumberOfMessages(10); List sqsMessages = sqs.receiveMessage(receiveMessageRequest).getMessages(); 

Menggunakan denganMaxNumberOfMessages (), kami menentukan berapa banyak mesej yang akan diterima dari barisan - walaupun perlu diperhatikan bahawa maksimum adalah 10 .

Kaedah denganWaitTimeSeconds () membolehkan pengundian panjang. Pengundian yang panjang adalah cara untuk membatasi jumlah permintaan mesej terima yang kami kirim ke SQS.

Ringkasnya, ini bermaksud bahawa kita akan menunggu hingga beberapa saat yang ditentukan untuk mengambil mesej. Sekiranya tidak ada mesej dalam barisan untuk jangka masa tersebut, permintaan akan kembali kosong. Sekiranya mesej tiba di barisan semasa itu, ia akan dikembalikan.

Kita boleh mendapatkan atribut dan isi mesej yang diberikan:

sqsMessages.get(0).getAttributes(); sqsMessages.get(0).getBody();

6. Memadamkan Mesej dari Antrian

Untuk memadam mesej, kami akan menggunakan DeleteMessageRequest :

sqs.deleteMessage(new DeleteMessageRequest() .withQueueUrl(fifoQueueUrl) .withReceiptHandle(sqsMessages.get(0).getReceiptHandle())); 

7. Baris Surat Mati

Antrian huruf mati mestilah jenis yang sama dengan barisan asasnya - mestilah FIFO jika barisan asas adalah FIFO, dan standard jika barisan asas adalah standard. Untuk contoh ini, kami akan menggunakan barisan standard.

Perkara pertama yang perlu kita lakukan adalah membuat apa yang akan menjadi barisan surat mati kita:

String deadLetterQueueUrl = sqs.createQueue("baeldung-dead-letter-queue").getQueueUrl(); 

Seterusnya, kami akan mendapatkan ARN barisan baru kami (Nama Sumber Amazon):

GetQueueAttributesResult deadLetterQueueAttributes = sqs.getQueueAttributes( new GetQueueAttributesRequest(deadLetterQueueUrl) .withAttributeNames("QueueArn")); String deadLetterQueueARN = deadLetterQueueAttributes.getAttributes() .get("QueueArn"); 

Akhirnya, kami menetapkan barisan yang baru dibuat ini menjadi barisan huruf mati antrian standard asal kami:

SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest() .withQueueUrl(standardQueueUrl) .addAttributesEntry("RedrivePolicy", "{\"maxReceiveCount\":\"2\", " + "\"deadLetterTargetArn\":\"" + deadLetterQueueARN + "\"}"); sqs.setQueueAttributes(queueAttributesRequest); 

Paket JSON yang kami tetapkan dalam kaedah addAttributesEntry () ketika membina contoh SetQueueAttributesRequest kami mengandungi maklumat yang kami perlukan : maxReceiveCount adalah 2 , yang bermaksud bahawa jika mesej diterima berkali-kali, ia dianggap belum diproses dengan betul, dan dihantar ke barisan surat mati kami.

The deadLetterTargetArn atribut menunjukkan barisan standard kami kepada surat giliran mati kami baru diwujudkan.

8. Pemantauan

Kami dapat memeriksa berapa banyak mesej yang ada dalam barisan tertentu, dan berapa banyak yang ada dengan SDK. Pertama, kita perlu membuat GetQueueAttributesRequest.

Dari sana kami akan memeriksa keadaan barisan:

GetQueueAttributesRequest getQueueAttributesRequest = new GetQueueAttributesRequest(standardQueueUrl) .withAttributeNames("All"); GetQueueAttributesResult getQueueAttributesResult = sqs.getQueueAttributes(getQueueAttributesRequest); System.out.println(String.format("The number of messages on the queue: %s", getQueueAttributesResult.getAttributes() .get("ApproximateNumberOfMessages"))); System.out.println(String.format("The number of messages in flight: %s", getQueueAttributesResult.getAttributes() .get("ApproximateNumberOfMessagesNotVisible")));

Pemantauan yang lebih mendalam dapat dicapai dengan menggunakan Amazon Cloud Watch.

9. Kesimpulannya

Dalam artikel ini, kami telah melihat bagaimana menguruskan barisan SQS menggunakan AWS Java SDK.

Seperti biasa, semua contoh kod yang digunakan dalam artikel boleh didapati di GitHub.