运行Beam管道时出现以下错误:
java.lang.IollegalArgumentException:无法序列化DoFnWithExecutionInformation{doFn=WriteWithAppendToGoFile$CreateTrailerDoFn@57b711b6,mainOutputTag=标记,sideInputMapping={},schemaInformation=DoFnSchemaInformation{elementConverters=[],fieldAccessDescriptor=*}}
原因:java.io.NotSerializableException:PipelineOptions对象不可序列化,不应嵌入到转换中(您是在字段中还是在匿名类中捕获了PipelineOption对象?(。相反,如果您使用的是DoFn,请在运行时通过ProcessContext/StartBundleContext/FinishBundleContext.getPipelineOptions((访问PipelineOptions,或者在管道构建时从PipelineOption中预提取必要的字段。
代码
private class CreateTrailerDoFn<T extends Extract> extends DoFn <T,String> {
@ProcessElement
public void processElement(ProcessContext context) {
final int[] count = {0};
String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new java.util.Date());
T data = context.element();
data.getContent().forEach(row -> {
LOG.info(String.format("data : %s", row));
count[0]++;
});
String trailerRow = String.format("%s,%s", count[0], timeStamp);
LOG.info(trailerRow);
context.output(trailerRow);
}
}
粘贴的错误来自粘贴的代码之外的某些部分。错误消息确实指示了该怎么做,@bruno volpato的回答很可能是问题所在。