我正在尝试使用旧金山的道路网络数据集,该数据集在此链接中免费提供:
cs.utah.edu/~ lifeifei/SpatialDataset.htm
我只使用边缘数据集,我想在3台机器的集群上划分道路网络地图(目前)。但是,当我尝试在graphRDD上应用'graph.PartitionBy(PartitonStrategy)'函数时,我得到错误为'分区策略无法解决或不是字段'。有人能告诉我为什么会出现这个错误吗?我将把代码放在下面:
SparkConf conf = new SparkConf().setMaster("local").setAppName("graph");
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
ClassTag<String> stringTag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
ClassTag<Integer> intTag = scala.reflect.ClassTag$.MODULE$.apply(Integer.class);
ClassTag<Double> doubleTag = scala.reflect.ClassTag$.MODULE$.apply(Double.class);
List<Edge<Double>> edges = new ArrayList<>();
String inputFileName = "Dataset/SFEdge.txt";
// Edge datset contains edgeId|SourceId|DestinationId|EdgeLength
// edges.add(new Edge<Double>(1, 2, 3.5));
// edges.add(new Edge<Double>(2, 3, 4.8));
readTextEdgeFile(edges, inputFileName);
JavaRDD<Edge<Double>> edgeRDD = javaSparkContext.parallelize(edges);
Graph<String, Double> graph = Graph.fromEdges(edgeRDD.rdd(), "", StorageLevel.MEMORY_ONLY(),
StorageLevel.MEMORY_ONLY(), stringTag, doubleTag);
graph.edges().toJavaRDD().foreach(x -> System.out
.println("Source: " + x.srcId() + " , Destination: " + x.dstId() + ", Distance: " + x.attr$mcD$sp()));
//Error is generated here below this comment
graph.partitionBy(PartitionStrategy.RandomVertexCut$.MODULE$);
}
public static boolean readTextEdgeFile(List<Edge<Double>> edgeList, String txtFileName)
throws FileNotFoundException, IOException {
String line = "";
String txtSplitBy = " ";
boolean removedBOM = false;
try (BufferedReader br = new BufferedReader(new FileReader(txtFileName))) {
while ((line = br.readLine()) != null) {
String[] record = line.split(txtSplitBy);
if (record.length == 4) {
if (!removedBOM && record[0] != "0") {
record[0] = String.valueOf(0);
removedBOM = true;
}
edgeList.add(new Edge<Double>(Integer.parseInt(record[1]), Integer.parseInt(record[2]),
Double.parseDouble(record[3])));
}
}
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
问题出在依赖版本上。我认为新版的GraphX找不到分区策略。因此,我将依赖项更改为较低的版本,并且它工作了。'
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.12</artifactId>
<version>2.4.8</version>
</dependency>
'顺便问一下,有什么方法可以发现图已经被分割了吗?一种验证?