Data Berdasarkan Acara dengan Apache Druid

1. Pengenalan

Dalam tutorial ini, kita akan memahami cara bekerja dengan data acara dan Apache Druid. Kami akan merangkumi asas data acara dan seni bina Druid. Sebagai sebahagian daripada itu, kami akan membuat saluran data mudah menggunakan pelbagai ciri Druid yang merangkumi pelbagai mod penyerapan data dan cara yang berbeza untuk menanyakan data yang disediakan.

2. Konsep Asas

Sebelum kita melihat butiran operasi Apache Druid, mari kita teliti beberapa konsep asas. Ruang yang kami minati adalah analisis data peristiwa masa nyata secara besar-besaran.

Oleh itu, sangat mustahak untuk memahami apa yang kita maksudkan dengan data peristiwa dan apa yang diperlukan untuk menganalisisnya dalam skala masa nyata.

2.1. Apakah Data Peristiwa?

Data peristiwa merujuk kepada sebilangan maklumat mengenai perubahan yang berlaku pada titik waktu tertentu . Data acara hampir terdapat di aplikasi masa kini. Dari log aplikasi klasik hingga data sensor moden yang dihasilkan oleh pelbagai perkara, praktikalnya ada di mana sahaja. Ini sering dicirikan oleh maklumat yang dapat dibaca oleh mesin yang dihasilkan secara besar-besaran.

Mereka menggunakan beberapa fungsi seperti ramalan, automasi, komunikasi, dan integrasi, untuk beberapa nama. Selain itu, mereka sangat penting dalam seni bina berdasarkan acara.

2.2. Apa itu Apache Druid?

Apache Druid adalah pangkalan data analitik masa nyata yang direka untuk analisis pantas berbanding data berorientasikan peristiwa . Druid dimulakan pada tahun 2011, bersumber terbuka di bawah lesen GPL pada tahun 2012, dan berpindah ke Apache License pada tahun 2015. Ia dikendalikan oleh Apache Foundation dengan sumbangan masyarakat dari beberapa organisasi. Ini menyediakan penyerapan masa nyata, prestasi pertanyaan cepat, dan ketersediaan tinggi.

Nama Druid merujuk kepada fakta bahawa senibina dapat berubah untuk menyelesaikan pelbagai jenis masalah data. Ia sering digunakan dalam aplikasi kecerdasan perniagaan untuk menganalisis data data masa nyata dan sejarah yang tinggi.

3. Senibina Druid

Druid adalah sumber data berorientasikan lajur dan diedarkan yang ditulis di Jawa . Ia mampu menelan sejumlah besar data peristiwa dan menawarkan pertanyaan latensi rendah di atas data ini. Lebih-lebih lagi, ia menawarkan kemungkinan untuk memotong data secara sewenang-wenangnya.

Cukup menarik untuk memahami bagaimana seni bina Druid menyokong ciri-ciri ini. Di bahagian ini, kita akan membahas beberapa bahagian penting dalam seni bina Druid.

3.1. Reka Bentuk Penyimpanan Data

Penting untuk memahami bagaimana Druid membina dan menyimpan datanya, yang memungkinkan untuk pembahagian dan pengedaran. Druid membahagikan data secara lalai semasa memproses dan menyimpannya menjadi potongan dan segmen:

Druid menyimpan data dalam apa yang kita ketahui sebagai "sumber data" , yang secara logiknya serupa dengan jadual dalam pangkalan data hubungan. Kumpulan Druid dapat menangani pelbagai sumber data secara selari, diserap dari pelbagai sumber.

Setiap sumber data dipartisi - berdasarkan waktu, secara lalai, dan selanjutnya berdasarkan atribut lain jika dikonfigurasi. A julat masa data ini dikenali sebagai "sebahagian" - sebagai contoh, data satu jam jika data adalah dibahagikan mengikut jam.

Setiap potongan dibahagikan kepada satu atau lebih "segmen" , yang merupakan satu fail yang terdiri daripada banyak baris data. Sumber data mungkin terdapat dari beberapa segmen hingga berjuta-juta segmen.

3.2. Proses Druid

