ETL dengan Aliran Data Awan Musim Semi

1. Gambaran keseluruhan

Spring Cloud Data Flow adalah toolkit asli awan untuk membina saluran paip data masa nyata dan proses kumpulan. Spring Cloud Data Flow siap digunakan untuk pelbagai kes penggunaan pemprosesan data seperti import / eksport sederhana, pemprosesan ETL, streaming peristiwa, dan analitik ramalan.

Dalam tutorial ini, kita akan mempelajari contoh Extract Transform and Load (ETL) masa nyata menggunakan saluran paip aliran yang mengekstrak data dari pangkalan data JDBC, mengubahnya menjadi POJO sederhana dan memuatkannya ke dalam MongoDB.

2. Pemprosesan ETL dan Acara-Aliran

ETL - ekstrak, transformasi dan pemuatan - biasanya disebut sebagai proses yang memuatkan data dari beberapa pangkalan data dan sistem ke gudang data umum. Di gudang data ini, proses pemprosesan analisis data mungkin dilakukan tanpa menjejaskan prestasi keseluruhan sistem.

Walau bagaimanapun, tren baru mengubah cara bagaimana ini dilakukan. ETL masih berperanan dalam memindahkan data ke gudang data dan tasik data.

Pada masa ini, ini dapat dilakukan dengan aliran dalam seni bina aliran acara dengan bantuan Spring Cloud Data Flow .

3. Aliran Data Awan Musim Semi

Dengan Spring Cloud Data Flow (SCDF), pembangun dapat membuat saluran data dalam dua versi:

  • Aplikasi aliran masa nyata yang lama menggunakan Spring Cloud Stream
  • Aplikasi tugas kumpulan pendek yang menggunakan Spring Cloud Task

Dalam artikel ini, kita akan membahas aplikasi penstriman jangka panjang yang pertama berdasarkan Spring Cloud Stream.

3.1. Aplikasi Spring Cloud Stream

Saluran saluran SCDF terdiri dari langkah-langkah, di mana setiap langkah adalah aplikasi yang dibangun dalam gaya Spring Boot menggunakan kerangka mikro Spring Cloud Stream. Aplikasi ini disatukan oleh middleware pesanan seperti Apache Kafka atau RabbitMQ.

Aplikasi ini diklasifikasikan menjadi sumber, pemproses, dan sink. Jika dibandingkan dengan proses ETL, kita dapat mengatakan bahwa sumbernya adalah "ekstrak", prosesor adalah "transformer" dan sink adalah bahagian "load".

Dalam beberapa kes, kita dapat menggunakan starter aplikasi dalam satu atau beberapa langkah saluran paip. Ini bermaksud bahawa kita tidak perlu menerapkan aplikasi baru untuk satu langkah, tetapi sebaliknya, konfigurasikan aplikasi pemula yang sudah ada.

Senarai pemula aplikasi boleh didapati di sini.

3.2. Pelayan Aliran Data Spring

Bahagian terakhir seni bina adalah Spring Cloud Data Flow Server . Pelayan SCDF melakukan penyebaran aplikasi dan aliran saluran paip menggunakan Spesifikasi Spring Cloud Deployer. Spesifikasi ini menyokong rasa asli awan SCDF dengan menyebarkan ke berbagai waktu tayang moden, seperti Kubernetes, Apache Mesos, Yarn, dan Cloud Foundry.

Juga, kita dapat menjalankan aliran sebagai penyebaran tempatan.

Maklumat lebih lanjut mengenai seni bina SCDF boleh didapati di sini.

4. Persediaan Alam Sekitar

Sebelum memulakan, kita perlu memilih bahagian penyebaran kompleks ini . Bahagian pertama yang ditentukan ialah Pelayan SCDF.

Untuk ujian, kami akan menggunakan SCDF Server Local untuk pembangunan tempatan . Untuk penyebaran pengeluaran, kita kemudian dapat memilih runtime cloud-native, seperti SCDF Server Kubernetes. Kami boleh mendapatkan senarai waktu operasi pelayan di sini.

Sekarang, mari kita periksa keperluan sistem untuk menjalankan pelayan ini.

4.1. Keperluan Sistem

Untuk menjalankan SCDF Server, kita harus menentukan dan menetapkan dua kebergantungan:

  • alat tengah pesanan, dan
  • RDBMS.

Untuk perisian tengah pemesejan, kami akan bekerjasama dengan RabbitMQ, dan kami memilih PostgreSQL sebagai RDBMS untuk menyimpan definisi aliran saluran paip kami.

Untuk menjalankan RabbitMQ, muat turun versi terbaru di sini dan mulakan contoh RabbitMQ menggunakan konfigurasi lalai atau jalankan perintah Docker berikut:

docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

Sebagai langkah persediaan terakhir, pasang dan jalankan PostgreSQL RDBMS pada port lalai 5432. Selepas ini, buat pangkalan data di mana SCDF dapat menyimpan definisi alirannya menggunakan skrip berikut:

CREATE DATABASE dataflow;

