,如下所示:最佳实践 - 命名大型Tuplex类型。我在数据流中使用POJO而不是元组。
这就是我的pojo的定义:
public class PositionEvent extends Tuple8<Integer, String, Integer,
Integer, Integer, Integer, Integer, Integer>
如果我尝试将PositionEvent
的数据流保存到CSV文件中,则会抛出一个例外:
source.filter((PositionEvent e) -> e.speed > MAXIMUM_SPEED)
.writeAsCsv(String.format("%s/%s", outputFolder, SPEED_RADAR_FILE))
线程" main" java.lang.illegalargumentException中的例外:writeascsv()方法只能在元组的数据流上使用。
但是,如果我明确将PositionEvent
施加到tuple8,则有效:
source.filter((PositionEvent e) -> e.speed > MAXIMUM_SPEED)
.map((PositionEvent e) ->
(Tuple8<Integer, String, Integer, Integer,
Integer, Integer, Integer, Integer>) e)
.writeAsCsv(String.format("%s/%s", outputFolder, SPEED_RADAR_FILE))
不应该检测到数据流中的对象是Tuple
子类的?
====================
编辑:(感谢Twalthr)
好吧,这是我的pojo:
import org.apache.flink.api.java.tuple.Tuple8;
public class PositionEvent extends Tuple8<Integer, String, Integer,
Integer, Integer, Integer, Integer, Integer> {
public PositionEvent() {
}
public PositionEvent(int timestamp, String vid, int speed, int xway,
int lane, int dir, int seg, int pos) {
super(timestamp, vid, speed, xway, lane, dir, seg, pos);
}
public int getSpeed() {
return f2;
}
}
这是我以前的pojo:
public class PositionEvent extends Tuple8<Integer, String, Integer,
Integer, Integer, Integer, Integer, Integer> {
public int timestamp;
public String vid;
public int speed;
public int xway;
public int lane;
public int dir;
public int seg;
public int pos;
public PositionEvent() {
}
public PositionEvent(int timestamp, String vid, int speed, int xway,
int lane, int dir, int seg, int pos) {
super(timestamp, vid, speed, xway, lane, dir, seg, pos);
}
}
现在我不需要明确施放我的pojo。
似乎不仅扩展了Tuple8
,而且还添加了诸如e.speed
之类的其他字段。这隐含使您的类型成为Pojo。要命名您的字段并保持有效的元组类型,您可以简单地实现Getter,但不要添加其他字段。否则,您可以简单地使用POJO代替元组。
也可能值得研究Flink的桌子&amp;SQL API。它的目的是通过自动处理所有类型来简化开发。