Best (PostgreSQL?)增量实体解析/记录链接的数据模型和处理



我正在解决一个问题,我想请你发表意见。

我们正在尝试通过简单的相等比较进行确定性的实体解析/记录链接。增量方式,在流事件上。我正在尝试弄清楚如何在PostgreSQL中做到这一点,同时考虑到扩展。

想象一下,您有一系列事件。为了举例说明,让我们坚持使用网络跟踪中的事件示例。事件可能如下所示(只留下最重要的部分,而不是整个有效负载(:

1. EventID: 1, Cookie: A
2. EventID: 2, Cookie: B
3. EventID: 3, Cookie: A
4. EventID: 4, Cookie: B

如您所见,到目前为止,我们有 2 个属性 - EventID 和 Cookie。因此,从此事件流中,我们希望有一些表示(数据模型(来描述事件 ID 1 和 3 连接到 Cookie A,事件 ID 2 和 4 连接到 Cookie B。 最简单的伪表示(这是我正在努力解决的,也是正在处理的(可能如下所示(设置大括号(:

EventIDs,Identifiers
{"1","3"},{"Cookie|A"}
{"2","4"},{"Cookie|B"}

让我们再做一轮:

5. EventID: 5, Cookie: A, Email: a@example.com
6. EventID: 6, Cookie: B, Email: b@emample.com

在此轮之后,实体表示将如下所示:

EventIDs,Identifiers
{"1","3","5"},{"Cookie|A","Email|a@example.com"}
{"2","4","6"},{"Cookie|B","Email|b@example.com"}

目前为止,一切都好。还有一场活动要设置总决赛:

7. EventID: 7, Cookie: A, Phone: 1234

实体:

EventIDs,Identifiers
{"1","3","5","7"},{"Cookie|A","Email|a@example.com","Phone|1234"}
{"2","4","6"},{"Cookie|B","Email|b@example.com"}

和大结局:

8. EventID: 8, Cookie: B, Phone: 1234

如您所见 - 通过这个新事件,我们发现了一个新链接,我们现在知道这两个实体已连接:

EventIDs,Identifiers
{"1","2","3","4","5","6","7","8"},{"Cookie|A","Cookie|B","Email|a@example.com","Email|b@example.com","Phone|1234"}

这是一个问题陈述:给定事件流,其中每个事件都有自己的唯一 ID 和 1 到 N 个不同类型的标识符,我们如何在 PostgreSQL(或其他工具(中解析(基本上是传递闭包的变化?我愿意接受建议(所以它可以通过增量处理扩展到 1BN 事件和 100M 个实体(以先到者为准(?

到目前为止,我们尝试了什么?Python - 自定义算法。运行良好,但内存受限 - 我们使用 Python 字典来存储实体 ID 及其标识符,以及倒置字典来保存标识符及其实体 ID 以进行快速查找(O(1( 用于查找(。可以想象,在内存中保存此类字典会消耗大量数据 - 在 30M 个实体中,我们的数据为 25GB。不会扩展。

PostgreSQL - 我基本上使用了我描述的相同的表来保存实体。然后,插入到此表的触发器查找 EventID 或 EntityID 与我们要插入的所有候选实体,从实体表中删除它们,将结果与要插入的新行合并并插入此新的合并实体。运行良好,不扩展 - 合并实体时的竞争条件。

Apache Spark - 我将事件流转换为一个图形,其中节点是标识符(电话,电子邮件等(,边缘定义为"标识符与同一事件中的其他标识符一起出现(,并使用了GraphFrame的连接组件算法。效果很好,但它是每次运行的整个历史记录的批处理。但我想它是渐进的。

如果您对我使用的代码感兴趣,请随时询问。为了这篇文章并保持合理的长度,我选择现在不包括它。

我将非常感谢任何将推动我们解决问题的指示、讨论和建议。

非常感谢,我期待着讨论!

您是否考虑过将EventID视为另一个Identifier(例如"EID|1"(会简化倒排索引的维护?

使用这个PostgreSQL表作为基础:

Table "public.matching"
Column     |  Type   | Collation | Nullable | Default 
---------------+---------+-----------+----------+---------
identifier    | text    |           | not null | 
grouping_id   | integer |           | not null | 
event_id_orig | integer |           | not null | 
Indexes:
"pk_matching" PRIMARY KEY, btree (identifier)
"idx_matching_grouping_id" hash (grouping_id)
"idx_matching_hash_pk" hash (identifier)

当新记录到达时,创建identifier集并将EID|x添加到该集,然后运行以下命令:

with event_ident as (
select 1 as event_id, unnest('{"EID|1","Cookie|A"}'::text[]) as identifier
), upd_crit as (
select distinct m.grouping_id, min(m.grouping_id) over () as min_grouping_id
from matching m
join event_ident e on e.identifier = m.identifier
), upd_run as (
update matching
set grouping_id = upd_crit.min_grouping_id
from upd_crit
where upd_crit.grouping_id = matching.grouping_id
and upd_crit.min_grouping_id != matching.grouping_id -- Added in response to comment
), insert_run as (
insert into matching
(identifier, grouping_id, event_id_orig)
select e.identifier,
coalesce(u.min_grouping_id, e.event_id) as grouping_id,
e.event_id as event_id_orig
from event_ident e
cross join (select min(min_grouping_id) as min_grouping_id from upd_crit) u
on conflict (identifier) do nothing
returning *
)
select * from insert_run;

在你前四次事件之后,我有这个:

=# select * from matching;
identifier | grouping_id | event_id_orig 
------------+-------------+---------------
EID|3      |           1 |             3
EID|1      |           1 |             1
Cookie|A   |           1 |             1
EID|4      |           2 |             4
EID|2      |           2 |             2
Cookie|B   |           2 |             2
(6 rows)

使用电子邮件地址的回合后:

=# select * from matching;
identifier      | grouping_id | event_id_orig 
---------------------+-------------+---------------
EID|5               |           1 |             5
Email|a@example.com |           1 |             5
EID|3               |           1 |             3
EID|1               |           1 |             1
Cookie|A            |           1 |             1
EID|6               |           2 |             6
Email|b@example.com |           2 |             6
EID|4               |           2 |             4
EID|2               |           2 |             2
Cookie|B            |           2 |             2
(10 rows)

EventId7 之后:

=# select * from matching;
identifier      | grouping_id | event_id_orig 
---------------------+-------------+---------------
EID|6               |           2 |             6
Email|b@example.com |           2 |             6
EID|4               |           2 |             4
EID|2               |           2 |             2
Cookie|B            |           2 |             2
EID|7               |           1 |             7
Phone|1234          |           1 |             7
EID|5               |           1 |             5
Email|a@example.com |           1 |             5
EID|3               |           1 |             3
EID|1               |           1 |             1
Cookie|A            |           1 |             1
(12 rows)

在添加您的总决赛记录后:

=# select * from matching;
identifier      | grouping_id | event_id_orig 
---------------------+-------------+---------------
EID|8               |           1 |             8
EID|6               |           1 |             6
Email|b@example.com |           1 |             6
EID|4               |           1 |             4
EID|2               |           1 |             2
Cookie|B            |           1 |             2
EID|7               |           1 |             7
Phone|1234          |           1 |             7
EID|5               |           1 |             5
Email|a@example.com |           1 |             5
EID|3               |           1 |             3
EID|1               |           1 |             1
Cookie|A            |           1 |             1
(13 rows)

要以您使用的格式检索,请执行以下操作:

select grouping_id, 
array_agg(distinct event_id_orig) as event_ids, 
array_agg(identifier) filter (where identifier not like 'EID|%') as identifiers
from matching
group by grouping_id;
-[ RECORD 1 ]-----------------------------------------------------------------------
grouping_id | 1
event_ids   | {1,2,3,4,5,6,7,8}
identifiers | {Email|b@example.com,Cookie|B,Phone|1234,Email|a@example.com,Cookie|A}

最新更新