如何将时间戳作为额外列添加到数据帧中



*大家好,

我有一个简单的问题要问大家。 我有一个RDD,使用createStream方法从kafka流中创建。现在我想在转换为数据帧之前将时间戳作为值添加到此 rdd。我尝试使用 withColumn() 向数据帧添加一个值,但返回此错误*

val topicMaps = Map("topic" -> 1)
    val now = java.util.Calendar.getInstance().getTime()
    val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)
      messages.foreachRDD(rdd =>
          {
            val sqlContext = new org.apache.spark.sql.SQLContext(sc)
            import sqlContext.implicits._
            val dataframe = sqlContext.read.json(rdd.map(_._2))

        val d =dataframe.withColumn("timeStamp_column",dataframe.col("now"))

val d =dataframe.withColumn("timeStamp_column",dataframe.col("now")) org.apache.spark.sql.AnalysisException:无法解析 (action, device_os_ver, device_type, event_name, item_name, lat, lon, memberid, productUpccd, tenantid); at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:15

正如我所知道的那样,数据帧不能被改变,因为它们是不可变的,但RDD也是不可变的。那么最好的方法是什么。如何为 RDD 提供值(动态向 RDD 添加时间戳)。

尝试current_timestamp函数。

import org.apache.spark.sql.functions.current_timestamp    
df.withColumn("time_stamp", current_timestamp())

要添加具有常量(如时间戳)的新列,可以使用lit函数:

import org.apache.spark.sql.functions._
val newDF = oldDF.withColumn("timeStamp_column", lit(System.currentTimeMillis))

这对我有用。我通常会在此之后执行写入。

val d = dataframe.withColumn("SparkLoadedAt", current_timestamp())

在 Scala/Databricks 中:

import org.apache.spark.sql.functions._
val newDF = oldDF.withColumn("Timestamp",current_timestamp())

查看我的输出

我在评论中看到有些人在获取字符串时间戳时遇到问题。 这是一种使用 spark 3 日期时间格式执行此操作的方法

import org.apache.spark.sql.functions._
val d =dataframe. 
  .withColumn("timeStamp_column", date_format(current_timestamp(), "y-M-d'T'H:m:sX"))

相关内容

  • 没有找到相关文章