Apache Flink:不能将 writeAsCsv() 与子类元组的数据流一起使用



,如下所示:最佳实践 - 命名大型Tuplex类型。我在数据流中使用POJO而不是元组。

这就是我的pojo的定义:

public class PositionEvent extends Tuple8<Integer, String, Integer, 
    Integer, Integer, Integer, Integer, Integer>

如果我尝试将PositionEvent的数据流保存到CSV文件中,则会抛出一个例外:

source.filter((PositionEvent e) -> e.speed > MAXIMUM_SPEED)
            .writeAsCsv(String.format("%s/%s", outputFolder, SPEED_RADAR_FILE))

线程" main" java.lang.illegalargumentException中的例外:writeascsv()方法只能在元组的数据流上使用。

但是,如果我明确将PositionEvent施加到tuple8,则有效:

source.filter((PositionEvent e) -> e.speed > MAXIMUM_SPEED)
            .map((PositionEvent e) ->
                    (Tuple8<Integer, String, Integer, Integer,
                            Integer, Integer, Integer, Integer>) e)
            .writeAsCsv(String.format("%s/%s", outputFolder, SPEED_RADAR_FILE))

不应该检测到数据流中的对象是Tuple子类的?

====================

编辑:(感谢Twalthr)

好吧,这是我的pojo:

import org.apache.flink.api.java.tuple.Tuple8;
public class PositionEvent extends Tuple8<Integer, String, Integer,
        Integer, Integer, Integer, Integer, Integer> {
    public PositionEvent() {
    }
    public PositionEvent(int timestamp, String vid, int speed, int xway,
                         int lane, int dir, int seg, int pos) {
        super(timestamp, vid, speed, xway, lane, dir, seg, pos);
    }
    public int getSpeed() {
        return f2;
    }
}

这是我以前的pojo:

public class PositionEvent extends Tuple8<Integer, String, Integer,
        Integer, Integer, Integer, Integer, Integer> {
    public int timestamp;
    public String vid;
    public int speed;
    public int xway;
    public int lane;
    public int dir;
    public int seg;
    public int pos;
    public PositionEvent() {
    }
    public PositionEvent(int timestamp, String vid, int speed, int xway,
                         int lane, int dir, int seg, int pos) {
        super(timestamp, vid, speed, xway, lane, dir, seg, pos);
    }
}

现在我不需要明确施放我的pojo。

似乎不仅扩展了Tuple8,而且还添加了诸如e.speed之类的其他字段。这隐含使您的类型成为Pojo。要命名您的字段并保持有效的元组类型,您可以简单地实现Getter,但不要添加其他字段。否则,您可以简单地使用POJO代替元组。

也可能值得研究Flink的桌子&amp;SQL API。它的目的是通过自动处理所有类型来简化开发。

相关内容

  • 没有找到相关文章

最新更新