如何有效地将 tsv 文件中的数组列读取到每个列的单个 npz 文件中



我有一个数据文件,如下所示:

58f0965a62d62099f5c0771d35dbc218        0.868632614612579       [0.028979932889342308, 0.004080114420503378, 0.03757167607545853]       [-0.006008833646774292, -0.010409083217382431, 0.01565541699528694]
36f7859ce47417470bc28384694f0ac4        0.835115909576416       [0.026130573824048042, -0.00358427781611681, 0.06635218113660812]       [-0.06970945745706558, 0.03816794604063034, 0.03491008281707764]
59f7d617bb662155b0d49ce3f27093ed        0.907200276851654       [0.009903069585561752, -0.009721670299768448, 0.0151780480518937]       [-0.03264783322811127, 0.0035394825972616673, -0.05089104175567627]

其中列分别位于

  • 数据点的 MD5 哈希
  • 目标浮点输出
  • 我想读入np.array对象的浮点数组
  • 我想读入np.array对象的另一个浮点数组

我一直在阅读该文件,以便为浮点数组的两个矩阵创建一个 numpy 数组文件:

import numpy as np
from tqdm import tqdm
import pandas as pd
lol = []
with open('data.tsv') as fin:
for line in tqdm(fin):
md5hash, score, vector1, vector2 = line.strip().split('t')
row = {'md5_hash': md5hash, 'score':float(score), 
'vector1': np.array(eval(vector1)), 
'vector2': np.array(eval(vector2))
}
lol.append(row)

df = pd.DataFrame(lol)
training_vector1 = np.array(list(df['vector1']))
# Save the training vectors.
np.save('vector1.npz', training_vector1)
training_vector2 = np.array(list(df['vector2']))
# Save the training vectors.
np.save('vector1.npz', training_vector2)

虽然这适用于小型数据集,但实际数据集在数组中具有更多的浮点数,接近 2 亿行。下面是 100 行的示例 https://gist.github.com/1f6f0b2501dc334db1e0038d36452f5d

如何有效地将 tsv 文件中的数组列读取到每个列的单个 npz 文件中?

首先,对整体问题进行说明。 任何加载类似于您提供的示例输入的 200M 行的方法都需要大约 1.1 TB 的内存。 虽然这是可能的,但肯定不理想。 因此,我不建议继续这样做,而是研究专门为处理大型数据集而设计的方法,例如HDF5。

话虽如此,手头的问题并不是特别复杂,但通过pandaseval()可能既不可取,也无益。

对于cut预处理为CSV文件也是如此,这些文件的阅读方式稍微简单一些。

假设np.save()同样快,无论数组是如何生成的,我们可以说以下函数很好地复制了 OP 中的处理:

def process_tsv_OP(filepath="100-translation.embedded-3.tsv"):  
lol = []
with open(filepath, "r") as fin:
for line in fin:
md5hash, score, vector1, vector2 = line.strip().split('t')
row = {'md5_hash': md5hash, 'score':float(score), 
'vector1': np.array(eval(vector1)), 
'vector2': np.array(eval(vector2))
}
lol.append(row)
df = pd.DataFrame(lol)
training_vector1 = np.array(list(df['vector1']))
training_vector2 = np.array(list(df['vector2']))
return training_vector1, training_vector2

这可以通过避免pandas和">邪恶eval()"(以及在内存中的一些复制)来简化:

def text2row(text):
text = text[1:-1]
return [float(x) for x in text.split(',')]

def process_tsv(filepath="100-translation.embedded-3.tsv"):
with open(filepath, "r") as in_file:
v1 = []
v2 = []
for line in in_file:
_, _, text_r1, text_r2 = line.strip().split('t')
r1 = text2row(text_r1)
r2 = text2row(text_r2)
v1.append(r1)
v2.append(r2)
v1 = np.array(v1)
v2 = np.array(v2)
return v1, v2

很容易证明两者产生相同的输出:

def same_res(x, y):
return all(np.allclose(i, j) for i, j in zip(x, y))

same_res(process_tsv(), process_tsv_OP())
# True

但时间大不相同:

%timeit process_tsv_OP()
# 1 loop, best of 5: 300 ms per loop
%timeit process_tsv()
# 10 loops, best of 5: 86.1 ms per loop

