如何有效地将时间戳列表与 Pandas 中的时间戳范围列表匹配



我有 3 个熊猫数据帧

df_a = pd.DataFrame(data={
'id': [1, 5, 3, 2],
'ts': [3, 5, 11, 14],
'other_cols': ['...'] * 4
})
df_b = pd.DataFrame(data={
'id': [2, 1, 3],
'ts': [7, 8, 15],
'other_cols': ['...'] * 3
})
df_c = pd.DataFrame(data={
'id': [154, 237, 726, 814, 528, 237, 248, 514],
'ts': [1, 2, 4, 6, 9, 10, 12, 13],
'other_cols': ['...'] * 8
})

这是我需要解决的问题

  • 对于df_a中的每个id,请找到df_b中的相应id及其时间戳。让我们假设ts_ats_b.
  • 查找min(ts_a, ts_b)max(ts_a, ts_b)之间的df_c的所有行,并计算这些行的一些自定义函数。这个函数可以是 pd 函数(在 95% 的情况下),但它可以是任何 python 函数。

以下是每个 id(id、ts)的行示例:

  • ID 1:[726, 4], [814, 6]
  • ID 2:[528, 9], [237, 10], [248, 12], [514, 13]
  • ID 3:[248, 12], [514, 13]
  • id 5:只能在 A 中找到,而在 B 中找不到,所以什么都不应该做

输出并不重要,因此任何可以将id映射到f(rows for that id)的东西都可以完成这项工作。

例如,假设我需要对结果应用一个简单的len函数,我将得到以下结果

res
id
12
24
32

这是我的最新尝试。我认为它非常快,但当然速度完全取决于您尝试的表格的内容。让我知道它是如何为您工作的。

合成数据生成:

import random
import pandas as pd

a_len = int(1e7)
c_len = int(1e8)
df_a = pd.DataFrame(data={
'id': random.sample(population=range(a_len), k=int(a_len * .99)),
'ts': random.choices(population=range(int(a_len * 10)), k=int(a_len * .99)),
'other_cols': ['...'] * int(a_len * .99)
})
df_a.sort_values(by=["ts"], inplace=True)
df_b = pd.DataFrame(data={
'id': random.sample(population=range(a_len), k=int(a_len * .99)),
'ts': random.choices(population=range(int(a_len * 10)), k=int(a_len * .99)),
'other_cols': ['...'] * int(a_len * .99)
})
df_b.sort_values(by=["ts"], inplace=True)
df_c = pd.DataFrame(data={
'id': range(c_len),
'ts': random.choices(population=range(int(a_len * 1e7)), k=c_len),
'other_cols': ['...'] * c_len
})
df_c.sort_values(by=["ts"], inplace=True)

这些表的示例生成的一些统计信息是:

size_by_id = df_c_labeled.groupby(by=["id"]).size()
size_by_id.max()
>>> 91
size_by_id.median()
>>> 26.0

该算法,利用pandas.IntervalIndex

import functools
import numpy as np
import pandas as pd

def cartesian_product(*arrays):
"""https://stackoverflow.com/a/11146645/7059681"""
la = len(arrays)
dtype = np.result_type(*arrays)
arr = np.empty([len(a) for a in arrays] + [la], dtype=dtype)
for i, a in enumerate(np.ix_(*arrays)):
arr[...,i] = a
return arr.reshape(-1, la).T

