Spring Batch menggunakan Partitioner

1. Gambaran keseluruhan

Dalam pengenalan sebelumnya kepada Spring Batch, kami memperkenalkan kerangka kerja sebagai alat pemrosesan kumpulan. Kami juga meneroka perincian konfigurasi dan implementasi untuk pelaksanaan tugas tunggal, satu proses.

Untuk melaksanakan pekerjaan dengan beberapa pemprosesan selari, pelbagai pilihan disediakan. Pada tahap yang lebih tinggi, terdapat dua mod pemprosesan selari:

  1. Proses Tunggal, pelbagai utas
  2. Pelbagai Proses

Dalam artikel ringkas ini, kita akan membincangkan pemisahan Langkah , yang dapat dilaksanakan untuk pekerjaan proses tunggal dan multi proses.

2. Membahagi Langkah

Spring Batch dengan partition memberikan kita kemudahan untuk membahagikan pelaksanaan Langkah :

Tinjauan Partitioning

Gambar di atas menunjukkan pelaksanaan Pekerjaan dengan Langkah berpisah .

Ada Langkah yang disebut "Tuan", yang pelaksanaannya dibagi menjadi beberapa langkah "Budak". Hamba-hamba ini dapat menggantikan tuan, dan hasilnya tetap tidak akan berubah. Baik tuan dan hamba adalah contoh Langkah . Budak boleh menjadi perkhidmatan jarak jauh atau hanya menjalankan benang secara tempatan.

Sekiranya diperlukan, kami dapat menyampaikan data dari tuan kepada hamba. Data meta (iaitu JobRepository ), memastikan bahawa setiap hamba dilaksanakan hanya sekali dalam satu pelaksanaan Job.

Berikut adalah rajah urutan yang menunjukkan bagaimana semuanya berfungsi:

Langkah Pemisahan

Seperti yang ditunjukkan, PartitionStep mendorong pelaksanaannya. The PartitionHandler bertanggungjawab membelah kerja-kerja "Master" ke dalam "Hamba". Langkah paling kanan adalah hamba.

3. The Maven POM

Ketergantungan Maven sama seperti yang disebutkan dalam artikel kami sebelumnya. Iaitu, Spring Core, Spring Batch dan ketergantungan untuk pangkalan data (dalam kes kami, SQLite ).

4. Konfigurasi

Dalam artikel pengenalan kami, kami melihat contoh menukar beberapa data kewangan dari CSV ke fail XML. Mari kita berikan contoh yang sama.

Di sini, kami akan menukar maklumat kewangan dari 5 fail CSV ke fail XML yang sesuai, menggunakan pelaksanaan multi-utas.

Kita boleh mencapainya dengan menggunakan partition Job and Step . Kami akan mempunyai lima utas, satu untuk setiap fail CSV.

Pertama sekali, mari buat Kerja:

@Bean(name = "partitionerJob") public Job partitionerJob() throws UnexpectedInputException, MalformedURLException, ParseException { return jobs.get("partitioningJob") .start(partitionStep()) .build(); }

Seperti yang kita lihat, Pekerjaan ini dimulakan dengan PartitioningStep . Inilah langkah utama kami yang akan dibahagikan kepada pelbagai langkah hamba:

@Bean public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("partitionStep") .partitioner("slaveStep", partitioner()) .step(slaveStep()) .taskExecutor(taskExecutor()) .build(); }

Di sini, kita akan membuat PartitioningStep menggunakan StepBuilderFactory . Untuk itu, kita perlu memberi maklumat mengenai SlaveSteps dan Partitioner .

The Partitioner adalah satu antara muka yang menyediakan kemudahan itu untuk mentakrifkan satu set nilai-nilai input bagi setiap hamba. Dengan kata lain, logik untuk membahagikan tugas menjadi utas masing-masing ada di sini.

Mari buat pelaksanaannya, yang disebut CustomMultiResourcePartitioner , di mana kita akan meletakkan nama fail input dan output di ExecutionContext untuk diteruskan ke setiap langkah budak:

public class CustomMultiResourcePartitioner implements Partitioner { @Override public Map partition(int gridSize) { Map map = new HashMap(gridSize); int i = 0, k = 1; for (Resource resource : resources) { ExecutionContext context = new ExecutionContext(); Assert.state(resource.exists(), "Resource does not exist: " + resource); context.putString(keyName, resource.getFilename()); context.putString("opFileName", "output"+k+++".xml"); map.put(PARTITION_KEY + i, context); i++; } return map; } }

Kami juga akan membuat kacang untuk kelas ini, di mana kami akan memberikan direktori sumber untuk fail input:

@Bean public CustomMultiResourcePartitioner partitioner() { CustomMultiResourcePartitioner partitioner = new CustomMultiResourcePartitioner(); Resource[] resources; try { resources = resoursePatternResolver .getResources("file:src/main/resources/input/*.csv"); } catch (IOException e) { throw new RuntimeException("I/O problems when resolving" + " the input file pattern.", e); } partitioner.setResources(resources); return partitioner; }

Kami akan menentukan langkah hamba, seperti langkah lain dengan pembaca dan penulis. Pembaca dan penulis akan sama seperti yang kita lihat dalam contoh pengenalan kami, kecuali mereka akan menerima parameter nama fail dari StepExecutionContext.

Perhatikan bahawa kacang ini perlu dilipat selangkah sehingga mereka dapat menerima parameter stepExecutionContext , pada setiap langkah. Sekiranya mereka tidak dilindungi, kacang mereka akan dibuat pada mulanya, dan tidak akan menerima nama fail pada tahap langkah:

@StepScope @Bean public FlatFileItemReader itemReader( @Value("#{stepExecutionContext[fileName]}") String filename) throws UnexpectedInputException, ParseException { FlatFileItemReader reader = new FlatFileItemReader(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); String[] tokens = {"username", "userid", "transactiondate", "amount"}; tokenizer.setNames(tokens); reader.setResource(new ClassPathResource("input/" + filename)); DefaultLineMapper lineMapper = new DefaultLineMapper(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper); return reader; } 
@Bean @StepScope public ItemWriter itemWriter(Marshaller marshaller, @Value("#{stepExecutionContext[opFileName]}") String filename) throws MalformedURLException { StaxEventItemWriter itemWriter = new StaxEventItemWriter(); itemWriter.setMarshaller(marshaller); itemWriter.setRootTagName("transactionRecord"); itemWriter.setResource(new ClassPathResource("xml/" + filename)); return itemWriter; }

Sambil menyebutkan pembaca dan penulis dalam langkah budak, kita dapat menyampaikan argumen sebagai nol, kerana nama fail ini tidak akan digunakan, kerana mereka akan menerima nama fail dari stepExecutionContext :

@Bean public Step slaveStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("slaveStep").chunk(1) .reader(itemReader(null)) .writer(itemWriter(marshaller(), null)) .build(); }

5. Kesimpulan

Dalam tutorial ini, kami membincangkan bagaimana melaksanakan pekerjaan dengan pemprosesan selari menggunakan Spring Batch.

Seperti biasa, pelaksanaan lengkap untuk contoh ini boleh didapati di GitHub.