我的问题是如何从DataStream
转换为List
,例如为了能够遍历它。
代码看起来像:
package flinkoracle;
//imports
//....
public class FlinkOracle {
final static Logger LOG = LoggerFactory.getLogger(FlinkOracle.class);
public static void main(String[] args) {
LOG.info("Starting...");
// get the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
//get the source from Oracle DB
DataStream<?> source = env
.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("oracle.jdbc.driver.OracleDriver")
.setDBUrl("jdbc:oracle:thin:@localhost:1521")
.setUsername("user")
.setPassword("password")
.setQuery("select * from table1")
.setRowTypeInfo(rowTypeInfo)
.finish());
source.print().setParallelism(1);
try {
LOG.info("----------BEGIN----------");
env.execute();
LOG.info("----------END----------");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
LOG.info("End...");
}
}
提前非常感谢。 溴 塔马斯
Flink 提供了一个迭代器接收器来收集 DataStream 结果以进行测试和调试。它可以按如下方式使用:
import org.apache.flink.contrib.streaming.DataStreamUtils;
DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
您可以将迭代器复制到新列表中,如下所示:
while (iter.hasNext())
list.add(iter.next());
Flink 还在 DataStream 上提供了一堆简单的 write*() 方法,主要用于调试目的。数据刷新到目标系统取决于输出格式的实现。这意味着并非所有发送到输出格式的元素都会立即显示在目标系统中。注意:这些 write*() 方法不参与 Flink 的检查点,在失败的情况下,这些记录可能会丢失。
writeAsText() / TextOutputFormat
writeAsCsv(...) / CsvOutputFormat
print() / printToErr()
writeUsingOutputFormat() / FileOutputFormat
writeToSocket
来源:链接。
您可能需要添加以下依赖项才能使用 DataStreamUtils:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-contrib</artifactId>
<version>0.10.2</version>
</dependency>
在较新版本中,DataStreamUtils::collect
已被弃用。相反,您可以使用DataStream::executeAndCollect
如果给定限制,它将返回最多该大小的List
。
var list = source.executeAndCollect(100);
如果您不知道有多少个元素,或者您只是想遍历结果而不一次将它们全部加载到内存中,那么您可以使用 no-arg 版本来获取CloseableIterator
try (var iterator = source.executeAndCollect()) {
// do something
}