我有一个CSV文件,其中包含来自随机传感器的数据,记录了几分钟的时间。现在,我想将数据从CSV文件流式传输到我的python代码,就好像它直接从传感器本身接收数据一样。(该代码用于从两个不同的传感器/csv文件中获取读数并对其进行平均。有人建议使用Apache Spark来流式传输数据,但我觉得这对我来说有点太复杂了。可能有更简单的解决方案吗?
使用 pandas read_csv() 函数来读取小块的大 csv 文件,基本代码写如下:
import pandas as pd
chunksize = 100
for chunk in pd.read_csv('myfile.csv', chunksize=chunksize):
print(chunk)
此链接解释了其工作原理:http://pandas.pydata.org/pandas-docs/stable/io.html#io-chunking
你可以在python中使用类似tail -f
来实现这一点。 这应该做你想做的事。 http://lethain.com/tailing-in-python/
你也可以在Numpy/Matplotlib上使用Python。这是一种将 csv 数据速度流式传输为变量而不是额外文件的简单方法。
´import matplotlib.pyplot as plt
from matplotlib import style
import numpy as np
import io
def draw_graph_stream(csv_content):
csv_stream = io.StringIO(csv_content)
svg_stream = io.StringIO()
data = np.genfromtxt(csv_stream, delimiter = ';') # generate the stream
x = data[0,:] #first row in csv
y = np.mean(data[1:,:], axis=0) # first column with mean generate the average
plt.plot(x,y)
plt.savefig(svg_stream, format = 'svg') #just safe it as svg
svg_stream.seek(0) #Position 0 for reading after writing
return svg_stream.read()
print("Start test")
with io.open('/filepathtodata','r') as csv_file: #works like a Loop
print("Reading file")
csv_content = csv_file.read()
print("Drawing graph")
svg_content = draw_graph_stream(csv_content)
with io.open('thefilepathforsafe','w+') as svg_file:
print("Write back")
svg_file.write(svg_content)´
它在fastapi中工作,用于将数据流式传输到用户界面。
from starlette.responses import StreamingResponse
from io import BytesIO
temp_file_name = 'test.csv'
with open(temp_file_name, 'rb') as fh:
buffered_data = BytesIO(fh.read())
response = StreamingResponse(buffered_data, media_type="text/csv")
response.headers["Content-Disposition"] = f"attachment; filename=filename.csv"
return response