Druid mempunyai seni bina pelbagai proses dan diedarkan . Oleh itu, setiap proses dapat diskalakan secara bebas, memungkinkan kita membuat kelompok yang fleksibel. Mari kita fahami proses penting yang merupakan sebahagian daripada Druid:

  • Penyelaras : Proses ini bertanggungjawab terutamanya untuk pengurusan dan pengedaran segmen dan berkomunikasi dengan proses sejarah untuk memuat atau melepaskan segmen berdasarkan konfigurasi
  • Overlord : Ini adalah proses utama yang bertanggungjawab untuk menerima tugas, menyelaraskan pembahagian tugas, membuat kunci tugas, dan mengembalikan status kepada pemanggil
  • Broker : Ini adalah proses yang mana semua pertanyaan dihantar untuk dilaksanakan dalam kluster yang diedarkan; ia mengumpulkan metadata dari Zookeeper dan mengarahkan pertanyaan ke proses yang mempunyai segmen yang betul
  • Router : Ini adalah proses pilihan yang dapat digunakan untuk merutekan pertanyaan ke proses broker yang berbeza, sehingga memberikan pengasingan pertanyaan ke pertanyaan untuk data yang lebih penting
  • Sejarah : Ini adalah proses yang menyimpan data yang boleh ditanyakan; mereka mengekalkan hubungan tetap dengan Zookeeper dan memerhatikan maklumat segmen yang harus mereka muat dan dilayan
  • MiddleManager : Ini adalah proses pekerja yang melaksanakan tugas yang dihantar; mereka meneruskan tugas kepada Peons yang berjalan di JVM yang terpisah, sehingga memberikan pengasingan sumber dan log

3.3. Kebergantungan Luar

Selain daripada proses inti, Druid bergantung pada beberapa kebergantungan luaran agar klusternya berfungsi seperti yang diharapkan .

Mari lihat bagaimana kluster Druid dibentuk bersama dengan proses teras dan pergantungan luaran:

Druid menggunakan penyimpanan dalam untuk menyimpan data yang telah diserap ke dalam sistem. Ini tidak digunakan untuk menjawab pertanyaan tetapi digunakan sebagai sandaran data dan untuk memindahkan data antara proses. Ini boleh berupa apa sahaja dari sistem fail tempatan ke kedai objek yang diedarkan seperti S3 dan HDFS.

The penyimpanan metadata digunakan untuk memegang berkongsi sistem metadata seperti maklumat penggunaan segmen dan maklumat tugas. Namun, ia tidak pernah digunakan untuk menyimpan data sebenar. Ia adalah pangkalan data hubungan seperti Apache Derby, PostgreSQL, atau MySQL.

Penggunaan Druid Apache Zookeeper untuk pengurusan keadaan kluster semasa . Ini memudahkan sejumlah operasi dalam kelompok Druid seperti pemilihan koordinator / pemimpin pemimpin, protokol penerbitan segmen, dan protokol muat / turun segmen.

4. Persediaan Druid

Druid direka untuk digunakan sebagai kelompok yang boleh diskalakan dan bertolak ansur. Walau bagaimanapun, menyusun kluster Druid kelas pengeluaran tidak sepele . Seperti yang telah kita saksikan sebelumnya, terdapat banyak proses dan pergantungan luaran untuk disiapkan dan dikonfigurasi. Oleh kerana dapat membuat kluster secara fleksibel, kita mesti memperhatikan keperluan kita untuk mengatur proses individu dengan tepat.

Also, Druid is only supported in Unix-like environments and not on Windows. Moreover, Java 8 or later is required to run Druid processes. There are several single-server configurations available for setting up Druid on a single machine for running tutorials and examples. However, for running a production workload, it's recommended to set up a full-fledged Druid cluster with multiple machines.

For the purpose of this tutorial, we'll set up Druid on a single machine with the help of the official Docker image published on the Docker Hub. This enables us to run Druid on Windows as well, which, as we have discussed earlier, is not otherwise supported. There is a Docker compose file available, which creates a container for each Druid process and its external dependencies.

We have to provide configuration values to Druid as environment variables. The easiest way to achieve this is to provide a file called “environment” in the same directory as the Docker compose file.

Once we have the Docker compose and the environment file in place, starting up Druid is as simple as running a command in the same directory:

docker-compose up

This will bring up all the containers required for a single-machine Druid setup. We have to be careful to provide enough memory to the Docker machine, as Druid consumes a significant amount of resources.

5. Ingesting Data

The first step towards building a data pipeline using Druid is to load data into Druid. This process is referred to as data ingestion or indexing in Druid architecture. We have to find a suitable dataset to proceed with this tutorial.

Now, as we have gathered so far, we have to pick up data that are events and have some temporal nature, to make the most out of the Druid infrastructure.

The official guide for Druid uses simple and elegant data containing Wikipedia page edits for a specific date. We'll continue to use that for our tutorial here.

5.1. Data Model

Let's begin by examining the structure of the data we have with us. Most of the data pipeline we create is quite sensitive to data anomalies, and hence, it's necessary to clean-up the data as much as possible.