(在用wget https://gist.githubusercontent.com/alvations/1f6f0b2501dc334db1e0038d36452f5d/raw/ee31c052a4dbda131df182f0237dbe6e5197dff2/100-translation.embedded-3.tsv获得的示例输入文件上)


cut预处理输入似乎没有那么有益:

!time cut -f3 100-translation.embedded-3.tsv | rev | cut -c2- | rev | cut -c2- > vector1.csv
# real  0m0.184s
# user  0m0.102s
# sys   0m0.233s
!time cut -f4 100-translation.embedded-3.tsv | rev | cut -c2- | rev | cut -c2- > vector2.csv
# real  0m0.208s
# user  0m0.113s
# sys   0m0.279s
%timeit np.genfromtxt('vector1.csv', delimiter=','); np.genfromtxt('vector2.csv', delimiter=',')
# 1 loop, best of 5: 130 ms per loop

而且,虽然使用pd.read_csv()可以节省一些时间:

%timeit pd.read_csv('vector1.csv').to_numpy(); pd.read_csv('vector2.csv').to_numpy()
# 10 loops, best of 5: 85.7 ms per loop

这似乎比提供的数据集上的原始方法还要慢(尽管cut本身可能会更好地扩展以获得更大的输入)。


如果你真的想坚持使用npy文件格式,你至少希望以块的形式附加到你的输出中。 虽然单独使用 NumPy 不支持此功能,但您可以使用NpyAppendArray(另请参阅此处)。 修改后的process_tsv()如下所示:

import os
from npy_append_array import NpyAppendArray

def process_tsv_append(
in_filepath="100-translation.embedded-3.tsv",
out1_filepath="out1.npy",
out2_filepath="out2.npy",
append_every=10,
):
# clear output files
for filepath in (out1_filepath, out2_filepath):
if os.path.isfile(filepath):
os.remove(filepath)
with 
open(in_filepath, "r") as in_file, 
NpyAppendArray(out1_filepath) as npaa1, 
NpyAppendArray(out2_filepath) as npaa2:
v1 = []
v2 = []
for i, line in enumerate(in_file, 1):
_, _, text_r1, text_r2 = line.strip().split("t")
r1 = text2row(text_r1)
r2 = text2row(text_r2)
v1.append(r1)
v2.append(r2)
if i % append_every == 0:
npaa1.append(np.array(v1))
npaa2.append(np.array(v2))
v1 = []
v2 = []
if len(v1) > 0:  # assumes len(v1) == len(v2)
npaa1.append(np.array(v1))
npaa2.append(np.array(v2))

process_tsv_append()
v1 = np.load("out1.npy")
v2 = np.load("out2.npy")
same_res(process_tsv(), (v1, v2))
# True

所有这些都可以用 Cython 相对盲目地加速,但加速似乎微不足道:

%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True

import numpy as np

cpdef text2row_cy(text):
return [float(x) for x in text[1:-1].split(',')]

cpdef process_tsv_cy(filepath="100-translation.embedded-3.tsv"):
with open(filepath, "r") as in_file:
v1 = []
v2 = []
for line in in_file:
_, _, text_r1, text_r2 = line.strip().split('t')
r1 = text2row_cy(text_r1)
r2 = text2row_cy(text_r2)
v1.append(r1)
v2.append(r2)
v1 = np.array(v1)
v2 = np.array(v2)
return v1, v2
print(same_res(process_tsv_cy(), process_tsv_OP()))
# True
%timeit process_tsv_cy()
# 10 loops, best of 5: 72.4 ms per loop

同样,预分配数组似乎也没有好处:

def text2row_out(text, out):
for i, x in enumerate(text[1:-1].split(',')):
out[i] = float(x)

def process_tsv_alloc(filepath="100-translation.embedded-3.tsv"):
num_lines = open(filepath, "r").read().count("n")
with open(filepath, "r") as in_file:
# num lines
num_lines = in_file.read().count("n")
# num cols
in_file.seek(0)
line = next(in_file)
_, _, text_r1, text_r2 = line.strip().split('t')
num_cols1 = len(text_r1.split(","))
num_cols2 = len(text_r2.split(","))
# populate arrays
v1 = np.empty((num_lines, num_cols1))
v2 = np.empty((num_lines, num_cols2))
in_file.seek(0)
for i, line in enumerate(in_file):
_, _, text_r1, text_r2 = line.strip().split('t')
text2row_out(text_r1, v1[i])
text2row_out(text_r2, v2[i])
return v1, v2

print(same_res(process_tsv_alloc(), process_tsv_OP()))
%timeit process_tsv_alloc()
# 10 loops, best of 5: 110 ms per loop

通过重写所有内容以更接近C,可以使用Numba(也可能使用Cython)显着减少运行时间。为了使我们的代码与 Numba 兼容 - 并且有利于通过 Numba 加速它,我们需要进行重大修改:

  • 以字节形式打开文件(不再支持 UTF-8,这对于手头的问题来说不是一个重大问题)
  • 以块的形式读取和处理文件,需要足够大,比如 1M 的数量
  • 手动编写所有字符串处理函数,特别是字符串到浮点数的转换
import numpy as np
import numba as nb

@nb.njit
def bytes2int(text):
c_min = ord("0")
c_max = ord("9")
n = len(text)
valid = n > 0
# determine sign
start = n - 1
stop = -1
sign = 1
if valid:
first = text[0]
if first == ord("+"):
stop = 0
elif first == ord("-"):
sign = -1
stop = 0
# parse rest
number = 0
j = 0
for i in range(start, stop, -1):
c = text[i]
if c_min <= c <= c_max:
number += (c - c_min) * 10 ** j
j += 1
else:
valid = False
break
return sign * number if valid else None

@nb.njit
def bytes2float_helper(text):
sep = ord(".")
c_min = ord("0")
c_max = ord("9")
n = len(text)
valid = n > 0
# determine sign
start = n - 1
stop = -1
sign = 1
if valid:
first = text[0]
if first == ord("+"):
stop = 0
elif first == ord("-"):
sign = -1
stop = 0
# parse rest
sep_pos = 0
number = 0
j = 0
for i in range(start, stop, -1):
c = text[i]
if c_min <= c <= c_max:
number += (c - c_min) * 10 ** j
j += 1
elif c == sep and sep_pos == 0:
sep_pos = j
else:
valid = False
break
return sign * number, sep_pos, valid

@nb.njit
def bytes2float(text):
exp_chars = b"eE"
exp_pos = -1
for exp_char in exp_chars:
for i, c in enumerate(text[::-1]):
if c == exp_char:
exp_pos = i
break
if exp_pos > -1:
break
if exp_pos > 0:
exp_number = bytes2int(text[-exp_pos:])
if exp_number is None:
exp_number = 0
number, sep_pos, valid = bytes2float_helper(text[:-exp_pos-1])
result = number / 10.0 ** (sep_pos - exp_number) if valid else None
else:
number, sep_pos, valid = bytes2float_helper(text)
result = number / 10.0 ** sep_pos if valid else None
return result

@nb.njit
def btrim(text):
space = ord(" ")
tab = ord("t")
nl = ord("n")
cr = ord("r")
start = 0
stop = 0
for c in text:
if c == space or c == tab or c == nl or c == cr:
start += 1
else:
break
for c in text[::-1]:
if c == space:
stop += 1
else:
break
if start == 0 and stop == 0:
return text
elif stop == 0:
return text[start:]
else:
return text[start:-stop]

@nb.njit
def text2row_nb(text, sep, num_cols, out, curr_row):
last_i = 0
j = 0
for i, c in enumerate(text):
if c == sep:
x = bytes2float(btrim(text[last_i:i]))
out[curr_row, j] = x
last_i = i + 2
j += 1
x = bytes2float(btrim(text[last_i:]))
out[curr_row, j] = x

@nb.njit
def process_line(line, psep, sep, num_psep, num_cols1, num_cols2, out1, out2, curr_row):
if len(line) > 0:
psep_pos = np.empty(num_psep, dtype=np.int_)
j = 0
for i, char in enumerate(line):
if char == psep:
psep_pos[j] = i
j += 1
text2row_nb(line[psep_pos[-2] + 2:psep_pos[-1] - 1], sep, num_cols1, out1, curr_row)
text2row_nb(line[psep_pos[-1] + 2:-1], sep, num_cols2, out2, curr_row)

@nb.njit
def decode_block(block, psep, sep, num_lines, num_cols1, num_cols2, out1, out2, curr_row):
nl = ord("n")
last_i = 0
i = j = 0
for c in block:
if c == nl:
process_line(block[last_i:i], psep, sep, 3, num_cols1, num_cols2, out1, out2, curr_row)
j += 1
last_i = i
curr_row += 1
if j >= num_lines:
break
i += 1
return block[i + 1:], curr_row

@nb.njit
def count_nl(block, start=0):
nl = ord("n")
for c in block:
if c == nl:
start += 1
return start

def process_tsv_block(filepath="100-translation.embedded-3.tsv", size=2 ** 18):
with open(filepath, "rb") as in_file:
# count newlines
num_lines = 0
while True:
block = in_file.read(size)
if block:
num_lines = count_nl(block, num_lines)
else:
break
# count num columns
in_file.seek(0)
line = next(in_file)
_, _, text_r1, text_r2 = line.strip().split(b't')
num_cols1 = len(text_r1.split(b","))
num_cols2 = len(text_r2.split(b","))

# fill output arrays
v1 = np.empty((num_lines, num_cols1))
v2 = np.empty((num_lines, num_cols2))
in_file.seek(0)
remainder = b""
curr_row = 0
while True:
block = in_file.read(size)
if block:
block = remainder + block
num_lines = count_nl(block)
if num_lines > 0:
remainder, curr_row = decode_block(block, ord("t"), ord(","), num_lines, num_cols1, num_cols2, v1, v2, curr_row)
else:
remainder = block
else:
num_lines = count_nl(remainder)
if num_lines > 0:
remainder, curr_row = decode_block(remainder, ord("t"), ord(","), num_lines, num_cols1, num_cols2, v1, v2, curr_row)
break
return v1, v2

所有这些工作的奖励仅比process_tsv()快 ~2 倍:

print(same_res(process_tsv_block(), process_tsv_OP()))
# True
%timeit process_tsv_block()
# 10 loops, best of 5: 48.8 ms per loop

剪切第 3 列,删除第一个和最后一个方括号

cut -f3 data.tsv | rev | cut -c2- | rev | cut -c2- > vector1.csv

对向量 2 重复相同的操作

cut -f4 data.tsv | rev | cut -c2- | rev | cut -c2- > vector2.csv

将 csv 读入 python 保存到 npy 文件中的 numpy。

import numpy as np
np.save('vector1.npy', np.genfromtxt('vector1.csv', delimiter=','))
np.save('vector1.npy', np.genfromtxt('vector2.csv', delimiter=','))

其他答案都很好,下面的版本是使用dask的变体。由于原始数据是文本格式,因此让我们使用dask.bagAPI。

首先,导入模块并定义一个实用程序函数:

from dask.array import from_delayed, from_npy_stack, to_npy_stack, vstack
from dask.bag import read_text
from numpy import array, nan, stack
def process_line(line):
"""Utility function adapted from the snippet in the question."""
md5hash, score, vector1, vector2 = line.strip().split("t")
row = {
"md5_hash": md5hash,
"score": float(score),
"vector1": array(eval(vector1)),
"vector2": array(eval(vector2)),
}
return row

接下来,创建一个bag

bag = read_text("100-translation.embedded-3.tsv", blocksize="1mb").map(process_line)

由于示例片段很小,为了模拟"大数据",让我们假设我们可以一次加载"1mb"。这应该在袋子中创建 3 个分区。

接下来,隔离向量/数组并将它们转换为dask.arrays

# create delayed versions of the arrays
a1 = bag.pluck("vector1").map_partitions(stack).to_delayed()
a2 = bag.pluck("vector2").map_partitions(stack).to_delayed()
# convert the delayed objects to dask array
A1 = vstack(
[from_delayed(a, shape=(nan, 768), dtype="float") for a in a1],
allow_unknown_chunksizes=True,
)
A2 = vstack(
[from_delayed(a, shape=(nan, 768), dtype="float") for a in a2],
allow_unknown_chunksizes=True,
)

现在,我们可以将数组保存为npy堆栈:

to_npy_stack("_A1", A1)
to_npy_stack("_A2", A2)

请注意,这种处理并不理想,因为工作人员将传递数据两次(每个数组一次),但是对于当前的API,我想不出更好的方法。

此外,请注意,npy堆栈将"未知"块保留为元数据,即使计算了所有相关信息也是如此。这是可以在 dask 代码库中改进的东西,但现在最简单的解决方法是再次加载数据,计算块,重新分块(以获得漂亮的网格状结构)并再次保存:

# rechunk into regular-sized format
A1 = from_npy_stack("_A1")
A1.compute_chunk_sizes()
A1.rechunk(chunks=(40, 768))
to_npy_stack("A1_final", A1)
# rechunk into regular-sized format
A2 = from_npy_stack("_A2")
A2.compute_chunk_sizes()
A2.rechunk(chunks=(40, 768))
to_npy_stack("A2_final", A2)

当然,在真实的数据集上,你会想要使用更大的块。最后的保存操作不一定是numpy堆栈,根据您的兴趣,现在可以将其存储为HDF5zarr数组。

如果将输出格式更改为原始二进制文件,则可以逐行处理输入文件,而无需将完整结果存储在RAM中。

import numpy as np
fh_in = open('data.tsv')
fh_vec1 = open('vector1.bin', 'wb')
fh_vec2 = open('vector2.bin', 'wb')
linecount = 0
for line in fh_in:
hash_, score, vec1, vec2 = line.strip().split('t')
np.fromstring(vec1.strip('[]'), sep=',').tofile(fh_vec1)
np.fromstring(vec2.strip('[]'), sep=',').tofile(fh_vec2)
linecount += 1

原始二进制文件不存储有关 dtype、形状或字节顺序的任何信息。 要将其加载回数组中,您可以使用np.fromfilenp.memmap,然后对其调用.reshape(linecount, -1)

最新更新