我想在java spark中将字符串值作为ObjectId写入MongoDB。
我试过了。
List<StructField> structFields = new ArrayList<>();
structFields.add(DataTypes.createStructField("oid", DataTypes.StringType, true));
StructType structType = DataTypes.createStructType(structFields);
spark.sqlContext().udf().register("toObjectId", (String publisherId) -> {
return new com.mongodb.spark.sql.fieldTypes.api.java.ObjectId(publisherId);
}, StructType);
dataframe = dataframe.withColumn("pub_id",
functions.callUDF("toObjectId", dataset.col("publisherId").cast(DataTypes.StringType))
);
Map<String, String> writeOverrides = new HashMap<String, String>();
writeOverrides.put("writeConcern.w", "majority");
WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides);
MongoSpark.save(dataset, writeConfig);
但是我得到了这个错误。
Caused by: scala.MatchError: com.mongodb.spark.sql.fieldTypes.api.java.ObjectId@8b2d2973 (of class com.mongodb.spark.sql.fieldTypes.api.java.ObjectId)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:236)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
如何通过JAVA API在spark中写ObjectId到MongoDB ?
您不需要使用udf()
首先导入struct
:
import static org.apache.spark.sql.functions.struct;
然后使用:
dataframe = dataframe.withColumn("pub_id", struct(col("publisherId").as("oid")))