>我在以下架构中有一个数据源 - "人员 ID"、"地址 ID"。我已将数据加载到PCollection中。
每个人都可以有多个地址ID,每个地址可以分配给多个人。我试图找到的是所有相关的人或共享相同地址ID的人,而不仅仅是在第一级到"n"级。
假设 P1 对 A1 和 A2 有优势,P2 对 A2 有优势,A3 和P3 对 A3 和 A4 有优势。在这种情况下,如果我画一个图,我可以找到 P1 与 P2 相关,因为两者都共享 A2。P1 也与 P3 相关,因为 P1 通过 A2 与 P2 相关,而 P2 通过 A3 与 P3 相关。
我的最终目标是找到这群有关系的人(通过地址ID(。到目前为止,我所做的是尝试利用 Join.innerJoin 来形成温和的表结构,并将其循环到我们需要的任何级别。
PCollection<PeopleAddress> PA = readEdges(); //
PCollection<KV<String, PeopleAddress>> KAddressPA = transform(PA); // String is the address ID
PCollection<KV<String, KV<PeopleAddress, PeopleAddress>> data = Join.innerJoin(KAddressPA, KAddressPA);
//Above PCollection will give all first level edges, from this we will form a PeopleToPeople connection
PCollection<PeoplePeopleConnection> PP = getConnection(data); // From LHS and RHS we will read the ids and store.
// With the new set of People People Connection we can get new set of PeopleAddress edges..
Class PeoplePeopleConnection { String basePId; String cPId; }
Class PeopleAddress { String pId; String aId; }
我正在考虑循环上面的代码n次以获得N级连接。但感觉有点过度劳累。许多边缘是重复的。我想知道有没有办法在PCollection中做到这一点。就像当我们找到一个地址与现有人员的连接时,一些如何将其链接到现有的人对象。有些人如何将新的PeopleConnection或PeopleAddress连接合并回单个PColleciton。
解决问题的不同视角?
好的,所以到目前为止,你会有成对的人住在同一个地址,对吧?
PCollection<PeoplePeopleConnection> PP = getConnection(data);
这些对形成一个没有地址的图——只有人,并且有distance=1
。我喜欢这个,因为它允许我们专注于人,并丢弃地址。
那么,鉴于(P1, P2)
,(P2, P3)
- 我们如何也(P1, P3)
?
我们可以做这样的事情:
PCollection<KV<String, String>> twoWayPairs = PP.apply(
FlatMapElements(pair -> Lists.of(KV.of(pair.basePId, pair.cPId),
KV.of(pair.cPId, pair.basePId))));
然后,我们可以加入他们,就像你以前一样:
PCollection<KV<String, Iterable<String>> groupedData = twoWayPairs
.apply(GroupByKey.create());
给定(P1, P2)
和(P2, P3)
作为输入,这将返回(P2, [P1, P3])
、(P1, [P2])
、(P3, [P2])
。从这对中,我们可以导出(P1, P3)
作为distance=2
邻居的列表。
groupedData.apply(FlatMapElements((KV<String, Iterable<String>>) neighbors -> {
List<KV<String, String>> newPairs = cartesianProduct(neighbors.getValue());
if (newPairs.size() == 0) {
return Lists.of(KV.of(neighbors.getKey(), neighbors.getValue().get(0)),
KV.of(neighbors.getValue().get(0), neighbors.getKey()));
} else {
return newPairs;
}
});
为什么我们要检查newPairs
是否为空?因为当newPairs
元素为空时,我们就会遇到不与其他元素链接的对的情况(例如(P1, [P2])
之前(。
所以,最后,你应该能够做这样的事情:
// We get the distance=1 elements:
PCollection<KV<String, String>> twoWayPairs = PP.apply(
FlatMapElements(pair -> Lists.of(KV.of(pair.basePId, pair.cPId),
KV.of(pair.cPId, pair.basePId))));
for(int i = 1; i < MAX_DISTANCE; i++) {
twoWayPairs = twoWayPairs
.apply(GroupByKey.create())
.apply(FlatMapElements((KV<String, Iterable<String>>) neighbors -> {
List<KV<String, String>> newPairs = cartesianProduct(neighbors.getValue());
if (newPairs.size() == 0) {
return Lists.of(KV.of(neighbors.getKey(), neighbors.getValue().get(0)),
KV.of(neighbors.getValue().get(0), neighbors.getKey()));
} else {
return newPairs;
}
});
}
这应该有助于产生具有distance<N
的邻居。
考虑到在这种情况下,被打乱的数据会显着增加,因此在进入非常远的距离之前要小心。