# inner join on id
df_ts = pd.merge(
left=df_a[["id", "ts"]],
right=df_b[["id", "ts"]],
how="inner",
on="id",
suffixes=["_a", "_b"]
)
# a = min ts, b = max ts
df_ts["ts_a"], df_ts["ts_b"] = (
df_ts[["ts_a", "ts_b"]].min(axis=1),
df_ts[["ts_a", "ts_b"]].max(axis=1),
)
a_min = df_ts["ts_a"].min()
b_max = df_ts["ts_b"].max()
interval_index = pd.IntervalIndex.from_arrays(
left=df_ts["ts_a"],
right=df_ts["ts_b"],
closed="both",
)
# rename to avoid collisions
df_c.rename(columns={"id": "id_c", "ts": "ts_c"}, inplace=True)
ts_c = df_c["ts_c"].to_numpy()
df_c_idxs_list, df_ts_idxs_list = [], []
# the first item in ts_c that is at least equal to a_min
c_lo = 0
while ts_c[c_lo] < a_min:
c_lo += 1
c_idx = c_lo
c_hi = len(ts_c)
while c_lo < c_hi and ts_c[c_lo] <= b_max:
# the index of the next greatest ts in ts_c
# depending on how often you many duplicate values you have in ts_c,
# it may be faster to binary search instead of incrementing one by one
# c_idx = bisect.bisect_right(a=ts_c, x=ts_c[c_lo], lo=c_idx, hi=c_hi)
while c_idx < c_hi and ts_c[c_idx] == ts_c[c_lo]:
c_idx += 1
# the indicies of the intervals containing ts_c[c_lo]
unique_ts_idxs = np.where(interval_index.contains(ts_c[c_lo]))[0]
# all the indicies equal to ts_c[c_lo]
unique_c_idxs = df_c.iloc[c_lo: c_idx].index

# all the pairs of these indicies
c_idxs, ts_idxs = cartesian_product(unique_c_idxs, unique_ts_idxs)
df_c_idxs_list.append(c_idxs)
df_ts_idxs_list.append(ts_idxs)
c_lo = c_idx
df_c_idxs = np.concatenate(df_c_idxs_list)
df_ts_idxs = np.concatenate(df_ts_idxs_list)
df_c_labeled = pd.concat(
[
df_ts.loc[df_ts_idxs, :].reset_index(drop=True),
df_c.loc[df_c_idxs, :].reset_index(drop=True)
],
axis=1
)
print(df_c_labeled)
id  ts_a  ts_b  id_c  ts_c other_cols
0   1     3     8   726     4        ...
1   1     3     8   814     6        ...
2   2     7    14   528     9        ...
3   2     7    14   237    10        ...
4   3    11    15   248    12        ...
5   2     7    14   248    12        ...
6   3    11    15   514    13        ...
7   2     7    14   514    13        ...

现在我们可以做一些groupby的事情:

id_groupby = df_c_labeled.groupby(by="id")
<小时 />
id_groupby["ts_c"].size()
id
1    2
2    4
3    2
Name: ts_c, dtype: int64
<小时 />
id_groupby["ts_c"].max() - id_groupby["ts_c"].min()
id
1    2
2    4
3    1
Name: ts_c, dtype: int64

概述

这个问题可以分两部分有效解决。

第一部分包括查找df_adf_b中的匹配行以及基于tsdf_c行的范围。这可以使用并行 Numba 实现非常快速地完成(同时仅消耗输入数据集的一小部分)。

第二部分包括计算基于用户定义的函数,这些函数可能是熊猫函数。后面的操作本质上很慢,内存也很昂贵。事实上,Pandas 函数主要在数据帧/序列上运行,而这些数据帧/序列在这里效率不高。众所周知,迭代包含通用纯 Python 类型的 Pandas 数据帧非常慢。构建许多小数据帧很慢(即使创建空数据帧也有相当高的开销),但内存效率很高。创建大数据帧的速度要快得多,但这显然不是内存效率的,因为它几乎强制复制许多行(由于df_c中要提取的项目数量为df_a/df_b,因此需要数十甚至数百次)。最后,最快的Pandas解决方案将远远慢于最佳时间(至少一个数量级)。另请注意,并行性在这里几乎没有帮助,因为 GIL 会阻止多线程代码快速运行,而酸洗会阻止多处理快速。此外,像Numba或Cython这样的工具无法帮助用户定义的通用纯Python函数。AFAIK,使这部分真正快速和内存效率的唯一方法就是不要在巨大的数据帧或通用纯 Python 函数上应用通用的 pandas 函数


第 1 部分:提取数据帧行

第一部分可以使用并行 Numba (JIT 编译器) 代码来完成。虽然 Numba 不直接支持 Pandas,但 Pandas 主要在内部使用 Numpy,这得到了 Numba 的良好支持。计算可以拆分为许多并行高效计算的块。主要思想是构建一个快速的df_b索引,以便线性时间合并df_adf_b,并使用二叉搜索来查找df_c中的匹配行范围。生成的代码非常快。问题是输出格式对于第 2 部分不是很有效。这是代码:

