如何在 MQTT Paho 中存储和解析每 N 分钟 (python) 的实时流数据



我正在处理一个实时数据流项目,每N分钟解析和存储一次数据。我的目标是丢弃第一分钟的数据(作为缓冲区),并从服务器中存储每 4 分钟的数据。然后,数据将被解析为要聚类和计算的其他函数(此处不包括函数)。

我已经初始化了

"on_message"函数中的条件,数据在该功能中解析它。我不认为我的结构和召唤是实现我的目标的正确方法。如果您需要任何其他详细信息,请告诉我。

on_message

def on_message(r_c_client, userdata, message):
    if (message.topic == "scanning"):
     c = datetime.now().time()
     current = (c.hour * 60 + c.minute) * 60 + c.second
     time.sleep(60) #initial delay
     data = json.loads(message.payload.decode("utf-8"))
     x = data['host']
     y = data['data']
     hostList = store(x, y)
     while (current>=total_Time ):
      #time.sleep(60) #initial delay

        nodeList = listToDf(hostList)

        nodeDf= df_reformat(nodeList)
        print clustering_results_reformat(process_startTime, nodeDf)

存储功能

def store(host, data):


  if host in hostList:
      hostList[host].append(data)
  else:
      hostList[host] = [data]
  return hostList

主要

global process_startTime
t = datetime.now().time()
process_startTime = (t.hour * 60 + t.minute) * 60 + t.second
total_Time = process_startTime + 300 #4 minutes + 1 minute 
print t , process_startTime
broker_address = '10.10.0.100'
c_client = mqtt.Client("trilateration")
c_client.on_connect = on_connect

c_client.on_message = on_message
c_client.on_subscribe = on_subscribe

c_client.connect(broker_address, 1883)
c_client.loop_forever()

首先,你不应该在on_message函数中阻塞(睡眠),这个函数是为收到的每条消息调用的,如果你睡觉,那么系统将不得不等待一段时间才能继续下一条消息。

接下来,您需要跟踪on_message函数之外的开始时间,然后您可以将每条消息的当前时间与此值进行比较,并决定是否要保留/处理它。

def on_message(r_c_client, userdata, message):
  global process_startTime
  if (message.topic == "scanning"):
   c = datetime.now().time()
   current = (c.hour * 60 + c.minute) * 60 + c.second
   if (current<=total_Time and current>=(process_startTime + 60)):
     data = json.loads(message.payload.decode("utf-8"))
     x = data['host']
     y = data['data']
     hostList = store(x, y)

主要内容应如下所示:

global process_startTime
t = datetime.now().time()
process_startTime = (t.hour * 60 + t.minute) * 60 + t.second
total_Time = process_startTime + 300 #4 minutes + 1 minute 
print t , process_startTime
broker_address = '10.10.0.100'
c_client = mqtt.Client("trilateration")
c_client.on_connect = on_connect
c_client.on_message = on_message
c_client.on_subscribe = on_subscribe
c_client.connect(broker_address, 1883)
while (True):
  c_client.loop()
  c = datetime.now().time()
  current = (c.hour * 60 + c.minute) * 60 + c.second
  if (current >= total_Time):
    nodeList = listToDf(hostList)
    nodeDf= df_reformat(nodeList)
    print clustering_results_reformat(process_startTime, nodeDf)
  time.sleep(1)

相关内容

  • 没有找到相关文章

最新更新