Apache Beam 在使用本地直接流道时返回"Input values must not be mutated in any way."



我写了一个Apache Beam DoFn

static class FillLocation extends DoFn<TrackingRequest, TrackingRequest> {
        @ProcessElement
        public void processElement(ProcessContext c) {    
            TrackingRequest rq = c.element();
            rq.location = getLocationFromIP(rq.IP);         
            c.output(rq);
        }
}

在本地测试时给了我这个错误 PTransform ..非法突变价值..的类.....

 Input values must not be mutated in any way.
    at org.apache.beam.runners.direct.ImmutabilityEnforcementFactory$ImmutabilityCheckingEnforcement.verifyUnmodified(ImmutabilityEnforcementFactory.java:96)
    at org.apache.beam.runners.direct.ImmutabilityEnforcementFactory$ImmutabilityCheckingEnforcement.afterElement(ImmutabilityEnforcementFactory.java:71)
    at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:149)
    at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

您的函数修改了输入 TrackingRequest 元素的位置字段。数据流不允许这样做。

文档说:

输入 PCollection 的当前元素由 c.element(( 返回。它应该被认为是不可变的。数据流运行时不会改变元素,因此缓存等是安全的。元素不应被任何 DoFn 方法改变,因为它可能缓存在其他地方、由数据流运行时保留或以其他未指定的方式使用。

您可以创建输入元素的副本,修改字段,然后将副本作为输出发送出去。

相关内容

最新更新