Pengenalan kepada Apache Storm

1. Gambaran keseluruhan

Tutorial ini akan menjadi pengenalan kepada Apache Storm, sistem pengiraan masa nyata yang diedarkan.

Kami akan memberi tumpuan dan meliputi:

  • Apa sebenarnya Apache Storm dan masalah apa yang diselesaikannya
  • Senibina, dan
  • Cara menggunakannya dalam projek

2. Apa itu Apache Storm?

Apache Storm adalah sistem edaran sumber terbuka dan bebas untuk pengiraan masa nyata.

Ini memberikan toleransi kesalahan, skalabilitas, dan menjamin pemprosesan data, dan sangat baik dalam memproses aliran data yang tidak terikat.

Beberapa kes penggunaan yang baik untuk Storm boleh memproses operasi kad kredit untuk pengesanan penipuan atau memproses data dari rumah pintar untuk mengesan sensor yang rosak.

Storm membolehkan penyatuan dengan pelbagai pangkalan data dan sistem antrian yang terdapat di pasaran.

3. Ketergantungan Maven

Sebelum menggunakan Apache Storm, kita perlu memasukkan kebergantungan inti ribut dalam projek kita:

 org.apache.storm storm-core 1.2.2 provided 

Kita hanya harus menggunakan ruang lingkup yang disediakan jika kita ingin menjalankan aplikasi kita di cluster Storm.

Untuk menjalankan aplikasi secara tempatan, kita dapat menggunakan mod lokal yang disebut yang akan mensimulasikan cluster Storm dalam proses lokal, dalam hal ini kita harus menghapus yang disediakan.

4. Model Data

Model data Apache Storm terdiri daripada dua elemen: tupel dan aliran.

4.1. Tuple

A tupel adalah senarai awal yang dinamakan bidang dengan jenis dinamik. Ini bermaksud bahawa kita tidak perlu menyatakan jenis bidang secara eksplisit.

Ribut perlu tahu bagaimana membuat siri semua nilai yang digunakan dalam tuple. Secara lalai, ia sudah boleh membuat siri jenis primitif, rentetan dan tatasusunan bait .

Dan kerana Storm menggunakan serialisasi Kryo, kita perlu mendaftarkan serializer menggunakan Config untuk menggunakan jenis khusus. Kita boleh melakukan ini dengan salah satu daripada dua cara:

Pertama, kita boleh mendaftarkan kelas untuk bersiri menggunakan nama penuhnya:

Config config = new Config(); config.registerSerialization(User.class);

Dalam kes sedemikian, Kryo akan membuat siri kelas menggunakan FieldSerializer. Secara lalai, ini akan membuat siri semua medan tidak sementara kelas, baik swasta dan awam.

Atau sebagai gantinya, kami dapat menyediakan kelas untuk siri dan serializer yang kami mahu Storm gunakan untuk kelas itu:

Config config = new Config(); config.registerSerialization(User.class, UserSerializer.class);

Untuk membuat serializer khusus, kita perlu memperluaskan Serializer kelas generik yang mempunyai dua kaedah menulis dan membaca.

4.2. Aliran

A Stream adalah pengekstrakan teras dalam ekosistem storm. The Stream adalah urutan kurnia yang amat besar daripada tuples.

Ribut membolehkan memproses pelbagai aliran secara selari.

Setiap aliran mempunyai id yang diberikan dan diberikan semasa pengisytiharan.

5. Topologi

Logik aplikasi Storm masa nyata dimasukkan ke dalam topologi. Topologi terdiri daripada cerat dan baut .

5.1. Cerat

Cerat adalah sumber aliran. Mereka memancarkan tuple ke topologi.

Tuples boleh dibaca dari pelbagai sistem luaran seperti Kafka, Kestrel atau ActiveMQ.

Cerat boleh dipercayai atau tidak boleh dipercayai . Boleh dipercayai bermaksud bahawa muncung dapat membalas bahawa tuple yang gagal diproses oleh Storm. Tidak boleh dipercayai bermaksud bahawa muncung tidak membalas kerana akan menggunakan mekanisme api-dan-lupa untuk mengeluarkan tupel.

Untuk membuat spout kustom, kita perlu menerapkan antara muka IRichSpout atau memperluas kelas yang sudah menerapkan antaramuka, misalnya, kelas BaseRichSpout abstrak .

Mari buat cerat yang tidak boleh dipercayai :

public class RandomIntSpout extends BaseRichSpout { private Random random; private SpoutOutputCollector outputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { random = new Random(); outputCollector = spoutOutputCollector; } @Override public void nextTuple() { Utils.sleep(1000); outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis())); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp")); } }

RandomIntSpout tersuai kami akan menghasilkan bilangan bulat rawak dan cap waktu setiap saat.

5.2. Selak

Bolt memproses tupel dalam aliran. Mereka dapat melakukan pelbagai operasi seperti penyaringan, penggabungan atau fungsi khusus.

Beberapa operasi memerlukan beberapa langkah, dan dengan itu kita perlu menggunakan beberapa baut dalam kes seperti itu.

Untuk membuat Bolt tersuai , kita perlu menerapkan IRichBolt atau untuk antara muka IBasicBolt operasi yang lebih mudah .

Terdapat juga beberapa kelas pembantu yang tersedia untuk melaksanakan Bolt. Dalam kes ini, kami akan menggunakan BaseBasicBolt :

