我有以下代码从jdbc:获取数据
DataSet dbData =
env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(bdpService.getDriver(dataSource))
.setDBUrl(dbUrl)
.setQuery(sql)
.setUsername(dataSource.getUsername())
.setPassword(dataSource.getPassword())
.setRowTypeInfo(new RowTypeInfo(getTypes(transInfo)))
.finish(), getTypes(transInfo));
然后我想对数据集进行Tuple类型的操作,比如groupBy等。但是它的类型是Row,我该如何转换它,或者用任何其他方法将数据从jdbc获取到DataSet类型?
非常感谢。
我如下解决了它:
public static class RowToTupleMapper implements ResultTypeQueryable, MapFunction<Row, Tuple>{
private Class tupleClass;
private TypeInformation[] typeInformations;
public RowToTupleMapper(Class tupleClass, TypeInformation[] typeInformations){
this.tupleClass = tupleClass;
this.typeInformations = typeInformations;
}
@Override
public Tuple map(Row value) throws Exception {
Tuple tuple = (Tuple)tupleClass.getDeclaredConstructor().newInstance();
for (int i = 0; i < value.getArity(); i++) {
Object obj = value.getField(i);
tuple.setField(obj, i);
}
return tuple;
}
@Override
public TypeInformation getProducedType() {
return new TupleTypeInfo(typeInformations);
}
}
像这样使用这个助手类:
final String tupleClassName = "org.apache.flink.api.java.tuple.Tuple" + transInfo.getSelects().size();
final Class tupleClass = Class.forName(tupleClassName);
DataSet<Tuple> ret = dbData.map(new RowToTupleMapper(tupleClass, typeInformations));