如何在谷歌云数据流中实现PostgreSQL的连接池



#源配置

source_config = relational_db.SourceConfiguration(
drivername=CONFIG['drivername'],
host=CONFIG['host'],
port=CONFIG['port'],
database=CONFIG['database'],
username=CONFIG['username'],
password=CONFIG['password']
)
# Target database table
customer_purchase_config = relational_db.TableConfiguration(
name = 'customer_purchase',
create_if_missing = False,
primary_key_columns = ['purchaseId']
)

带光束。管道(选项=选项(为p:

res = (
p
| "Read data from PubSub"
>> beam.io.ReadFromPubSub(subscription=SUB).with_output_types(bytes)
|'Transformation' >> (beam.ParDo(PubSubToDict()))
)
customer_purchase = res | beam.ParDo(Customer_Purchase())
customer_purchase | 'Writing to customer_purchase' >> relational_db.Write(
source_config=source_config,
table_config=customer_purchase_config
)

因此,当我尝试使用这些配置时,我可以在PostgreSQL中插入和更新数据,但当我收到巨大的输入峰值时,我的连接限制正在达到,来自工作节点的重试次数也在增加,所以有没有任何方法可以定义连接池,以便我可以重用连接。

查看此pull-request示例,了解如何在Dataflow-Python SDK到Cloud SQL中管理连接池和批处理写入。

其思想是利用DoFn.Setup()方法,该方法在创建对象时运行一次,并将为每个python解释器创建一个静态连接池。这个池将在对象的生命周期中存在,然后您将通过租用连接并将连接返回到池中以供重用来打开和关闭会话。

经过一些测试,问题不在于连接的数量,而在于每秒创建和关闭的新连接的数量。每个新连接都需要SQL服务器上的一个线程来处理事务,这是一种反模式,会导致大量开销。假设您使用的是Beam Nuggets,库会创建一个新的连接,并在每个捆绑包之后处理它。如果你有小的捆绑包,那么它创建新连接的速度会太快。这给SQL服务器端带来了一个问题,因为同一受影响记录的每个事务现在都由不同的线程处理,这可能导致线程之间的行锁争用,并导致性能瓶颈,如Cloud SQL文档中所述。

另外需要记住的是钥匙的数量和捆绑包的大小。决定捆绑包大小的因素之一是每个密钥可用的数据量。从Pub/Sub读取时,默认为1024个密钥,目的是帮助过滤重复的消息。可以使用混洗步骤(GBK(GroupByKey将关键帧重新设置为设定数量的关键帧,例如40。这也有助于增加捆绑包的大小。如果实现正确,您应该将相同记录的相同事务组合到相同的密钥中,然后将其作为一个完整的事务批处理到Cloud SQL。这允许云SQL优化性能,并减少锁定行的线程开销。

我的建议是使用GroupByKey使密钥少于1024个,这会导致更大的捆绑包大小,并将受影响记录的每个事务分组在一起。然后,您可以使用该补丁批处理到CSQL,并增加了处理连接池的好处。这就减少了新连接,从而减少了云SQL的开销。

我在这里有一个异步python池选项,用于生产。它运行得非常好。

Python Postgres psycopg2线程连接池已耗尽

相关内容

最新更新