public class PrintingBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { System.out.println(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }

PrintingBolt khusus ini hanya akan mencetak semua tuple ke konsol.

6. Membuat Topologi Ringkas

Mari satukan idea-idea ini menjadi topologi sederhana. Topologi kami akan mempunyai satu cerat dan tiga baut.

6.1. RawakNumberSpout

Pada mulanya, kami akan membuat cerat yang tidak boleh dipercayai. Ia akan menghasilkan bilangan bulat rawak dari julat (0,100) setiap saat:

public class RandomNumberSpout extends BaseRichSpout { private Random random; private SpoutOutputCollector collector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { random = new Random(); collector = spoutOutputCollector; } @Override public void nextTuple() { Utils.sleep(1000); int operation = random.nextInt(101); long timestamp = System.currentTimeMillis(); Values values = new Values(operation, timestamp); collector.emit(values); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("operation", "timestamp")); } }

6.2. PenapisanBolt

Seterusnya, kami akan membuat bolt yang akan menyaring semua elemen dengan operasi sama dengan 0:

public class FilteringBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { int operation = tuple.getIntegerByField("operation"); if (operation > 0) { basicOutputCollector.emit(tuple.getValues()); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("operation", "timestamp")); } }

6.3. AgregatBolt

Seterusnya, mari buat Bolt yang lebih rumit yang akan mengumpulkan semua operasi positif dari setiap hari.

Untuk tujuan ini, kami akan menggunakan kelas khusus yang dibuat khas untuk menerapkan baut yang beroperasi di tingkap dan bukannya beroperasi pada tupel tunggal: BaseWindowedBolt .

Windows are an essential concept in stream processing, splitting the infinite streams into finite chunks. We can then apply computations to each chunk. There are generally two types of windows:

Time windows are used to group elements from a given time period using timestamps. Time windows may have a different number of elements.

Count windows are used to create windows with a defined size. In such a case, all windows will have the same size and the window will not be emitted if there are fewer elements than the defined size.

Our AggregatingBolt will generate the sum of all positive operations from a time window along with its beginning and end timestamps:

public class AggregatingBolt extends BaseWindowedBolt { private OutputCollector outputCollector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.outputCollector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp")); } @Override public void execute(TupleWindow tupleWindow) { List tuples = tupleWindow.get(); tuples.sort(Comparator.comparing(this::getTimestamp)); int sumOfOperations = tuples.stream() .mapToInt(tuple -> tuple.getIntegerByField("operation")) .sum(); Long beginningTimestamp = getTimestamp(tuples.get(0)); Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1)); Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp); outputCollector.emit(values); } private Long getTimestamp(Tuple tuple) { return tuple.getLongByField("timestamp"); } }

Note that, in this case, getting the first element of the list directly is safe. That's because each window is calculated using the timestamp field of the Tuple, so there has to be at least one element in each window.

6.4. FileWritingBolt

Finally, we'll create a bolt that will take all elements with sumOfOperations greater than 2000, serialize them and write them to the file:

public class FileWritingBolt extends BaseRichBolt { public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class); private BufferedWriter writer; private String filePath; private ObjectMapper objectMapper; @Override public void cleanup() { try { writer.close(); } catch (IOException e) { logger.error("Failed to close writer!"); } } @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); try { writer = new BufferedWriter(new FileWriter(filePath)); } catch (IOException e) { logger.error("Failed to open a file for writing.", e); } } @Override public void execute(Tuple tuple) { int sumOfOperations = tuple.getIntegerByField("sumOfOperations"); long beginningTimestamp = tuple.getLongByField("beginningTimestamp"); long endTimestamp = tuple.getLongByField("endTimestamp"); if (sumOfOperations > 2000) { AggregatedWindow aggregatedWindow = new AggregatedWindow( sumOfOperations, beginningTimestamp, endTimestamp); try { writer.write(objectMapper.writeValueAsString(aggregatedWindow)); writer.newLine(); writer.flush(); } catch (IOException e) { logger.error("Failed to write data to file.", e); } } } // public constructor and other methods }

Note that we don't need to declare the output as this will be the last bolt in our topology

6.5. Running the Topology

Finally, we can pull everything together and run our topology:

public static void runTopology() { TopologyBuilder builder = new TopologyBuilder(); Spout random = new RandomNumberSpout(); builder.setSpout("randomNumberSpout"); Bolt filtering = new FilteringBolt(); builder.setBolt("filteringBolt", filtering) .shuffleGrouping("randomNumberSpout"); Bolt aggregating = new AggregatingBolt() .withTimestampField("timestamp") .withLag(BaseWindowedBolt.Duration.seconds(1)) .withWindow(BaseWindowedBolt.Duration.seconds(5)); builder.setBolt("aggregatingBolt", aggregating) .shuffleGrouping("filteringBolt");  String filePath = "./src/main/resources/data.txt"; Bolt file = new FileWritingBolt(filePath); builder.setBolt("fileBolt", file) .shuffleGrouping("aggregatingBolt"); Config config = new Config(); config.setDebug(false); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Test", config, builder.createTopology()); }

To make the data flow through each piece in the topology, we need to indicate how to connect them. shuffleGroup allows us to state that data for filteringBolt will be coming from randomNumberSpout.

For each Bolt, we need to add shuffleGroup which defines the source of elements for this bolt. The source of elements may be a Spout or another Bolt. And if we set the same source for more than one bolt, the source will emit all elements to each of them.

In this case, our topology will use the LocalCluster to run the job locally.

7. Conclusion

Dalam tutorial ini, kami memperkenalkan Apache Storm, sistem pengiraan masa nyata yang diedarkan. Kami membuat cerat, beberapa baut, dan menyatukannya menjadi topologi lengkap.

Dan, seperti biasa, semua contoh kod boleh didapati di GitHub.