Spring Batch - Tasklets vs Chunks

1. Pengenalan

Spring Batch menyediakan dua cara yang berbeza untuk melaksanakan pekerjaan: menggunakan tasklets dan potongan .

Dalam artikel ini, kita akan belajar bagaimana mengkonfigurasi dan melaksanakan kedua-dua kaedah menggunakan contoh kehidupan nyata yang sederhana.

2. Kebergantungan

Mari mulakan dengan menambahkan kebergantungan yang diperlukan :

 org.springframework.batch spring-batch-core 4.2.0.RELEASE   org.springframework.batch spring-batch-test 4.2.0.RELEASE test 

Untuk mendapatkan versi terbaru spring-batch-core dan spring-batch-test, sila rujuk Maven Central.

3. Kes Penggunaan Kami

Mari pertimbangkan fail CSV dengan kandungan berikut:

Mae Hodges,10/22/1972 Gary Potter,02/22/1953 Betty Wise,02/17/1968 Wayne Rose,04/06/1977 Adam Caldwell,09/27/1995 Lucille Phillips,05/14/1992

Kedudukan pertama setiap baris mewakili nama seseorang dan kedudukan kedua mewakili tarikh lahirnya .

Kes penggunaan kami adalah untuk menghasilkan fail CSV lain yang mengandungi nama dan umur setiap orang :

Mae Hodges,45 Gary Potter,64 Betty Wise,49 Wayne Rose,40 Adam Caldwell,22 Lucille Phillips,25

Sekarang domain kami sudah jelas mari maju dan membina penyelesaian menggunakan kedua-dua pendekatan tersebut. Kita akan mulakan dengan tugasan.

4. Pendekatan Tugasan

4.1. Pengenalan dan Reka Bentuk

Tasklets dimaksudkan untuk melakukan satu tugas dalam satu langkah. Tugas kita akan terdiri dari beberapa langkah yang dilaksanakan satu demi satu. Setiap langkah hanya perlu menjalankan satu tugas yang ditentukan .

Tugas kita akan terdiri daripada tiga langkah:

  1. Baca baris dari fail CSV input.
  2. Hitung umur untuk setiap orang dalam fail CSV input.
  3. Tulis nama dan umur setiap orang ke fail CSV keluaran baru.

Sekarang gambar besar sudah siap, mari buat satu kelas setiap langkah.

LinesReader akan bertanggungjawab membaca data dari fail input:

