javadoc of SingleOutputStreamOperator#returns(TypeHint<T> typeHint) 方法



我正在阅读SingleOutputStreamOperator#returns的源代码,它的javadoc是:

/**
* Adds a type information hint about the return type of this operator. This method
* can be used in cases where Flink cannot determine automatically what the produced
* type of a function is. That can be the case if the function uses generic type variables
* in the return type that cannot be inferred from the input type.
*
* <p>Use this method the following way:
* <pre>{@code
*     DataStream<Tuple2<String, Double>> result =
*         stream.flatMap(new FunctionWithNonInferrableReturnType())
*               .returns(new TypeHint<Tuple2<String, Double>>(){});
* }</pre>
*
* @param typeHint The type hint for the returned data type.
* @return This operator with the type information corresponding to the given type hint.
*/

它提到FunctionWithNonInferrableReturnType来展示return方法的必要性,但我无法编写NonInferrableReturnType这样的类。我怎样才能写一个简单的?

当文档显示NonInferrableReturnType时,这意味着我们可以使用类型变量<T>,或您喜欢的任何其他字母。因此,您可以创建一个返回TMapFunction。但是,例如,如果您的目标是返回String,则必须使用.returns(TypeInformation.of(String.class)

public class MyMapFunctionNonInferrableReturnType<T> implements MapFunction<AbstractDataModel, T> {
@Override
public T map(AbstractDataModel value) throws Exception {
return (T) value.getValue();
}
}

在这里,我使用的是您上一个问题中的类。当创建具有超类型的MapFunction时,编译失败。没有.returns(TypeInformation.of(String.class))的相同代码编译但引发运行时异常:

由于类型擦除,无法自动确定

。你可以通过在转换调用的结果,或者让您的函数实现"ResultTypeQueryable"接口。

public class NonInferrableReturnTypeStreamJob {
private final List<AbstractDataModel> abstractDataModelList;
private final ValenciaSinkFunction sink;
public NonInferrableReturnTypeStreamJob() {
this.abstractDataModelList = new ArrayList<AbstractDataModel>();
this.abstractDataModelList.add(new ConcreteModel("a", "1"));
this.abstractDataModelList.add(new ConcreteModel("a", "2"));
this.sink = new ValenciaSinkFunction();
}
public NonInferrableReturnTypeStreamJob(List<AbstractDataModel> abstractDataModelList, ValenciaSinkFunction sink) {
this.abstractDataModelList = abstractDataModelList;
this.sink = sink;
}
public static void main(String[] args) throws Exception {
NonInferrableReturnTypeStreamJob concreteModelTest = new NonInferrableReturnTypeStreamJob();
concreteModelTest.execute();
}
public void execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(this.abstractDataModelList)
.map(new MyMapFunctionNonInferrableReturnType())
.returns(TypeInformation.of(String.class))
.addSink(sink);
env.execute();
}
}

如果你愿意的话,这里是这个例子的集成测试:

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;
import org.sense.flink.examples.stream.valencia.ValenciaSinkFunction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertTrue;
public class NonInferrableReturnTypeStreamJobTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster;
private final int minAvailableProcessors = 4;
private final boolean runInParallel;
public NonInferrableReturnTypeStreamJobTest() {
int availableProcessors = Runtime.getRuntime().availableProcessors();
this.runInParallel = availableProcessors >= minAvailableProcessors;
if (this.runInParallel) {
flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(minAvailableProcessors)
.setNumberTaskManagers(1)
.build());
}
}
@Test
public void execute() throws Exception {
List<AbstractDataModel> abstractDataModelList = new ArrayList<AbstractDataModel>();
abstractDataModelList.add(new ConcreteModel("a", "1"));
abstractDataModelList.add(new ConcreteModel("a", "2"));
ValenciaSinkFunction.values.clear();
NonInferrableReturnTypeStreamJob streamJob = new NonInferrableReturnTypeStreamJob(abstractDataModelList, new ValenciaSinkFunction());
streamJob.execute();
List<String> results = ValenciaSinkFunction.values;
assertEquals(2, results.size());
assertTrue(results.containsAll(Arrays.asList("1", "2")));
}
}

相关内容

  • 没有找到相关文章

最新更新