在 DoFn 中找不到元组标签



我有一个DoFn,应该将输入分成两个单独的PCollection。管道构建并运行,直到需要在 DoFn 中输出,然后我收到以下异常:

"java.lang.IllegalArgumentException: Unknown output tag Tag<edu.mayo.mcc.cdh.pipeline.PubsubToAvro$PubsubMessageToArchiveDoFn$2.<init>:219#2587af97b4865538>
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:216)...

如果我声明我在ParDo中使用的TupleTags,我会收到该错误,但是如果我在ParDo之外声明它们,我会收到一个语法错误,指出OutputReceiver找不到标签。以下是申请和ParDo/DoFn:

PCollectionTuple results = (messages.apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()).withOutputTags(noTag, TupleTagList.of(medaPcollection))));
PCollection<AvroPubsubMessageRecord> medaPcollectionTransformed = results.get(medaPcollection);
PCollection<AvroPubsubMessageRecord> noTagPcollectionTransformed = results.get(noTag);
static class PubsubMessageToArchiveDoFn extends DoFn<PubsubMessage, AvroPubsubMessageRecord> {
final TupleTag<AvroPubsubMessageRecord> medaPcollection = new TupleTag<AvroPubsubMessageRecord>(){};
final TupleTag<AvroPubsubMessageRecord> noTag = new TupleTag<AvroPubsubMessageRecord>(){};
@ProcessElement
public void processElement(ProcessContext context, MultiOutputReceiver out) {
String appCode;
PubsubMessage message = context.element();
String msgStr = new String(message.getPayload(), StandardCharsets.UTF_8);
try {
JSONObject jsonObject = new JSONObject(msgStr);
LOGGER.info("json: {}", jsonObject);
appCode = jsonObject.getString("app_code");
LOGGER.info(appCode);
if(appCode == "MEDA"){
LOGGER.info("Made it to MEDA tag");
out.get(medaPcollection).output(new AvroPubsubMessageRecord(
message.getPayload(), message.getAttributeMap(), context.timestamp().getMillis()));
} else {
LOGGER.info("Made it to default tag");
out.get(noTag).output(new AvroPubsubMessageRecord(
message.getPayload(), message.getAttributeMap(), context.timestamp().getMillis()));
}
} catch (Exception e) {
LOGGER.info("Error Processing Message: {}n{}", msgStr, e);
}
}

}

您可以在processElement方法中没有MultiOutputReceiver out参数的情况下尝试吗?

然后返回带有传递元素和相应元组标签的context.output输出。

您的示例仅具有上下文:

static class PubsubMessageToArchiveDoFn extends DoFn<PubsubMessage, AvroPubsubMessageRecord> {
final TupleTag<AvroPubsubMessageRecord> medaPcollection = new TupleTag<AvroPubsubMessageRecord>(){};
final TupleTag<AvroPubsubMessageRecord> noTag = new TupleTag<AvroPubsubMessageRecord>(){};
@ProcessElement
public void processElement(ProcessContext context) {
String appCode;
PubsubMessage message = context.element();
String msgStr = new String(message.getPayload(), StandardCharsets.UTF_8);
try {
JSONObject jsonObject = new JSONObject(msgStr);
LOGGER.info("json: {}", jsonObject);
appCode = jsonObject.getString("app_code");
LOGGER.info(appCode);
if(appCode == "MEDA"){
LOGGER.info("Made it to MEDA tag");
context.output(medaPcollection, new AvroPubsubMessageRecord(
message.getPayload(), message.getAttributeMap(), context.timestamp().getMillis()));
} else {
LOGGER.info("Made it to default tag");
context.output(noTag, new AvroPubsubMessageRecord(
message.getPayload(), message.getAttributeMap(), context.timestamp().getMillis()));
}
} catch (Exception e) {
LOGGER.info("Error Processing Message: {}n{}", msgStr, e);
}
}

我还向您展示一个适合我的示例:

public class WordCountFn extends DoFn<String, Integer> {
private final TupleTag<Integer> outputTag = new TupleTag<Integer>() {};
private final TupleTag<Failure> failuresTag = new TupleTag<Failure>() {};

@ProcessElement
public void processElement(ProcessContext ctx) {
try {
// Could throw ArithmeticException.
final String word = ctx.element();
ctx.output(1 / word.length());
} catch (Throwable throwable) {
final Failure failure = Failure.from("step", ctx.element(), throwable);
ctx.output(failuresTag, failure);
}
}

public TupleTag<Integer> getOutputTag() {
return outputTag;
}

public TupleTag<Failure> getFailuresTag() {
return failuresTag;
}
}

在我的第一个输出(好的情况下),无需传递元组标签ctx.output(1 / word.length());

对于我的第二个输出(失败情况),我传递带有相应元素的 Failure 标签。

我能够通过使我的ParDo成为匿名函数而不是类来解决这个问题。我将整个函数内联,并且在执行此操作后查找输出标签没有问题。感谢您的建议!

最新更新