"PBegin"对象在运行梁管道时没有属性"窗口"



在运行数据流作业时,我得到的'PBegin'对象没有属性'windowing'。我正在调用pardo函数中的connectclass类。

我正在尝试从Beam python SDK连接NOSQL数据库,并运行sql从表中提取数据。然后使用另一个pardo将输出写入单独的文件。

class Connector(beam.DoFn):
def __init__(self,username,seeds,keyspace,password,datacenter=None):
self.username = username
self.password = password
self.seeds = seeds
self.keyspace = keyspace
self.datacenter = datacenter
super(self.__class__, self).__init__()
def process(self, element):
if datacenter:
load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=self.datacenter)
auth_provider = PlainTextAuthProvider(username=self.username, password=self.password)
cluster = Cluster(contact_points=self.seeds,
load_balancing_policy=load_balancing_policy,
auth_provider=auth_provider)
session=cluster.connect(self.seeds,self.keyspace,self.username, self.password, self.datacenter)
rows = session.execute(SQL Query)
yield rows

刚刚偶然发现了同样的问题。尝试连接到RDBMS源,但我想就实现设计而言,NoSQL和SQL数据库之间没有区别。

除了Jayadeep Jayaraman所建议的之外,这可以通过使用ParDo来实现。实际上,使用ParDo进行连接是波束文档建议的如果这样做的限制对于您的用例是可以接受的:

对于有界(批(源,目前有两种创建波束源的选项:

使用ParDo和GroupByKey。

使用Source接口并扩展BoundedSource抽象子类。

ParDo是推荐的选项,因为实现Source可能很棘手。请参阅何时使用>源接口,以获取您可能希望使用源>>的一些用例的列表(例如动态工作再平衡(。

您没有显示如何使用DoFn。对我来说,记住DoFn作用于已经存在的PCollection的元素是很有帮助的。它本身无法从头开始创建您的DoFn。因此,为了克服您提到的问题,您可能希望从内存创建一个PCollection,其中包含用于从源中检索数据的查询的一个元素。然后将从源读取的ParDo应用到此PCollection。

BTW:我在Pcollection中为每个分区设计了一个元素,我想从RDBMS中读取,这样就可以从SQL数据库中并行读取数据。

解决方案可能如下所示:

p | beam.Create(["Your Query / source object qualifier goes here"]) 
| "Read from Database" >> beam.ParDo(YourConnector())

我还要提到使用DoFn的start_bundle和finish_bundle方法来设置/断开连接可能是个好主意

为此,您需要使用Beam IO。这里有一个关于如何在Python中构建自定义IO的指南[1]。

ParDo通常用于在PCollection上运行转换。您也可以查看SplitableDoFn来构建这样的东西。此处参考[2]

1-https://beam.apache.org/documentation/io/developing-io-python/

2-https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html

相关内容

  • 没有找到相关文章

最新更新