处理持续的实时数据流



所以我有一个项目想法,要求我处理传入的实时数据,并不断跟踪有关实时数据的一些指标。然后,我时不时地希望能够请求我正在计算的指标,并使用该数据做一些事情。

目前我有一个简单的Python脚本,使用套接字库来获取实时数据。它基本上只是…

metric1 = 0
metric2 = ''
while True:
    response = socket.recv(512).decode('utf-8')
    if response.startswith('PING'):
        sock.send("PONGn".encode('utf-8'))
    else:
        process(response)

在上面的中,process(response)将用每个响应的数据更新metric1metric2。(例如,它们可能分别是mean len(response)和most common response)

我想做的是在启动项目后不断运行上述脚本,并偶尔在本地运行的脚本中查询metric1metric2。我猜我将不得不考虑在我几乎没有经验的服务器上运行代码。

做我想做的事情最容易使用的工具是什么?我对各种语言都很熟悉,所以如果有其他语言的库或工具更适合所有这些,请告诉我

谢谢!

我做过一个类似的项目,不确定它是否特别适用于你的情况,但也许它可以给你一个起点。

虽然我很清楚将Pandas dataframe用于实时目的并不是最佳实践,但在我的情况下,它已经足够快了(我实际上愿意听取有关如何改进我的工作流程的建议!),下面是我的代码:

all_prices = pd.Dataframe()
readprice():
 global all_prices
 
 msg = mysock.recv(16384)
    msg_stringa=str(msg,'utf-8')
    
    new_price = pd.read_csv(StringIO(msg_stringa) , sep=";", error_bad_lines=False, 
                    index_col=None, header=None, engine='c', names=range(33),
                    decimal = '.')
...
...
all_prices = all_prices.append(new_price, ignore_index=True).copy()

所以'all_prices' Pandas Dataframe是全局的,新的价格被附加到一般的'all_prices' DF。其他函数可以使用这个全局DF来读取内容。在两个或多个线程之间共享变量时要非常小心,这可能会导致错误。更多信息在这里:http://www.laurentluce.com/posts/python-threads-synchronization-locks-rlocks-semaphores-conditions-events-and-queues/

在我的例子中,我不将DF共享给并行线程,其他线程在追加后启动,而不是同时启动。

最新更新