我有一个简单的python管道,它在属性页上抓取数据,页面上的数据被划分为状态和属性类型。管道的步骤如下:
- 状态和属性类型的所有组合上的循环
- 对于每个组合,爬网程序都会遍历相应的页面并收集所有属性的URL
- 对于每个属性,数据在存储到SQLite数据库中之前都会进行爬网、清理和丰富
目前这是一个单线程且非常简单的过程。我想改进这一点,我正在寻找在我的新管道中使用的现代工具。既可以可视化处理的状态,也可以将其作为多处理管道运行。
目前,我有了使用Kafka和Airflow的第一个想法。一个进程在页面上抓取属性URL,并为每个URL创建Kafka消息。然后,第二个进程获取单个Kafka消息并对其进行处理;爬行、清洁、丰富、储存。同时,在Airflow中,我可以很好地概述进程的状态,甚至可以重试失败的进程。在那里,状态和属性类型的每个组合都被拆分为单独的DAG。
然而,问题是,我不能用多处理来进行爬网,因为这会导致对目标页面的请求过多,调用最终会被阻止。管道将出现故障。
我的新想法是还包括Kubernetes。我将有一个pod来抓取属性URL。然后,第二个pod将一次抓取一个属性URL。最后一个pod将负责处理属性数据(清理、丰富、存储),但我希望这个pod有X个实例,因为抓取数据会比处理数据更快
因为每个属性都有很多数据(大约20个字段,至少有一个字段包含属性的长描述),我认为Kafka不是在pod之间传输信息的好选择。但我看不出有其他选择可以包括工作队列。我唯一能想到的选择是,消息总是只包括列表的URL。但在爬网之后,数据存储在SQLite中,而将清理和丰富数据的最后一个pod将需要从SQLite DB中提取数据。这是一个合理的想法,还是有更好的选择?
我试着在谷歌上搜索关于如何使用Kubernetes+Airflow+Kafka
设置系统的教程和建议,但一无所获。有些页面只专门介绍在Kubernetes中运行Airflow,但从来没有关于Kafka的信息。这是否意味着这种组合是不可能的?如果是,为什么不呢?此外,你对我应该研究的更好的工具或完整的系统有什么建议吗?
如果我的问题过于模糊或开放,我很抱歉,我找不到其他地方可以找到以最佳方式建立这条管道的建议,并为我提供找工作的技能。
如果您必须要在K8上部署Kafka,您可以使用https://strimzi.io/这是一个成熟的开源运营商,可以很容易地在Kubernetes中部署Kafka。
然而,我认为您可以通过一些简单的云无服务器功能来实现您想要实现的功能,在GCP中,您可以使用pubsub+cloud功能。
在其他公共云中也有类似的服务。
如果你想知道为什么程序的运行时间这么长,你需要对其进行评测。
首先使用标准库中的cProfile
模块分析当前的单线程实现。尝试:
python -m cProfile -s cumtime yourscript.py <arguments>
这会让你知道你的程序在哪里花费时间。
我再怎么强调也不为过!有时,瓶颈完全不是你想象的那样。
例如,在我的一个程序中,它的大部分时间都花在了字符串格式化上!通过实验,我发现我可以通过使用适当的类型的specizer将时间减半:
In [1]: f = 139.3592529296875;
In [3]: %timeit str(f)
724 ns ± 0.31 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
In [4]: %timeit "{0}".format(f)
734 ns ± 1.21 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
In [5]: %timeit "{0:.5f}".format(f)
314 ns ± 0.0365 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
In [7]: %timeit "{0:f}".format(f)
313 ns ± 7.35 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
In [8]: %timeit "{0:e}".format(f)
382 ns ± 0.171 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
只有在您对脚本进行了分析之后,您才能考虑改进。一般来说,我建议从耗时最长的代码开始(但不是内置代码;对此你无能为力)。
请记住,最大的改进往往来自于更好的算法。
在同一程序的后期开发中,我在构建字符串之前将数值数据转换为字符串一次,因为所有的数字都被多次使用,所以节省了大量的重复工作。
Airflow用于调度程序,可能在Kubernetes pod中,但不是必需的;您可以运行一个独立的Airflow工作集群。
关于卡夫卡,气流并不是真正需要消费的,因为卡夫卡的话题是无穷无尽的、连续不断的。您可以发布一个多分区Kafka主题的url,然后运行与分区一样多的使用者(线程或进程),然后进行并行处理。由于处理是并行的,所以不要使用sqlite,因为这只需要一个实例来消耗所有数据。
例如,您仍然可以使用Kubernetes使用Knative或OpenFaaS来进行处理。
您也可以使用NATS或RabbitMQ,因为您只需要一个队列。或者Celery和Redis通常与Python一起使用。