我正在尝试处理日志文件并将几乎相似的结果保存到两个不同的地方,而不需要重新处理整个日志文件。
。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Integer> ds = env.fromCollection(bigData());
MapOperator<Integer, Integer> hardWorkDS = ds.map(i -> {
System.out.println("enter hard work");
return hardWork(i);
});
saveToDB(hardWorkDS.collect());
saveToAnotherDB(hardWorkDS.map(i -> moreWork(i)).collect());
此代码将数据源中的元素数的两倍打印为"enter hard work"。我知道这就是它应该如何工作,因为"collect()"从每次调用开始计算整个数据。
是否有一个变通办法,我可以做,以不处理相同的数据两次?
我知道这在流媒体中是可能的,但我不能在这个中使用流媒体。
DataSet程序可以根据需要拥有任意多的数据集。只需在DataSet.output(OutputFormat)
中添加更多的sink,并调用env.execute()
来启动程序。Flink提供了一个JDBCOutputFormat
,您可以使用它将数据写入数据库。
正如您注意到的,您不应该使用collect()
,因为它会立即执行程序。除了防止多个数据接收之外,collect()
还有一个缺点,即它在将数据写入数据库之前先将数据获取到客户端。直接从OutputFormat
写入数据是一种更具可扩展性的解决方案。