如何使用 Apache Flink 计算同一数据集上的两个总和



我有一个简单的流,如果数据是这样的形式:

id | name | eventType | eventTime
----------------------------------
1    A       PLAY        (ts of when the client fired the event)
1    B       IMPRESSION
2    A       CLICK

最终目标是计算 eventType CLICK 的事件总和除以 IMPRESSION 类型的事件类型的总和,该事件类型按 ID 和 NAME 分组,翻转窗口为 60 秒。

在纯SQL中,它看起来像

SELECT d.id, d.name, d.impressionCount, d.clickCount,  d.clickCount / d.impressionCount * 100.0 FROM
( SELECT i.id, i.name, count(*) as clickCount, c.impressionCount from events as i
LEFT JOIN
(
SELECT id, name, count(*) as impressionCount from events WHERE event_type = 'IMPRESSION' GROUP BY id,name
) as c
ON i.id = c.id and i.name = c.name
WHERE event_type = 'CLICK' 
GROUP BY i.id, i.name
) as d

因此,我首先需要创建一个包含点击次数的列和一个包含展示次数的新列,然后使用该表进行除法。

我的问题是.. 使用 Flink APis 做这件事最好是什么?我试图这样做:

Table clickCountTable = eventsTable
.where("eventType = 'CLICK'")
.window(Tumble.over("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("id, name, minuteWindow")
.select("concat(concat(id,'_'), name) as id, eventType.count as clickCount, minuteWindow.rowtime as minute");

和印象相同,然后我加入这两个表。但是我没有得到正确的结果,我不确定这是使用管窗实现我想做的事情的最佳方式。

编辑

这就是我将流转换为表的方式:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
[.....]
DataStream<EventWithCount> eventStreamWithTime = eventStream
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<EventWithCount>() {
@Override
public long extractAscendingTimestamp(EventWithCount element) {
try {
DateFormat df1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
Date parsedDate = df1.parse(element.eventTime);
Timestamp timestamp = new java.sql.Timestamp(parsedDate.getTime());
return timestamp.getTime();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}});
tEnv.fromDataStream(eventStreamWithTime, "id, name, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);

id和每分钟nameCLICK事件进行计数的表 API 查询看起来不错。

Table clickCountTable = eventsTable
.where("eventType = 'CLICK'")
.window(Tumble.over("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("id, name, minuteWindow")
.select("concat(concat(id,'_'), name) as clickId, eventType.count as clickCount, minuteWindow.rowtime as clickMin");

IMPRESSION执行相同的操作:

Table impressionCountTable = eventsTable
.where("eventType = 'IMPRESSION'")
.window(Tumble.over("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("id, name, minuteWindow")
.select("concat(concat(id,'_'), name) as impId, eventType.count as impCount, minuteWindow.rowtime as impMin");

最后,您必须联接两个表:

Table result = impressionCountTable
.leftOuterJoin(clickCountTable, "impId = countId && impMin = countMin")
.select("impId as id, impMin as minute, clickCount / impCount as ratio")

请注意连接条件impMin = countMin。这会将连接转换为最小窗口大小为 1 毫秒的时间窗口连接(ms 是 Flink SQL 中的时间粒度(。

您说,查询的行为不符合您的预期。您能更具体地说明您的预期和实际结果吗?

相关内容

  • 没有找到相关文章

最新更新