我正在阅读SingleOutputStreamOperator#returns
的源代码,它的javadoc是:
/**
* Adds a type information hint about the return type of this operator. This method
* can be used in cases where Flink cannot determine automatically what the produced
* type of a function is. That can be the case if the function uses generic type variables
* in the return type that cannot be inferred from the input type.
*
* <p>Use this method the following way:
* <pre>{@code
* DataStream<Tuple2<String, Double>> result =
* stream.flatMap(new FunctionWithNonInferrableReturnType())
* .returns(new TypeHint<Tuple2<String, Double>>(){});
* }</pre>
*
* @param typeHint The type hint for the returned data type.
* @return This operator with the type information corresponding to the given type hint.
*/
它提到FunctionWithNonInferrableReturnType
来展示return方法的必要性,但我无法编写NonInferrableReturnType
这样的类。我怎样才能写一个简单的?
当文档显示NonInferrableReturnType
时,这意味着我们可以使用类型变量<T>
,或您喜欢的任何其他字母。因此,您可以创建一个返回T
的MapFunction
。但是,例如,如果您的目标是返回String
,则必须使用.returns(TypeInformation.of(String.class)
。
public class MyMapFunctionNonInferrableReturnType<T> implements MapFunction<AbstractDataModel, T> {
@Override
public T map(AbstractDataModel value) throws Exception {
return (T) value.getValue();
}
}
在这里,我使用的是您上一个问题中的类。当创建具有超类型的MapFunction时,编译失败。没有.returns(TypeInformation.of(String.class))
的相同代码编译但引发运行时异常:
。你可以通过在转换调用的结果,或者让您的函数实现"ResultTypeQueryable"接口。
public class NonInferrableReturnTypeStreamJob {
private final List<AbstractDataModel> abstractDataModelList;
private final ValenciaSinkFunction sink;
public NonInferrableReturnTypeStreamJob() {
this.abstractDataModelList = new ArrayList<AbstractDataModel>();
this.abstractDataModelList.add(new ConcreteModel("a", "1"));
this.abstractDataModelList.add(new ConcreteModel("a", "2"));
this.sink = new ValenciaSinkFunction();
}
public NonInferrableReturnTypeStreamJob(List<AbstractDataModel> abstractDataModelList, ValenciaSinkFunction sink) {
this.abstractDataModelList = abstractDataModelList;
this.sink = sink;
}
public static void main(String[] args) throws Exception {
NonInferrableReturnTypeStreamJob concreteModelTest = new NonInferrableReturnTypeStreamJob();
concreteModelTest.execute();
}
public void execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(this.abstractDataModelList)
.map(new MyMapFunctionNonInferrableReturnType())
.returns(TypeInformation.of(String.class))
.addSink(sink);
env.execute();
}
}
如果你愿意的话,这里是这个例子的集成测试:
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;
import org.sense.flink.examples.stream.valencia.ValenciaSinkFunction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertTrue;
public class NonInferrableReturnTypeStreamJobTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster;
private final int minAvailableProcessors = 4;
private final boolean runInParallel;
public NonInferrableReturnTypeStreamJobTest() {
int availableProcessors = Runtime.getRuntime().availableProcessors();
this.runInParallel = availableProcessors >= minAvailableProcessors;
if (this.runInParallel) {
flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(minAvailableProcessors)
.setNumberTaskManagers(1)
.build());
}
}
@Test
public void execute() throws Exception {
List<AbstractDataModel> abstractDataModelList = new ArrayList<AbstractDataModel>();
abstractDataModelList.add(new ConcreteModel("a", "1"));
abstractDataModelList.add(new ConcreteModel("a", "2"));
ValenciaSinkFunction.values.clear();
NonInferrableReturnTypeStreamJob streamJob = new NonInferrableReturnTypeStreamJob(abstractDataModelList, new ValenciaSinkFunction());
streamJob.execute();
List<String> results = ValenciaSinkFunction.values;
assertEquals(2, results.size());
assertTrue(results.containsAll(Arrays.asList("1", "2")));
}
}