Apache Spark: Perbezaan antara Kerangka Data, Set Data dan RDD

1. Gambaran keseluruhan

Apache Spark adalah sistem pemprosesan data yang cepat dan diedarkan. Ia melakukan pemprosesan data dalam memori dan menggunakan cache dalam memori dan pelaksanaan yang dioptimumkan sehingga menghasilkan prestasi yang cepat. Ia menyediakan API peringkat tinggi untuk bahasa pengaturcaraan yang popular seperti Scala, Python, Java, dan R.

Dalam tutorial ringkas ini, kita akan membahas tiga konsep asas Spark: kerangka data, set data, dan RDD.

2. DataFrame

Spark SQL memperkenalkan abstraksi data berbentuk tabular yang disebut DataFrame sejak Spark 1.3. Sejak itu, ia menjadi salah satu ciri terpenting di Spark. API ini berguna semasa kita ingin menangani data tersebar dan separa berstruktur, diedarkan.

Dalam bahagian 3, kita akan membincangkan Set Data Terdistribusi yang Berdaya Tahan (RDD). DataFrames menyimpan data dengan cara yang lebih efisien daripada RDD, ini kerana mereka menggunakan keupayaan RDD yang tidak berubah, dalam memori, berdaya tahan, tersebar, dan selari tetapi mereka juga menerapkan skema pada data. DataFrames juga menerjemahkan kod SQL ke dalam operasi RDD tahap rendah yang dioptimumkan.

Kita boleh membuat DataFrames dengan tiga cara:

  • Menukar RDD yang ada
  • Menjalankan pertanyaan SQL
  • Memuatkan data luaran

Pasukan Spark memperkenalkan SparkSession dalam versi 2.0, ia menyatukan semua konteks yang berbeza dengan memastikan pemaju tidak perlu risau untuk membuat konteks yang berbeza:

SparkSession session = SparkSession.builder() .appName("TouristDataFrameExample") .master("local[*]") .getOrCreate(); DataFrameReader dataFrameReader = session.read();

Kami akan menganalisis fail Tourist.csv :

Dataset data = dataFrameReader.option("header", "true") .csv("data/Tourist.csv");

Sejak Spark 2.0 DataFrame menjadi Dataset jenis Row , jadi kami dapat menggunakan DataFrame sebagai alias untuk Dataset .

Kami dapat memilih lajur tertentu yang kami minati. Kami juga dapat menapis dan mengelompokkan mengikut lajur tertentu:

data.select(col("country"), col("year"), col("value")) .show(); data.filter(col("country").equalTo("Mexico")) .show(); data.groupBy(col("country")) .count() .show();

3. Set data

Set data adalah sekumpulan data berstruktur yang sangat ditaip . Mereka menyediakan gaya pengaturcaraan berorientasi objek yang sudah biasa ditambah dengan manfaat keselamatan jenis kerana kumpulan data dapat memeriksa sintaks dan menangkap kesalahan pada waktu kompilasi.

Dataset adalah lanjutan dari DataFrame, oleh itu kita dapat menganggap DataFrame sebagai pandangan yang tidak diketik dari set data.

Pasukan Spark merilis Dataset API di Spark 1.6 dan seperti yang mereka sebutkan: "tujuan Spark Datasets adalah untuk menyediakan API yang memungkinkan pengguna untuk dengan mudah mengekspresikan transformasi pada domain objek, sementara juga memberikan kelebihan kinerja dan ketahanan pelaksanaan Spark SQL enjin ”.

Pertama, kita perlu membuat kelas jenis TouristData :

