将Scala序列化代码转换为执行并行操作



我有一个Scala代码,该代码将CSV作为输入,读取每一行,执行每一行的文档分类,然后存储将文档标签预测到MySQL数据库中。

摘要的问题是,有时CSV有3200行,完成整个操作需要很多时间。我需要转换此代码,例如CSV在执行者之间分发,执行文档预测并存储标签。

以下是代码段 -

    val reader = new CSVReader(new FileReader(args(4)))
    var readFirstLine = false;
    for (row <- reader.readAll) {
        if(readFirstLine) {
            var date = row(1).split(" ");
            var split_date = date(0).split('-').toList;
            val documentTransformed = tf.transform(row(2).split(" "))
            val emotionPredicted = model.predict(documentTransformed)
            val emotionMapped = emotionMaps(emotionPredicted);          
            //Insert Emotions               
            var query = "insert into emotions_values(user_id, year, month, day, emotion)" + "values ('"+ args(5) +"', '"+ split_date(0) +"', '"+ split_date(1) +"', '"+ split_date(2) +"', '"+ emotionMapped +"')";
            statement.executeUpdate(query)
            val polarityPredicted = polarityModel.predict(documentTransformed)
            val polarityMapped = polarityMaps(polarityPredicted);
            //Insert Polarity
            var polarityQuery = "insert into polarity_values(user_id, year, month, day, polarity)" + "values ('"+ args(5) +"', '"+ split_date(0) +"', '"+ split_date(1) +"', '"+ split_date(2) +"', '"+ polarityMapped +"')";
            statement.executeUpdate(polarityQuery)
        }
        else {
            readFirstLine = true;
        }
    }

您需要做的就是使用Spark中的内置CSV功能:

sparkSession.read
    .option("header", "true")
    .option("inferSchema", "true") //Maybe
    .csv(args(4))
    .rdd { row =>
       ...
    }

这将使您的CSV内容变成RDD,然后您可以根据需要进行操纵。请注意,只需将header选项设置为true即可忽略第一行。

我建议您研究是否可以使用csv方法返回的DataFrame - 这将使您能够利用Spark中的催化剂优化 - 与rdd方法返回的RDD相比。

最新更新