带有spring-batch的Json阵列读取器文件



我有一个文件作为输入,其中包含一个json数组:

[ {
  ...,
  ...
  },
  {
  ...,
  ...
  },
  {
  ...,
  ...
  }
]

我想在不违反春季批处理原则的情况下阅读它(与FlatFileReader或XmlReader的方式相同)

我没有找到任何方法来做这件事——读者已经在春季批次中实现了。

实现这个阅读器的最佳方式是什么?

提前感谢

假设您想对StaxEventItemReader进行建模,因为您想将JSON数组的每个项都读取为Spring Batch中的一个项,那么我推荐如下:

  • RecordSeparatorPolicy-你需要实现你自己的RecordSepartorPolicy,这表明你是否已经完成了整个项目的阅读。您还可以使用RecordSeparatoerPolicy#postProcess来清除需要处理的[]的开头和结尾以及逗号分隔符
  • LineTokenizer-然后您将想要创建自己的LineTokenzier来解析JSON。我今天刚刚为一个项目做了一个代码,所以你可以把它作为一个开始(认为它没有经过测试):

    public class JsonLineTokenizer implements LineTokenizer {
        @Override
        public FieldSet tokenize(String line) {
            List<String> tokens = new ArrayList<>();
            try {
                HashMap<String,Object> result =
                        new ObjectMapper().readValue(line, HashMap.class);
                tokens.add((String) result.get("field1"));
                tokens.add((String) result.get("field2")));
            } catch (IOException e) {
                throw new RuntimeException("Unable to parse json: " + line);
            }
            return new DefaultFieldSet(tokens.toArray(new String[0]), {"field1", "field2"});
        }
    }
    

这是我从您的建议和默认实现开始编写的记录分隔符策略。我使用内部纯字符串表示读取记录,但我发现用codehaus丢弃JSON对象解析JSON非常简单。

public class JsonRecordSeparatorPolicy extends SimpleRecordSeparatorPolicy {
/**
 * True if the line can be parsed to a JSON object.
 * 
 * @see RecordSeparatorPolicy#isEndOfRecord(String)
 */
@Override
public boolean isEndOfRecord(String line) {
    return StringUtils.countOccurrencesOf(line, "{") == StringUtils.countOccurrencesOf(line, "}")
            && (line.trim().endsWith("}") || line.trim().endsWith(",") || line.trim().endsWith("]") );
}
@Override
public String postProcess(String record) {
    if(record.startsWith("[")) record = record.substring(1);
    if(record.endsWith("]")) record = record.substring(0, record.length()-1);
    if(record.endsWith(",")) record = record.substring(0, record.length()-1);
    return super.postProcess(record);
}

}

最新更新