import numba as nb
import numpy as np
import pandas as pd
# Feel free to change the signature based on the actual type of your dataframe. Smaller types take less memory and tends to be faster because of that.
@nb.njit('(int64[::1], int64[::1], int64[::1], int64[::1], int64[::1])', parallel=True)
def find_matching_rows(df_a_id, df_a_ts, df_b_id, df_b_ts, df_c_ts):
# Build an index of `df_b` IDs
b_index = {df_b_id[i]: i for i in range(df_b_id.size)}

# Mark the `df_a` rows found in `df_b` (parallel)
found = np.empty(df_a_id.size, np.bool_)
for a_row in nb.prange(df_a_id.size):
a_id = df_a_id[a_row]
found[a_row] = a_id in b_index

# Count the number of valid rows (parallel)
count = 0
for a_row in nb.prange(df_a_id.size):
count += found[a_row]

# Count the number of valid item per chunk and
# the offsets of the output of each chunk (mainly parallel)
chunk_size = 32768
chunk_count = (found.size + chunk_size - 1) // chunk_size
count_by_chunk = np.empty(chunk_count, np.int32)
for i in nb.prange(chunk_count):
count_by_chunk[i] = np.sum(found[i*chunk_size:(i+1)*chunk_size])
out_offsets = np.zeros(chunk_count + 1, np.int32)
for i in range(chunk_count):
out_offsets[i+1] = out_offsets[i] + count_by_chunk[i]
assert out_offsets[chunk_count] == count

# Main chunk-based computation (parallel)
a_rows = np.empty(count, np.int32)              # `df_a` indices
b_rows = np.empty(count, np.int32)              # `df_b` indices
c_rows = np.empty((count, 2), np.int32)         # Start/end indices
for chunk_id in nb.prange(chunk_count):
a_row_start = chunk_id * chunk_size
a_row_end = min(df_a_id.size, a_row_start + chunk_size)
offset = out_offsets[chunk_id]
for a_row in range(a_row_start, a_row_end):
# Discard ids of `df_a` not in `df_b`
if not found[a_row]:
continue
a_id = df_a_id[a_row]
b_row = b_index[a_id]
ts_a, ts_b = df_a_ts[a_row], df_b_ts[b_row]
ts_min, ts_max = min(ts_a, ts_b), max(ts_a, ts_b)
c_start_row = np.searchsorted(df_c_ts, ts_min, 'left')  # Included
c_end_row = np.searchsorted(df_c_ts, ts_max, 'right')   # Excluded
# If the is no row found in `df_c`
if c_start_row >= c_end_row:
c_start_row = c_end_row = -1                        # Not discarded (may be useful)
# Save results
a_rows[offset] = a_row
b_rows[offset] = b_row
c_rows[offset, 0] = c_start_row
c_rows[offset, 1] = c_end_row
offset += 1
return (a_rows, b_rows, c_rows)

以下是调用函数的方法:

a_rows, b_rows, c_rows = find_matching_rows(
df_a['id'].values, df_a['ts'].values,
df_b['id'].values, df_b['ts'].values,
df_c['ts'].values
)

第2部分:数据帧和用户定义函数

如前所述,通用方法本质上效率低下(对于速度和内存使用)。一种解决方案是调整您的操作以直接在以前的 Numba 代码中应用它们。这将使整体实现非常快速(即并行和JIT编译)和内存效率(即动态计算 - 不需要巨大的临时数据帧)。话虽如此,Numba 不支持通用的纯 Python 对象类型或 pandas 函数,因此这可能需要对实际数据帧进行一些重要的代码返工。

低效的替代方法是从以前由find_matching_rows创建的基于索引的数组创建一个大型临时数据帧。下面是一个 Numba 代码示例:

