Apache Flink:执行环境和多接收器



我的问题可能会引起一些混乱,所以请先看描述。确定我的问题可能会有所帮助。我将在问题的末尾添加我的代码(也欢迎有关我的代码结构/实现的任何建议(。提前感谢您的任何帮助!

我的问题:

  1. 如何在 Flink 批处理中定义多个接收器,而无需让它从一个源重复获取数据?

  2. createCollectionEnvironment()getExecutionEnvironment()有什么区别?我应该在本地环境中使用哪一个?

  3. env.execute()有什么用?我的代码将输出没有这句话的结果。如果我添加这句话,它会弹出一个异常:

-

Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'. 
at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:940) 
at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:922) 
at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:34) 
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816) 
at MainClass.main(MainClass.java:114)

描述:编程新手。最近我需要使用 Flink 批处理处理一些数据(分组数据、计算标准偏差等(。 但是,我到了需要输出两个数据集的地步。 结构是这样的

从源(数据库(->数据集 1(使用 zipWithIndex(((-> 数据集 2(在保留索引的同时进行一些计算(->数据集 3 添加索引

首先I输出DataSet 2,索引例如从1到10000; 然后我输出DataSet 3索引从 10001 变为 20000,尽管我没有更改任何函数中的值。 我的猜测是在输出DataSet 3而不是使用 以前计算DataSet 2它从再次从数据库获取数据开始,然后执行计算。 通过使用ZipWithIndex()函数,它不仅给出错误的索引号,而且还增加了与 db 的连接。

我想这与执行环境有关,就像我使用

ExecutionEnvironment

env = ExecutionEnvironment.createCollectionsEnvironment((;

将给出"错误"的索引号(10001-20000( 和

ExecutionEnvironment

env = ExecutionEnvironment.getExecutionEnvironment((;

将给出正确的索引号 (1-10000( 所花费的时间和数据库连接的数量是不同的,打印的顺序将颠倒。

操作系统、数据库、其他环境详情及版本:IntelliJ IDEA 2017.3.5(社区版( 内部版本 #IC-173.4674.33,构建于 2018 年 3 月 6 日 JRE: 1.8.0_152-release-1024-b15 amd64 JVM: OpenJDK 64-bit Server VM by JetBrains s.r.o 视窗 10 10.0

我的测试代码(Java(:

public static void main(String[] args( 抛出异常 { ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment((;

//Table is used to calculate the standard deviation as I figured that there is no such calculation in DataSet.
BatchTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(env);
//Get Data from a mySql database
DataSet<Row> dbData =
env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.cj.jdbc.Driver")
.setDBUrl($database_url)
.setQuery("select value from $table_name where id =33")
.setUsername("username")
.setPassword("password")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.DOUBLE_TYPE_INFO))
.finish()
);
// Add index for assigning group (group capacity is 5)
DataSet<Tuple2<Long, Row>> indexedData = DataSetUtils.zipWithIndex(dbData);
// Replace index(long) with group number(int), and convert Row to double at the same time
DataSet<Tuple2<Integer, Double>> rawData = indexedData.flatMap(new GroupAssigner());
//Using groupBy() to combine individual data of each group into a list, while calculating the mean and range in each group
//put them into a POJO named GroupDataClass
DataSet<GroupDataClass> groupDS = rawData.groupBy("f0").combineGroup(new GroupCombineFunction<Tuple2<Integer, Double>, GroupDataClass>() {
@Override
public void combine(Iterable<Tuple2<Integer, Double>> iterable, Collector<GroupDataClass> collector) {
Iterator<Tuple2<Integer, Double>> it = iterable.iterator();
Tuple2<Integer, Double> var1 = it.next();
int groupNum = var1.f0;
// Using max and min to calculate range, using i and sum to calculate mean
double max = var1.f1;
double min = max;
double sum = 0;
int i = 1;
// The list is to store individual value
List<Double> list = new ArrayList<>();
list.add(max);
while (it.hasNext())
{
double next = it.next().f1;
sum += next;
i++;
max = next > max ? next : max;
min = next < min ? next : min;
list.add(next);
}
//Store group number, mean, range, and 5 individual values within the group
collector.collect(new GroupDataClass(groupNum, sum / i, max - min, list));
}
});
//print because if no sink is created, Flink will not even perform the calculation.
groupDS.print();

// Get the max group number and range in each group to calculate average range
// if group number start with 1 then the maximum of group number equals to the number of group
// However, because this is the second sink, data will flow from source again, which will double the group number
DataSet<Tuple2<Integer, Double>> rangeDS = groupDS.map(new MapFunction<GroupDataClass, Tuple2<Integer, Double>>() {
@Override
public Tuple2<Integer, Double> map(GroupDataClass in) {
return new Tuple2<>(in.groupNum, in.range);
}
}).max(0).andSum(1);
// collect and print as if no sink is created, Flink will not even perform the calculation.
Tuple2<Integer, Double> rangeTuple = rangeDS.collect().get(0);
double range = rangeTuple.f1/ rangeTuple.f0;
System.out.println("range = " + range);
}
public static class GroupAssigner implements FlatMapFunction<Tuple2<Long, Row>, Tuple2<Integer, Double>> {
@Override
public void flatMap(Tuple2<Long, Row> input, Collector<Tuple2<Integer, Double>> out) {
// index 1-5 will be assigned to group 1, index 6-10 will be assigned to group 2, etc.
int n = new Long(input.f0 / 5).intValue() + 1;
out.collect(new Tuple2<>(n, (Double) input.f1.getField(0)));
}
}
  1. 可以将源连接到多个接收器,源仅执行一次,记录将广播到多个接收器。看到这个问题 Flink 可以将结果写入多个文件(如 Hadoop 的 MultipleOutputFormat(吗?

  2. getExecutionEnvironment是当您想要运行作业时获取环境的正确方法。createCollectionEnvironment是玩弄和测试的好方法。查看文档

  3. 异常错误消息非常明确:如果您调用 print 或收集数据流,则会执行。所以你有两个选择:

  • 要么在数据流结束时调用 print/collect,然后执行并打印它。这对测试东西很有好处。请记住,每个数据流只能调用一次 collect/print,否则它会在未完全定义的情况下多次执行
  • 要么在数据流的末尾添加一个接收器并调用 env.execute((。一旦您的流程处于更成熟的状态,这就是您想要做的事情。

相关内容

  • 没有找到相关文章

最新更新