在Flink sourceFunction中获取ClassNotFound异常



我正在使用协议缓冲区将数据流发送到Apache Flink。我有两个课。一个是生产者,一个是消费者。制作人是Java线程类,它读取来自套接字的数据并将其定为Protobuf,然后将其存储在我的Blockingqueue中消费者是在Flink中实现源函数的类。我用以下方式测试了此程序:

DataStream<Event.MyEvent> stream = env.fromCollection(queue);

而不是自定义源,它可以正常工作。但是,当我尝试使用源函数类时,会引发此例外:

Caused by: java.lang.RuntimeException: Unable to find proto buffer class
at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
...
Caused by: java.lang.ClassNotFoundException: event.Event$MyEvent
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
...

,在另一次尝试中,我将两个都分为一个(实现源函数的类)。我从套接字中获取数据,并用Protobuf将其列出,然后将其存储在Blockingqueue中,然后从Blockingqueue读取。我的代码也可以通过这种方法运行。

,但我想使用两个单独的类(多线程),但它引发了异常。我正在尝试在过去的两天内解决它,但也进行了很多搜索,但没有运气。任何帮助都将被提出。

生产者:

public class Producer implements Runnable {
    Boolean running = true;
    Socket socket = null, bufferSocket = null;
    PrintStream ps = null;
    BlockingQueue<Event.MyEvent> queue;
    final int port;
    public Producer(BlockingQueue<Event.MyEvent> queue, int port){
        this.port = port;
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            socket = new Socket("127.0.0.1", port);
            bufferSocket = new Socket(InetAddress.getLocalHost(), 6060);
            ps = new PrintStream(bufferSocket.getOutputStream());
            while (running) {
                queue.put(Event.MyEvent.parseDelimitedFrom(socket.getInputStream()));
                ps.println("Items in Queue: " + queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

消费者:

public class Consumer implements SourceFunction<Event.MyEvent> {
    Boolean running = true;
    BlockingQueue<Event.MyEvent> queue;
    Event.MyEvent event;
    public Consumer(BlockingQueue<Event.MyEvent> queue){
        this.queue = queue;
    }
    @Override
    public void run(SourceContext<Event.MyEvent> sourceContext) {
        try {
            while (running) {
                event = queue.take();
                sourceContext.collect(event);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    @Override
    public void cancel() {
        running = false;
    }
}

event.myevent是我的Protobuf类。我正在使用2.6.1版,并使用v2.6.1编译了类。我仔细检查了版本,以确保这不是问题。生产者课程正常。我用Flink V1.1.3和V1.1.4进行了测试。我正在本地模式下运行它。


编辑:答案包含在中间,单独发布并在此处删除。

更新12/28/2016

...但是我仍然很好奇。是什么导致此错误?是弗林克的错误还是我做错了什么?

...

问答器已经找到了使此工作的方法。我已经从问题中提取了相关部分。请注意,之所以无法解释。

我没有使用引用语法,因为它是很多文本,但是以下是由Asker共享的:

所以我终于让它开始了。我在sourceFunction(消费者)内创建了我的blockingqueue对象,并从源函数类(消费者)内部称为生产者类,而不是在程序的主要方法中制作blockingqueue和调用生产者类。现在有效!

这是我在Flink中的完整工作代码:

public class Main {
public static void main(String[] args) throws Exception {
    final int port, buffer;
    //final String ip;
    try {
        final ParameterTool params = ParameterTool.fromArgs(args);
        port = params.getInt("p");
        buffer = params.getInt("b");
    } catch (Exception e) {
        System.err.println("No port number and/or buffer size specified.");
        return;
    }
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Event.MyEvent> stream = env.addSource(new Consumer(port, buffer));
    //DataStream<Event.MyEvent> stream = env.fromCollection(queue);

    Pattern<Event.MyEvent, ?> crashedPattern = Pattern.<Event.MyEvent>begin("start")
            .where(new FilterFunction<Event.MyEvent>() {
                @Override
                public boolean filter(Event.MyEvent myEvent) throws Exception {
                    return (myEvent.getItems().getValue() >= 120);
                }
            })
            .<Event.MyEvent>followedBy("next").where(new FilterFunction<Event.MyEvent>() {
                @Override
                public boolean filter(Event.MyEvent myEvent) throws Exception {
                    return (myEvent.getItems().getValue() <= 10);
                }
            })
            .within(Time.seconds(3));
    PatternStream<Event.MyEvent> crashed = CEP.pattern(stream.keyBy(new KeySelector<Event.MyEvent, String>() {
        @Override
        public String getKey(Event.MyEvent myEvent) throws Exception {
            return myEvent.getEventType();
        }
    }), crashedPattern);
    DataStream<String> alarm = crashed.select(new PatternSelectFunction<Event.MyEvent, String>() {
        @Override
        public String select(Map<String, Event.MyEvent> pattern) throws Exception {
            Event.MyEvent start = pattern.get("start");
            Event.MyEvent next = pattern.get("next");
            return start.getEventType() + " | Speed from " + start.getItems().getValue() + " to " + next.getItems().getValue() + " in 3 secondsn";
        }
    });
    DataStream<String> rate = alarm.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
            .apply(new AllWindowFunction<String, String, TimeWindow>() {
                @Override
                public void apply(TimeWindow timeWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {
                    int sum = 0;
                    for (String s: iterable) {
                        sum ++;
                    }
                    collector.collect ("CEP Output Rate: " + sum + "n");
                }
            });
    rate.writeToSocket(InetAddress.getLocalHost().getHostName(), 7070, new SimpleStringSchema());
    env.execute("Flink Taxi Crash Streaming");
}
private static class Producer implements Runnable {
    Boolean running = true;
    Socket socket = null, bufferSocket = null;
    PrintStream ps = null;
    BlockingQueue<Event.MyEvent> queue;
    final int port;
    Producer(BlockingQueue<Event.MyEvent> queue, int port){
        this.port = port;
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            socket = new Socket("127.0.0.1", port);
            bufferSocket = new Socket(InetAddress.getLocalHost(), 6060);
            ps = new PrintStream(bufferSocket.getOutputStream());
            while (running) {
                queue.put(Event.MyEvent.parseDelimitedFrom(socket.getInputStream()));
                ps.println("Items in Queue: " + queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
private static class Consumer implements SourceFunction<Event.MyEvent> {
    Boolean running = true;
    final int port;
    BlockingQueue<Event.MyEvent> queue;
    Consumer(int port, int buffer){
        queue = new ArrayBlockingQueue<>(buffer);
        this.port = port;
    }
    @Override
    public void run(SourceContext<Event.MyEvent> sourceContext) {
        try {
            new Thread(new Producer(queue, port)).start();
            while (running) {
                sourceContext.collect(queue.take());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    @Override
    public void cancel() {
        running = false;
    }
}

相关内容

  • 没有找到相关文章

最新更新