我有两个while True
循环,每个循环从外部API读取一些数据。
db = Database(env='dev')
aStream = AStream(db)
proc1 = Process(target=aStream.stream_a_to_db())
proc1.start()
bStream = BStream(db)
proc2 = Process(target=bStream.stream_b_to_db())
proc2.start()
我的Database
课如下
class Database:
def __init__(self, env='dev'):
"""DB setup"""
self.db_url = self._set_db_url_by_env(env)
self.engine = create_engine(self.db_url, echo=True)
def create_db_session(self):
# Create all tables that do not already exist
Base.metadata.create_all(self.engine, Base.metadata.tables.values(), checkfirst=True)
# SqlAlchemy :: Session setup
Session = sessionmaker(bind=self.engine)
# SqlAlchemy :: Starts a session
return Session()
我不明白运行此代码的结果。它以先运行者为准,即如果我proc2
放在proc1
之前,proc2
流到 db 是唯一运行的流。
我尝试了两件事。
- 使用虚拟函数代替实际流。
def func1():
print("func1 up and running.")
def func2():
print("func2 up and running.")
proc1 = Process(target=func1)
proc1.start()
proc2 = Process(target=func2)
proc2.start()
这将按预期运行,func1
和func2
都在运行和打印。
- 只需将
bStream
移动到单独的python文件,并在两个终端选项卡中手动运行两个文件,python a_stream.py
和python b_stream.py
,而无需对其他代码进行任何更改。它们每个运行良好,都可以毫无问题地将数据流式传输到数据库。
基本上,我的目标不是有两个脚本并手动运行它们,而只是一个带有两个进程的脚本。正确的方法是什么?它必须与SQLAlchemy引擎/会话的工作方式有关。我是SQLAlchemy的新手。感谢这里的任何帮助!
根据文档:
对于使用 os.fork 系统调用的多进程应用程序,或者例如 Python 多处理模块,通常需要为每个子进程使用单独的引擎。这是因为引擎维护对最终引用 DBAPI 连接的连接池的引用 - 这些连接往往不能跨进程边界移植。配置为不使用池(通过使用 NullPool 实现)的引擎没有此要求。
因此,与其在主进程中创建引擎并将其传递给两个子进程, 在每个子流程中创建一个新引擎:
def worker(Stream, methodname):
db = Database(env='dev')
stream = Stream(db)
getattr(stream, methodname)()
args = [(AStream, 'stream_a_to_db'), (BStream, 'stream_b_to_db')]
procs = [Process(target=worker, args=a) for a in args]
for proc in procs:
proc.start()
for proc in procs:
proc.join()