Although there are sophisticated ways and tools to perform data analysis, we'll begin by visual inspection. A quick analysis reveals that the input data has events captured in JSON format, with a single event containing typical attributes:

{ "time": "2015-09-12T02:10:26.679Z", "channel": "#pt.wikipedia", "cityName": null, "comment": "Houveram problemas na última edição e tive de refazê-las, junto com as atualizações da página.", "countryIsoCode": "BR", "countryName": "Brazil", "isAnonymous": true, "isMinor": false, "isNew": false, "isRobot": false, "isUnpatrolled": true, "metroCode": null, "namespace": "Main", "page": "Catarina Muniz", "regionIsoCode": null, "regionName": null, "user": "181.213.37.148", "delta": 197, "added": 197, "deleted": 0 }

While there are quite a number of attributes defining this event, there are a few that are of special interest to us when working with Druid:

  • Timestamp
  • Dimensions
  • Metrics

Druid requires a particular attribute to identify as a timestamp column. In most situations, Druid's data parser is able to automatically detect the best candidate. But we always have a choice to select from, especially if we do not have a fitting attribute in our data.

Dimensions are the attributes that Druid stores as-is. We can use them for any purpose like grouping, filtering, or applying aggregators. We have a choice to select dimensions in the ingestion specification, which we'll discuss further in the tutorial.

Metrics are the attributes that, unlike dimensions, are stored in aggregated form by default. We can choose an aggregation function for Druid to apply to these attributes during ingestion. Together with roll-up enabled, these can lead to compact data representations.

5.2. Ingestion Methods

Now, we'll discuss various ways we can perform the data ingestion in Druid. Typically, event-driven data are streaming in nature, which means they keep generating at various pace over time, like Wikipedia edits.

However, we may have data batched for a period of time to go over, where data is more static in nature, like all Wikipedia edits that happened last year.

We may also have diverse data use cases to solve, and Druid has fantastic support for most of them. Let's go over two of the most common ways to use Druid in a data pipeline:

  • Streaming Ingestion
  • Batched Ingestion

The most common way to ingest data in Druid is through the Apache Streaming service, where Druid can read data directly from Kafka. Druid supports other platforms like Kinesis as well. We have to start supervisors on the Overload process, which creates and manages Kafka indexing tasks. We can start the supervisor by submitted a supervisor spec as a JSON file over the HTTP POST command of the Overload process.

Alternatively, we can ingest data in batch — for example, from a local or remote file. It offers a choice for Hadoop-based batch ingestion for ingesting data from the Hadoop filesystem in the Hadoop file format. More commonly, we can choose the native batch ingestion either sequentially or in parallel. It's a more convenient and simpler approach as it does not have any external dependencies.

5.3. Defining the Task Specification

For this tutorial, we'll set up a native batch ingestion task for the input data we have. We have the option of configuring the task from the Druid console, which gives us an intuitive graphical interface. Alternately, we can define the task spec as a JSON file and submit it to the overlord process using a script or the command line.

Let's first define a simple task spec for ingesting our data in a file called wikipedia-index.json:

