小于 'size' 'expires'参数的翻转窗口不起作用



复制

的步骤我试图在滚动窗口中聚合一些数据,然后将过程函数应用于窗口中的数据。我使用expires参数来处理延迟事件(假设我们可以获得一个属于n的事件)n+1分钟)。

def parse_millis(ms):
return datetime.fromtimestamp(int(ms) / 1000)

def process_window_function(window_info, values: list):
logger.info(f"Processing window with "
f"start = {datetime.fromtimestamp(window_info[1][0])}, "
f"end = {datetime.fromtimestamp(window_info[1][1])}")
logger.info(values)

class InputClass(faust.Record, coerce=True):
id: str
timestamp: datetime = DatetimeField(date_parser=parse_millis)
value: int

tumbling_window_table = (
app.Table(
'tumbling_window_table',
default=list,
on_window_close=process_window_function,
)
.tumbling(size=60, expires=timedelta(seconds=10))
.relative_to_field(InputClass.timestamp)
)
input_topic = app.topic("input.topic", value_type=InputClass)

@app.agent(input_topic)
async def process_input(stream):
event: InputClass
async for event in stream:
logger.info(f"Event with timestamp {event.timestamp} is stored in window state")
list_of_values = tumbling_window_table[event.id].value()
list_of_values.append(event.value)
tumbling_window_table[event.id] = list_of_values

预期行为我期望process_window_function调用n窗口只当10秒的n+1窗口将被传递来处理延迟事件

实际行为窗口的process_window_functionn将在windown+1的第一个事件之后立即调用。如果expires参数小于size参数。看起来Faust只是忽略了过期。有了这样的行为,可以晚一点到达的事件将被跳过。

如果expires参数等于或大于size,将正确处理延迟事件,但我不希望延迟超过10秒。

输入

卡夫卡
{"id":"sensor-1","timestamp":1614808641000,"value":1}
{"id":"sensor-1","timestamp":1614808677000,"value":2}
{"id":"sensor-1","timestamp":1614808681000,"value":3}

日志
[2021-03-03 21:58:07,510] [1] [INFO] [^Worker]: Ready 
[2021-03-03 21:58:41,955] [1] [INFO] Event with timestamp 2021-03-03 21:57:21 is stored in window state 
[2021-03-03 21:59:00,574] [1] [INFO] Event with timestamp 2021-03-03 21:57:57 is stored in window state 
[2021-03-03 21:59:16,963] [1] [INFO] Event with timestamp 2021-03-03 21:58:01 is stored in window state 
[2021-03-03 21:59:16,987] [1] [INFO] Processing window with start = 2021-03-03 21:57:00, end = 2021-03-03 21:57:59.900000 
[2021-03-03 21:59:16,988] [1] [INFO] [1, 2] 
<标题>
  • Python版本3.7.9
  • 浮士德版faust-streaming==0.6.1
  • RocksDB versionpython-rocksdb

我有可能在Flink中实现这样的行为,但在Faust中面临这个问题。我做错了什么?

这可能是我遇到的相同问题,如果是这样,这可能是解决方案。我必须手动设置clean_up_interval,因为它默认为30秒。此属性表示检查过期表数据所需的时间。

你可以通过在用典型的app = faust.App()方式定义你的应用程序后设置app.conf.table_clean_up_interval = <time as int or float>来实现。

您可以在settings.py文件中找到这个方法,并在这里找到一个工作示例(最近可能更改了?)

唯一的问题似乎是on_window_close在应用程序崩溃(或重新平衡??)时没有正确触发,这就像在没有工作人员监视的情况下到期的窗口永远消失了,你永远不会知道它们。但我还没有使用RocksDB,只是在内存中,所以也许有更多的东西,你可以帮助?

我仍然试图得到我的头周围晚的事件,因为我使用相同的过程来执行很长时间的聚合(如3个月在15的间隔),但我无法弄清楚是否数据是非常旧的是放在一个窗口,匹配它的时间戳或只是当前窗口。我认为它会根据时间戳与窗口的关系将其放在正确的窗口中,但无法确认。

最新更新