春季批处理并行处理 - 运行时文件分配



我有一个简单的弹簧批次,例如,从文件中读取100万个记录并在控制台上打印。

现在,我想在n个服务器上部署此批次,例如n = 5。

如何确保所有服务器实例都没有读取相同的记录?

如 - 如何适当地将记录分开(100万/5(以获得优化的结果?

请帮助代码示例。谢谢。

正如迈克尔(Michael(所建议的那样,您可以使用系统命令将文件拆分,然后使用MultiResourcePartitioner并行处理拆分文件。这就是我的做法

@Bean
    public Partitioner partitioner() {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        ClassLoader cl = this.getClass().getClassLoader();
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(cl);
        Resource[] resources = resolver.getResources("file:" + filePath + "/"+"*.csv");     
        partitioner.setResources(resources);
        partitioner.partition(10);      
        return partitioner;
    }
    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(4);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }   
    @Bean
    @Qualifier("masterStep")
    public Step masterStep() {
        return stepBuilderFactory.get("masterStep")
                .partitioner(ProcessDataStep())
                .partitioner("ProcessDataStep",partitioner())   
                .taskExecutor(taskExecutor())
                .listener(pcStressStepListener)
                .build();
    }

    @Bean
    @Qualifier("processData")
    public Step processData() {
        return stepBuilderFactory.get("processData")
                .<pojo, pojo> chunk(5000)
                .reader(reader)             
                .processor(processor())
                .writer(writer)         
                .build();
    }

    @Bean(name="reader")
    @StepScope
    public FlatFileItemReader<pojo> reader(@Value("#{stepExecutionContext['fileName']}") String filename) {
        FlatFileItemReader<pojo> reader = new FlatFileItemReader<>();
        reader.setResource(new UrlResource(filename));
        reader.setLineMapper(new DefaultLineMapper<pojo>() {
            {
                setLineTokenizer(new DelimitedLineTokenizer() {
                    {
                        setNames(FILE HEADER);

                    }
                });
                setFieldSetMapper(new BeanWrapperFieldSetMapper<pojo>() {
                    {
                        setTargetType(pojo.class);
                    }
                });
            }
        });
        return reader;
    }   

最新更新