public class LinesReader implements Tasklet { // ... }

LinesProcessor akan mengira umur bagi setiap orang dalam fail:

public class LinesProcessor implements Tasklet { // ... }

Akhirnya, LinesWriter akan mempunyai tanggungjawab untuk menulis nama dan umur ke fail output:

public class LinesWriter implements Tasklet { // ... }

Pada ketika ini, semua langkah kami melaksanakan antara muka Tasklet . Itu akan memaksa kita untuk melaksanakan kaedah pelaksanaannya :

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { // ... }

Kaedah ini adalah di mana kita akan menambahkan logik untuk setiap langkah. Sebelum memulakan kod itu, mari kita konfigurasikan tugas kita.

4.2. Konfigurasi

Kita perlu menambahkan beberapa konfigurasi ke konteks aplikasi Spring . Setelah menambahkan pengisytiharan kacang standard untuk kelas yang dibuat di bahagian sebelumnya, kami bersedia untuk membuat definisi tugas kami:

@Configuration @EnableBatchProcessing public class TaskletsConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean protected Step readLines() { return steps .get("readLines") .tasklet(linesReader()) .build(); } @Bean protected Step processLines() { return steps .get("processLines") .tasklet(linesProcessor()) .build(); } @Bean protected Step writeLines() { return steps .get("writeLines") .tasklet(linesWriter()) .build(); } @Bean public Job job() { return jobs .get("taskletsJob") .start(readLines()) .next(processLines()) .next(writeLines()) .build(); } // ... }

Ini bermaksud bahawa "taskletsJob" kami akan terdiri daripada tiga langkah. Yang pertama ( readLines ) akan melaksanakan tasklet yang ditentukan dalam bean linesReader dan beralih ke langkah seterusnya: processLines. ProcessLines akan melaksanakan tasklet yang ditentukan dalam bean linesProcessor dan menuju ke langkah terakhir: writeLines .

Aliran kerja kami ditentukan, dan kami bersedia untuk menambahkan logik!

4.3. Model dan Utiliti

Oleh kerana kita akan memanipulasi baris dalam fail CSV, kita akan membuat Garis kelas :

public class Line implements Serializable { private String name; private LocalDate dob; private Long age; // standard constructor, getters, setters and toString implementation }

Harap maklum bahawa Line menggunakan Serializable. Ini kerana Line akan bertindak sebagai DTO untuk memindahkan data antara langkah-langkah. Menurut Spring Batch, objek yang ditransfer di antara langkah-langkah mesti disirikan .

Sebaliknya, kita boleh mula berfikir tentang membaca dan menulis baris.

Untuk itu, kami akan menggunakan OpenCSV:

 com.opencsv opencsv 4.1 

Cari versi OpenCSV terbaru di Maven Central.

Setelah OpenCSV disertakan, kami juga akan membuat kelas FileUtils . Ia akan menyediakan kaedah untuk membaca dan menulis baris CSV:

public class FileUtils { public Line readLine() throws Exception { if (CSVReader == null) initReader(); String[] line = CSVReader.readNext(); if (line == null) return null; return new Line( line[0], LocalDate.parse( line[1], DateTimeFormatter.ofPattern("MM/dd/yyyy"))); } public void writeLine(Line line) throws Exception { if (CSVWriter == null) initWriter(); String[] lineStr = new String[2]; lineStr[0] = line.getName(); lineStr[1] = line .getAge() .toString(); CSVWriter.writeNext(lineStr); } // ... }

Perhatikan bahawa readLine bertindak sebagai pembungkus atas kaedah readNext OpenCSV dan mengembalikan objek Line .

Same way, writeLine wraps OpenCSV's writeNext receiving a Line object. Full implementation of this class can be found in the GitHub Project.

At this point, we're all set to start with each step implementation.

4.4. LinesReader

Let's go ahead and complete our LinesReader class:

public class LinesReader implements Tasklet, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesReader.class); private List lines; private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { lines = new ArrayList(); fu = new FileUtils( "taskletsvschunks/input/tasklets-vs-chunks.csv"); logger.debug("Lines Reader initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { Line line = fu.readLine(); while (line != null) { lines.add(line); logger.debug("Read line: " + line.toString()); line = fu.readLine(); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeReader(); stepExecution .getJobExecution() .getExecutionContext() .put("lines", this.lines); logger.debug("Lines Reader ended."); return ExitStatus.COMPLETED; } }

LinesReader's execute method creates a FileUtils instance over the input file path. Then, adds lines to a list until there're no more lines to read.

Our class also implements StepExecutionListener that provides two extra methods: beforeStep and afterStep. We'll use those methods to initialize and close things before and after execute runs.

If we take a look at afterStep code, we'll notice the line where the result list (lines) is put in the job's context to make it available for the next step:

stepExecution .getJobExecution() .getExecutionContext() .put("lines", this.lines);

At this point, our first step has already fulfilled its responsibility: load CSV lines into a List in memory. Let's move to the second step and process them.

4.5. LinesProcessor

LinesProcessor will also implement StepExecutionListener and of course, Tasklet. That means that it will implement beforeStep, execute and afterStep methods as well:

public class LinesProcessor implements Tasklet, StepExecutionListener { private Logger logger = LoggerFactory.getLogger( LinesProcessor.class); private List lines; @Override public void beforeStep(StepExecution stepExecution) { ExecutionContext executionContext = stepExecution .getJobExecution() .getExecutionContext(); this.lines = (List) executionContext.get("lines"); logger.debug("Lines Processor initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { for (Line line : lines) { long age = ChronoUnit.YEARS.between( line.getDob(), LocalDate.now()); logger.debug("Calculated age " + age + " for line " + line.toString()); line.setAge(age); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.debug("Lines Processor ended."); return ExitStatus.COMPLETED; } }

It's effortless to understand that it loads lines list from the job's context and calculates the age of each person.

There's no need to put another result list in the context as modifications happen on the same object that comes from the previous step.

And we're ready for our last step.

4.6. LinesWriter

LinesWriter‘s task is to go over lines list and write name and age to the output file:

public class LinesWriter implements Tasklet, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesWriter.class); private List lines; private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { ExecutionContext executionContext = stepExecution .getJobExecution() .getExecutionContext(); this.lines = (List) executionContext.get("lines"); fu = new FileUtils("output.csv"); logger.debug("Lines Writer initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { for (Line line : lines) { fu.writeLine(line); logger.debug("Wrote line " + line.toString()); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeWriter(); logger.debug("Lines Writer ended."); return ExitStatus.COMPLETED; } }

We're done with our job's implementation! Let's create a test to run it and see the results.

4.7. Running the Job

To run the job, we'll create a test:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = TaskletsConfig.class) public class TaskletsTest { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test public void givenTaskletsJob_whenJobEnds_thenStatusCompleted() throws Exception { JobExecution jobExecution = jobLauncherTestUtils.launchJob(); assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); } }

ContextConfiguration annotation is pointing to the Spring context configuration class, that has our job definition.

We'll need to add a couple of extra beans before running the test:

@Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } @Bean public JobRepository jobRepository() throws Exception { MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean(); factory.setTransactionManager(transactionManager()); return (JobRepository) factory.getObject(); } @Bean public PlatformTransactionManager transactionManager() { return new ResourcelessTransactionManager(); } @Bean public JobLauncher jobLauncher() throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository()); return jobLauncher; }

Everything is ready! Go ahead and run the test!

After the job has finished, output.csv has the expected content and logs show the execution flow:

[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized. [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Gary Potter,02/22/1953] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Betty Wise,02/17/1968] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended. [main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized. [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line [Gary Potter,02/22/1953] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line [Betty Wise,02/17/1968] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended. [main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized. [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Gary Potter,02/22/1953,64] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Betty Wise,02/17/1968,49] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25] [main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.

That's it for Tasklets. Now we can move on to the Chunks approach.

5. Chunks Approach

5.1. Introduction and Design

As the name suggests, this approach performs actions over chunks of data. That is, instead of reading, processing and writing all the lines at once, it'll read, process and write a fixed amount of records (chunk) at a time.

Then, it'll repeat the cycle until there's no more data in the file.

As a result, the flow will be slightly different:

  1. While there're lines:
    • Do for X amount of lines:
      • Read one line
      • Process one line
    • Write X amount of lines.

So, we also need to create three beans for chunk oriented approach:

public class LineReader { // ... }
public class LineProcessor { // ... }
public class LinesWriter { // ... }

Before moving to implementation, let's configure our job.

5.2. Configuration

The job definition will also look different:

@Configuration @EnableBatchProcessing public class ChunksConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean public ItemReader itemReader() { return new LineReader(); } @Bean public ItemProcessor itemProcessor() { return new LineProcessor(); } @Bean public ItemWriter itemWriter() { return new LinesWriter(); } @Bean protected Step processLines(ItemReader reader, ItemProcessor processor, ItemWriter writer) { return steps.get("processLines"). chunk(2) .reader(reader) .processor(processor) .writer(writer) .build(); } @Bean public Job job() { return jobs .get("chunksJob") .start(processLines(itemReader(), itemProcessor(), itemWriter())) .build(); } }

In this case, there's only one step performing only one tasklet.

However, that tasklet defines a reader, a writer and a processor that will act over chunks of data.

Note that the commit interval indicates the amount of data to be processed in one chunk. Our job will read, process and write two lines at a time.

Now we're ready to add our chunk logic!

5.3. LineReader

LineReader will be in charge of reading one record and returning a Line instance with its content.

To become a reader, our class has to implement ItemReader interface:

public class LineReader implements ItemReader { @Override public Line read() throws Exception { Line line = fu.readLine(); if (line != null) logger.debug("Read line: " + line.toString()); return line; } }

The code is straightforward, it just reads one line and returns it. We'll also implement StepExecutionListener for the final version of this class:

public class LineReader implements ItemReader, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LineReader.class); private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv"); logger.debug("Line Reader initialized."); } @Override public Line read() throws Exception { Line line = fu.readLine(); if (line != null) logger.debug("Read line: " + line.toString()); return line; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeReader(); logger.debug("Line Reader ended."); return ExitStatus.COMPLETED; } }

It should be noticed that beforeStep and afterStep execute before and after the whole step respectively.

5.4. LineProcessor

LineProcessor follows pretty much the same logic than LineReader.

However, in this case, we'll implement ItemProcessor and its method process():

public class LineProcessor implements ItemProcessor { private Logger logger = LoggerFactory.getLogger(LineProcessor.class); @Override public Line process(Line line) throws Exception { long age = ChronoUnit.YEARS .between(line.getDob(), LocalDate.now()); logger.debug("Calculated age " + age + " for line " + line.toString()); line.setAge(age); return line; } }

The process() method takes an input line, processes it and returns an output line. Again, we'll also implement StepExecutionListener:

public class LineProcessor implements ItemProcessor, StepExecutionListener { private Logger logger = LoggerFactory.getLogger(LineProcessor.class); @Override public void beforeStep(StepExecution stepExecution) { logger.debug("Line Processor initialized."); } @Override public Line process(Line line) throws Exception { long age = ChronoUnit.YEARS .between(line.getDob(), LocalDate.now()); logger.debug( "Calculated age " + age + " for line " + line.toString()); line.setAge(age); return line; } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.debug("Line Processor ended."); return ExitStatus.COMPLETED; } }

5.5. LinesWriter

Unlike reader and processor, LinesWriter will write an entire chunk of lines so that it receives a List of Lines:

public class LinesWriter implements ItemWriter, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesWriter.class); private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { fu = new FileUtils("output.csv"); logger.debug("Line Writer initialized."); } @Override public void write(List lines) throws Exception { for (Line line : lines) { fu.writeLine(line); logger.debug("Wrote line " + line.toString()); } } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeWriter(); logger.debug("Line Writer ended."); return ExitStatus.COMPLETED; } }

LinesWriter code speaks for itself. And again, we're ready to test our job.

5.6. Running the Job

We'll create a new test, same as the one we created for the tasklets approach:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = ChunksConfig.class) public class ChunksTest { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test public void givenChunksJob_whenJobEnds_thenStatusCompleted() throws Exception { JobExecution jobExecution = jobLauncherTestUtils.launchJob(); assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); } }

After configuring ChunksConfig as explained above for TaskletsConfig, we're all set to run the test!

Once the job is done, we can see that output.csv contains the expected result again, and the logs describe the flow:

[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialized. [main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized. [main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized. [main] DEBUG o.b.t.chunks.LineReader - Read line: [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Gary Potter,02/22/1953] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line [Gary Potter,02/22/1953] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Gary Potter,02/22/1953,64] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Betty Wise,02/17/1968] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line [Betty Wise,02/17/1968] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Betty Wise,02/17/1968,49] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25] [main] DEBUG o.b.t.chunks.LineProcessor - Line Processor ended. [main] DEBUG o.b.t.chunks.LinesWriter - Line Writer ended. [main] DEBUG o.b.t.chunks.LineReader - Line Reader ended.

We have the same result and a different flow. Logs make evident how the job executes following this approach.

6. Conclusion

Konteks yang berbeza akan menunjukkan perlunya satu pendekatan atau yang lain. Walaupun Tasklet merasa lebih semula jadi untuk senario 'satu tugas demi tugas yang lain', potongan menyediakan penyelesaian mudah untuk menangani bacaan atau situasi yang dihias halaman atau di mana kita tidak mahu menyimpan sejumlah besar data dalam ingatan.

Pelaksanaan lengkap contoh ini boleh didapati dalam projek GitHub .