Panduan untuk Apache Crunch

1. Pengenalan

Dalam tutorial ini, kami akan menunjukkan Apache Crunch dengan contoh aplikasi pemprosesan data. Kami akan menjalankan aplikasi ini menggunakan kerangka MapReduce.

Kita akan mulakan dengan merangkumi beberapa konsep Apache Crunch secara ringkas. Kemudian kita akan masuk ke aplikasi contoh. Dalam aplikasi ini kami akan melakukan pemprosesan teks:

  • Pertama sekali, kita akan membaca baris dari fail teks
  • Kemudian, kami akan memisahkannya menjadi kata-kata dan membuang beberapa kata biasa
  • Kemudian, kami akan mengumpulkan perkataan yang tinggal untuk mendapatkan senarai perkataan unik dan jumlahnya
  • Akhirnya, kami akan menulis senarai ini ke fail teks

2. Apa itu Crunch?

MapReduce adalah kerangka pengaturcaraan selari yang diedarkan untuk memproses sejumlah besar data pada kelompok pelayan. Rangka kerja perisian seperti Hadoop dan Spark melaksanakan MapReduce.

Crunch menyediakan kerangka kerja untuk menulis, menguji dan menjalankan saluran paip MapReduce di Java. Di sini, kami tidak menulis pekerjaan MapReduce secara langsung. Sebaliknya, kami menentukan saluran paip data (iaitu operasi untuk melakukan langkah-langkah input, pemprosesan, dan output) menggunakan Crunch API. Crunch Planner memetakannya ke pekerjaan MapReduce dan melaksanakannya apabila diperlukan.

Oleh itu, setiap saluran data Crunch diselaraskan oleh contoh antara muka Pipeline . Antaramuka ini juga mendefinisikan kaedah untuk membaca data ke saluran paip melalui contoh Sumber dan menulis data dari saluran paip ke contoh Sasaran .

Kami mempunyai 3 antara muka untuk mewakili data:

  1. PCollection - koleksi elemen yang tidak berubah dan diedarkan
  2. PTable , V > - pelbagai peta kunci dan nilai yang tidak berubah, diedarkan, tidak tersusun
  3. Jadual PGrouped , V > - peta kunci jenis K yang diedarkan dan disusun ke Iterable V yang mungkin berulang sekali

DoFn adalah kelas asas untuk semua fungsi pemprosesan data . Ini sesuai dengan kelas Mapper , Reducer dan Combiner di MapReduce. Kami menghabiskan sebahagian besar masa pengembangan untuk menulis dan menguji pengiraan logik menggunakannya .

Sekarang kita lebih biasa dengan Crunch, mari kita gunakan untuk membina aplikasi contoh.

3. Menyiapkan Projek Crunch

Pertama sekali, mari kita siapkan Crunch Project dengan Maven. Kita boleh melakukannya dengan dua cara:

  1. Tambahkan kebergantungan yang diperlukan dalam fail pom.xml projek yang ada
  2. Gunakan pola dasar untuk menghasilkan projek permulaan

Mari kita lihat dengan cepat kedua-dua pendekatan tersebut.

3.1. Ketergantungan Maven

Untuk menambahkan Crunch ke projek yang ada, mari tambahkan kebergantungan yang diperlukan dalam fail pom.xml .

Pertama, mari tambahkan perpustakaan inti :

 org.apache.crunch crunch-core 0.15.0 

Seterusnya, mari tambahkan pustaka klien hadoop untuk berkomunikasi dengan Hadoop. Kami menggunakan versi yang sepadan dengan pemasangan Hadoop:

 org.apache.hadoop hadoop-client 2.2.0 provided 

Kami boleh menyemak Maven Central untuk versi terkini perpustakaan klien-inti dan hadoop-pelanggan.

3.2. Jenis Seni Maven

Pendekatan lain adalah dengan cepat menghasilkan projek permulaan menggunakan pola dasar Maven yang disediakan oleh Crunch :

mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype 

Apabila diminta oleh perintah di atas, kami memberikan perincian versi Crunch dan artifak projek.

4. Persediaan Paip Crunch

Setelah menyiapkan projek, kita perlu membuat objek Pipeline . Crunch mempunyai 3 pelaksanaan Pipeline :

  • MRPipeline - dilaksanakan dalam Hadoop MapReduce
  • SparkPipeline - dilaksanakan sebagai rangkaian saluran paip Spark
  • MemPipeline - melaksanakan memori dalam klien dan berguna untuk ujian unit

