Pemesejan yang boleh dipercayai dengan Kumpulan JG

1. Gambaran keseluruhan

JGroups adalah API Java untuk pertukaran mesej yang boleh dipercayai. Ia mempunyai antara muka ringkas yang menyediakan:

  • timbunan protokol fleksibel, termasuk TCP dan UDP
  • pemecahan dan pemasangan semula pesanan besar
  • unicast dan multicast yang boleh dipercayai
  • pengesanan kegagalan
  • kawalan aliran

Serta banyak lagi ciri lain.

Dalam tutorial ini, kami akan membuat aplikasi mudah untuk bertukar-tukar pesan String antara aplikasi dan membekalkan keadaan bersama ke aplikasi baru ketika mereka bergabung dalam rangkaian.

2. Persediaan

2.1. Ketergantungan Maven

Kita perlu menambahkan satu kebergantungan pada pom.xml kami :

 org.jgroups jgroups 4.0.10.Final  

Versi terbaru perpustakaan boleh diperiksa di Maven Central.

2.2. Rangkaian

JGroups akan cuba menggunakan IPV6 secara lalai. Bergantung pada konfigurasi sistem kami, ini mungkin menyebabkan aplikasi tidak dapat berkomunikasi.

Untuk mengelakkan ini, kami akan menetapkan java.net.preferIPv4Stack ke harta sebenar semasa menjalankan aplikasi kami di sini:

java -Djava.net.preferIPv4Stack=true com.baeldung.jgroups.JGroupsMessenger 

3. JChannels

Sambungan kami ke rangkaian JGroups adalah JChannel. Saluran tersebut bergabung dengan kluster dan mengirim dan menerima mesej, serta maklumat mengenai keadaan rangkaian.

3.1. Membuat Saluran

Kami membuat JChannel dengan jalan ke fail konfigurasi. Sekiranya kita menghilangkan nama fail, ia akan mencari udp.xml dalam direktori kerja semasa.

Kami akan membuat saluran dengan fail konfigurasi bernama secara eksplisit:

JChannel channel = new JChannel("src/main/resources/udp.xml"); 

Konfigurasi JGroups boleh menjadi sangat rumit, tetapi konfigurasi UDP dan TCP lalai mencukupi untuk kebanyakan aplikasi. Kami telah memasukkan fail untuk UDP dalam kod kami dan akan menggunakannya untuk tutorial ini.

Untuk maklumat lebih lanjut mengenai mengkonfigurasi pengangkutan, lihat manual JGroups di sini.

3.2. Menyambungkan Saluran

Setelah membuat saluran, kita perlu bergabung dengan kluster. Kluster adalah sekumpulan nod yang bertukar mesej.

Untuk menyertai kluster memerlukan nama kluster:

channel.connect("Baeldung"); 

Node pertama yang cuba bergabung dengan kluster akan membuatnya jika tidak wujud. Kami akan melihat proses ini dalam tindakan di bawah.

3.3. Menamakan Saluran

Nod dikenali dengan nama sehingga rakan sebaya dapat mengirim pesan yang diarahkan dan menerima pemberitahuan tentang siapa yang masuk dan keluar dari kluster. JGroups akan memberikan nama secara automatik, atau kita dapat menetapkan sendiri:

channel.name("user1");

Kami akan menggunakan nama-nama ini di bawah, untuk mengesan kapan nod masuk dan keluar dari kluster.

3.4. Menutup Saluran

Pembersihan saluran sangat penting jika kita ingin rakan sebaya menerima pemberitahuan tepat pada masanya bahawa kita telah keluar.

Kami menutup JChannel dengan kaedah tutupnya :

channel.close()

4. Perubahan Pandangan Kluster

Dengan JChannel yang dibuat, kami kini bersedia untuk melihat keadaan rakan sebaya dalam kluster dan bertukar mesej dengan mereka.

JGroups mengekalkan keadaan kluster di dalam kelas View . Setiap saluran mempunyai satu paparan rangkaian. Apabila paparan berubah, ia dihantar melalui panggilan balik viewAccepted () .

Untuk tutorial ini, kami akan melanjutkan kelas ReceiverAdaptor API yang menerapkan semua kaedah antara muka yang diperlukan untuk aplikasi.

Ini adalah kaedah yang disyorkan untuk melaksanakan panggilan balik.

Mari tambah paparanDiterima ke aplikasi kami:

public void viewAccepted(View newView) { private View lastView; if (lastView == null) { System.out.println("Received initial view:"); newView.forEach(System.out::println); } else { System.out.println("Received new view."); List newMembers = View.newMembers(lastView, newView); System.out.println("New members: "); newMembers.forEach(System.out::println); List exMembers = View.leftMembers(lastView, newView); System.out.println("Exited members:"); exMembers.forEach(System.out::println); } lastView = newView; } 

