我已经扩展了WritableComparable,并希望将其作为mapper值存储为mapper。
public class SenderRecieverPair implements WritableComparable<BinaryComparable> {
Set<InternetAddress> pair = new TreeSet<InternetAddress>(new Comparator<InternetAddress>() {
@Override
public int compare(InternetAddress add1, InternetAddress add2) {
return add1.getAddress().compareToIgnoreCase(add2.getAddress());
}
});
public SenderRecieverPair() {
super();
}
public SenderRecieverPair(InternetAddress add1, InternetAddress add2) {
super();
pair.add(add1);
pair.add(add1);
}
public Set<InternetAddress> getPair() {
return pair;
}
@Override
public void write(DataOutput out) throws IOException {
for (Iterator<InternetAddress> iterator = pair.iterator(); iterator.hasNext();) {
InternetAddress email = (InternetAddress) iterator.next();
String mailAddress = email.getAddress();
if(mailAddress == null) {
mailAddress = "";
}
byte[] address = mailAddress.getBytes("UTF-8");
WritableUtils.writeVInt(out, address.length);
out.write(address, 0, address.length);
String displayName = email.getPersonal();
if(displayName == null) {
displayName = "";
}
byte[] display = displayName.getBytes("UTF-8");
WritableUtils.writeVInt(out, display.length);
out.write(display, 0, display.length);
}
}
@Override
public void readFields(DataInput in) throws IOException {
for (int i = 0; i < 2; i++) {
int length = WritableUtils.readVInt(in);
byte[] container = new byte[length];
in.readFully(container, 0, length);
String mailAddress = new String(container, "UTF-8");
length = WritableUtils.readVInt(in);
container = new byte[length];
in.readFully(container, 0, length);
String displayName = new String(container, "UTF-8");
InternetAddress address = new InternetAddress(mailAddress, displayName);
pair.add(address);
}
}
@Override
public int compareTo(BinaryComparable o) {
// TODO Auto-generated method stub
return 0;
}
}
然而,我得到了以下错误。请帮助我理解并更正此
2013-07-29 06:49:26,753 INFO org.apache.hadoop.util.NativeCodeLoader: Loaded the native-hadoop library
2013-07-29 06:49:26,891 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing JVM Metrics with processName=MAP, sessionId=
2013-07-29 06:49:27,004 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 100
2013-07-29 06:49:27,095 INFO org.apache.hadoop.mapred.MapTask: data buffer = 79691776/99614720
2013-07-29 06:49:27,095 INFO org.apache.hadoop.mapred.MapTask: record buffer = 262144/327680
2013-07-29 06:49:27,965 INFO org.apache.hadoop.mapred.MapTask: Starting flush of map output
2013-07-29 06:49:27,988 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2013-07-29 06:49:27,991 WARN org.apache.hadoop.mapred.Child: Error running child
java.lang.RuntimeException: java.io.EOFException
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:967)
at org.apache.hadoop.util.QuickSort.fix(QuickSort.java:30)
at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:83)
at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:59)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1253)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1154)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:581)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:648)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:322)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
at org.apache.hadoop.mapred.Child.main(Child.java:262)
Caused by: java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:250)
at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:299)
at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:320)
at com.edureka.sumit.enron.datatype.SenderRecieverPair.readFields(SenderRecieverPair.java:68)
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:122)
... 14 more
2013-07-29 06:49:27,993 INFO org.apache.hadoop.mapred.Task: Runnning cleanup for the task
感谢
这是故意的吗?
public SenderRecieverPair(InternetAddress add1, InternetAddress add2) {
super();
pair.add(add1);
pair.add(add1);
}
您要添加add1两次,因此在写循环中,您只能从集合中获得1个元素,而不是两个
几个观察结果:
- 如果你知道你在
SenderRecieverPair
中使用了一对,那么我就不会使用Set——显式地将这两个对象存储为实例变量。该集合允许您无意中向该集合添加额外的值,并且您的write方法将根据集合大小写出0、1、2或更多(您的readFields方法明确要求在for循环中有2) - 其次,如果你坚持使用集合,你应该知道hadoop在对map/reduce任务的调用之间会重新使用对象实例。这意味着,每次调用map/reduce方法时,实际的对象引用都是相同的,只是底层内容将通过调用
readFields
而更改。在您的情况下,不要将pair.clear()
作为readFields方法的第一部分进行调用,这意味着集合将在两次调用之间继续增长 - 最后,使用
InternetAddress
类中的Text对象来存储电子邮件地址和显示名称,然后序列化要简单得多,因为您可以委托该对象,该对象可以委托给Text对象:
例如:
public class InternetAddress implements WritableComparable<InternetAddress> {
protected Text emailAddress = new Text();
protected Text displayName = new Text();
// getter and setters for the above two fields
// ..
// compareTo method
// ..
@Override
public void write(DataOutput out) throws IOException {
emailAddress.write(out);
displayName.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
emailAddress.readFields(in);
displayName.readFields(in);
}
}
public class SenderRecieverPair implements WritableComparable<BinaryComparable> {
protected Set<InternetAddress> pair = new TreeSet<InternetAddress>();
// other methods omitted
..
@Override
public void write(DataOutput out) throws IOException {
int safety = 0;
for (Iterator<InternetAddress> iterator = pair.iterator(); iterator.hasNext();) {
InternetAddress p1 = (InternetAddress) iterator.next();
p1.write(out);
p2 = (InternetAddress) iterator.next();
p2.write(out);
if (++safety == 3) {
throw new IOException("More than two items in pair");
}
}
}
@Override
public void readFields(DataInput in) throws IOException {
pair.clear();
// Note a more efficient method would be to re-use the objects already in the set (which is even easier to do if you don't use a set and just store the two objects as instance variables)
InternetAddress a1 = new InternetAddress();
a1.readFields(in);
pair.add(a1);
InternetAddress a2 = new InternetAddress();
a2.readFields(in);
pair.add(a2);
}
}
哦,我看不到hashCode
方法——如果您使用HashPartitioner
(默认)并在映射器和还原器之间传递这些对象,那么您肯定应该重写这些方法。
java.io.EOFException如果您试图读取文件末尾以外的其他对象,则会引发异常。所以我认为,因为您在readFields方法中循环,这可能是您的问题背后的原因。