4.2. Pelayan Aliran Data Spring Cloud Setempat

Untuk menjalankan SCDF Server Local, kita dapat memilih untuk memulai server menggunakan docker-compose , atau kita dapat memulai sebagai aplikasi Java.

Di sini, kita akan menjalankan SCDF Server Local sebagai aplikasi Java. Untuk mengkonfigurasi aplikasi, kita harus menentukan konfigurasi sebagai parameter aplikasi Java. Kami memerlukan Java 8 di jalan Sistem.

Untuk menampung balang dan pergantungan, kita perlu membuat folder utama untuk Pelayan SCDF kami dan memuat turun pengedaran SCDF Pelayan Tempatan ke dalam folder ini. Anda boleh memuat turun edaran SCDF Server Local terkini di sini.

Kita juga perlu membuat folder lib dan meletakkan pemacu JDBC di sana. Versi terbaru pemacu PostgreSQL boleh didapati di sini.

Akhirnya, mari jalankan pelayan tempatan SCDF:

$java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \ --spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \ --spring.datasource.username=postgres_username \ --spring.datasource.password=postgres_password \ --spring.datasource.driver-class-name=org.postgresql.Driver \ --spring.rabbitmq.host=127.0.0.1 \ --spring.rabbitmq.port=5672 \ --spring.rabbitmq.username=guest \ --spring.rabbitmq.password=guest

Kami dapat memeriksa sama ada ia berjalan dengan melihat URL ini:

// localhost: 9393 / papan pemuka

4.3. Shell Aliran Data Spring

Shell SCDF adalah alat baris perintah yang memudahkan untuk menyusun dan menggunakan aplikasi dan saluran paip kami . Perintah Shell ini dijalankan di atas REST API Spring Cloud Data Flow Server.

Download the latest version of the jar into your SCDF home folder, available here. Once it is done, run the following command (update the version as needed):

$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar ____ ____ _ __ / ___| _ __ _ __(_)_ __ __ _ / ___| | ___ _ _ __| | \___ \| '_ \| '__| | '_ \ / _` | | | | |/ _ \| | | |/ _` | ___) | |_) | | | | | | | (_| | | |___| | (_) | |_| | (_| | |____/| .__/|_| |_|_| |_|\__, | \____|_|\___/ \__,_|\__,_| ____ |_| _ __|___/ __________ | _ \ __ _| |_ __ _ | ___| | _____ __ \ \ \ \ \ \ | | | |/ _` | __/ _` | | |_ | |/ _ \ \ /\ / / \ \ \ \ \ \ | |_| | (_| | || (_| | | _| | | (_) \ V V / / / / / / / |____/ \__,_|\__\__,_| |_| |_|\___/ \_/\_/ /_/_/_/_/_/ Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help". dataflow:>

If instead of “dataflow:>” you get “server-unknown:>” in the last line, you are not running the SCDF Server at localhost. In this case, run the following command to connect to another host:

server-unknown:>dataflow config server //{host}

Now, Shell is connected to the SCDF Server, and we can run our commands.

The first thing we need to do in Shell is to import the application starters. Find the latest version here for RabbitMQ+Maven in Spring Boot 2.0.x, and run the following command (again, update the version, here “Darwin-SR1“, as needed):

$ dataflow:>app import --uri //bit.ly/Darwin-SR1-stream-applications-rabbit-maven

For checking the installed applications run the following Shell command:

$ dataflow:> app list

As a result, we should see a table containing all the installed applications.

Also, SCDF offers a graphical interface, named Flo, that we can access by this address: //localhost:9393/dashboard. However, its use isn't in the scope of this article.

5. Composing an ETL Pipeline

Let's now create our stream pipeline. For doing this, we'll use the JDBC Source application starter to extract information from our relational database.

Also, we'll create a custom processor for transforming the information structure and a custom sink to load our data into a MongoDB.

5.1. Extract – Preparing a Relational Database for Extraction

Let's create a database with the name of crm and a table with the name of customer:

CREATE DATABASE crm;
CREATE TABLE customer ( id bigint NOT NULL, imported boolean DEFAULT false, customer_name character varying(50), PRIMARY KEY(id) )

Note that we're using a flag imported, which will store which record has already been imported. We could also store this information in another table, if necessary.

Now, let's insert some data:

INSERT INTO customer(id, customer_name, imported) VALUES (1, 'John Doe', false);

5.2. Transform – Mapping JDBC Fields to the MongoDB Fields Structure

For the transformation step, we'll do a simple translation of the field customer_name from the source table, to a new field name. Other transformations could be done here, but let's keep the example short.

To do this, we'll create a new project with the name customer-transform. The easiest way to do this is by using the Spring Initializr site to create the project. After reaching the website, choose a Group and an Artifact name. We'll use com.customer and customer-transform, respectively.

Once this is done, click on the button “Generate Project” to download the project. Then, unzip the project and import it into your favorite IDE, and add the following dependency to the pom.xml:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 

Now we're set to start coding the field name conversion. To do this, we'll create the Customer class to act as an adapter. This class will receive the customer_name via the setName() method and will output its value via getName method.

