如何在Flink中使用Java读取Avrofile进入元组班



我正在尝试读取一个avro文件并在其上执行一些操作,一切正常,但是当我使用它们时,它会得到以下异常:

aggregating on field positions is only possible on tuple data types

然后,我更改我的类以实现tuple4(因为我有4个字段),但是当我想收集结果时,获得avrotypepeexception未知类型:t0

这是我的数据和工作类:

public class Nation{
public Integer N_NATIONKEY;
public String N_NAME;
public Integer N_REGIONKEY;
public String N_COMMENT;
public Integer getN_NATIONKEY() {
    return N_NATIONKEY;
}
public void setN_NATIONKEY(Integer n_NATIONKEY) {
    N_NATIONKEY = n_NATIONKEY;
}
public String getN_NAME() {
    return N_NAME;
}
public void setN_NAME(String n_NAME) {
    N_NAME = n_NAME;
}
public Integer getN_REGIONKEY() {
    return N_REGIONKEY;
}
public void setN_REGIONKEY(Integer n_REGIONKEY) {
    N_REGIONKEY = n_REGIONKEY;
}
public String getN_COMMENT() {
    return N_COMMENT;
}
public void setN_COMMENT(String n_COMMENT) {
    N_COMMENT = n_COMMENT;
}
public Nation() {
}

public static void main(String[] args) throws Exception {
    Configuration parameters = new Configuration();
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    Path path2 = new Path("/Users/violet/Desktop/nation.avro");
    AvroInputFormat<Nation> format = new AvroInputFormat<Nation>(path2,Nation.class);
    format.configure(parameters);
    DataSet<Nation> nation = env.createInput(format);
    nation.aggregate(Aggregations.SUM,0);

    JobExecutionResult res = env.execute();
}

这是元组类和上述作业的相同代码:

public class NationTuple extends Tuple4<Integer,String,Integer,String> {
Integer N_NATIONKEY(){ return this.f0;}
String N_NAME(){return this.f1;}
Integer N_REGIONKEY(){ return this.f2;}
String N_COMMENT(){ return this.f3;}

}

我尝试了这堂课,并获得了Typexception(到处都是国家而不是国家)

我不认为要实现tuple4的班级是正确的方法。取而代

static Tuple4<Integer, String, Integer, String> toTuple(Nation nation) {
  return Tuple4.of(nation.N_NATIONKEY, ...);
}

,然后在您的拓扑调用中:

inputData.map(p -> toTuple(p)).returns(new TypeHint<Tuple4<Integer, String, Integer, String>(){});

唯一细微的部分是您需要提供一种类型的提示,以便Flink可以弄清楚您的功能返回哪种元组。

另一种解决方案是在进行聚合时使用字段名称代替元组字段索引。例如:

groupBy("N_NATIONKEY", "N_REGIONKEY")

这一切都在此处解释:https://ci.apache.org/projects/flink/flink/flink-docs-stable/dev/api_coneptes.html#specifying-keys-keys

相关内容

  • 没有找到相关文章

最新更新