{ "type" : "index_parallel", "spec" : { "dataSchema" : { "dataSource" : "wikipedia", "dimensionsSpec" : { "dimensions" : [ "channel", "cityName", "comment", "countryIsoCode", "countryName", "isAnonymous", "isMinor", "isNew", "isRobot", "isUnpatrolled", "metroCode", "namespace", "page", "regionIsoCode", "regionName", "user", { "name": "added", "type": "long" }, { "name": "deleted", "type": "long" }, { "name": "delta", "type": "long" } ] }, "timestampSpec": { "column": "time", "format": "iso" }, "metricsSpec" : [], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "day", "queryGranularity" : "none", "intervals" : ["2015-09-12/2015-09-13"], "rollup" : false } }, "ioConfig" : { "type" : "index_parallel", "inputSource" : { "type" : "local", "baseDir" : "quickstart/tutorial/", "filter" : "wikiticker-2015-09-12-sampled.json.gz" }, "inputFormat" : { "type": "json" }, "appendToExisting" : false }, "tuningConfig" : { "type" : "index_parallel", "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000 } } }

Let's understand this task spec with respect to the basics we've gone through in previous sub-sections:

  • We have chosen the index_parallel task, which provides us native batch ingestion in parallel
  • The datasource we'll be using in this task has the name “wikipedia”
  • The timestamp for our data is coming from the attribute “time”
  • There are a number of data attributes we are adding as dimensions
  • We're not using any metrics for our data in the current task
  • Roll-up, which is enabled by default, should be disabled for this task
  • The input source for the task is a local file named wikiticker-2015-09-12-sampled.json.gz
  • We're not using any secondary partition, which we can define in the tuningConfig

This task spec assumes that we've downloaded the data filewikiticker-2015-09-12-sampled.json.gz and kept it on the local machine where Druid is running. This may be trickier when we're running Druid as a Docker container. Fortunately, Druid comes with this sample data present by default at the location quickstart/tutorial.

5.4. Submitting the Task Specification

Finally, we can submit this task spec to the overlord process through the command line using a tool like curl:

curl -X 'POST' -H 'Content-Type:application/json' -d @wikipedia-index.json //localhost:8081/druid/indexer/v1/task

Normally, the above command returns the ID of the task if the submission is successful. We can verify the state of our ingestion task through the Druid console or by performing queries, which we'll go through in the next section.

5.5. Advanced Ingestion Concepts

Druid is best suited for when we have a massive scale of data to deal with — certainly not the kind of data we've seen in this tutorial! Now, to enable features at scale, Druid architecture must provide suitable tools and tricks.

While we'll not use them in this tutorial, let's quickly discuss roll-up and partitioning.

Event data can soon grow in size to massive volumes, which can affect the query performance we can achieve. In many scenarios, it may be possible for us to summarise data over time. This is what we know as roll-up in Druid. When roll-up is enabled, Druid makes an effort to roll-up rows with identical dimensions and timestamps during ingestion. While it can save space, roll-up does lead to a loss in query precision, hence, we must use it rationally.

Another potential way to achieve better performance at the face of rising data volume is distributing the data and, hence, the workload. By default, Druid partitions the data based on timestamps into time chunks containing one or more segments. Further, we can decide to do secondary partitioning using natural dimensions to improve data locality. Moreover, Druid sorts data within every segment by timestamp first and then by other dimensions that we configure.

6. Querying Data

Once we have successfully performed the data ingestion, it should be ready for us to query. There are multiple ways to query data in Druid. The simplest way to execute a query in Druid is through the Druid console. However, we can also execute queries by sending HTTP commands or using a command-line tool.

The two prominent ways to construct queries in Druid are native queries and SQL-like queries. We're going to construct some basic queries in both these ways and send them over HTTP using curl. Let's find out how we can create some simple queries on the data we have ingested earlier in Druid.

6.1. Native Queries

Native queries in Druid use JSON objects, which we can send to a broker or a router for processing. We can send the queries over the HTTP POST command, amongst other ways, to do the same.

Let's create a JSON file by the name simple_query_native.json:

{ "queryType" : "topN", "dataSource" : "wikipedia", "intervals" : ["2015-09-12/2015-09-13"], "granularity" : "all", "dimension" : "page", "metric" : "count", "threshold" : 10, "aggregations" : [ { "type" : "count", "name" : "count" } ] }

This is a simple query that fetches the top ten pages that received the top number of page edits between the 12th and 13th of September, 2019.

Let's post this over HTTP using curl:

curl -X 'POST' -H 'Content-Type:application/json' -d @simple_query_native.json //localhost:8888/druid/v2?pretty

This response contains the details of the top ten pages in JSON format:

[ { "timestamp" : "2015-09-12T00:46:58.771Z", "result" : [ { "count" : 33, "page" : "Wikipedia:Vandalismusmeldung" }, { "count" : 28, "page" : "User:Cyde/List of candidates for speedy deletion/Subpage" }, { "count" : 27, "page" : "Jeremy Corbyn" }, { "count" : 21, "page" : "Wikipedia:Administrators' noticeboard/Incidents" }, { "count" : 20, "page" : "Flavia Pennetta" }, { "count" : 18, "page" : "Total Drama Presents: The Ridonculous Race" }, { "count" : 18, "page" : "User talk:Dudeperson176123" }, { "count" : 18, "page" : "Wikipédia:Le Bistro/12 septembre 2015" }, { "count" : 17, "page" : "Wikipedia:In the news/Candidates" }, { "count" : 17, "page" : "Wikipedia:Requests for page protection" } ] } ]

6.2. Druid SQL

Druid has a built-in SQL layer, which offers us the liberty to construct queries in familiar SQL-like constructs. It leverages Apache Calcite to parse and plan the queries. However, Druid SQL converts the SQL queries to native queries on the query broker before sending them to data processes.

Let's see how we can create the same query as before, but using Druid SQL. As before, we'll create a JSON file by the name simple_query_sql.json:

{ "query":"SELECT page, COUNT(*) AS counts / FROM wikipedia WHERE \"__time\" / BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00' / GROUP BY page ORDER BY Edits DESC LIMIT 10" }

Please note that the query has been broken into multiple lines for readability, but it should appear on a single line. Again, as before, we'll POST this query over HTTP, but to a different endpoint:

curl -X 'POST' -H 'Content-Type:application/json' -d @simple_query_sql.json //localhost:8888/druid/v2/sql

The output should be very similar to what we achieved earlier with the native query.

6.3. Query Types

We saw, in the earlier section, a type of query where we fetched the top ten results for the metric “count” based on an interval. This is just one type of query that Druid supports, and it's known as the TopN query. Of course, we can make this simple TopN query much more interesting by using filters and aggregations. But that is not in the scope of this tutorial. However, there are several other queries in Druid that may interest us.

Some of the popular ones include Timeseries and GroupBy.

Timeseries queries return an array of JSON objects, where each object represents a value as described in the time-series query — for instance, the daily average of a dimension for the last one month.

GroupBy queries return an array of JSON objects, where each object represents a grouping as described in the group-by query. For example, we can query for the daily average of a dimension for the past month grouped by another dimension.

There are several other query types, including Scan, Search, TimeBoundary, SegmentMetadata, and DatasourceMetadata.

6.4. Advanced Query Concepts

Druid offers some complex methods to create sophisticated queries for creating interesting data applications. These include various ways to slice and dice the data while still being able to provide incredible query performance.

While a detailed discussion of them is beyond the scope of this tutorial, let's discuss some of the important ones like Joins and Lookups, Multitenancy, and Query Caching.

Druid supports two ways of joining the data. The first is the join operators, and the second is query-time lookups. However, for better query performance, it's advisable to avoid query-time joins.

Multitenancy refers to the feature of supporting multiple tenants on the same Druid infrastructure while still offering them logical isolation. It's possible to achieve this in Druid through separate data sources per tenant or data partitioning by the tenant.

And finally, query caching is the key to performance in data-intensive applications. Druid supports query result caching at the segment and the query result levels. Further, the cache data can reside in memory or in external persistent storage.

7. Language Bindings

Although Druid has excellent support for creating ingestion specs and defining queries in JSON, it may be tedious sometimes to define these queries in JSON, especially when queries get complex. Unfortunately, Druid doesn't offer a client library in any specific language to help us in this regard. But there are quite a few language bindings that have been developed by the community. One such client library is also available for Java.

We'll quickly see how we can build the TopN query we used earlier using this client library in Java.

Let's begin by defining the required dependency in Maven:

 in.zapr.druid druidry 2.14 

After this, we should be able to use the client library and create our TopN query:

DateTime startTime = new DateTime(2015, 9, 12, 0, 0, 0, DateTimeZone.UTC); DateTime endTime = new DateTime(2015, 9, 13, 0, 0, 0, DateTimeZone.UTC); Interval interval = new Interval(startTime, endTime); Granularity granularity = new SimpleGranularity(PredefinedGranularity.ALL); DruidDimension dimension = new SimpleDimension("page"); TopNMetric metric = new SimpleMetric("count"); DruidTopNQuery query = DruidTopNQuery.builder() .dataSource("wikipedia") .dimension(dimension) .threshold(10) .topNMetric(metric) .granularity(granularity) .filter(filter) .aggregators(Arrays.asList(new LongSumAggregator("count", "count"))) .intervals(Collections.singletonList(interval)).build();

After this, we can simply generate the required JSON structure, which we can use in the HTTP POST call:

ObjectMapper mapper = new ObjectMapper(); String requiredJson = mapper.writeValueAsString(query);

8. Conclusion

In this tutorial, we went through the basics of event data and Apache Druid architecture.

Further, we set up a primary Druid cluster using Docker containers on our local machine. Then, we also went through the process of ingesting a sample dataset in Druid using the native batch task. After this, we saw the different ways we have to query our data in Druid. Lastly, we went through a client library in Java to construct Druid queries.

Kami baru sahaja mencoret permukaan ciri yang ditawarkan Druid. Terdapat beberapa kemungkinan di mana Druid dapat membantu kami membangun saluran data kami dan membuat aplikasi data. Ciri penyerapan dan pertanyaan lanjutan adalah langkah seterusnya yang jelas untuk dipelajari, untuk memanfaatkan kekuatan Druid dengan berkesan.

Lebih-lebih lagi, membuat kluster Druid yang sesuai yang mengukur proses individu mengikut keperluan harus menjadi sasaran untuk memaksimumkan manfaat.