在JDBC批处理项写入器中写入动态sql查询



我有一份工作,包括从MongoDB的集合中读取数据并将其写入SQLServer。

由于集合没有相同的字段名,我必须在没有pojo的JDBC Batch Item Writer中动态设置查询,因为集合的数量超过100个,每个集合的文档平均有超过50个字段。

我尝试使用org.bson.Document而不是POJO和实现FactoryBean的类来编写sql。但是它不工作。

总结一下,下面是这份工作的示例:

//Step Configuration
@Bean
public MongoItemReader<Document> Reader() {
MongoItemReader<Document> reader = new MongoItemReader<>();
reader.setTemplate(mongoTemplate);
reader.setCollection("COLLECTION");
reader.setTargetType(Document.class);
reader.setQuery("{}");
reader.setSort(new HashMap<>() {{
put("_id", Sort.Direction.ASC);
}});
reader.setPageSize(1000);
return reader;
}
@Bean
public SqlString sqlString(){
return new SqlString()
}
@Bean
public JdbcBatchItemWriter<Document> writer() throws Exception {
JdbcBatchItemWriter<Document> Writer = new JdbcBatchItemWriter<>();
Writer.setDataSource(dataSource);
Writer.setAssertUpdates(true);
Writer.setItemSqlParameterSourceProvider(new TableSourceProvider());
Writer.setSql(sqlString.getObject());
return Writer;
}

// Sql Configuration
public class SqlString implements FactoryBean<String> {
private String table;
private String schema;
private Document document;

@Override
public String getObject() throws Exception {


final StringBuilder sql = new StringBuilder("INSERT INTO " + schema + "." + table + " ");
CopyOnWriteArraySet<String> keyset = (CopyOnWriteArraySet) document.keySet();
final Iterator<String> iterator = keyset.iterator();
String entry = iterator.next();
final StringBuilder key = new StringBuilder("(" + entry);
final StringBuilder value= new StringBuilder("(" + entry);
while (iterator.hasNext()) {
entry = iterator.next();
key.append(",").append(entry);
value.append(",").append(entry);
}
key.append(")");
value.append(")");
sql.append(key.toString());
sql.append(" VALUES ");
sql.append(value.toString());

return sql.toString();
}
@Override
public Class<?> getObjectType() {
return null;
}
}
//ItemSqlParameterSourceProvider
public class TableSourceProvider implements ItemSqlParameterSourceProvider<Document> {
@Override
public SqlParameterSource createSqlParameterSource(Document document) {
ConcurrentHashMap<String, Object> map = new ConcurrentHashMap<>();
for (Map.Entry<String, Object> entry : document.entrySet()) {
map.put(entry.getKey(), entry.getValue());
}
return new MapSqlParameterSource(map);
}
}

我将为这种情况创建两个步骤:

  • 步骤1:一个简单的微线程,计算创建sql查询所需的字段,并将查询放在作业执行上下文中
  • 步骤2:一个面向块的步骤,它具有一个阶梯作用域的写入器bean,并配置了来自执行上下文的sql查询。

相关内容

  • 没有找到相关文章

最新更新