spark-cassandra-connector语言 - repartitionByCassandraReplica re



所以,我有一个16节点的集群,其中每个节点都安装了Spark和Cassandra,而我使用Spark-Cassandra连接器3.0.0。我正在尝试在分区键上连接一个带有cassandra表的数据集,同时还尝试使用. repartitionbycassandrareplica .

然而,似乎我只是得到一个空的rdd与0分区(下面的第5行)!知道为什么吗?

Encoder<ExperimentForm> ExpEncoder = Encoders.bean(ExperimentForm.class);
//FYI experimentlist is a List<String>
Dataset<ExperimentForm> dfexplistoriginal = sp.createDataset(experimentlist, Encoders.STRING()).toDF("experimentid").as(ExpEncoder);
JavaRDD<ExperimentForm> predf = CassandraJavaUtil.javaFunctions(dfexplistoriginal.toJavaRDD()).repartitionByCassandraReplica("mdb","experiment",experimentlist.size(),CassandraJavaUtil.someColumns("experimentid"),CassandraJavaUtil.mapToRow(ExperimentForm.class));
System.out.println(predf.collect()); //Here it gives an empty dataset with 0 partitions
Dataset<ExperimentForm> newdfexplist =  sp.createDataset(predf.rdd(), ExpEncoder);
Dataset<Row> readydfexplist = newdfexplist.as(Encoders.STRING()).toDF("experimentid");
Dataset<Row> metlistinitial = sp.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "mdb");
put("table", "experiment");
}
})
.load().select(col("experimentid"), col("description"), col("intensity")).join(readydfexplist, "experimentid");

如果需要,这是Cassandra的实验桌:

CREATE TABLE experiment(
experimentid varchar,
description text,
rt float,
intensity float,
mz float,
identifier text,
chemical_formula text,
filename text,
PRIMARY KEY ((experimentid),description, rt, intensity, mz, identifier, chemical_formula, filename));

这是ExperimentForm类:

public class ExperimentForm {
private String experimentid;
public String getExperimentid() {
return experimentid;
}
public void setExperimentid(String experimentid) {
this.experimentid = experimentid;
}
}

如果您需要其他信息,请告诉我。

答案基本上与这里的Spark-Cassandra相同:repartitionByCassandraReplica或将数据集转换为JavaRDD并返回不维护分区数量?

只需要在RDD上做repartitionByCassandraReplica和JoinWithCassandraTable,然后转换回数据集。

最新更新