是否不允许在Apache Flink Table API中查询POJO数据集的Supertype



我正在尝试使用Apache Flink 1.3.2在Windows 10上使用Java 1.8.0_144 Ide Eclipse Mars实现日志分析器。

上下文:

  • 有多种类型的logmessage。
  • 为每种类型创建POJO。
  • 为每种类型创建POJO的数据集实例。
  • 然后使用表API查询如下所示。

这很好。

DataSet<String> rawLogs = env.readTextFile(input);// input is the data file path
DataSet<FirstBackupMessage> logMsgPOJODataSet = rawLogs.map(new LogMapFunction());
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); 
Table LogMessageTable = tableEnv.fromDataSet(logMsgPOJODataSet);
Table result = tableEnv .sql("Select taskId from " + LogMessageTable);
tableEnv.toDataSet(result, Row.class).print();

要求:我正在尝试使用工厂模型概括此实施。为了做到这一点,我正在尝试将POJO类概括为logMessage 接口。在上述情况下:

public class FirstBackupMessage implements LogMessage
similarly 
public class SecondBackupMessage implements LogMessage
public class ThirdBackupMessage implements LogMessage

在MapFunction实现中,我正在填充特定类实例,但映射函数的输出映射到通用引用,即logMessage在上面的情况下,它将是

DataSet<LogMessage> logMsgPOJODataSet = rawLogs.map(new LogMapFunction());  
//the LogMapFunction.map method is populating FirstBackupMessage

之后,如果我尝试查询pojo firstbackupmessage中存在的字段,但现在参考接口,即logmessage它引发了例外,说明找不到我正在查询的字段。

但是

奇怪的是,如果我用通用引用(即logmsgpojodataset.print()打印数据集,它将在特定的POJO中打印出POJO中的所有字段。

问题:在Flink表API中不允许/可用吗?

表API/SQL库在关系表上操作。通过调用TableEnvironment.fromDataSet(logMsgPOJODataSet)DataSet logMsgPOJODataSet在逻辑上转换为表。在此过程中,需要根据logMsgPOJODataSet DataSet的类型来标识新表格的架构。Flink的数据集API使用TypeInformation确定DataSet的数据类型。

由于logMsgPOJODataSet DataSet的类型是LogMessage,因此表API不知道其任何子类型。因此,包括LogMessage的所有字段,但没有一个子类型字段。

在任何情况下,都无法处理同一表中不同类型的行。所有行必须具有相同的模式。处理这种情况的两种方法将是:

  1. 使该架构成为所有子类型的超集,并具有无支撑类型的无效值。也许添加另一个指示子类型的字段。
  2. 添加保存所有子类型数据的通用Map<String, String>字段。

在这两种情况下,都需要使用数据集API进行转换,例如使用MapFunction

相关内容

  • 没有找到相关文章

最新更新