@nb.njit('(int32[::1], int32[::1], int32[:,::1])')
def build_df_index(a_rows, b_rows, c_rows):
n = a_rows.size
# Count he total number of rows to be computed in df_c
count = 0
for i in range(n):
count += c_rows[i, 1] - c_rows[i, 0]
new_a_rows = np.empty(count, np.int32)
new_b_rows = np.empty(count, np.int32)
new_c_rows = np.empty(count, np.int32)
offset = 0
for i in range(n):
for j in range(c_rows[i, 1] - c_rows[i, 0]):
new_a_rows[offset] = a_rows[i]
new_b_rows[offset] = b_rows[i]
new_c_rows[offset] = c_rows[i,0] + j
offset += 1
return (new_a_rows, new_b_rows, new_c_rows)

生成的索引数组可用于创建最终的数据帧,例如df_a.iloc[new_a_rows]df_b.iloc[new_b_rows]df_c.iloc[new_c_rows]。如果你的实际数据帧只包含统一类型或 Numba 支持的类型,那么你可以直接使用 Numba 生成这个临时数据帧(比 Pandasiloc快得多,尤其是在并行执行的情况下)。

我同意@QuangHong。 处理这些大数据可能效率不高。

但是,我尝试了使用熊猫的示例输入

根据id列合并df_adf_binner加入,因为我们需要两者上的行

df_merge_a_b = df_a.merge(df_b, on=['id'], how='inner')

查找相应行的最小值和最大值

df_merge_a_b["min_ab"] = df_merge_a_b[["ts_x", "ts_y"]].min(axis=1)
df_merge_a_b["max_ab"] = df_merge_a_b[["ts_x", "ts_y"]].max(axis=1)

在最小值和最大值就位后,对于数据帧中的每一行,查找介于最小值和最大值之间的 ID

def get_matching_rows(row):
min_ab = row["min_ab"]
max_ab = row["max_ab"]
result = df_c[df_c["ts"].between(min_ab, max_ab)] 
print(result)
## apply custom function on result and return


df_merge_a_b.apply(lambda x: get_matching_rows(x), axis=1)

示例输出

id  ts other_cols
2  726   4        ...
3  814   6        ...
id  ts other_cols
6  248  12        ...
7  514  13        ...
id  ts other_cols
4  528   9        ...
5  237  10        ...
6  248  12        ...
7  514  13        ...

应用自定义函数并将所有输出连接在一起。

可能不是超级高效......但想在熊猫身上尝试这个样本。

# Set some indices, note how df_c is different.
df_a = df_a.set_index('id')
df_b = df_b.set_index('id')
# Looks like maybe your `ts` is already sorted? If so, `sort_index()` isn't necessary~
df_c = df_c.set_index('ts').sort_index()
# concat them together, then get the min and max from each ts.
df = pd.concat([df_a, df_b])
# Producing the min/max this way should be fast.
# sort=False is optional for performance and means your output will be jumbled like shown below~
df = df.groupby(level=-1, sort=False)['ts'].agg(['min', 'max'])
# Making this work with `raw=True` should improve performance.
# Being able to use `loc` should help.
out = df.apply(lambda x: df_c.loc[x[0]:x[1], 'id'].to_dict(), axis=1, raw=True)
print(out)

输出:

id
1                       {4: 726, 6: 814}
5                                     {}
3                     {12: 248, 13: 514}
2    {9: 528, 10: 237, 12: 248, 13: 514}
dtype: object

我对这种方法没有太大的信心,但我很想知道结果如何~


设置和排序(必要时)索引后,单行将是:

# Only concating `ts` will be faster, no need to drag everything along.
out = (pd.concat([df_a[['ts']], df_b[['ts']]])
.groupby(level=-1, sort=False)['ts']
.agg(['min', 'max'])
.apply(lambda x: df_c.loc[x[0]:x[1], 'id'].to_dict(), axis=1, raw=True)
# See this alternative if only ts are needed:
#.apply(lambda x: set(df_c.loc[x[0]:x[1], 'id'].index), axis=1, raw=True)
)

要向现有答案添加一种可能的优化:如果(min, max)组合中存在重复项,则可以仅对唯一(min, max)值执行df_c查找/计算(或者实现缓存)。

