在Python SDK中使用SqlTransform进行本地测试Apache Beam。我收到一个错误:"'在运行器%s中未执行[%s]。



我在本地Apache Beam实例上使用SqlTransform时出现错误。下面是一个简单的测试:

with beam.Pipeline() as p:
pc = (p | beam.Create([
FruitRecipe("pie", "strawberry", 3, 1.5),
FruitRecipe("muffin", "blueberry", 2, 2.),
])
| beam.Map(lambda x: beam.Row(recipe = x[0],  # str
fruit = x[1],    # str
quantity = x[2], # int
unit_cost = x[3], # float
is_berry = x[1].endswith('berry')))) # bool
pc | SqlTransform(" SELECT * FROM PCOLLECTION WHERE quantity > 1")

错误:

RuntimeError Traceback(最近一次调用)在11 is_berry = x[1].endswith('berry')))) # bool12——比;13 pc | SqlTransform("SELECT * FROM PCOLLECTION WHERE quantity>1") #| beam.Map(print)

~/PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/pvalue.py in __or__(self, ptransform)
139 
140   def __or__(self, ptransform):
--> 141     return self.pipeline.apply(ptransform, self)
142 
143 
~/PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
689         transform.type_check_inputs(pvalueish)
690 
--> 691       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
692 
693       if type_options is not None and type_options.pipeline_type_check:
~/PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/runners/runner.py in apply(self, transform, input, options)
196       m = getattr(self, 'apply_%s' % cls.__name__, None)
197       if m:
--> 198         return m(transform, input, options)
199     raise NotImplementedError(
200         'Execution of [%s] not implemented in runner %s.' % (transform, self))
~PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
226   def apply_PTransform(self, transform, input, options):
227     # The base case of apply is to call the transform's expand.
--> 228     return transform.expand(input)
229 
230   def run_transform(self,
~/PROJECTS/Apache_Beam/env/lib/python3.8/site-packages/apache_beam/transforms/external.py in expand(self, pvalueish)
316       response = service.Expand(request)
317       if response.error:
--> 318         raise RuntimeError(response.error)
319       self._expanded_components = response.components
320       if any(env.dependencies
RuntimeError: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Encountered unsupported logical type URN: beam:logical:pythonsdk_any:v1
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at org.apache.beam.runners.core.construction.RehydratedComponents.getPCollection(RehydratedComponents.java:139)
at org.apache.beam.sdk.expansion.service.ExpansionService.lambda$expand$0(ExpansionService.java:422)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Collections.java:1577)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Collections.java:1602)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:417)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:491)
at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Encountered unsupported logical type URN: beam:logical:pythonsdk_any:v1
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at org.apache.beam.runners.core.construction.RehydratedComponents.getCoder(RehydratedComponents.java:168)
at org.apache.beam.runners.core.construction.PCollectionTranslation.fromProto(PCollectionTranslation.java:51)
at org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:108)
at org.apache.beam.runners.core.construction.RehydratedComponents$3.load(RehydratedComponents.java:98)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
... 27 more
Caused by: java.lang.IllegalArgumentException: Encountered unsupported logical type URN: beam:logical:pythonsdk_any:v1
at org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProtoWithoutNullable(SchemaTranslation.java:316)
at org.apache.beam.sdk.schemas.SchemaTranslation.fieldTypeFromProto(SchemaTranslation.java:232)
at org.apache.beam.sdk.schemas.SchemaTranslation.fieldFromProto(SchemaTranslation.java:226)
at org.apache.beam.sdk.schemas.SchemaTranslation.schemaFromProto(SchemaTranslation.java:212)
at org.apache.beam.runners.core.construction.CoderTranslators$8.fromComponents(CoderTranslators.java:169)
at org.apache.beam.runners.core.construction.CoderTranslators$8.fromComponents(CoderTranslators.java:151)
at org.apache.beam.runners.core.construction.CoderTranslation.fromKnownCoder(CoderTranslation.java:170)
at org.apache.beam.runners.core.construction.CoderTranslation.fromProto(CoderTranslation.java:145)
at org.apache.beam.runners.core.construction.RehydratedComponents$2.load(RehydratedComponents.java:87)
at org.apache.beam.runners.core.construction.RehydratedComponents$2.load(RehydratedComponents.java:82)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
... 38 more
<

br如果您根据期望的类型在beam.Row调用的每个字段周围添加强制类型转换,那么应该能够解决这个问题,例如:

| beam.Map(lambda x: beam.Row(recipe = str(x[0]),
fruit = str(x[1]),
quantity = int(x[2]),
unit_cost = float(x[3]),
is_berry = bool(x[1].endswith('berry')))))

详细说明错误java.lang.IllegalArgumentException: Encountered unsupported logical type URN: beam:logical:pythonsdk_any:v1表明Beam python无法确定传递给SqlTransform的PCollection中的一个字段的类型。通常这不是什么大问题,Beam Python只是使用回退(称为beam:logical:pythonsdk_any:v1),用Python序列化(即pickle)为任何此类字段编码值。这很好地工作,因为下游Python转换完全能够读取pickle编码的数据。这可能会影响性能,但不会破坏您的管道。

然而,在SqlTransform的情况下,我们实际上使用来自Java SDK的实现,它不了解Python序列化。所以当它遇到beam:logical:pythonsdk_any:v1时,它就放弃了。

我上面建议的解决方案,在每个值周围添加强制转换,确保Beam python将为每个字段推断出一个特定的类型,我们可以以可移植的方式编码。

我为这个问题提交了BEAM-11690。在这种情况下,我们应该提出一个更有用的错误信息。谢谢你提出这个问题!

相关内容

最新更新