将 PySpark RDD 映射两次以按不同的键分组



我有以下RDD

timeRange = (access_logs
.map(lambda log: (log.date_time, 1))
.reduceByKey(lambda a, b : a + b)
.map(lambda s: s)
.take(2000))
print("IpAddresses by time range: {}".format(timeRange))

我的架构如下所示:

def parse_apache_log_line(logline):
match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
if match is None:
raise Exception("Invalid logline: %s" % logline)
return Row(
ip_address    = match.group(1),
client_identd = match.group(2),
user_id       = match.group(3),
date_time     = match.group(4),
method        = match.group(5),
endpoint      = match.group(6),
protocol      = match.group(7),
response_code = int(match.group(8)),
content_size  = int(match.group(9))
)

示例日志文件:

129.192.176.24 - - [25/May/2015:23:11:16 +0000] "GET / HTTP/1.0" 200 3557 "-" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; snprtz|S26320700000083|2600#Service Pack 1#2#5#154321|isdn)"

我想按时间戳分组和显示,然后按时间戳内的 IP 地址及其计数进行分组和显示。 现在我可以映射IP地址并得到类似('25/May/2015:23:11:15 +0000', 1995)的东西,但我正在寻找类似的东西:('25/May/2015:23:11:15 +0000', ('1.2.3.4', 20)).

您只需按第一步中的键(date_time, ip_address)减少,然后按date_time分组。

试试这个:

timeRange = (access_logs
.map(lambda log: ((log.date_time, log.ip_address), 1))
.reduceByKey(lambda a, b: a + b)
.map(lambda x: (x[0][0], (x[0][1], x[1]))) # <=> (date_time, (ip_address, count))
.groupByKey()
.map(lambda x: (x[0], list(x[1]))) # this final step to get list as groupBy gives ResultIterable object
)

最新更新