建立一个完美的管道来永远运行任务



我在构建一个完美的管道时遇到了问题。假设我有一个文件,称之为streamA.py和streamB.py。这两个文件的目的是全天候连续流式传输数据,并每500条记录流式传输一次,将数据推送到redis流中
我创建了另一个名为redis_to_postgres.py的文件,该文件异步获取redis流中的所有数据,并将数据推送到postgresql,并从我最近推送的id的redis流清除内存。这是通过async完成的。我希望在上一个管道启动后每15分钟进行一次计时。

最实用的方法是什么?在这种情况下,我会创建3个单独的管道吗?一个用于streamA,一个用于streamB,第三个用于从redis读取并推送到postgresql,最后清理数据?或者我会创建一个以并行方式流式传输数据的管道,另一个只读取并推送到postgres吗?感谢

一个有趣的用例!您是在要求Prefect≤1.0还是Orion?对于Orion,有一篇博客文章更详细地讨论了这个问题,并展示了示例流程。

但我认为你要求的是Prefect≤1.0。

为了从Redis读取数据并将其加载到Postgres,比如说每10秒,你可以在预取任务中使用一个循环:

for iteration in range(1, 7):
logger.info("iteration nr %s", iteration)
read_from_redis_and_load_to_postgres() # your logic here
if iteration < 6:
logger.info("Sleeping for 10 seconds...")
time.sleep(10)

这个流程可以安排为每分钟运行一次。这将为您提供重试、可观察性和所有的预取功能,并且每10秒将数据加载到Postgres不会淹没您的数据库。

但是,对于您获得实时数据并连续将其加载到Redis流的部分,您可以将其作为单独的服务而不是预取流来运行,因为预取1.0流更倾向于批量处理,并且预计会在某个时刻结束,以判断流运行是否成功。如果你把它作为一个永远不会结束的预取流,它可能会失去流心跳,并被僵尸杀手过程杀死。因此,运行此部分可能更容易,例如作为一个全天候运行的独立容器化服务。您可以将其部署为单独的Kubernetes部署或ECS服务。

它还取决于许多因素,包括这个代码在做什么,这个API的可靠性有多高(从中提取数据的源系统是否有一定的速率限制?为什么有500条记录?这500条记录的填充频率是多少,以及你最终向Redis写入的频率是多少?(。

话虽如此,我很想看看你是否可以在Orion中实现它,就像博客文章的例子一样。我们目前正在收集有关Orion流媒体用例的反馈,因此如果您在Orion中实现这一点,我们很想听听您对此的反馈。

最新更新