我有一个Kafka主题,称为 a 。
主题中的数据格式 a 是:
{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}
{ id : 2, name:confluent, created_at:2017-09-28 22:00:00.000}
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}
{ id : 4, name:apache, created_at:2017-09-28 24:41:00.000}
现在在消费者方面,我只想获得一个小时的最新数据,这意味着每小时我需要根据create_at
从主题获得最新值我的预期输出是:
{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}
我认为这可以通过KSQL解决,但我不确定。请帮助我。
预先感谢。
是的,您可以使用ksql。尝试以下内容:
CREATE STREAM S1 (id BIGINT, name VARCHAR, created_at VARCHAT) WITH (kafka_topic = 'topic_name', value_format = 'JSON');
CREATE TABLE maxRow AS SELECT id, name, max(STRINGTOTIMESTAMP(created_at, 'yyyy-mm-dd hh:mm:ss.SSS')) AS creted_at FROM s1 WINDOW TUMBLING (size 1 hour) GROUP BY id, name;
结果将以Linux时间戳格式的created_at
时间。您可以在新查询中使用TimestAmptoString UDF将其更改为所需的格式。如果您发现任何问题,请让我知道。