所以我有一个项目想法,要求我处理传入的实时数据,并不断跟踪有关实时数据的一些指标。然后,我时不时地希望能够请求我正在计算的指标,并使用该数据做一些事情。
目前我有一个简单的Python脚本,使用套接字库来获取实时数据。它基本上只是…
metric1 = 0
metric2 = ''
while True:
response = socket.recv(512).decode('utf-8')
if response.startswith('PING'):
sock.send("PONGn".encode('utf-8'))
else:
process(response)
在上面的中,process(response)
将用每个响应的数据更新metric1
和metric2
。(例如,它们可能分别是mean len(response)和most common response) 我想做的是在启动项目后不断运行上述脚本,并偶尔在本地运行的脚本中查询metric1
和metric2
。我猜我将不得不考虑在我几乎没有经验的服务器上运行代码。
做我想做的事情最容易使用的工具是什么?我对各种语言都很熟悉,所以如果有其他语言的库或工具更适合所有这些,请告诉我
谢谢!
我做过一个类似的项目,不确定它是否特别适用于你的情况,但也许它可以给你一个起点。
虽然我很清楚将Pandas dataframe用于实时目的并不是最佳实践,但在我的情况下,它已经足够快了(我实际上愿意听取有关如何改进我的工作流程的建议!),下面是我的代码:
all_prices = pd.Dataframe()
readprice():
global all_prices
msg = mysock.recv(16384)
msg_stringa=str(msg,'utf-8')
new_price = pd.read_csv(StringIO(msg_stringa) , sep=";", error_bad_lines=False,
index_col=None, header=None, engine='c', names=range(33),
decimal = '.')
...
...
all_prices = all_prices.append(new_price, ignore_index=True).copy()
所以'all_prices' Pandas Dataframe是全局的,新的价格被附加到一般的'all_prices' DF。其他函数可以使用这个全局DF来读取内容。在两个或多个线程之间共享变量时要非常小心,这可能会导致错误。更多信息在这里:http://www.laurentluce.com/posts/python-threads-synchronization-locks-rlocks-semaphores-conditions-events-and-queues/
在我的例子中,我不将DF共享给并行线程,其他线程在追加后启动,而不是同时启动。