使用 Apache Bean, Dataflow 中的 PCollections 生成图形表示



>我在以下架构中有一个数据源 - "人员 ID"、"地址 ID"。我已将数据加载到PCollection中。

每个人都可以有多个地址ID,每个地址可以分配给多个人。我试图找到的是所有相关的人或共享相同地址ID的人,而不仅仅是在第一级到"n"级。

假设 P1 对 A1 和 A2 有优势,P2 对 A2 有优势,A3 和P3 对 A3 和 A4 有优势。在这种情况下,如果我画一个图,我可以找到 P1 与 P2 相关,因为两者都共享 A2。P1 也与 P3 相关,因为 P1 通过 A2 与 P2 相关,而 P2 通过 A3 与 P3 相关。

我的最终目标是找到这群有关系的人(通过地址ID(。到目前为止,我所做的是尝试利用 Join.innerJoin 来形成温和的表结构,并将其循环到我们需要的任何级别。

PCollection<PeopleAddress> PA =  readEdges(); // 
PCollection<KV<String, PeopleAddress>> KAddressPA = transform(PA); // String is the address ID
PCollection<KV<String, KV<PeopleAddress, PeopleAddress>> data = Join.innerJoin(KAddressPA, KAddressPA);
//Above PCollection will give all first level edges, from this we will form a PeopleToPeople connection
PCollection<PeoplePeopleConnection> PP = getConnection(data); // From LHS and RHS we will read the ids and store. 
// With the new set of People People Connection we can  get new set of PeopleAddress edges..
Class PeoplePeopleConnection { String basePId; String cPId; }
Class PeopleAddress { String pId; String  aId; }

我正在考虑循环上面的代码n次以获得N级连接。但感觉有点过度劳累。许多边缘是重复的。我想知道有没有办法在PCollection中做到这一点。就像当我们找到一个地址与现有人员的连接时,一些如何将其链接到现有的人对象。有些人如何将新的PeopleConnection或PeopleAddress连接合并回单个PColleciton。

解决问题的不同视角?

好的,所以到目前为止,你会有成对的人住在同一个地址,对吧?

PCollection<PeoplePeopleConnection> PP = getConnection(data);

这些对形成一个没有地址的图——只有人,并且有distance=1。我喜欢这个,因为它允许我们专注于人,并丢弃地址。

那么,鉴于(P1, P2)(P2, P3)- 我们如何也(P1, P3)

我们可以做这样的事情:

PCollection<KV<String, String>> twoWayPairs = PP.apply(
FlatMapElements(pair -> Lists.of(KV.of(pair.basePId, pair.cPId), 
KV.of(pair.cPId, pair.basePId))));

然后,我们可以加入他们,就像你以前一样:

PCollection<KV<String, Iterable<String>> groupedData = twoWayPairs
.apply(GroupByKey.create());

给定(P1, P2)(P2, P3)作为输入,这将返回(P2, [P1, P3])(P1, [P2])(P3, [P2])。从这对中,我们可以导出(P1, P3)作为distance=2邻居的列表。

groupedData.apply(FlatMapElements((KV<String, Iterable<String>>) neighbors -> {
List<KV<String, String>> newPairs = cartesianProduct(neighbors.getValue());
if (newPairs.size() == 0) {
return Lists.of(KV.of(neighbors.getKey(), neighbors.getValue().get(0)),
KV.of(neighbors.getValue().get(0), neighbors.getKey()));
} else {
return newPairs;
}
});

为什么我们要检查newPairs是否为空?因为当newPairs元素为空时,我们就会遇到不与其他元素链接的对的情况(例如(P1, [P2])之前(。

所以,最后,你应该能够做这样的事情:

// We get the distance=1 elements:
PCollection<KV<String, String>> twoWayPairs = PP.apply(
FlatMapElements(pair -> Lists.of(KV.of(pair.basePId, pair.cPId), 
KV.of(pair.cPId, pair.basePId))));
for(int i = 1; i < MAX_DISTANCE; i++) {
twoWayPairs = twoWayPairs
.apply(GroupByKey.create())
.apply(FlatMapElements((KV<String, Iterable<String>>) neighbors -> {
List<KV<String, String>> newPairs = cartesianProduct(neighbors.getValue());
if (newPairs.size() == 0) {
return Lists.of(KV.of(neighbors.getKey(), neighbors.getValue().get(0)),
KV.of(neighbors.getValue().get(0), neighbors.getKey()));
} else {
return newPairs;
}
});
}

这应该有助于产生具有distance<N的邻居。

考虑到在这种情况下,被打乱的数据会显着增加,因此在进入非常远的距离之前要小心。

最新更新