The @JsonProperty annotations will do the transformation while deserializing from JSON to Java:

public class Customer { private Long id; private String name; @JsonProperty("customer_name") public void setName(String name) { this.name = name; } @JsonProperty("name") public String getName() { return name; } // Getters and Setters }

The processor needs to receive data from an input, do the transformation and bind the outcome to an output channel. Let's create a class to do this:

import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.integration.annotation.Transformer; @EnableBinding(Processor.class) public class CustomerProcessorConfiguration { @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Customer convertToPojo(Customer payload) { return payload; } }

In the above code, we can observe that the transformation occurs automatically. The input receives the data as JSON and Jackson deserialize it into a Customer object using the set methods.

The opposite is for the output, the data is serialized to JSON using the get methods.

5.3. Load – Sink in MongoDB

Similarly to the transform step, we'll create another maven project, now with the name customer-mongodb-sink. Again, access the Spring Initializr, for the Group choose com.customer, and for the Artifact choose customer-mongodb-sink. Then, type MongoDB in the dependencies search box and download the project.

Next, unzip and import it to your favorite IDE.

Then, add the same extra dependency as in the customer-transform project.

Now we'll create another Customer class, for receiving input in this step:

import org.springframework.data.mongodb.core.mapping.Document; @Document(collection="customer") public class Customer { private Long id; private String name; // Getters and Setters }

For sinking the Customer, we'll create a Listener class that will save the customer entity using the CustomerRepository:

@EnableBinding(Sink.class) public class CustomerListener { @Autowired private CustomerRepository repository; @StreamListener(Sink.INPUT) public void save(Customer customer) { repository.save(customer); } }

And the CustomerRepository, in this case, is a MongoRepository from Spring Data:

import org.springframework.data.mongodb.repository.MongoRepository; import org.springframework.stereotype.Repository; @Repository public interface CustomerRepository extends MongoRepository { } 

5.4. Stream Definition

Now, both custom applications are ready to be registered on SCDF Server. To accomplish this, compile both projects using the Maven command mvn install.

We then register them using the Spring Cloud Data Flow Shell:

app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT
app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT

Finally, let's check if the applications are stored at SCDF, run the application list command in the shell:

app list

As a result, we should see both applications in the resulting table.

5.4.1. Stream Pipeline Domain-Specific Language – DSL

A DSL defines the configuration and data flow between the applications. The SCDF DSL is simple. In the first word, we define the name of the application, followed by the configurations.

Also, the syntax is a Unix-inspired Pipeline syntax, that uses vertical bars, also known as “pipes”, to connect multiple applications:

http --port=8181 | log

This creates an HTTP application served in port 8181 which sends any received body payload to a log.

Now, let's see how to create the DSL stream definition of the JDBC Source.

5.4.2. JDBC Source Stream Definition

The key configurations for the JDBC Source are query and update.query will select unread records while update will change a flag to prevent the current records from being reread.

Also, we'll define the JDBC Source to poll in a fixed delay of 30 seconds and polling maximum 1000 rows. Finally, we'll define the configurations of connection, like driver, username, password and connection URL:

jdbc  --query='SELECT id, customer_name FROM public.customer WHERE imported = false' --update='UPDATE public.customer SET imported = true WHERE id in (:id)' --max-rows-per-poll=1000 --fixed-delay=30 --time-unit=SECONDS --driver-class-name=org.postgresql.Driver --url=jdbc:postgresql://localhost:5432/crm --username=postgres --password=postgres

More JDBC Source configuration properties can be found here.

5.4.3. Customer MongoDB Sink Stream Definition

As we didn't define the connection configurations in application.properties of customer-mongodb-sink, we'll configure through DSL parameters.

Our application is fully based on the MongoDataAutoConfiguration. You can check out the other possible configurations here. Basically, we'll define the spring.data.mongodb.uri:

customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main

5.4.4. Create and Deploy the Stream

First, to create the final stream definition, go back to the Shell and execute the following command (without line breaks, they have just been inserted for readability):

stream create --name jdbc-to-mongodb --definition "jdbc --query='SELECT id, customer_name FROM public.customer WHERE imported=false' --fixed-delay=30 --max-rows-per-poll=1000 --update='UPDATE customer SET imported=true WHERE id in (:id)' --time-unit=SECONDS --password=postgres --driver-class-name=org.postgresql.Driver --username=postgres --url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main" 

This stream DSL defines a stream named jdbc-to-mongodb. Next, we'll deploy the stream by its name:

stream deploy --name jdbc-to-mongodb 

Finally, we should see the locations of all available logs in the log output:

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6. Conclusion

In this article, we've seen a full example of an ETL data pipeline using Spring Cloud Data Flow.

Yang paling penting, kami melihat konfigurasi pemula aplikasi, membuat saluran paip ETL menggunakan Spring Cloud Data Flow Shell dan melaksanakan aplikasi khusus untuk membaca, mengubah dan menulis data kami.

Seperti biasa, kod contoh boleh didapati di projek GitHub.