在ParDo上返回我自己的一个类时,数据流管道引发PicklingError



我有一个管道如下:

import base64
import gzip
import logging
import apache_beam as beam
import data.build.common.v1.common_pb2 as common_pb2
from data.pipeline.steps.console_proto_list import CONSOLE_PROTO_LIST
from google.protobuf.message import DecodeError
class GetClearMessage(beam.DoFn):
def process(self, element, **kwargs):
""" Parse encoded proto 
Returns an instance of EntryPoint decoded.
"""
logging.info('Unserializing data')
logging.info(element)
batch_entry_point = common_pb2.BatchEntryPoint()
data = element.data
logging.info(data)
try:
batch_entry_point.ParseFromString(data)
except DecodeError:
unziped = gzip.decompress(data)
batch_entry_point.ParseFromString(unziped)
logging.info(batch_entry_point)
return [batch_entry_point]
def batch_pipeline(pipeline):
console_message = (
pipeline
| 'Get console's message from pub/sub' >> beam.io.ReadFromPubSub(
subscription='projects/production-213911/subscriptions/ps-to-bq-console',
with_attributes=True)
)
clear_message = console_message | beam.ParDo(GetClearMessage())
gcloud_id = console_message | beam.ParDo(GetGcloudId())
registry = console_message | beam.ParDo(GetTableData())
#clear_message | beam.ParDo(Test())

我删除了一些类,因为没有必要理解这个问题。

当我在数据流上运行管道时,我经常会收到以下错误:

Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed [while running 'ParDo(GetClearMessage)-ptransform-4468']

请参阅下面的完整堆栈跟踪。

Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 156, in apache_beam.runners.worker.operations.ConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 184, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1277, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1288, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 390, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 446, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 257, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
File "/usr/local/lib/python3.6/site-packages/apache_beam/coders/coders.py", line 764, in <lambda>
lambda x: dumps(x, protocol), pickle.loads)
_pickle.PicklingError: Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
element.data)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 158, in apache_beam.runners.worker.operations.ConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 156, in apache_beam.runners.worker.operations.ConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 184, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1277, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1288, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 390, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 446, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 257, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
File "/usr/local/lib/python3.6/site-packages/apache_beam/coders/coders.py", line 764, in <lambda>
lambda x: dumps(x, protocol), pickle.loads)
_pickle.PicklingError: Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed [while running 'ParDo(GetClearMessage)-ptransform-314']

但是,正如您可能在GetClearMessage中看到的那样,我记录了一些数据,当我查看我的日志以了解这个特定步骤时,当我记录batch_entry_point(BatchEntryPoint的一个实例(时,每件事似乎都很好,它是引起麻烦的类。

你知道是什么导致了这种行为吗?


编辑

我试图在ParDo(GetClearMessage(的结果上添加一个步骤,但从未达到此步骤。所以我想pickle错误是因为我想返回BatchEntryPoint的一个实例。

我不理解这种行为,你知道怎么解决吗
感谢

我没有解决这个问题,但我找到了一个解决方案,不返回batch_entry_point,而是返回其中的每个元素,如下所示:

for i in batch_entry_point.entrypoints:
logging.info(i)
obj['proto'] = i
yield obj

然后,每个元素都由管道的下一步处理

最新更新