数据流错误:'Clients have non-trivial state that is local and unpickleable'



>我有一个可以在本地执行的管道,没有任何错误。我曾经在本地运行的管道中收到此错误

'Clients have non-trivial state that is local and unpickleable.'
PicklingError: Pickling client objects is explicitly not supported.

我相信我通过降级到 apache-beam=2.3.0 来解决这个问题 然后在本地它将完美运行。

现在我正在使用数据流运行器,在要求.txt文件中,我有以下依赖项

apache-beam==2.3.0
google-cloud-bigquery==1.1.0
google-cloud-core==0.28.1
google-cloud-datastore==1.6.0
google-cloud-storage==1.10.0
protobuf==3.5.2.post1
pytz==2013.7

但我再次收到这个可怕的错误

'Clients have non-trivial state that is local and unpickleable.'
PicklingError: Pickling client objects is explicitly not supported.

为什么它给了我DataflowRunner的错误,而不是DirectRunner? 他们不应该使用相同的依赖项/环境吗? 任何帮助将不胜感激。

我已经读过这是解决它的方法,但是当我尝试它时,我仍然收到相同的错误

class MyDoFn(beam.DoFn):
def start_bundle(self, process_context):
self._dsclient = datastore.Client()
def process(self, context, *args, **kwargs):
# do stuff with self._dsclient

与 https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191 相比

我之前的参考帖子,我在本地修复了这个问题:

在 apache-beam 作业中使用 start_bundle(( 不起作用。不可腌制的存储。客户端((

提前感谢!

start_bundle方法中初始化不可挑剔的客户端是一种正确的方法,Beam IO 通常遵循这种方法,请参阅 datastoreio.py 作为示例。这是一个管道,它在DoFn中使用GCS python客户端执行简单操作。我在Apache Beam 2.16.0上运行它没有问题。如果您仍然可以重现您的问题,请提供其他详细信息。

gcs_client.py文件:

import argparse
import logging
import time
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage
class MyDoFn(beam.DoFn):
def start_bundle(self):
self.storage_client = storage.Client()
def process(self, element):
bucket = self.storage_client.get_bucket("existing-gcs-bucket")
blob = bucket.blob(str(int(time.time())))
blob.upload_from_string("payload")
return element
logging.getLogger().setLevel(logging.INFO)
_, options = argparse.ArgumentParser().parse_known_args()
pipeline_options = PipelineOptions(options)
p = beam.Pipeline(options=pipeline_options)
_ = p | beam.Create([None]) | beam.ParDo(MyDoFn())
p.run().wait_until_finish()

要求.txt文件:

google-cloud-storage==1.23.0

命令行:

python -m gcs_client 
--project=insert_your_project 
--runner=DataflowRunner 
--temp_location gs://existing-gcs-bucket/temp/ 
--requirements_file=requirements.txt 
--save_main_session

在让数据流将一堆行写入 Bigtable 时,我遇到了类似的问题。将--save-main-session设置为False似乎已经解决了它。

相关内容

最新更新