public class TouristData { private String region; private String country; private String year; private String series; private Double value; private String footnotes; private String source; // ... getters and setters }

Untuk memetakan setiap rekod kami ke jenis yang ditentukan, kami perlu menggunakan Encoder. Pengekod menterjemahkan antara objek Java dan format binari dalaman Spark :

// SparkSession initialization and data load Dataset responseWithSelectedColumns = data.select(col("region"), col("country"), col("year"), col("series"), col("value").cast("double"), col("footnotes"), col("source")); Dataset typedDataset = responseWithSelectedColumns .as(Encoders.bean(TouristData.class));

Seperti DataFrame, kita dapat menapis dan mengelompokkan mengikut lajur tertentu:

typedDataset.filter((FilterFunction) record -> record.getCountry() .equals("Norway")) .show(); typedDataset.groupBy(typedDataset.col("country")) .count() .show();

Kami juga boleh melakukan operasi seperti menyaring mengikut lajur yang memadankan julat tertentu atau mengira jumlah lajur tertentu, untuk mendapatkan nilai keseluruhannya:

typedDataset.filter((FilterFunction) record -> record.getYear() != null && (Long.valueOf(record.getYear()) > 2010 && Long.valueOf(record.getYear())  record.getValue() != null && record.getSeries() .contains("expenditure")) .groupBy("country") .agg(sum("value")) .show();

4. RDD

Dataset Terdistribusi Berdaya Tahan atau RDD adalah abstraksi pengaturcaraan utama Spark. Ini mewakili kumpulan elemen yang: tidak berubah, tahan, dan diedarkan .

RDD merangkumi set data yang besar, Spark secara automatik akan menyebarkan data yang terdapat dalam RDD di seluruh kluster kami dan menyelaraskan operasi yang kami lakukan ke atasnya .

Kita boleh membuat RDD hanya melalui operasi data dalam penyimpanan stabil atau operasi pada RDD lain.

Toleransi kesalahan sangat penting ketika kita berurusan dengan kumpulan data yang besar dan data diedarkan pada mesin kluster. RDD tahan kerana mekanisme pemulihan kesalahan bawaan Spark. Spark bergantung pada fakta bahawa RDD menghafal bagaimana ia dibuat sehingga kita dapat dengan mudah mengesan garis keturunan untuk memulihkan partisi .

Terdapat dua jenis operasi yang dapat kita lakukan pada RDD: Transformasi dan Tindakan .

4.1. Transformasi

Kita dapat menerapkan Transformasi ke RDD untuk memanipulasi datanya. Setelah manipulasi ini dilakukan, kita akan mendapat RDD baru, kerana RDD adalah objek yang tidak dapat diubah .

Kami akan memeriksa cara melaksanakan Peta dan Penapis, dua transformasi yang paling biasa.

Pertama, kita perlu membuat JavaSparkContext dan memuat data sebagai RDD dari fail Tourist.csv :

SparkConf conf = new SparkConf().setAppName("uppercaseCountries") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD tourists = sc.textFile("data/Tourist.csv");

Seterusnya, mari kita gunakan fungsi peta untuk mendapatkan nama negara dari setiap rekod dan menukar namanya menjadi huruf besar. Kami dapat menyimpan set data yang baru dihasilkan ini sebagai fail teks pada cakera:

JavaRDD upperCaseCountries = tourists.map(line -> { String[] columns = line.split(COMMA_DELIMITER); return columns[1].toUpperCase(); }).distinct(); upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");

If we want to select only a specific country, we can apply the filter function on our original tourists RDD:

JavaRDD touristsInMexico = tourists .filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico")); touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");

4.2. Actions

Actions will return a final value or save the results to disc, after doing some computation on the data.

Two of the recurrently used actions in Spark are Count and Reduce.

Let's count the total countries on our CSV file:

// Spark Context initialization and data load JavaRDD countries = tourists.map(line -> { String[] columns = line.split(COMMA_DELIMITER); return columns[1]; }).distinct(); Long numberOfCountries = countries.count();

Now, we'll calculate the total expenditure by country. We'll need to filter the records containing expenditure in their description.

Instead of using a JavaRDD, we'll use a JavaPairRDD. A pair of RDD is a type of RDD that can store key-value pairs. Let's check it next:

JavaRDD touristsExpenditure = tourists .filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure")); JavaPairRDD expenditurePairRdd = touristsExpenditure .mapToPair(line -> { String[] columns = line.split(COMMA_DELIMITER); return new Tuple2(columns[1], Double.valueOf(columns[6])); }); List
    
      totalByCountry = expenditurePairRdd .reduceByKey((x, y) -> x + y) .collect();
    

5. Conclusion

Ringkasnya, kita harus menggunakan DataFrames atau Dataset ketika kita memerlukan API khusus domain, kita memerlukan ungkapan tingkat tinggi seperti agregasi, jumlah, atau pertanyaan SQL. Atau semasa kita mahukan keselamatan jenis pada waktu kompilasi.

Sebaliknya, kita harus menggunakan RDD ketika data tidak terstruktur dan kita tidak perlu menerapkan skema tertentu atau ketika kita memerlukan transformasi dan tindakan tingkat rendah.

Seperti biasa, semua contoh kod boleh didapati di GitHub.