Biasanya, kami mengembangkan dan menguji menggunakan contoh MemPipeline . Kemudian kami menggunakan contoh MRPipeline atau SparkPipeline untuk pelaksanaan sebenar.

Sekiranya kita memerlukan saluran paip dalam memori, kita dapat menggunakan kaedah statik getInstance untuk mendapatkan contoh MemPipeline :

Pipeline pipeline = MemPipeline.getInstance();

Tetapi buat masa ini, mari buat contoh MRPipeline untuk melaksanakan aplikasi dengan Hadoop :

Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

5. Baca Data Input

Setelah membuat objek saluran paip, kami ingin membaca data input. Antara muka Pipeline menyediakan kaedah kemudahan untuk membaca input dari fail teks , readTextFile (pathName).

Mari kita panggil kaedah ini untuk membaca fail teks input:

PCollection lines = pipeline.readTextFile(inputPath);

Kod di atas membaca fail teks sebagai koleksi String .

Sebagai langkah seterusnya, mari tulis kes ujian untuk membaca input:

@Test public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() { Pipeline pipeline = MemPipeline.getInstance(); PCollection lines = pipeline.readTextFile(INPUT_FILE_PATH); assertEquals(21, lines.asCollection() .getValue() .size()); }

Dalam ujian ini, kami mengesahkan bahawa kami mendapat bilangan baris yang diharapkan semasa membaca fail teks.

6. Langkah Pemprosesan Data

Setelah membaca data input, kita perlu memprosesnya. Crunch API mengandungi sejumlah subkelas DoFn untuk menangani senario pemprosesan data biasa :

  • FilterFn - menapis ahli koleksi berdasarkan keadaan boolean
  • MapFn - memetakan setiap rekod input ke tepat satu catatan output
  • CombineFn - menggabungkan sebilangan nilai menjadi satu nilai
  • JoinFn - melakukan gabung seperti gabung dalaman, gabung luar kiri, gabung luar kanan dan gabung luar penuh

Let's implement the following data processing logic by using these classes:

  1. Split each line in the input file into words
  2. Remove the stop words
  3. Count the unique words

6.1. Split a Line of Text Into Words

First of all, let's create the Tokenizer class to split a line into words.

We'll extend the DoFn class. This class has an abstract method called process. This method processes the input records from a PCollection and sends the output to an Emitter.

We need to implement the splitting logic in this method:

public class Tokenizer extends DoFn { private static final Splitter SPLITTER = Splitter .onPattern("\\s+") .omitEmptyStrings(); @Override public void process(String line, Emitter emitter) { for (String word : SPLITTER.split(line)) { emitter.emit(word); } } } 

In the above implementation, we've used the Splitter class from Guava library to extract words from a line.

Next, let's write a unit test for the Tokenizer class:

@RunWith(MockitoJUnitRunner.class) public class TokenizerUnitTest { @Mock private Emitter emitter; @Test public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted() { Tokenizer splitter = new Tokenizer(); splitter.process(" hello world ", emitter); verify(emitter).emit("hello"); verify(emitter).emit("world"); verifyNoMoreInteractions(emitter); } }

The above test verifies that the correct words are returned.

Finally, let's split the lines read from the input text file using this class.

The parallelDo method of PCollection interface applies the given DoFn to all the elements and returns a new PCollection.

Let's call this method on the lines collection and pass an instance of Tokenizer:

PCollection words = lines.parallelDo(new Tokenizer(), Writables.strings()); 

As a result, we get the list of words in the input text file. We'll remove the stop words in the next step.

6.2. Remove Stop Words

Similarly to the previous step, let's create a StopWordFilter class to filter out stop words.

However, we'll extend FilterFn instead of DoFn. FilterFn has an abstract method called accept. We need to implement the filtering logic in this method:

