在 Apache Beam 对 Google Dataflow worker 的"process_outputs"期间出现令人困惑的错误



我正在Google Dataflow上运行以下成功的Apache Beam测试管道。它使用数据存储作为源和接收器。我们数据库中的许多实体都被分配到名称空间。该管道旨在对给定名称空间中的特定类型的所有实体执行_do_work()。请注意,对非名称空间实体执行相同操作的类似测试管道也能成功运行:

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1 import helper as apache_helper
from apache_beam.io.gcp.datastore.v1 import datastoreio
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper
from .pipelines.dataflow_settings import (
PROJECT, NAMESPACES_PER_PIPELINE
)
class NamespacedDatastoreMigration(_DatastoreMigrationBase):
"""
Map a do-function over a query multiplexed across several namespaces.
The inheritor must implement the following:
- a PROJECT class attribute
- a do-function (_do_work())
- a method to get the namespaces across which to shard the query (
get_namespaces())
"""
_NAMESPACES_PER_PIPELINE = NAMESPACES_PER_PIPELINE  # 25
def __init__(self, argv, migration_history_obj, model):
super(NamespacedDatastoreMigration, self).__init__(argv, migration_history_obj, model)
self._namespaces = self.get_namespaces()
self._pipelines = self._create_pipelines(argv)
def get_namespaces(self):
query_pb = query_pb2.Query()
helper.set_kind(query_pb, "__namespace__")
client = apache_helper.get_datastore(PROJECT)
namespace_entities = apache_helper.fetch_entities(PROJECT, '', query_pb, client)
namespaces = []
for n in namespace_entities:
# Get namespace name or id
key_path = n.key.path[-1]
if key_path.HasField('id'):
name_or_id = key_path.id
else:
name_or_id = key_path.name
# Avoid duplicates and test namespaces
if len(str(name_or_id)) > 1 and name_or_id not in namespaces:
namespaces.append(name_or_id)
return namespaces
def run(self):
for pipeline in self._pipelines:
pipeline.run()
def _create_pipelines(self, argv):
pipelines = []
for namespaces in zip(*[iter(self._namespaces)] * self._NAMESPACES_PER_PIPELINE):
p = beam.Pipeline(argv=argv)
(
(
p | 'ReadNamespace_{}'.format(
ns
) >> datastoreio.ReadFromDatastore(
project=self.PROJECT,
query=self.query(),
namespace=ns
)
for ns in namespaces
)
| 'JoinNamespaceEntities' >> beam.Flatten()
| self.__class__.__name__ >> beam.FlatMap(self._do_work)
| self._get_sink()
)
pipelines.append(p)
return pipelines
model = "App"
NamespacedDatastoreMigration(
argv,
kwargs.get('migration_history_obj'),  # Irrelevant here
model  # Entity kind
).run()

其中argv是:

argv = [
'--project={0}'.format(PROJECT),
'--job_name=' + name,  # A human readable descriptor that's been cleaned
'--staging_location=gs://{0}/migrations/'.format(BUCKET),
'--temp_location=gs://{0}/migrations/'.format(BUCKET),
'--setup_file=./setup.py',
'--runner=DataflowRunner'
]

这是基于子类:

class _DatastoreMigrationBase(object):
PROJECT = PROJECT
def __init__(self, argv, migration_history_obj, model):
self.migration_history_obj = migration_history_obj
if not model:
raise Exception('This operation requires a model class name.')
self.model = model
def query(self):
# Instantiate a filter protobuf
filter_pb = query_pb2.Filter()
# Get all non-deleted model instances
helper.set_property_filter(
filter_pb,
'deleted',
query_pb2.PropertyFilter.EQUAL,
False
)
# Instantiate a query protobuf
query_pb = query_pb2.Query(
filter=filter_pb
)
helper.set_kind(query_pb, self.model)
return query_pb
def _get_source(self):
return 'DatastoreRead' >> datastoreio.ReadFromDatastore(
self.PROJECT,
self.query(),
)
@staticmethod
def _do_work(entity):
return entity
def _get_sink(self):
return 'WriteToDatastore' >> datastoreio.WriteToDatastore(
self.PROJECT
)

然而,当我像这样将NamespacedDatastoreMigration划分为子类时:

from ..helpers import create_argv
from ..mappers import NamespacedDatastoreMigration

class CampaignActionField(NamespacedDatastoreMigration):
@staticmethod
def _do_work(entity):
target_url = entity.properties.get('target_url').string_value
message = entity.properties.get('message').string_value
path = entity.properties.get('path').string_value
if target_url and not message and not path:
entity.properties.get('action').string_value = 'webhook'
return entity

model = "Campaign"  # Entity kind
CampaignActionField(
create_argv(kwargs.get('name')),  # "ED-2275 Campaign action field"
kwargs.get('migration_history_obj'),  # Irrelevant here
model
).run()

如果这个新管道在Dataflow上运行,它就会失败。一开始,一切都很顺利。我的意思是,我看到以下信息日志:

2018-11-20 (11:02:57) Worker configuration: n1-standard-1 in us-central1-b.
2018-11-20 (11:03:15) Starting 1 workers in us-central1-b.

# SEVERAL OF THE FOLLOWING FOR DIFFERENT NAMESPACES:
2018-11-20 (11:03:15) Executing operation ReadNamespace_xxxx_1/GroupByKey/Create
2018-11-20 (11:03:17) Executing operation ReadNamespace_xxxx_1/UserQuery/Read+ReadNamespace_xxxx_1/SplitQuery+ReadNa...
2018-11-20 (11:05:58) Executing operation ReadNamespace_xxxx_1/GroupByKey/Close

然后我得到了这个回溯:

Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 156, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
def start(self):
File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_process_state:
File "dataflow_worker/shuffle_operations.py", line 66, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.shuffle_source.reader() as reader:
File "dataflow_worker/shuffle_operations.py", line 70, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/shuffle_operations.py", line 229, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
with self.scoped_process_state:
File "dataflow_worker/shuffle_operations.py", line 236, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
self.output(wvalue.with_value((k, wvalue.value)))
File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 717, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 763, in apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
TypeError: 'Entity' object is not iterable [while running 's152-c260']

我认为这与NamespacedDatastoreMigrationCampaignActionField中的两个_do_work()函数之间的差异有关,因为前者成功,而后者失败,而_do_work()是它们之间唯一的差异(除了被转换的实体类型(。但我想不出到底出了什么问题以及如何解决。有人有什么想法吗?

事实证明,在NamespacedDatastoreMigration_create_pipelines方法中将FlatMap更改为Map为我解决了这个问题。我还愚蠢地用非名称空间模型调用NamespacedDatastoreMigration,这就是为什么它成功了,而CamapaignActionField(使用名称空间模型(却没有成功。

相关内容

最新更新