Setiap Lihat mengandungi List of Alamat objek, yang mewakili setiap ahli kelompok. JGroups menawarkan kaedah kemudahan untuk membandingkan satu pandangan dengan pandangan lain, yang kami gunakan untuk mengesan anggota kluster yang baru atau keluar.

5. Menghantar Mesej

Pengendalian mesej dalam JGroups sangat mudah. A Mesej mengandungi bait pelbagai dan Alamat objek yang sepadan dengan penghantar dan penerima.

Untuk tutorial ini kami menggunakan Strings yang dibaca dari baris perintah, tetapi mudah untuk melihat bagaimana aplikasi dapat menukar jenis data lain.

5.1. Broadcast Messages

A Message is created with a destination and a byte array; JChannel sets the sender for us. If the target is null, the entire cluster will receive the message.

We'll accept text from the command line and send it to the cluster:

System.out.print("Enter a message: "); String line = in.readLine().toLowerCase(); Message message = new Message(null, line.getBytes()); channel.send(message); 

If we run multiple instances of our program and send this message (after we implement the receive() method below), all of them would receive it, including the sender.

5.2. Blocking Our Messages

If we don't want to see our messages, we can set a property for that:

channel.setDiscardOwnMessages(true); 

When we run the previous test, the message sender does not receive its broadcast message.

5.3. Direct Messages

Sending a direct message requires a valid Address. If we're referring to nodes by name, we need a way to look up an Address. Fortunately, we have the View for that.

The current View is always available from the JChannel:

private Optional getAddress(String name) { View view = channel.view(); return view.getMembers().stream() .filter(address -> name.equals(address.toString())) .findAny(); } 

Address names are available via the class toString() method, so we merely search the List of cluster members for the name we want.

So we can accept a name on from the console, find the associated destination, and send a direct message:

Address destination = null; System.out.print("Enter a destination: "); String destinationName = in.readLine().toLowerCase(); destination = getAddress(destinationName) .orElseThrow(() -> new Exception("Destination not found"); Message message = new Message(destination, "Hi there!"); channel.send(message); 

6. Receiving Messages

We can send messages, now let's add try to receive them now.

Let's override ReceiverAdaptor's empty receive method:

public void receive(Message message) { String line = Message received from: " + message.getSrc() + " to: " + message.getDest() + " -> " + message.getObject(); System.out.println(line); } 

Since we know the message contains a String, we can safely pass getObject() to System.out.

7. State Exchange

When a node enters the network, it may need to retrieve state information about the cluster. JGroups provides a state transfer mechanism for this.

When a node joins the cluster, it simply calls getState(). The cluster usually retrieves the state from the oldest member in the group – the coordinator.

Let's add a broadcast message count to our application. We'll add a new member variable and increment it inside receive():

private Integer messageCount = 0; public void receive(Message message) { String line = "Message received from: " + message.getSrc() + " to: " + message.getDest() + " -> " + message.getObject(); System.out.println(line); if (message.getDest() == null) { messageCount++; System.out.println("Message count: " + messageCount); } } 

We check for a null destination because if we count direct messages, each node will have a different number.

Next, we override two more methods in ReceiverAdaptor:

public void setState(InputStream input) { try { messageCount = Util.objectFromStream(new DataInputStream(input)); } catch (Exception e) { System.out.println("Error deserialing state!"); } System.out.println(messageCount + " is the current messagecount."); } public void getState(OutputStream output) throws Exception { Util.objectToStream(messageCount, new DataOutputStream(output)); } 

Similar to messages, JGroups transfers state as an array of bytes.

JGroups supplies an InputStream to the coordinator to write the state to, and an OutputStream for the new node to read. The API provides convenience classes for serializing and deserializing the data.

Note that in production code access to state information must be thread-safe.

Finally, we add the call to getState() to our startup, after we connect to the cluster:

channel.connect(clusterName); channel.getState(null, 0); 

getState() accepts a destination from which to request the state and a timeout in milliseconds. A null destination indicates the coordinator and 0 means do not timeout.

Apabila kami menjalankan aplikasi ini dengan sepasang nod dan bertukar mesej siaran, kami melihat kenaikan jumlah mesej.

Kemudian jika kita menambah pelanggan ketiga atau berhenti dan memulakan salah satunya, kita akan melihat nod yang baru disambungkan mencetak kiraan mesej yang betul.

8. Kesimpulannya

Dalam tutorial ini, kami menggunakan JGroups untuk membuat aplikasi untuk pertukaran mesej. Kami menggunakan API untuk memantau node mana yang tersambung ke dan meninggalkan kluster dan juga untuk memindahkan keadaan kluster ke node baru ketika bergabung.

Contoh kod, seperti biasa, boleh didapati di GitHub.