我有1个来自Kafka的事件,其中此内容:
(date,
user_id,
app_id,
duration,
session_id,
....)
我使用此代码获取主题:
val kafkaStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(topicSessionDuration -> 2), StorageLevel.MEMORY_AND_DISK_2)
.map(_._2)
.map(RawSessionData(_))
我和:
一起存放在卡桑德拉(Cassandra) kafkaStream.map(session_duration => (
session_duration.year,
session_duration.month,
session_duration.day,
session_duration.publisher_id,
session_duration.app_id,
session_duration.user_id
)).saveToCassandra(configServer.getString("cassandra.keyspace"), configServer.getString("cassandra.table.daily.user_by_app"))
在此事件中,我已经存储在15个桌子中,以用于不同的用途。会话数量,用户数量,持续时间....
我需要存储在另一个事件中,但是从这个事件开始,我需要存储30条不同的行(日期 0天到日期 30天)。
我尝试这样做:
for (a <- 0 to 30) {
val toto = a
kafkaStream.map(x => {
val date = new DateTime(x.date_create).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).withMillisOfSecond(0).plusDays(toto)
(
date.getYear,
date.getMonthOfYear,
date.getDayOfMonth,
x.user_id
)
}).saveToCassandra(configServer.getString("cassandra.keyspace"), configServer.getString("cassandra.table.daily.user_30d"))
,但它不起作用。确实,它仅节省12或15或20行或其他数字。并非所有行。
我有一个错误:
Could not compute split, block input-11-1480932491800 not found
我可能做错了什么,但是什么?请问你能帮帮我吗?:)
最好将这种转换表示为:
kafkaStream.flatMap{x =>
(0 to 30).map{day =>
val date = new DateTime(x.date_create).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0).withMillisOfSecond(0).plusDays(day)
(
date.getYear,
date.getMonthOfYear,
date.getDayOfMonth,
x.user_id
)
}}.saveToCassandra(configServer.getString("cassandra.keyspace"), configServer.getString("cassandra.table.daily.user_30d"))