Postgres和scoped_session,但仍然致命:剩余的连接槽保留给非复制超级用户连接



我正在使用 sqlalchemysessionmakerscoped_session为我的线程创建一个连接池,这样我就可以避免标题中的错误,但不幸的是我仍然得到它。我一直在阅读类似的问题并阅读博客,但不幸的是,我仍然无法解决这个问题。 我的应用程序正在侦听 pubsub,并在数据到达时在数据库中写入一些内容。该应用程序收到大量消息,因此在一定数量之后,我会收到错误。我以为使用sessionmakerscoped_session可以轻松处理这种情况,但我显然错过了一些东西。 以下是简化的代码:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from google.cloud import pubsub_v1
TOPIC_SUBSCRIBER = os.environ.get('PUBSUB_SUBSCRIBER')
PROJECT_ID = os.environ.get('PROJECT_ID')
client = pubsub_v1.SubscriberClient()
subscription_path = client.subscription_path(PROJECT_ID, TOPIC_SUBSCRIBER)
db_uri = os.environ.get('DATABASE_URI')
engine = create_engine(db_uri)
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
def event_handler(message):
session_db = Session()
# Do stuff
Session.remove()

def run():
streaming_pull_future = client.subscribe(
subscription_path, callback=event_handler
)
print("Listening for messages on {}".format(subscription_path))
# Calling result() on StreamingPullFuture keeps the main thread from
# exiting while messages get processed in the callbacks.
try:
streaming_pull_future.result()
except Exception as e:  # noqa
streaming_pull_future.cancel()
print("ERROR: {}".format(str(e)))

if __name__ == '__main__':
run()

该错误是由于 pubsub 创建过多线程并使用过多连接使数据库过载。我通过限制可以同时处理的并发消息的数量来解决它。这里还讨论了该方法。 这是我的代码:

def run():
# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=MAX_WORKERS)
custom_executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
custom_scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(custom_executor)
streaming_pull_future = client.subscribe(
subscription_path, callback=callback, flow_control=flow_control, scheduler=custom_scheduler
)
print("Listening for messages on {}".format(subscription_path))
# Calling result() on StreamingPullFuture keeps the main thread from
# exiting while messages get processed in the callbacks.
try:
streaming_pull_future.result()
except Exception as e:  # noqa
streaming_pull_future.cancel()
print("ERROR: {}".format(str(e)))

上面的代码解决了这个问题,但是我认为sessionmakerscoped_session会自己管理并发,而是我必须在其他地方处理它。我很想听听一些SQLAlchemy专家对此的看法。 我希望这将在未来对某人有所帮助。

相关内容

  • 没有找到相关文章

最新更新