基于scala中的数据类型映射每一列


44,8602,37.19
35,5368,65.89
35,3391,40.64
44,6694,14.98
val sc = new SparkContext("local[*]", "TotalSpentByCustomer")
val input = sc.textFile("C:\Sparcuscopy.csv")
val fields = input.map(x => (x.split("t")(1).toInt, 1, 2.toFloat, 2))
val d = fields.reduceByKey((x,y) => x+y)
val results = d.collect()
results.foreach(print)

获取错误

value reduceByKey不是org.apache.spark.rdd.rdd[(Int,Int,Float,Int(]val d=fields.reduceByKey((x,y(=>x+y(

请建议这是解析字段的正确方法吗?

您可以使用Spark Project SQL库将csv文件加载到具有给定模式的数据帧中,并在需要时将其转换为RDD。

// remove those lines when not using jupyter
interp.load.ivy("org.apache.spark" %% "spark-sql" % "3.2.0")
interp.load.ivy("org.apache.spark" %% "spark-core" % "3.2.0")
import org.apache.spark.sql.types.{StructType, StructField, FloatType, IntegerType};
val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate;
// create a new schema for reading the csv
val schema = new StructType()
.add("Field1", IntegerType, true)
.add("Field2", IntegerType ,true)
.add("Field3", FloatType, true)
val df = spark.read.format("csv")
.schema(schema)
.load("/vagrant/test/test.csv") //replace with desired path
// select only column 1 & 2, not sure if this was intended by the questioner 
val selected = df.select("Field1","Field2")
// convert your dataframe to a rdd
val d = selected.rdd
d.collect().foreach(println)

这为问题的给定输入输出以下行

[44,8602]
[35,5368]
[35,3391]
[44,6694]

最新更新