我有一份工作,包括从MongoDB的集合中读取数据并将其写入SQLServer。
由于集合没有相同的字段名,我必须在没有pojo的JDBC Batch Item Writer中动态设置查询,因为集合的数量超过100个,每个集合的文档平均有超过50个字段。
我尝试使用org.bson.Document而不是POJO和实现FactoryBean的类来编写sql。但是它不工作。
总结一下,下面是这份工作的示例:
//Step Configuration
@Bean
public MongoItemReader<Document> Reader() {
MongoItemReader<Document> reader = new MongoItemReader<>();
reader.setTemplate(mongoTemplate);
reader.setCollection("COLLECTION");
reader.setTargetType(Document.class);
reader.setQuery("{}");
reader.setSort(new HashMap<>() {{
put("_id", Sort.Direction.ASC);
}});
reader.setPageSize(1000);
return reader;
}
@Bean
public SqlString sqlString(){
return new SqlString()
}
@Bean
public JdbcBatchItemWriter<Document> writer() throws Exception {
JdbcBatchItemWriter<Document> Writer = new JdbcBatchItemWriter<>();
Writer.setDataSource(dataSource);
Writer.setAssertUpdates(true);
Writer.setItemSqlParameterSourceProvider(new TableSourceProvider());
Writer.setSql(sqlString.getObject());
return Writer;
}
// Sql Configuration
public class SqlString implements FactoryBean<String> {
private String table;
private String schema;
private Document document;
@Override
public String getObject() throws Exception {
final StringBuilder sql = new StringBuilder("INSERT INTO " + schema + "." + table + " ");
CopyOnWriteArraySet<String> keyset = (CopyOnWriteArraySet) document.keySet();
final Iterator<String> iterator = keyset.iterator();
String entry = iterator.next();
final StringBuilder key = new StringBuilder("(" + entry);
final StringBuilder value= new StringBuilder("(" + entry);
while (iterator.hasNext()) {
entry = iterator.next();
key.append(",").append(entry);
value.append(",").append(entry);
}
key.append(")");
value.append(")");
sql.append(key.toString());
sql.append(" VALUES ");
sql.append(value.toString());
return sql.toString();
}
@Override
public Class<?> getObjectType() {
return null;
}
}
//ItemSqlParameterSourceProvider
public class TableSourceProvider implements ItemSqlParameterSourceProvider<Document> {
@Override
public SqlParameterSource createSqlParameterSource(Document document) {
ConcurrentHashMap<String, Object> map = new ConcurrentHashMap<>();
for (Map.Entry<String, Object> entry : document.entrySet()) {
map.put(entry.getKey(), entry.getValue());
}
return new MapSqlParameterSource(map);
}
}
我将为这种情况创建两个步骤:
- 步骤1:一个简单的微线程,计算创建sql查询所需的字段,并将查询放在作业执行上下文中
- 步骤2:一个面向块的步骤,它具有一个阶梯作用域的写入器bean,并配置了来自执行上下文的sql查询。