我们有一个数据流作业,它在Pubsub中消费消息,进行一些转换,并在CloudSQL Postgres实例上执行DML (INSERT, UPDATE, DELETE)。我们观察到瓶颈是在数据库中。代码是用Python编写的,使用SQLAlchemy作为与Postgres接口的库
我们观察到的常见问题有:
- 最大连接数,创建多个连接池
- 当有大量数据从Pubsub传入时,负责写入数据库的DoFn抛出这些异常:
Task was destroyed but it is pending! task: <Task pending name='Task-194770'...
Task exception was never retrieved future: <Task finished name='Task-196602'...
RuntimeError: aiohttp.client_exceptions.ClientResponseError: 429, message='Too Many Requests', url=URL('https://sqladmin.googleapis.com/sql/v1beta4/projects/.../instances/db-csql:generateEphemeralCert') [while running 'write_data-ptransform-48']
似乎云SQL API在这里达到了速率限制。
这些应该是我们理想的场景:
- 无论Dataflow创建的工人数量和数量如何,我们应该在整个管道中只有一个ConnectionPool(一个单例),具有静态连接数(分配给Dataflow作业的最大连接数为50,数据库中配置的最大连接数为200)。
- 在来自Pubsub的高流量时刻,应该有一些机制来限制进入数据库的请求速率。或者不为负责写入数据库的DoFn扩展工作线程的数量。
你能推荐一种方法来完成这个吗?
根据我的经验,单个全局连接池是不可能的,因为你不能将连接对象传递给worker (pickle/unpickle)。这是真的吗?
您应该尝试批量调用数据库。伪代码看起来像这样(取自beam编程指南)
class BufferDoFn(DoFn):
BUFFER = BagStateSpec('buffer', EventCoder())
IS_TIMER_SET = ReadModifyWriteStateSpec('is_timer_set', BooleanCoder())
OUTPUT = TimerSpec('output', TimeDomain.REAL_TIME)
def process(self,
buffer=DoFn.StateParam(BUFFER),
is_timer_set=DoFn.StateParam(IS_TIMER_SET),
timer=DoFn.TimerParam(OUTPUT)):
buffer.add(element)
if not is_timer_set.read():
timer.set(Timestamp.now() + Duration(seconds=10))
is_timer_set.write(True)
@on_timer(OUTPUT)
def output_callback(self,
buffer=DoFn.StateParam(BUFFER),
is_timer_set=DoFn.StateParam(IS_TIMER_SET)):
send_rpc(list(buffer.read()))
buffer.clear()
is_timer_set.clear()
原则上,您需要编写一个可拆分的dofn,并使用计时器和状态。