如果时间戳的分辨率相当低(例如天),这可能会大大减少计算量,但如果时间戳的分辨率很高(例如皮秒),则可能没有多大用处。当然,如果你想要快速的近似答案,你总是可以将时间戳四舍五入到一个可以容忍的误差范围。

在实践中,这看起来像:

from pandas import DataFrame, merge
df_a = DataFrame(
data={"id": [1, 5, 3, 2], "ts": [3, 5, 11, 14], "other_cols": ["..."] * 4}
)
df_b = DataFrame(data={"id": [2, 1, 3], "ts": [7, 8, 15], "other_cols": ["..."] * 3})
df_c = DataFrame(
data={
"id": [154, 237, 726, 814, 528, 237, 248, 514],
"ts": [1, 2, 4, 6, 9, 10, 12, 13],
"other_cols": ["..."] * 8,
}
)
# indexing and min/max are adapted the answers by @srinath, @ringo and @BeRT2me
df_a = df_a.set_index("id")["ts"]  # keep only info of interest
df_b = df_b.set_index("id")["ts"]  # keep only info of interest
df = merge(df_a, df_b, how="inner", left_index=True, right_index=True)
df["min"] = df[["ts_x", "ts_y"]].min(axis=1)
df["max"] = df[["ts_x", "ts_y"]].max(axis=1)
df = df[["min", "max"]]
# find unique min-max combinations (drop index to avoid confusion)
unique = df.drop_duplicates().reset_index(drop=True)
# proceed to actual calculations (below is just an example)
# make sure df_c is indexed by ts so we can lookup
df_c = df_c.set_index("ts").sort_index()
# if computation is costly this can be done in parallel, but
# AFAIK this would require using another library, e.g. dask
for tmin, tmax in unique.values:
sub = df_c.loc[tmin:tmax]
print(tmin, tmax, len(sub))
# 3 8 2
# 11 15 2
# 7 14 4

我假设在df_c中执行查找是速率限制步骤,因此我重新编制索引以启用直接查找与搜索。 我还假设有足够的 RAM:请参阅扩展到大型数据集。

import pandas as pd
df_a = pd.DataFrame(data={             # <-- original data frames
'id': [1, 5, 3, 2],
'ts': [3, 5, 11, 14],
'other_columns': ['...'] * 4})
df_b = pd.DataFrame(data={
'id': [2, 1, 3],
'ts': [7, 8, 15],
'other_columns': ['...'] * 3})
df_c = pd.DataFrame(data={
'id': [154, 237, 726, 814, 528, 237, 248, 514],
'ts': [1, 2, 4, 6, 9, 10, 12, 13],
'other_columns': ['...'] * 8})

通过内部连接合并df_adf_b

# combine df_a and df_b to find end-points
xs = pd.merge(
left = df_a.set_index('id').loc[:, 'ts'],
right = df_b.set_index('id').loc[:, 'ts'],
on = 'id', sort = True, validate = 'one_to_one',)
xs['ts_lower'] = xs.min(axis=1)
xs['ts_upper'] = xs.max(axis=1)
endpoints = xs[['ts_lower', 'ts_upper']]
print(endpoints)
ts_lower  ts_upper
id                    
1          3         8
2          7        14
3         11        15

修改df_c,使其ts列为索引。 然后扩展索引以包含endpoints中的值 - 这让我们可以直接索引到df_c中,而不是搜索:

# a) df_c: convert 'ts' to index
df_c = df_c.set_index('ts').sort_index().loc[:, 'id']
# b) df_c: expand the index
idx = (
df_c.index
.append(pd.Index(endpoints['ts_lower'].values))
.append(pd.Index(endpoints['ts_upper'].values))
.drop_duplicates()
.sort_values()
)
df_c = df_c.reindex(idx, method = 'ffill')

通过直接查找导航df_c

endpoints['val_lower'] = df_c[endpoints['ts_lower']].values
endpoints['val_upper'] = df_c[endpoints['ts_upper']].values
print(endpoints)
ts_lower  ts_upper  val_lower  val_upper
id                                          
1          3         8        237        814
2          7        14        814        514
3         11        15        237        514

最新更新