我写了一个Apache Beam DoFn
static class FillLocation extends DoFn<TrackingRequest, TrackingRequest> {
@ProcessElement
public void processElement(ProcessContext c) {
TrackingRequest rq = c.element();
rq.location = getLocationFromIP(rq.IP);
c.output(rq);
}
}
在本地测试时给了我这个错误 PTransform ..非法突变价值..的类.....
Input values must not be mutated in any way.
at org.apache.beam.runners.direct.ImmutabilityEnforcementFactory$ImmutabilityCheckingEnforcement.verifyUnmodified(ImmutabilityEnforcementFactory.java:96)
at org.apache.beam.runners.direct.ImmutabilityEnforcementFactory$ImmutabilityCheckingEnforcement.afterElement(ImmutabilityEnforcementFactory.java:71)
at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:149)
at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
您的函数修改了输入 TrackingRequest 元素的位置字段。数据流不允许这样做。
文档说:
输入 PCollection 的当前元素由 c.element(( 返回。它应该被认为是不可变的。数据流运行时不会改变元素,因此缓存等是安全的。元素不应被任何 DoFn 方法改变,因为它可能缓存在其他地方、由数据流运行时保留或以其他未指定的方式使用。
您可以创建输入元素的副本,修改字段,然后将副本作为输出发送出去。