SimpleTextLoader UDF in Pig



我想为 Pig UDF 创建一个自定义加载函数,我使用链接创建了一个 SimpleTextLoader

https://pig.apache.org/docs/r0.11.0/udf.html,我已经成功为这段代码生成了jar文件,在pig中注册并运行了Pig脚本。我不知道如何解决这个问题,任何帮助将不胜感激。

下面是我的Java代码

public class SimpleTextLoader extends LoadFunc{
    protected RecordReader in = null;
    private byte fieldDel = 't';
    private ArrayList<Object> mProtoTuple = null;
    private TupleFactory mTupleFactory = TupleFactory.getInstance();
    private static final int BUFFER_SIZE = 1024;
      public SimpleTextLoader() {
    }
public SimpleTextLoader(String delimiter) 
{
        this();
        if (delimiter.length() == 1) {
            this.fieldDel = (byte)delimiter.charAt(0);
        } else if (delimiter.length() > 1 && delimiter.charAt(0) == '\') {
            switch (delimiter.charAt(1)) {
            case 't':
                this.fieldDel = (byte)'t';
                break;
            case 'x':
               fieldDel =
                    Integer.valueOf(delimiter.substring(2), 16).byteValue();
               break;
            case 'u':
                this.fieldDel =
                    Integer.valueOf(delimiter.substring(2)).byteValue();
                break;
            default:
                throw new RuntimeException("Unknown delimiter " + delimiter);
            }
        } else {
            throw new RuntimeException("PigStorage delimeter must be a single character");
        }
    }
private void readField(byte[] buf, int start, int end) {
        if (mProtoTuple == null) {
            mProtoTuple = new ArrayList<Object>();
        }
        if (start == end) {
            // NULL value
            mProtoTuple.add(null);
        } else {
            mProtoTuple.add(new DataByteArray(buf, start, end));
        }
    }    @Override
    public Tuple getNext() throws IOException {
         try {
            boolean notDone = in.nextKeyValue();
            if (notDone) {
                return null;
            }
            Text value = (Text) in.getCurrentValue();
            System.out.println("printing value"  +value);
            byte[] buf = value.getBytes();
            int len = value.getLength();
            int start = 0;
            for (int i = 0; i < len; i++) {
                if (buf[i] == fieldDel) {
                    readField(buf, start, i);
                    start = i + 1;
                }
            }
            // pick up the last field
            readField(buf, start, len);
            Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
            mProtoTuple = null;
            System.out.println(t);
            return t;
        } catch (InterruptedException e) {
            int errCode = 6018;
            String errMsg = "Error while reading input";
            throw new ExecException(errMsg, errCode,
                    PigException.REMOTE_ENVIRONMENT, e);
        }
    }

    @Override
    public void setLocation(String string, Job job) throws IOException {
        FileInputFormat.setInputPaths(job,string);
    }
    @Override
    public InputFormat getInputFormat() throws IOException {
      return new TextInputFormat();
    }
    @Override
    public void prepareToRead(RecordReader reader, PigSplit ps) throws IOException {
        in=reader;
    }
}

下面是我的猪脚本

REGISTER /home/hadoop/netbeans/sampleloader/dist/sampleloader.jar
a= load '/input.txt' using sampleloader.SimpleTextLoader();
store a into 'output';

您正在使用不执行任何操作的sampleloader.SimpleTextLoader(),因为它只是一个空的构造函数。
而是使用正在执行拆分实际操作的sampleloader.SimpleTextLoader(String delimiter)

相关内容

  • 没有找到相关文章

最新更新