所以当我们使用Java
编写map/reduce
程序时,map收集数据,reducer接收每个键的值列表,如
Map(k, v) -> k1, v1
then shuffle and sort happens
then reducer gets it
reduce(k1, List<values>)
的工作。但是否有可能用streaming
做同样的python
?我使用这个作为参考,似乎reducer获得每行数据的命令行
也许这能帮到你。我从apache找到了这个…org
自定义将行拆分为键/值对的方式如前所述,当Map/Reduce框架从映射器的标准输出中读取一行时,它将该行拆分为一个键/值对。默认情况下,在第一个制表符之前的行前缀为键,其余行(不包括制表符)为值。
但是,您可以自定义此默认值。可以指定制表符以外的字段分隔符(默认值),还可以指定第n个(n>= 1)字符而不是一行中的第一个字符(默认值)作为键和值之间的分隔符。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar
-input myInputDirs
-output myOutputDir
-mapper org.apache.hadoop.mapred.lib.IdentityMapper
-reducer org.apache.hadoop.mapred.lib.IdentityReducer
-D stream.map.output.field.separator=.
-D stream.num.map.output.key.fields=4
在上面的示例中,-D stream.map.output.field.separator=.
指定"。"作为映射输出的字段分隔符,并且在一行中到第四个"。"的前缀将是键,而该行的其余部分(不包括第四个"。")将是值。如果一条线的长度小于4。,那么整行将是键,值将是一个空Text对象(就像new Text(")创建的对象)。
类似地,您可以使用-D stream.reduce.output.field.separator=SEP
和-D stream.num.reduce.output.fields=NUM
指定reduce输出行的第n个字段分隔符作为键和值之间的分隔符。
同样,您可以指定stream.map.input.field.separator
和stream.reduce.input.field.separator
作为map/reduce输入的输入分隔符。默认情况下,分隔符是制表符。
在Hadoop Streaming中,映射器将键值对写入sys.stdout
。Hadoop进行shuffle和排序,并将结果定向到sys.stdin
中的映射器。如何实际处理映射和reduce完全取决于您,只要您遵循该模型(映射到标准输出,从标准输入减少)。这就是为什么它可以在命令行上通过cat data | map | sort | reduce
独立于Hadoop进行测试。
reducer的输入是与映射的相同的键值对,但是是排序的。您可以像示例所演示的那样遍历结果并累积总数,或者您可以进一步将输入传递给itertools.groupby()
,这将为您提供与您习惯的k1, List<values>
相同的输入,并且在reduce()
内置中工作得很好。
关键是要由你来执行reduce。
PipeReducer是Hadoop流的reducer实现。reducer获取键/值,迭代它并将其作为键/值而不是键/值发送给STDIN。这是Hadoop流的默认行为。我没有看到任何选项可以改变这一点,除非Hadoop代码被修改。
public void reduce(Object key, Iterator values, OutputCollector output,
Reporter reporter) throws IOException {
.....
while (values.hasNext()) {
.....
inWriter_.writeKey(key);
inWriter_.writeValue(val);
.....
}
}
根据Hadoop的流引用:
- 键值默认为第一个选项卡前一行的前缀。
当Map/Reduce框架从映射器的标准输出中读取一行时,它将该行拆分为一个键/值对。默认情况下,在第一个制表符之前的行前缀为键,其余行(不包括制表符)为值。
- 分隔符和键位置可以自定义。
但是,您可以自定义此默认值。可以指定制表符以外的字段分隔符(默认值),还可以指定第n个(n>= 1)字符而不是一行中的第一个字符(默认值)作为键和值之间的分隔符。例如:
-
示例代码:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar -D stream.map.output.field.separator=. -D stream.num.map.output.key.fields=4 -input myInputDirs -output myOutputDir -mapper org.apache.hadoop.mapred.lib.IdentityMapper -reducer org.apache.hadoop.mapred.lib.IdentityReducer