给定我有一些SparkSql
RDD
结果:
CassandraRow{location_id: 163169767097254, context: drinking beer}
CassandraRow{location_id: 376101312892, context: drinking beer}
CassandraRow{location_id: 218866401458875, context: drinking beer}
CassandraRow{location_id: 163169767097254, context: drinking beer}
CassandraRow{location_id: 103760882995742, context: drinking beer}
CassandraRow{location_id: 214680441881239, context: drinking beer}
CassandraRow{location_id: 376101312892, context: ice creams}
CassandraRow{location_id: 193809797319052, context: drinking beer}
CassandraRow{location_id: 106017852771295, context: drinking beer}
CassandraRow{location_id: 166686436690629, context: drinking beer}
CassandraRow{location_id: 203328349712668, context: drinking beer}
CassandraRow{location_id: 103760882995742, context: vacations}
CassandraRow{location_id: 203328349712668, context: drinking beer}
CassandraRow{location_id: 214680441881239, context: drinking beer}
CassandraRow{location_id: 214680441881239, context: drinking beer}
CassandraRow{location_id: 376101312892, context: drinking beer}
CassandraRow{location_id: 166686436690629, context: vacations}
CassandraRow{location_id: 218866401458875, context: ice creams}
我想按location_id
对它们进行分组,得到如下结果:
List(
218866401458875 -> List(ice creams, drinking beer),
166686436690629 -> List(vacations, drinking beer),
376101312892 -> List(ice creams, drinking beer)
// and so on
)
到目前为止,这是我的代码:
val mappedContext = context.map(c => (c.getLong("location_id"), c.getString("context")))
val grouped = mappedContext.groupBy(_._1).mapValues(_.map(_._2))
grouped.foreach(k => {
println(k)
})
但我得到了:
(203328349712668,[Ljava.lang.String;@682d0831)
(106017852771295,[Ljava.lang.String;@3ed36a4c)
(193809797319052,[Ljava.lang.String;@1649ca17)
(214680441881239,[Ljava.lang.String;@9c9d648)
(103760882995742,[Ljava.lang.String;@9253bc0)
(376101312892,[Ljava.lang.String;@2c01a1a2)
(166686436690629,[Ljava.lang.String;@74c0cf47)
(163169767097254,[Ljava.lang.String;@33fc3c01)
(218866401458875,[Ljava.lang.String;@13a7ea85)
(500767133335647,[Ljava.lang.String;@328a55e2)
您应该尝试groupByKey
(而不是groupBy
):
context.map(c => c.getLong("location_id") -> c.getString("context")).groupByKey.foreach(println)