public class StopWordFilter extends FilterFn { // English stop words, borrowed from Lucene. private static final Set STOP_WORDS = ImmutableSet .copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it", "no", "not", "of", "on", "or", "s", "such", "t", "that", "the", "their", "then", "there", "these", "they", "this", "to", "was", "will", "with" }); @Override public boolean accept(String word) { return !STOP_WORDS.contains(word); } }

Next, let's write the unit test for StopWordFilter class:

public class StopWordFilterUnitTest { @Test public void givenFilter_whenStopWordPassed_thenFalseReturned() { FilterFn filter = new StopWordFilter(); assertFalse(filter.accept("the")); assertFalse(filter.accept("a")); } @Test public void givenFilter_whenNonStopWordPassed_thenTrueReturned() { FilterFn filter = new StopWordFilter(); assertTrue(filter.accept("Hello")); assertTrue(filter.accept("World")); } @Test public void givenWordCollection_whenFiltered_thenStopWordsRemoved() { PCollection words = MemPipeline .collectionOf("This", "is", "a", "test", "sentence"); PCollection noStopWords = words.filter(new StopWordFilter()); assertEquals(ImmutableList.of("This", "test", "sentence"), Lists.newArrayList(noStopWords.materialize())); } }

This test verifies that the filtering logic is performed correctly.

Finally, let's use StopWordFilter to filter the list of words generated in the previous step. The filter method of PCollection interface applies the given FilterFn to all the elements and returns a new PCollection.

Let's call this method on the words collection and pass an instance of StopWordFilter:

PCollection noStopWords = words.filter(new StopWordFilter());

As a result, we get the filtered collection of words.

6.3. Count Unique Words

After getting the filtered collection of words, we want to count how often each word occurs. PCollection interface has a number of methods to perform common aggregations:

  • min – returns the minimum element of the collection
  • max – returns the maximum element of the collection
  • length – returns the number of elements in the collection
  • count – returns a PTable that contains the count of each unique element of the collection

Let's use the count method to get the unique words along with their counts:

// The count method applies a series of Crunch primitives and returns // a map of the unique words in the input PCollection to their counts. PTable counts = noStopWords.count();

7. Specify Output

As a result of the previous steps, we have a table of words and their counts. We want to write this result to a text file. The Pipeline interface provides convenience methods to write output:

void write(PCollection collection, Target target); void write(PCollection collection, Target target, Target.WriteMode writeMode); void writeTextFile(PCollection collection, String pathName);

Therefore, let's call the writeTextFile method:

pipeline.writeTextFile(counts, outputPath); 

8. Manage Pipeline Execution

All the steps so far have just defined the data pipeline. No input has been read or processed. This is because Crunch uses lazy execution model.

It doesn't run the MapReduce jobs until a method that controls job planning and execution is invoked on the Pipeline interface:

  • run – prepares an execution plan to create the required outputs and then executes it synchronously
  • done – runs any remaining jobs required to generate outputs and then cleans up any intermediate data files created
  • runAsync – similar to run method, but executes in a non-blocking fashion

Therefore, let's call the done method to execute the pipeline as MapReduce jobs:

PipelineResult result = pipeline.done(); 

The above statement runs the MapReduce jobs to read input, process them and write the result to the output directory.

9. Putting the Pipeline Together

So far we have developed and unit tested the logic to read input data, process it and write to the output file.

Next, let's put them together to build the entire data pipeline:

public int run(String[] args) throws Exception { String inputPath = args[0]; String outputPath = args[1]; // Create an object to coordinate pipeline creation and execution. Pipeline pipeline = new MRPipeline(WordCount.class, getConf()); // Reference a given text file as a collection of Strings. PCollection lines = pipeline.readTextFile(inputPath); // Define a function that splits each line in a PCollection of Strings into // a PCollection made up of the individual words in the file. // The second argument sets the serialization format. PCollection words = lines.parallelDo(new Tokenizer(), Writables.strings()); // Take the collection of words and remove known stop words. PCollection noStopWords = words.filter(new StopWordFilter()); // The count method applies a series of Crunch primitives and returns // a map of the unique words in the input PCollection to their counts. PTable counts = noStopWords.count(); // Instruct the pipeline to write the resulting counts to a text file. pipeline.writeTextFile(counts, outputPath); // Execute the pipeline as a MapReduce. PipelineResult result = pipeline.done(); return result.succeeded() ? 0 : 1; }

10. Hadoop Launch Configuration

The data pipeline is thus ready.

However, we need the code to launch it. Therefore, let's write the main method to launch the application:

public class WordCount extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new WordCount(), args); }

ToolRunner.run parses the Hadoop configuration from the command line and executes the MapReduce job.

11. Run Application

The complete application is now ready. Let's run the following command to build it:

mvn package 

As a result of the above command, we get the packaged application and a special job jar in the target directory.

Let's use this job jar to execute the application on Hadoop:

hadoop jar target/crunch-1.0-SNAPSHOT-job.jar 

The application reads the input file and writes the result to the output file. The output file contains unique words along with their counts similar to the following:

[Add,1] [Added,1] [Admiration,1] [Admitting,1] [Allowance,1]

In addition to Hadoop, we can run the application within IDE, as a stand-alone application or as unit tests.

12. Conclusion

In this tutorial, we created a data processing application running on MapReduce. Apache Crunch makes it easy to write, test and execute MapReduce pipelines in Java.

As usual, the full source code can be found over on Github.