我正在处理一个实时数据流项目,每N分钟解析和存储一次数据。我的目标是丢弃第一分钟的数据(作为缓冲区),并从服务器中存储每 4 分钟的数据。然后,数据将被解析为要聚类和计算的其他函数(此处不包括函数)。
我已经初始化了"on_message"函数中的条件,数据在该功能中解析它。我不认为我的结构和召唤是实现我的目标的正确方法。如果您需要任何其他详细信息,请告诉我。
on_message
def on_message(r_c_client, userdata, message):
if (message.topic == "scanning"):
c = datetime.now().time()
current = (c.hour * 60 + c.minute) * 60 + c.second
time.sleep(60) #initial delay
data = json.loads(message.payload.decode("utf-8"))
x = data['host']
y = data['data']
hostList = store(x, y)
while (current>=total_Time ):
#time.sleep(60) #initial delay
nodeList = listToDf(hostList)
nodeDf= df_reformat(nodeList)
print clustering_results_reformat(process_startTime, nodeDf)
存储功能
def store(host, data):
if host in hostList:
hostList[host].append(data)
else:
hostList[host] = [data]
return hostList
主要
global process_startTime
t = datetime.now().time()
process_startTime = (t.hour * 60 + t.minute) * 60 + t.second
total_Time = process_startTime + 300 #4 minutes + 1 minute
print t , process_startTime
broker_address = '10.10.0.100'
c_client = mqtt.Client("trilateration")
c_client.on_connect = on_connect
c_client.on_message = on_message
c_client.on_subscribe = on_subscribe
c_client.connect(broker_address, 1883)
c_client.loop_forever()
首先,你不应该在on_message
函数中阻塞(睡眠),这个函数是为收到的每条消息调用的,如果你睡觉,那么系统将不得不等待一段时间才能继续下一条消息。
接下来,您需要跟踪on_message
函数之外的开始时间,然后您可以将每条消息的当前时间与此值进行比较,并决定是否要保留/处理它。
def on_message(r_c_client, userdata, message):
global process_startTime
if (message.topic == "scanning"):
c = datetime.now().time()
current = (c.hour * 60 + c.minute) * 60 + c.second
if (current<=total_Time and current>=(process_startTime + 60)):
data = json.loads(message.payload.decode("utf-8"))
x = data['host']
y = data['data']
hostList = store(x, y)
主要内容应如下所示:
global process_startTime
t = datetime.now().time()
process_startTime = (t.hour * 60 + t.minute) * 60 + t.second
total_Time = process_startTime + 300 #4 minutes + 1 minute
print t , process_startTime
broker_address = '10.10.0.100'
c_client = mqtt.Client("trilateration")
c_client.on_connect = on_connect
c_client.on_message = on_message
c_client.on_subscribe = on_subscribe
c_client.connect(broker_address, 1883)
while (True):
c_client.loop()
c = datetime.now().time()
current = (c.hour * 60 + c.minute) * 60 + c.second
if (current >= total_Time):
nodeList = listToDf(hostList)
nodeDf= df_reformat(nodeList)
print clustering_results_reformat(process_startTime, nodeDf)
time.sleep(1)