Pelanggan MQTT di Java

1. Gambaran keseluruhan

Dalam tutorial ini, kita akan melihat bagaimana kita dapat menambahkan pesanan MQTT dalam projek Java menggunakan perpustakaan yang disediakan oleh projek Eclipse Paho.

2. MQTT Primer

MQTT (MQ Telemetry Transport) adalah protokol pesanan yang dibuat untuk mengatasi kebutuhan metode sederhana dan ringan untuk memindahkan data ke / dari perangkat berkuasa rendah, seperti yang digunakan dalam aplikasi industri.

Dengan peningkatan populariti peranti IoT (Internet of Things), MQTT telah menyaksikan peningkatan penggunaannya, yang menyebabkan standardisasi oleh OASIS dan ISO.

Protokol menyokong satu corak pesanan, iaitu corak Publish-Subscribe: setiap mesej yang dihantar oleh klien mengandungi "topik" yang berkaitan yang digunakan oleh broker untuk mengarahkannya ke pelanggan yang dilanggan. Nama topik boleh berupa rentetan sederhana seperti " oiltemp " atau rentetan seperti jalan " motor / 1 / rpm ".

Untuk menerima mesej, pelanggan melanggan satu atau lebih topik menggunakan nama sebenarnya atau rentetan yang mengandungi salah satu wildcard yang disokong ("#" untuk topik bertingkat dan "+" untuk satu peringkat ").

3. Penyediaan Projek

Untuk memasukkan perpustakaan Paho dalam projek Maven, kita harus menambahkan kebergantungan berikut:

 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0 

Versi terbaru modul perpustakaan Eclipse Paho Java boleh dimuat turun dari Maven Central.

4. Persediaan Pelanggan

Semasa menggunakan perpustakaan Paho, perkara pertama yang perlu kita lakukan untuk mengirim dan / atau menerima mesej dari broker MQTT adalah mendapatkan pelaksanaan antara muka IMqttClient . Antaramuka ini mengandungi semua kaedah yang diperlukan oleh aplikasi untuk membuat sambungan ke pelayan, mengirim dan menerima mesej.

Paho keluar dari kotak dengan dua pelaksanaan antara muka ini, yang tidak segerak ( MqttAsyncClient ) dan yang segerak ( MqttClient ).Dalam kes kami, kami akan memberi tumpuan kepada versi segerak, yang mempunyai semantik yang lebih sederhana.

Penyediaan itu sendiri adalah proses dua langkah: pertama kita membuat contoh kelas MqttClient dan kemudian kita menyambungkannya ke pelayan kita. Subseksyen berikut memperincikan langkah-langkah tersebut.

4.1. Membuat Instance IMqttClient Baru

Coretan kod berikut menunjukkan cara membuat contoh segerak IMqttClient baru :

String publisherId = UUID.randomUUID().toString(); IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);

Dalam kes ini, kami menggunakan konstruktor termudah yang tersedia, yang mengambil alamat titik akhir broker MQTT kami dan pengecam pelanggan , yang secara unik mengenal pasti pelanggan kami.

Dalam kes kami, kami menggunakan UUID rawak, jadi pengenal pelanggan baru akan dihasilkan pada setiap larian.

Paho juga menyediakan konstruktor tambahan yang dapat kita gunakan untuk menyesuaikan mekanisme ketekunan yang digunakan untuk menyimpan pesan yang tidak diakui dan / atau Layanan BerjadualExecutor yang digunakan untuk menjalankan tugas latar belakang yang diperlukan oleh pelaksanaan mesin protokol.

Titik akhir pelayan yang kami gunakan adalah broker MQTT awam yang dihoskan oleh projek Paho , yang membolehkan sesiapa sahaja yang mempunyai sambungan internet untuk menguji klien tanpa memerlukan pengesahan.

4.2. Menyambung ke Pelayan

Contoh MqttClient kami yang baru dibuat tidak disambungkan ke pelayan. Kami melakukannya dengan memanggil kaedah connect () , secara opsional melewati instance MqttConnectOptions yang membolehkan kami menyesuaikan beberapa aspek protokol.

Khususnya, kita dapat menggunakan pilihan tersebut untuk menyampaikan maklumat tambahan seperti kelayakan keselamatan, mod pemulihan sesi, mod penyambungan semula dan sebagainya.

The MqttConnectionOptions kelas mendedahkan orang-orang pilihan sebagai hartanah mudah yang kita boleh ditetapkan menggunakan kaedah setter normal. Kami hanya perlu menetapkan sifat yang diperlukan untuk senario kami - yang lain akan menganggap nilai lalai.

Kod yang digunakan untuk membuat sambungan ke pelayan biasanya kelihatan seperti ini:

MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(true); options.setCleanSession(true); options.setConnectionTimeout(10); publisher.connect(options);

Di sini, kami menentukan pilihan sambungan kami supaya:

  • Perpustakaan secara automatik akan cuba menyambung semula ke pelayan sekiranya berlaku kegagalan rangkaian
  • Ini akan membuang mesej yang tidak dihantar dari jangkaan sebelumnya
  • Waktu tamat sambungan ditetapkan kepada 10 saat

5. Menghantar Mesej

Menghantar mesej menggunakan MqttClient yang sudah tersambung sangat mudah. Kami menggunakan salah satu varian kaedah penerbitan () untuk mengirim muatan, yang selalu merupakan susunan bait, ke topik tertentu , menggunakan salah satu pilihan kualiti perkhidmatan berikut:

  • 0 - semantik “paling banyak sekali”, juga dikenal sebagai “api-dan-lupakan”. Gunakan pilihan ini apabila kehilangan mesej dapat diterima, kerana tidak memerlukan pengakuan atau kegigihan
  • 1 - semantik "sekurang-kurangnya sekali". Gunakan pilihan ini apabila kehilangan mesej tidak dapat diterima dan pelanggan anda dapat menangani pendua
  • 2 - semantik "sekali sahaja". Gunakan pilihan ini apabila kehilangan mesej tidak dapat diterima dan pelanggan anda tidak dapat menangani pendua

Dalam projek sampel kami, kelas EngineTemperatureSensor memainkan peranan sebagai sensor tiruan yang menghasilkan bacaan suhu baru setiap kali kami menggunakan kaedah panggilannya () .

Kelas ini menerapkan antara muka yang Boleh Dipanggil sehingga kita dapat menggunakannya dengan mudah dengan salah satu pelaksanaan ExecutorService yang terdapat dalam pakej java.util.concurrent :

public class EngineTemperatureSensor implements Callable { // ... private members omitted public EngineTemperatureSensor(IMqttClient client) { this.client = client; } @Override public Void call() throws Exception { if ( !client.isConnected()) { return null; } MqttMessage msg = readEngineTemp(); msg.setQos(0); msg.setRetained(true); client.publish(TOPIC,msg); return null; } private MqttMessage readEngineTemp() { double temp = 80 + rnd.nextDouble() * 20.0; byte[] payload = String.format("T:%04.2f",temp) .getBytes(); return new MqttMessage(payload); } }

The MqttMessage merangkumi muatan itu sendiri, diminta Quality-of-Service dan juga yang tertangguh bendera mesej. Bendera ini menunjukkan kepada broker bahawa ia harus menyimpan mesej ini sehingga habis digunakan oleh pelanggan.

We can use this feature to implement a “last known good” behavior, so when a new subscriber connects to the server, it will receive the retained message right away.

6. Receiving Messages

In order to receive messages from the MQTT broker, we need to use one of the subscribe() method variants, which allow us to specify:

  • One or more topic filters for messages we want to receive
  • The associated QoS
  • The callback handler to process received messages

In the following example, we show how to add a message listener to an existing IMqttClient instance to receive messages from a given topic. We use a CountDownLatch as a synchronization mechanism between our callback and the main execution thread, decrementing it every time a new message arrives.

In the sample code, we've used a different IMqttClient instance to receive messages. We did it just to make more clear which client does what, but this is not a Paho limitation – if you want, you can use the same client for publishing and receiving messages:

CountDownLatch receivedSignal = new CountDownLatch(10); subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { byte[] payload = msg.getPayload(); // ... payload handling omitted receivedSignal.countDown(); }); receivedSignal.await(1, TimeUnit.MINUTES);

The subscribe() variant used above takes an IMqttMessageListener instance as its second argument.

In our case, we use a simple lambda function that processes the payload and decrements a counter. If not enough messages arrive in the specified time window (1 minute), the await() method will throw an exception.

When using Paho, we don't need to explicitly acknowledge message receipt. If the callback returns normally, Paho assumes it a successful consumption and sends an acknowledgment to the server.

If the callback throws an Exception, the client will be shut down. Please note that this will result in loss of any messages sent with QoS level of 0.

Messages sent with QoS level 1 or 2 will be resent by the server once the client is reconnected and subscribes to the topic again.

7. Conclusion

In this article, we demonstrated how we can add support for the MQTT protocol in our Java applications using the library provided by the Eclipse Paho project.

Perpustakaan ini menangani semua perincian protokol peringkat rendah, yang memungkinkan kita untuk fokus pada aspek lain dari penyelesaian kita, sambil meninggalkan ruang yang baik untuk menyesuaikan aspek penting dari ciri dalamannya, seperti ketekunan mesej.

Kod yang ditunjukkan dalam artikel ini boleh didapati di GitHub.