使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet



我是Spark的新手,我想使用group-by&reduce从CSV中找到以下内容(一行按雇用(:

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN

我想简化关于CSV的分组,按部门,指定,状态以及带有总和(成本到公司(和总员工计数的附加列

应该得到这样的结果:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

有没有办法使用转换和操作来实现这一点。还是我们应该进行RDD操作?

过程

  • 创建一个类(模式(来封装你的结构(它不是方法B所必需的,但如果你使用Java,它会让你的代码更易于阅读(

    public class Record implements Serializable {
      String department;
      String designation;
      long costToCompany;
      String state;
      // constructor , getters and setters  
    }
    
  • 正在加载 CVS (JSON( 文件

    JavaSparkContext sc;
    JavaRDD<String> data = sc.textFile("path/input.csv");
    //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
    SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified
    
    JavaRDD<Record> rdd_records = sc.textFile(data).map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             // Here you can use JSON
             // Gson gson = new Gson();
             // gson.fromJson(line, Record.class);
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
             return sd;
          }
    });
    

此时,您有两种方法:

A. SparkSQL

  • 注册表(使用定义的架构类(

    JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
    table.registerAsTable("record_table");
    table.printSchema();
    
  • 使用所需的查询分组依据查询表

    JavaSchemaRDD res = sqlContext.sql("
      select department,designation,state,sum(costToCompany),count(*) 
      from record_table 
      group by department,designation,state
    ");
    
  • 在这里,您还可以使用 SQL 方法执行所需的任何其他查询。

B. 斯帕克

  • 使用组合键映射:DepartmentDesignationState

    JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
    rdd_records.mapToPair(new
      PairFunction<Record, String, Tuple2<Long, Integer>>(){
        public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
          Tuple2<String, Tuple2<Long, Integer>> t2 = 
          new Tuple2<String, Tuple2<Long,Integer>>(
            record.Department + record.Designation + record.State,
            new Tuple2<Long, Integer>(record.costToCompany,1)
          );
          return t2;
    }
    

    }(;

  • 使用复合键减少 ByKey,对列求和costToCompany并按键累积记录数

    JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
     records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
     Integer>, Tuple2<Long, Integer>>() {
        public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
        Tuple2<Long, Integer> v2) throws Exception {
            return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
        }
    });
    
CSV

文件可以使用Spark内置的CSV阅读器进行解析。它会返回 成功读取文件时的数据帧/数据集。在 DataFrame/DataSet,您可以轻松应用类似SQL的操作。

将 Spark 2.x(及更高版本(与 Java 结合使用

创建SparkSession对象又名spark

import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
    .builder()
    .appName("Java Spark SQL Example")
    .getOrCreate();

为具有StructType的行创建架构

import org.apache.spark.sql.types.StructType;
StructType schema = new StructType()
    .add("department", "string")
    .add("designation", "string")
    .add("ctc", "long")
    .add("state", "string");

从 CSV 文件创建数据帧并对其应用架构

Dataset<Row> df = spark.read()
    .option("mode", "DROPMALFORMED")
    .schema(schema)
    .csv("hdfs://path/input.csv");

从CSV文件读取数据的更多选项

现在我们可以通过两种方式聚合数据

1.SQL方式

在 Spark sql 元存储中注册表以执行 SQL 操作

df.createOrReplaceTempView("employee");

对已注册的数据帧运行 SQL 查询

Dataset<Row> sqlResult = spark.sql(
    "SELECT department, designation, state, SUM(ctc), COUNT(department)" 
        + " FROM employee GROUP BY department, designation, state");
sqlResult.show(); //for testing

我们甚至可以直接在CSV文件上执行SQL,而无需使用Spark SQL创建表


2. 对象链或编程或类似Java的方式

对 sql 函数执行必要的导入

import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.sum;

对数据帧/数据集使用groupByagg来执行count和 数据sum

Dataset<Row> dfResult = df.groupBy("department", "designation", "state")
    .agg(sum("ctc"), count("department"));
// After Spark 1.6 columns mentioned in group by will be added to result by default
dfResult.show();//for testing

依赖库

"org.apache.spark" % "spark-core_2.11" % "2.0.0" 
"org.apache.spark" % "spark-sql_2.11" % "2.0.0"

以下内容可能并不完全正确,但它应该让您了解如何处理数据。它并不漂亮,应该用案例类等代替,但作为如何使用 Spark API 的快速示例,我希望它足够:)

val rawlines = sc.textfile("hdfs://.../*.csv")
case class Employee(dep: String, des: String, cost: Double, state: String)
val employees = rawlines
  .map(_.split(",") /*or use a proper CSV parser*/
  .map( Employee(row(0), row(1), row(2), row(3) )
# the 1 is the amount of employees (which is obviously 1 per line)
val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost))
val results = keyVals.reduceByKey{ a,b =>
    (a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost )
}
#debug output
results.take(100).foreach(println)
results
  .map( keyval => someThingToFormatAsCsvStringOrWhatever )
  .saveAsTextFile("hdfs://.../results")

或者你可以使用SparkSQL:

val sqlContext = new SQLContext(sparkContext)
# case classes can easily be registered as tables
employees.registerAsTable("employees")
val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*) 
  from employees 
  group by dep,des,state"""

对于 JSON,如果文本文件每行包含一个 JSON 对象,则可以使用 sqlContext.jsonFile(path) 让 Spark SQL 将其作为SchemaRDD加载(将自动推断架构(。然后,您可以将其注册为表并使用 SQL 进行查询。您还可以手动将文本文件加载为每条记录包含一个 JSON 对象的RDD[String],并使用sqlContext.jsonRDD(rdd)将其转换为SchemaRDD。 当您需要预处理数据时,jsonRDD非常有用。

相关内容

  • 没有找到相关文章