如何将JDBCInputFormat的结果转换为Tuple类型的DataSet



我有以下代码从jdbc:获取数据

DataSet dbData =
env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(bdpService.getDriver(dataSource))
.setDBUrl(dbUrl)
.setQuery(sql)
.setUsername(dataSource.getUsername())
.setPassword(dataSource.getPassword())
.setRowTypeInfo(new RowTypeInfo(getTypes(transInfo)))
.finish(), getTypes(transInfo));

然后我想对数据集进行Tuple类型的操作,比如groupBy等。但是它的类型是Row,我该如何转换它,或者用任何其他方法将数据从jdbc获取到DataSet类型?

非常感谢。

我如下解决了它:

public static class RowToTupleMapper implements ResultTypeQueryable, MapFunction<Row, Tuple>{
private Class tupleClass;
private TypeInformation[] typeInformations;
public RowToTupleMapper(Class tupleClass, TypeInformation[] typeInformations){
this.tupleClass = tupleClass;
this.typeInformations = typeInformations;
}
@Override
public Tuple map(Row value) throws Exception {
Tuple tuple = (Tuple)tupleClass.getDeclaredConstructor().newInstance();
for (int i = 0; i < value.getArity(); i++) {
Object obj = value.getField(i);
tuple.setField(obj, i);
}
return tuple;
}
@Override
public TypeInformation getProducedType() {
return new TupleTypeInfo(typeInformations);
}
}

像这样使用这个助手类:

final String tupleClassName = "org.apache.flink.api.java.tuple.Tuple" + transInfo.getSelects().size();
final Class tupleClass = Class.forName(tupleClassName);
DataSet<Tuple> ret = dbData.map(new RowToTupleMapper(tupleClass, typeInformations));

相关内容

  • 没有找到相关文章

最新更新