Spark,从Uniq Kafka请求中插入30天数据



我有1个来自Kafka的事件,其中此内容:

(date,
user_id,
app_id,
duration,
session_id,
....)

我使用此代码获取主题:

val kafkaStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
        ssc, kafkaParams, Map(topicSessionDuration -> 2), StorageLevel.MEMORY_AND_DISK_2)
        .map(_._2)
        .map(RawSessionData(_))

我和:

一起存放在卡桑德拉(Cassandra)
   kafkaStream.map(session_duration => (
        session_duration.year,
        session_duration.month,
        session_duration.day,
        session_duration.publisher_id,
        session_duration.app_id,
        session_duration.user_id
        )).saveToCassandra(configServer.getString("cassandra.keyspace"), configServer.getString("cassandra.table.daily.user_by_app"))

在此事件中,我已经存储在15个桌子中,以用于不同的用途。会话数量,用户数量,持续时间....

我需要存储在另一个事件中,但是从这个事件开始,我需要存储30条不同的行(日期 0天到日期 30天)。

我尝试这样做:

    for (a <- 0 to 30) {
       val toto = a
        kafkaStream.map(x => {
            val date = new DateTime(x.date_create).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).withMillisOfSecond(0).plusDays(toto)               
            (
                date.getYear,
                date.getMonthOfYear,
                date.getDayOfMonth,
                x.user_id
                )
        }).saveToCassandra(configServer.getString("cassandra.keyspace"), configServer.getString("cassandra.table.daily.user_30d"))

,但它不起作用。确实,它仅节省12或15或20行或其他数字。并非所有行。

我有一个错误:

Could not compute split, block input-11-1480932491800 not found

我可能做错了什么,但是什么?请问你能帮帮我吗?:)

最好将这种转换表示为:

kafkaStream.flatMap{x => 
    (0 to 30).map{day => 
        val date = new DateTime(x.date_create).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).withMillisOfSecond(0).plusDays(day)               
        (
            date.getYear,
            date.getMonthOfYear,
            date.getDayOfMonth,
            x.user_id
        )
    }}.saveToCassandra(configServer.getString("cassandra.keyspace"),   configServer.getString("cassandra.table.daily.user_30d"))

最新更新