我目前正在尝试流式数据流管道(在Python中)。我读取了一个数据流,我喜欢将其写入PG CloudSQL实例。为此,我正在寻找一个适当的地方来创建数据库连接。当我使用ParDo函数编写数据时,我认为DoFn.setup()将是一个好地方。
根据多个资源,这应该是一个好地方,因为setup()只被调用一次(当worker启动时)。
我运行了一些测试,但似乎setup()被调用的方式更频繁,而不仅仅是在工作的初始化。它的运行时间似乎和start_bundle()一样多(它在这么多元素之后)。
我创建了一个简单的管道,从PubSub读取一些消息,提取对象的文件名并输出文件名。除此之外,它还记录setup()
和start_bundle()
被调用的次数:
import argparse
import logging
from datetime import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
setup_counter=0
bundle_counter=0
class GetFileName(beam.DoFn):
"""
Generate file path from PubSub message attributes
"""
def _now(self):
return datetime.now().strftime("%Y/%m/%d %H:%M:%S")
def setup(self):
global setup_counter
moment = self._now()
logging.info("setup() called %s" % moment)
setup_counter=setup_counter+1
logging.info(f"""setup_counter = {setup_counter}""")
def start_bundle(self):
global bundle_counter
moment = self._now()
logging.info("Bundle started %s" % moment)
bundle_counter=bundle_counter+1
logging.info(f"""Bundle_counter = {bundle_counter}""")
def process(self, element):
attr = dict(element.attributes)
objectid = attr["objectId"]
# not sure if this is the prettiest way to create this uri, but works for the poc
path = f'{objectid}'
yield path
def run(input_subscription, pipeline_args=None):
pipeline_options = PipelineOptions(
pipeline_args, streaming=True
)
with beam.Pipeline(options=pipeline_options) as pipeline:
files = (pipeline
| "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=input_subscription,
with_attributes=True)
| "Get filepath" >> beam.ParDo(GetFileName())
)
files | "Print results" >> beam.Map(logging.info)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_subscription",
dest="input_subscription",
required=True,
help="The Cloud Pub/Sub subscription to read from."
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.input_subscription,
pipeline_args
)
基于此,我希望看到setup()
只记录一次(在启动管道之后),start_bundle()
记录任意次数,当在DirectRunner上运行此作业时。
然而,setup()
似乎和start_bundle()
一样被调用。
查看日志:
python main.py
> --runner DirectRunner
> --input_subscription <my_subscription>
> --direct_num_workers 1
> --streaming true
...
INFO:root:setup() called 2022/11/16 15:11:13
INFO:root:setup_counter = 1
INFO:root:Bundle started 2022/11/16 15:11:13
INFO:root:Bundle_counter = 1
INFO:root:avro/20221116135543584-hlgeinp.avro
INFO:root:avro/20221116135543600-hlsusop.avro
INFO:root:avro/20221116135543592-hlmvtgp.avro
INFO:root:avro/20221116135543597-hlsuppp.avro
INFO:root:avro/20221116135553122-boevtdp.avro
INFO:root:avro/20221116135553126-bomipep.avro
INFO:root:avro/20221116135553127-hlsuppp.avro
INFO:root:avro/20221116135155024-boripep.avro
INFO:root:avro/20221116135155020-bolohdp.avro
INFO:root:avro/20221116135155029-hlmvaep.avro
...
INFO:root:setup() called 2022/11/16 15:11:16
INFO:root:setup_counter = 2
INFO:root:Bundle started 2022/11/16 15:11:16
INFO:root:Bundle_counter = 2
INFO:root:high-volume/20221112234700584-hlprenp.avro
INFO:root:high-volume/20221113011240903-hlprenp.avro
INFO:root:high-volume/20221113010654305-hlprenp.avro
INFO:root:high-volume/20221113010822785-hlprenp.avro
INFO:root:high-volume/20221113010927402-hlprenp.avro
INFO:root:high-volume/20221113011248805-hlprenp.avro
INFO:root:high-volume/20221112234730001-hlprenp.avro
INFO:root:high-volume/20221112234738994-hlprenp.avro
INFO:root:high-volume/20221113010956395-hlprenp.avro
INFO:root:high-volume/20221113011648293-hlprenp.avro
...
INFO:root:setup() called 2022/11/16 15:11:18
INFO:root:setup_counter = 3
INFO:root:Bundle started 2022/11/16 15:11:18
INFO:root:Bundle_counter = 3
INFO:root:high-volume/20221113012008604-hlprenp.avro
INFO:root:high-volume/20221113011337394-hlprenp.avro
INFO:root:high-volume/20221113011307598-hlprenp.avro
INFO:root:high-volume/20221113011345403-hlprenp.avro
INFO:root:high-volume/20221113012000982-hlprenp.avro
INFO:root:high-volume/20221113011712190-hlprenp.avro
INFO:root:high-volume/20221113011640005-hlprenp.avro
INFO:root:high-volume/20221113012751380-hlprenp.avro
INFO:root:high-volume/20221113011914286-hlprenp.avro
INFO:root:high-volume/20221113012439206-hlprenp.avro
有人能解释一下这个行为吗?我想知道我对setup()
的功能的理解是否不正确,或者是否可以用另一种方式来解释。因为根据这个测试,setup()
似乎不是一个建立DB连接的好地方。
根据Beam
文档,setup
方法可以被多次调用:
DoFn.setup(): Called whenever the DoFn instance is deserialized on the worker.
This means it can be called more than once per worker because multiple instances of a given DoFn subclass may be created
(e.g., due to parallelization, or due to garbage collection
after a period of disuse).
This is a good place to connect to database instances, open network connections or other resources.
但是它仍然是实例化和创建数据库连接池的最佳位置。
teardown
是关闭每个worker连接的最佳位置。
DoFn.teardown(): Called once (as a best effort) per DoFn instance when the DoFn instance is shutting down.
This is a good place to close database instances, close network connections or other resources.
Note that teardown is called as a best effort and is not guaranteed. For example,
if the worker crashes, teardown might not be called.