如何在hadoop中实现OR联接(滚烫/级联)



只需将联接字段作为reducer键发送,就可以轻松地通过单个键联接数据集。但用几个键连接记录,其中至少有一个键应该是相同的,这对我来说并不容易

示例我有日志,我想根据用户参数对它们进行分组,我想通过(ipAddress,sessionId,visitorCockies)加入它们

因此,如果log1.ip==log2.ip或log1.session=log2.session或log1.cockie=log2.coockie,则log1应与log2分组。也许可以创建复合密钥或一些概率方法,如minHash。。。

有可能吗?

问题是,MapReduce联接通常是通过为某些字段上匹配的记录提供相同的reduce键来实现的,以便将它们发送到相同的reductor。因此,任何绕过这一点的方法都有点像黑客,但这是可能的。。。

以下是我的建议:对于每个输入记录,生成三个副本,每个副本都有一个新的"key"字段,该字段的前缀是它来自的字段。例如,假设您有以下输入:

(ip=1.2.3.4, session=ABC, cookie=123)
(ip=3.4.5.6, session=DEF, cookie=456)

然后你会生成

(ip=1.2.3.4, session=ABC, cookie=123, key=ip_1.2.3.4)
(ip=1.2.3.4, session=ABC, cookie=123, key=session_ABC)
(ip=1.2.3.4, session=ABC, cookie=123, key=cookie_123)
(ip=3.4.5.6, session=DEF, cookie=456, key=ip_3.4.5.6)
(ip=3.4.5.6, session=DEF, cookie=456, key=session_DEF)
(ip=3.4.5.6, session=DEF, cookie=456, key=cookie_456)

然后你可以简单地在这个新的领域分组。

我不太熟悉烫伤/级联(尽管我一直想了解更多关于它的信息),但这肯定符合Hadoop中连接的一般方式。

按照Joe的描述创建单独的联接后,需要消除重复的联接。数据中的两个元组是重复的,如果它们在"OR联接"中使用的所有字段中都相等。因此,如果之后对表示所有相关字段的键进行自然联接,则会将所有重复项分组在一起。因此,您可以用相应元组的一次出现来替换它们。

让我们看一个例子:假设你有一个包含字段(A,B,C,D)的元组,你感兴趣的字段是A,B和C。对于每个元组,您都将初始元组流与自身连接起来。用(A0,B0,C0,D0)表示第一个流,用(A1,B1,C1,D1)表示第二个流。结果将是元组(A0、B0、C0、D0、A1、B1、C1、D1)。对于这些元组中的每一个,您都要创建一个元组(A0A1B0B1C0C1、A0、B0、C0、D0、A1、B1、C1、D1),因此所有重复项都将在后续的reducer中分组在一起。对于每个组,只返回一个包含的元组。

您能描述更多关于"通过多个键连接记录"的信息吗?

如果您知道工作流中可以连接特定键的点,那么最好的方法可能是定义一个具有多个连接的流,而不是试图操作一个复杂的数据结构,以便在一步中解决N个键。

这里有一个示例应用程序,展示了如何在Cascading中处理不同类型的联接:https://github.com/Cascading/CoPA

对于级联,我最终创建了一个Filter,用于检查OR内任何条件的输出是否为true。级联过滤器输出True/False值,这些值可以选择性使用。

提示:使用类型别名使Scalding代码易于阅读

注意0:这个解决方案特别好,因为它总是只有一个映射作业,即使有更多的键要加入。

注意1:假设每个管道都没有重复的键,否则你必须让'key也有一个索引,它来自哪个日志,而mapTo将是一个flatMapTo,并且有点复杂。

注意2:为了简单起见,这将丢弃连接字段,为了保留它们,您需要一个大而丑陋的元组(ip1、ip2、session1、session2等)。如果您真的想要,我可以写一个保留它们的示例。

注意3:如果你真的想合并重复的值,你可以在后面加一个groupBy logEntry1和logEntry2,生成一个logEntryList,然后生成cat(正如评论中提到的,这对于联接来说是不正常的)。这将再创建2个映射作业。

type String2 = (String, String)
type String3 = (String, String, String)
def addKey(log: Pipe): Pipe = log.flatMap[String3, String](('ip, 'session, 'cookie) -> 'key)(
_.productIterator.toList.zipWithIndex.map {
case (key: String, index: Int) => index.toString + key
}
)
(addKey(log1) ++ addKey(log2)).groupBy('key)(_.toList[String]('logEntry -> 'group))
.mapTo[Iterable[String], String2]('group -> ('logEntry1, 'logEntry2))(list => (list.head, list.last))

最新更新