不能使用两个进程通过 SQLAlchemy 流式传输到数据库



我有两个while True循环,每个循环从外部API读取一些数据。

db = Database(env='dev')
aStream = AStream(db)
proc1 = Process(target=aStream.stream_a_to_db())
proc1.start()
bStream = BStream(db)
proc2 = Process(target=bStream.stream_b_to_db())
proc2.start()

我的Database课如下

class Database:
def __init__(self, env='dev'):
"""DB setup"""
self.db_url = self._set_db_url_by_env(env)
self.engine = create_engine(self.db_url, echo=True)
def create_db_session(self):
# Create all tables that do not already exist
Base.metadata.create_all(self.engine, Base.metadata.tables.values(), checkfirst=True)
# SqlAlchemy :: Session setup
Session = sessionmaker(bind=self.engine)
# SqlAlchemy :: Starts a session
return Session()

我不明白运行此代码的结果。它以先运行者为准,即如果我proc2放在proc1之前,proc2流到 db 是唯一运行的流。

我尝试了两件事。

  1. 使用虚拟函数代替实际流。
def func1():
print("func1 up and running.")
def func2():
print("func2 up and running.")
proc1 = Process(target=func1)
proc1.start()

proc2 = Process(target=func2)
proc2.start()

这将按预期运行,func1func2都在运行和打印。

  1. 只需将bStream移动到单独的python文件,并在两个终端选项卡中手动运行两个文件,python a_stream.pypython b_stream.py,而无需对其他代码进行任何更改。它们每个运行良好,都可以毫无问题地将数据流式传输到数据库。

基本上,我的目标不是有两个脚本并手动运行它们,而只是一个带有两个进程的脚本。正确的方法是什么?它必须与SQLAlchemy引擎/会话的工作方式有关。我是SQLAlchemy的新手。感谢这里的任何帮助!

根据文档:

对于使用 os.fork 系统调用的多进程应用程序,或者例如 Python 多处理模块,通常需要为每个子进程使用单独的引擎。这是因为引擎维护对最终引用 DBAPI 连接的连接池的引用 - 这些连接往往不能跨进程边界移植。配置为不使用池(通过使用 NullPool 实现)的引擎没有此要求。

因此,与其在主进程中创建引擎并将其传递给两个子进程, 在每个子流程中创建一个新引擎:

def worker(Stream, methodname):
db = Database(env='dev')
stream = Stream(db)
getattr(stream, methodname)()
args = [(AStream, 'stream_a_to_db'), (BStream, 'stream_b_to_db')]
procs = [Process(target=worker, args=a) for a in args]
for proc in procs:
proc.start()
for proc in procs:
proc.join()

相关内容

  • 没有找到相关文章

最新更新