我正在尝试使用Apache Flink 1.3.2在Windows 10上使用Java 1.8.0_144 Ide Eclipse Mars实现日志分析器。
上下文:
- 有多种类型的logmessage。
- 为每种类型创建POJO。
- 为每种类型创建POJO的数据集实例。
- 然后使用表API查询如下所示。
这很好。
DataSet<String> rawLogs = env.readTextFile(input);// input is the data file path
DataSet<FirstBackupMessage> logMsgPOJODataSet = rawLogs.map(new LogMapFunction());
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table LogMessageTable = tableEnv.fromDataSet(logMsgPOJODataSet);
Table result = tableEnv .sql("Select taskId from " + LogMessageTable);
tableEnv.toDataSet(result, Row.class).print();
要求:我正在尝试使用工厂模型概括此实施。为了做到这一点,我正在尝试将POJO类概括为logMessage 接口。在上述情况下:
public class FirstBackupMessage implements LogMessage
similarly
public class SecondBackupMessage implements LogMessage
public class ThirdBackupMessage implements LogMessage
在MapFunction实现中,我正在填充特定类实例,但映射函数的输出映射到通用引用,即logMessage在上面的情况下,它将是
DataSet<LogMessage> logMsgPOJODataSet = rawLogs.map(new LogMapFunction());
//the LogMapFunction.map method is populating FirstBackupMessage
之后,如果我尝试查询pojo firstbackupmessage中存在的字段,但现在参考接口,即logmessage它引发了例外,说明找不到我正在查询的字段。
但是
奇怪的是,如果我用通用引用(即logmsgpojodataset.print()打印数据集,它将在特定的POJO中打印出POJO中的所有字段。
问题:在Flink表API中不允许/可用吗?
表API/SQL库在关系表上操作。通过调用TableEnvironment.fromDataSet(logMsgPOJODataSet)
,DataSet
logMsgPOJODataSet
在逻辑上转换为表。在此过程中,需要根据logMsgPOJODataSet
DataSet
的类型来标识新表格的架构。Flink的数据集API使用TypeInformation
确定DataSet
的数据类型。
由于logMsgPOJODataSet
DataSet
的类型是LogMessage
,因此表API不知道其任何子类型。因此,包括LogMessage
的所有字段,但没有一个子类型字段。
在任何情况下,都无法处理同一表中不同类型的行。所有行必须具有相同的模式。处理这种情况的两种方法将是:
- 使该架构成为所有子类型的超集,并具有无支撑类型的无效值。也许添加另一个指示子类型的字段。
- 添加保存所有子类型数据的通用
Map<String, String>
字段。
在这两种情况下,都需要使用数据集API进行转换,例如使用MapFunction
。