我的数据是一个 10GB 的文件,格式如下:
[ 1234567890 ][ 2020052701020201 ][ value1 ][ value2 ][ key3 = value3 ]...[ keyn = valuen ]
注意:
- 可以有任意数量的 [ 键 = 值 ] 块。
- 字符
[
和]
在值本身中,例如:[ hello = wo[rld] ]
- 我无法控制 abinput 文件,除了我可以在我的脚本中更改/处理它。
- 我只需要几列,但是它们的值中有
[
和]
字符。
在我的简单for line in f:
函数中,我可以按' ][ '
模式进行拆分。 但是考虑到文件的大小,DAASK 是非常有利可图的。
我知道有了engine='c'
我就不能使用多字符分隔符,但是切换到engine='python'
会导致不可预测的结果。 下面是一个示例:
def init_ddf(filename):
return ddf.read_csv(
filename,
blocksize="1GB",
sep="]",
usecols=[1, 8],
na_filter=False,
names=["hello", World" ],
engine="c",
)
上面的代码按预期结果为ParserError: Too many columns specified: expected 25 and found 24
。此错误很难重现,因为它只是由于某些对我来说难以识别的特定行而发生的。并非每次有更多列时都会发生这种情况。所以在上面的函数中我改变了:engine="python"
和sep=" ][ "
.这适用于我测试的小样本数据。但是在 10G 文件中,我得到以下不可预测的行为:
def init_pyddf(filename, usecols, names):
return ddf.read_csv(
filename,
blocksize="1GB",
sep=" ][ ",
usecols=usecols,
na_filter=False,
names=names,
engine="python",
)
In [50]: !head /tmp/foo /tmp/bar
==> /tmp/foo <==
[ 1234567890 ][ 2020052701020201 ][ value1 ][ value2 ][ key3 = value3 ][ keyn = valuen ]
[ 1590471107 ][ 20200526T0731460 ][ THEOQQ ][ e = CL ][ Even = 175134 ][ rded = a12344 ][ blah = INVALID ][ N = T ][ ED = 13606 ]
==> /tmp/bar <==
[ 1234567890 ][ 2020052701020201 ][ value1 ][ value2 ][ key3 = value3 ][ keyn = valuen ]
[ 1590471107 ][ 20200526T0731460 ][ THEOQQ ][ e = CL ][ Even = 175134 ][ rded = a12344 ]
In [51]: init_pyddf("/tmp/foo", [1,2], ["time", "name"]).compute()
Out[51]:
time name
[ 1234567890 2020052701020201 value1 key3 = value3 keyn = valuen ]
[ 1590471107 20200526T0731460 THEOQQ Even = 175134 rded = a12344
In [52]: init_pyddf("/tmp/bar", [1,2], ["time", "name"]).compute()
Out[52]:
time name
0 2020052701020201 value1
1 20200526T0731460 THEOQQ
更多示例:
In [110]: !cat /tmp/dummy
[ 0 ][ 000000000000000000000000000 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ]
[ 1 ][ 20200526T073146.901861+0200 ][ T ][ E ][ E ][ F ][ W ][ N ][ E ][ E ][ 5 ]
In [111]: init_pyddf("/tmp/dummy", [1,7], ["time", "name"]).compute().head()
Out[111]:
time name
[ 0 0 0
[ 1 T E
In [112]: !cat /tmp/dummy
[ 0 ][ 000000000000000000000000000 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ]
[ 1 ][ 20200526T073146.901861+0200 ][ T ][ E ][ E ][ F ][ W ][ N ][ E ][ E ]
In [113]: init_pyddf("/tmp/dummy", [1,7], ["time", "name"]).compute().head()
Out[113]:
time name
0 000000000000000000000000000 0
1 20200526T073146.901861+0200 N
In [119]: !cat /tmp/dummy
[ 0 ][ 000000000000000 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ][ 0 ]
[ 1 ][ 20200526T073146 ][ T ][ D ][ F ][ W ][ e ][ E ][ E ][ I ][ T ][ T ][ S ][ S ][ B ][ A ][ E ][ F ][ S ][ P][ T = Y ][ 0 ]
In [120]: init_pyddf("/tmp/dummy", [1,7], ["time", "name"]).compute()
Out[120]:
time name
[ 0 000000000000000 0 0 0 0 0 0 0 0 ] NaN None None
[ 1 20200526T073146 T D F W e E E I T S S
鉴于您有一个更复杂的基于文本的文件格式,您可以首先从 Dask Bag 开始,使用普通的 Python 函数生成 python 字典,然后使用to_dataframe
方法将该 Bag 转换为 Dask 数据帧。
import dask.bag
b = dask.bag.read_text("my-files.*.txt")
def parse(line: str) -> dict:
...
records = b.map(parse)
df = b.to_dataframe()