嗨,我有一个用例,我应该从DB读取数据,并尝试合并行。例如,我有一个订单和许多orderDetails。我的SQL查询的输出是orderDetails。OrderNum是orderDetails中的FK。因此,现在当我处理orderDetails时,我必须通过orderNum进行过滤并使用外部端点。在我的情况下,我使用RepositoryItemReader
MyReader.java
public RepositoryItemReader<List<POJO>> readCurrentSeasonOrder() {
// some code
}
MyProcessor.java
public POJO2 process(List<POJO> items) throws Exception {
// some code
}
当我运行批处理作业时,我得到异常
POJO cannot be cast to java.util.List
我找不到任何适合我用例的例子。如有任何帮助,不胜感激。
对象列表;在Spring Batch的面向块的处理模型中,不是一个很好的项目封装。
选项1:您可以将查询更改为连接order和orderDetails并返回类型为Order
的项目。这样,一个项目将是Order
。
Option2:另一种方法是使用驱动查询模式。这样做的目的是让读者返回Order
s,并使用处理器来丰富订单的详细信息。这适用于小型/中型数据集,但对于大型数据集表现不佳(因为每个项目都有额外的查询)。这与算法本身有关,与Spring Batch本身无关。
您更喜欢选项1,其中数据库以一种有效的方式进行连接,并为您提供预先填写详细信息的订单项。
编辑:添加驾驶查询模式的示例
下面的示例展示了驱动查询模式的思想。阅读器返回Person
类型的项。处理器用地址丰富人员项。看看阅读器如何只获取人的id和姓名,而不获取地址。您可以根据您的情况将其调整为Order
和OrderDetails
。
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.sample;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
/**
* @author Mahmoud Ben Hassine
*/
@Configuration
@EnableBatchProcessing
public class MyJob {
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-drop-h2.sql")
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public JdbcCursorItemReader<Person> itemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource())
.sql("select id, name from person")
.beanRowMapper(Person.class)
.build();
}
@Bean
public ItemProcessor<Person, Person> itemProcessor() {
return new ItemProcessor<Person, Person>() {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public Person process(Person person) {
Address address = jdbcTemplate.queryForObject("select * from address where personId = ?", new Object[]{person.getId()}, new BeanPropertyRowMapper<>(Address.class));
person.setAddress(address);
return person;
}
};
}
@Bean
public ItemWriter<Person> itemWriter() {
return items -> {
for (Person item : items) {
System.out.println("item = " + item);
}
};
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<Person, Person>chunk(2)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
jdbcTemplate.update("CREATE TABLE address (id INT IDENTITY NOT NULL PRIMARY KEY, personId INT, street VARCHAR(20));");
jdbcTemplate.update("CREATE TABLE person (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (1,1, 'oxford street');");
jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (2,2, 'howard street');");
jdbcTemplate.update("INSERT INTO person (id, name) VALUES (1, 'foo');");
jdbcTemplate.update("INSERT INTO person (id, name) VALUES (2, 'bar');");
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
public static class Person {
private long id;
private String name;
private Address address;
public Person() {
}
public Person(long id, String name) {
this.id = id;
this.name = name;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Address getAddress() {
return address;
}
public void setAddress(Address address) {
this.address = address;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + ''' +
", address=" + address +
'}';
}
}
public static class Address {
private int id;
private int personId;
private String street;
public Address() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getStreet() {
return street;
}
public void setStreet(String street) {
this.street = street;
}
public int getPersonId() {
return personId;
}
public void setPersonId(int personId) {
this.personId = personId;
}
@Override
public String toString() {
return "Address{" +
"id=" + id +
", street='" + street + ''' +
'}';
}
}
}
基于Spring Batch的概念和原则,通常建议将每个项封装在特定于领域的对象中,而不是使用通用的对象列表。
但是无论如何,如果你想这样做,我已经写了一个例子:
首先,您应该创建一个自定义项阅读器:
public class CustomItemReader implements ItemReader<List<MyData>>, StepExecutionListener {
@PersistenceContext
private EntityManager entityManager;
private final int pageSize = 100;
private Long totalRecords;
private int currentPage = 0;
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
Query countQuery = entityManager.createQuery("SELECT COUNT(md) FROM MyData md WHERE md.status = 'Active'");
totalRecords = (Long) countQuery.getSingleResult();
}
@Override
public ExitStatus afterStep(@NotNull StepExecution stepExecution) {
return null;
}
@Override
@Transactional(readOnly = true)
public List<MyData> read() {
if (totalRecords == null || totalRecords == 0 || currentPage >= Math.ceil(1.0 * totalRecords / pageSize)) {
return null;
}
TypedQuery<MyData> dataQuery = entityManager.createQuery("SELECT md FROM MyData md WHERE md.status = 'Active'", MyData.class);
dataQuery.setFirstResult(currentPage * pageSize);
dataQuery.setMaxResults(pageSize);
List<MyData> listOfMyData = dataQuery.getResultList();
currentPage++;
return listOfMyData;
}}
,你可以写你的项目处理器:
public class MyItemProcessor implements ItemProcessor<List<MyData>, List<MyData>> {// do your logic}
返回类型可以是其他任何类型,但如果返回List,则应该使用List作为条目写入器的输入。
注意:你的块大小是在这个例子中的项目阅读器中定义的:private final int pageSize = 100,你需要在你的作业配置中设置chunk为1,这样每次你得到一个包含100个项目的数据列表