在多线程步骤中使用FlatFileItem编写器



当我阅读关于FlatFileItemWriter的春季文档时,我有点困惑,他们说它不是线程安全的,所以我想把它包装到SynchronizedItemStreamWriter中,但在这个链接中https://docs.spring.io/spring-batch/docs/current-SNAPSHOT/api/org/springframework/batch/item/support/SynchronizedItemStreamWriter.html他们说:例如,在多线程步骤中使用FlatFileItemWriter不需要同步写入请解释一下?

Spring Batch文件编写器中的线程安全与两个方面有关:更新执行上下文和写入输出文件。这是两件不同的事情,在多线程步骤中使用这些编写器时应该仔细考虑。我将分别解释这两个方面。

1.关于执行上下文更新的线程安全

出于可重启性的原因,文件编写器(平面文件、json、xml等)使用名为current.count的键更新执行上下文。此键用于重新启动场景,以了解上次运行中写入了多少项,并将文件截断到最后一个已知的正确偏移量。如果在多线程上下文中使用这样的编写器,则使用共享编写器的不同线程可能会覆盖此键,这是有问题的。这就是为什么建议在这种情况下使用saveState标志来关闭状态管理。有关详细信息,请参阅"多线程步骤"部分。

2.与写入输出文件有关的线程安全

将数据写入结构化JSON或XML文件与写入平面文件不同。如果在多线程步骤中使用非同步的JsonItemWriterStaxEventItemWriter,一个线程的输出可能会覆盖另一个线程,最终会得到不正确的输出文件。以下是StaxEventItemWriter:的一个快速示例

import javax.sql.DataSource;
import javax.xml.bind.annotation.XmlRootElement;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.batch.item.support.SynchronizedItemStreamWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.batch.item.xml.builder.StaxEventItemWriterBuilder;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.oxm.Marshaller;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {
@Bean
public SynchronizedItemStreamReader<Person> itemReader() {
String sql = "select * from person";
JdbcCursorItemReader<Person> personItemReader = new JdbcCursorItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource())
.sql(sql)
.beanRowMapper(Person.class)
.build();
SynchronizedItemStreamReader<Person> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
synchronizedItemStreamReader.setDelegate(personItemReader);
return synchronizedItemStreamReader;
}
//  @Bean
//  public SynchronizedItemStreamWriter<Person> itemWriter() {
//      Jaxb2Marshaller marchaller = new Jaxb2Marshaller();
//      marchaller.setClassesToBeBound(Person.class);
//      StaxEventItemWriter<Person> personStaxEventItemWriter = new StaxEventItemWriterBuilder<Person>()
//              .name("personItemWriter")
//              .resource(new FileSystemResource("persons.xml"))
//              .marshaller(marchaller)
//              .rootTagName("persons")
//              .build();
//      SynchronizedItemStreamWriter<Person> synchronizedItemStreamWriter = new SynchronizedItemStreamWriter<>();
//      synchronizedItemStreamWriter.setDelegate(personStaxEventItemWriter);
//      return synchronizedItemStreamWriter;
//  }
@Bean
public StaxEventItemWriter<Person> itemWriter() {
Jaxb2Marshaller marchaller = new Jaxb2Marshaller();
marchaller.setClassesToBeBound(Person.class);
return new StaxEventItemWriterBuilder<Person>()
.name("personItemWriter")
.resource(new FileSystemResource("persons.xml"))
.marshaller(marchaller)
.rootTagName("persons")
.build();
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<Person, Person>chunk(5)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(new SimpleAsyncTaskExecutor())
.build())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
@Bean
public DataSource dataSource() {
EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.HSQL)
.addScript("/org/springframework/batch/core/schema-hsqldb.sql")
.build();
JdbcTemplate jdbcTemplate = new JdbcTemplate(embeddedDatabase);
jdbcTemplate.execute("create table person (id int primary key, name varchar(20));");
for (int i = 1; i <= 10; i++) {
jdbcTemplate.execute(String.format("insert into person values (%s, 'foo%s');", i, i));
}
return embeddedDatabase;
}
@XmlRootElement
static class Person {
private int id;
private String name;
public Person() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return "Person{id=" + id + ", name='" + name + ''' + '}';
}
}
}

此示例从数据库表中读取项目并将其写入文件。使用非同步写入程序,示例会生成以下persons.xml文件:

<?xml version='1.0' encoding='UTF-8'?>
<persons>
<name>foo7</name>
</person><name>foo5</name><id/></id>><idson<name>foo7</name></person><name>
foo5
</name><id/></id><name>foo7</name></person><name>foo9</name></person><name>
foo7
</name></person><name>foo5
</name><id/></id>e></id></person></persons></person>><id>6</id><person>
<id>9</id>
<name>foo6</name>
</name><person>
<id>10</id>
<name>foo10</name>
</person></persons>

这显然是不正确的。使用同步写入程序,可以正确生成输出文件。即使未同步,FlatFileItemWriter也不会出现此问题,因为平面文件不包含用于打开/关闭标记的额外XML元素,也不包含用于分隔数组和对象的JSON元素(如[],)。因此,在这种情况下不需要同步写入。这就是SynchronizedItemStreamWriter:的Javadoc中所描述的内容

This decorator is useful when using a non thread-safe item writer in a multi-threaded
step. Typical delegate examples are the JsonFileItemWriter and StaxEventItemWriter.
It should be noted that synchronizing writes might introduce some performance
degradation, so this decorator should be used wisely and only when necessary.
For example, using a FlatFileItemWriter in a multi-threaded step does NOT
require synchronizing writes, so using this decorator in such use case might
be counter productive.

最新更新