尝试在Spark DataFrame上使用地图



我最近开始尝试Spark和Java。最初,我使用RDD进行了著名的WordCount示例,一切都按预期进行。现在,我正在尝试实现自己的示例,但使用数据框而不是RDD。

所以我正在从

的文件中读取数据集
DataFrame df = sqlContext.read()
        .format("com.databricks.spark.csv")
        .option("inferSchema", "true")
        .option("delimiter", ";")
        .option("header", "true")
        .load(inputFilePath);

,然后我尝试选择一个特定的列,并将简单的转换应用于每个行

df = df.select("start")
        .map(text -> text + "asd");

但是,该汇编发现了第二行的问题,我不完全理解(开始列以string类型推断)。

在接口scala.function 1

中找到的多个非重叠的抽象方法

为什么我的lambda函数将其视为scala函数,错误消息实际上是什么意思?

如果在数据框架上使用select函数,则会返回数据框架。然后,您将功能应用于Row数据类型,而不是行的值。之后,您应该先获得值,以便进行以下操作:

df.select("start").map(el->el.getString(0)+"asd")

,但是您将获得RDD为返回值,而不是DF

我用concat实现此

df.withColumn( concat(col('start'), lit('asd'))

当您映射相同的文本两次时,我不确定您是否也想更换字符串的第一部分?但是,如果您是,我会做:

df.withColumn('start', concat(
                      when(col('start') == 'text', lit('new'))
                      .otherwise(col('start))
                     , lit('asd')
                     )

使用大数据时,此解决方案会扩展,因为它构成了两个列,而不是迭代值。

相关内容

  • 没有找到相关文章

最新更新