我想为 Pig UDF 创建一个自定义加载函数,我使用链接创建了一个 SimpleTextLoader
https://pig.apache.org/docs/r0.11.0/udf.html,我已经成功为这段代码生成了jar文件,在pig中注册并运行了Pig脚本。我不知道如何解决这个问题,任何帮助将不胜感激。
下面是我的Java代码
public class SimpleTextLoader extends LoadFunc{
protected RecordReader in = null;
private byte fieldDel = 't';
private ArrayList<Object> mProtoTuple = null;
private TupleFactory mTupleFactory = TupleFactory.getInstance();
private static final int BUFFER_SIZE = 1024;
public SimpleTextLoader() {
}
public SimpleTextLoader(String delimiter)
{
this();
if (delimiter.length() == 1) {
this.fieldDel = (byte)delimiter.charAt(0);
} else if (delimiter.length() > 1 && delimiter.charAt(0) == '\') {
switch (delimiter.charAt(1)) {
case 't':
this.fieldDel = (byte)'t';
break;
case 'x':
fieldDel =
Integer.valueOf(delimiter.substring(2), 16).byteValue();
break;
case 'u':
this.fieldDel =
Integer.valueOf(delimiter.substring(2)).byteValue();
break;
default:
throw new RuntimeException("Unknown delimiter " + delimiter);
}
} else {
throw new RuntimeException("PigStorage delimeter must be a single character");
}
}
private void readField(byte[] buf, int start, int end) {
if (mProtoTuple == null) {
mProtoTuple = new ArrayList<Object>();
}
if (start == end) {
// NULL value
mProtoTuple.add(null);
} else {
mProtoTuple.add(new DataByteArray(buf, start, end));
}
} @Override
public Tuple getNext() throws IOException {
try {
boolean notDone = in.nextKeyValue();
if (notDone) {
return null;
}
Text value = (Text) in.getCurrentValue();
System.out.println("printing value" +value);
byte[] buf = value.getBytes();
int len = value.getLength();
int start = 0;
for (int i = 0; i < len; i++) {
if (buf[i] == fieldDel) {
readField(buf, start, i);
start = i + 1;
}
}
// pick up the last field
readField(buf, start, len);
Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple);
mProtoTuple = null;
System.out.println(t);
return t;
} catch (InterruptedException e) {
int errCode = 6018;
String errMsg = "Error while reading input";
throw new ExecException(errMsg, errCode,
PigException.REMOTE_ENVIRONMENT, e);
}
}
@Override
public void setLocation(String string, Job job) throws IOException {
FileInputFormat.setInputPaths(job,string);
}
@Override
public InputFormat getInputFormat() throws IOException {
return new TextInputFormat();
}
@Override
public void prepareToRead(RecordReader reader, PigSplit ps) throws IOException {
in=reader;
}
}
下面是我的猪脚本
REGISTER /home/hadoop/netbeans/sampleloader/dist/sampleloader.jar
a= load '/input.txt' using sampleloader.SimpleTextLoader();
store a into 'output';
您正在使用不执行任何操作的sampleloader.SimpleTextLoader()
,因为它只是一个空的构造函数。
而是使用正在执行拆分实际操作的sampleloader.SimpleTextLoader(String delimiter)
。