Apache Spark在Scala中嵌套迭代以生成统计RDD



我有一个由rowkey=client_id,campaigns={campaign_id:ccampaign_name}的Json数组制作的RDD

val clientsRDD = resultRDD.map(ClientRow.parseClientRow)
// change  RDD of ClientRow  objects to a DataFrame
val clientsDF = clientsRDD.toDF()
// Return the schema of this DataFrame
clientsDF.printSchema()
// print each line DataFrame
clientsDF.collect().foreach(println)

输出:

root
|-- rowkey: string (nullable = true)
|-- campaigns: string (nullable = true)
[1,[{"1000":"campaign1"},{"1001":"campaign2"}]]
[2,[{"1002":"campaign3"}]]

我还有一个RDD,里面装满了HBase的所有客户和活动数据的记录。

记录RDD

rowkey                 type         body
client_id-campaign_id, record_type, record_text 

我的目标是为每个客户(考虑其所有活动)和每个活动生成统计信息,例如计算所有client_id记录,按类型分组,以及计算每个活动记录,按类别分组。

client1
records:100, login:20, actions:80
client1 campaign1  
records:70, login:16, actions:50
client1 campaign2
records:30, login:4, actions:30

最后我想写统计数据。

在Spark with Scala中实现这一点的最佳方法是什么?我是否必须迭代clientsRDD(映射?),并为每一行生成不同的RDD映射记录RDD?

首先需要为活动字段定义模式:这意味着使用定义模式

val schema = StructType(Seq(StructField("rowkey", StringType, true),
StructField("campaigns", StructType(
StructField("id", StringType, true) ::
StructField("name", StringType, true) :: Nil
))

))

然后您可以在活动字段中使用explode方法使行变平。

val df = sqlContext.createDataFrame(clientsRDD, schema)
df.select(col("rowkey"), explode(col("campaigns")).as("campaign")).filter(col("campaign.id") === 1)

相关内容

  • 没有找到相关文章

最新更新