Apache Beam with Python :如何在会话窗口中计算最小值,并将其应用于所有相关的 PCollecti



>我正在使用Apache Beam的Python SDK来处理字典,这些字典代表流分析命中。由于会话窗口,点击被聚合。我的 DataFlow 真正要做的就是应用这些会话窗口,并为所有相关命中分配会话 ID。

作为会话 ID,我已经发现我会使用第一次点击的时间戳(与每个用户的 cookie ID 相结合(。这是我的管道:

msgs = p | 'Read' >> beam.io.gcp.pubsub.ReadFromPubSub(
topic='projects/{PROJECT_ID}/topics/{SOURCE_TOPIC}'.format(
PROJECT_ID=PROJECT_ID, SOURCE_TOPIC=SOURCE_TOPIC),
id_label='hit_id',
timestamp_attribute='time')
hits = msgs | 'Parse' >> beam.Map(my_parser)
windowed_hits = hits | 'Windowing' >> beam.WindowInto(beam.window.Sessions(1 * 60))
visit_id = (windowed_hits | 'ExtractTimestamp' >> beam.Map(my_extracter)
| 'GetMinimum' >> beam.CombineGlobally(my_min).without_defaults())
windowed_hits | 'SetVisitId' >>
beam.Map(set_visit_id, visit_id=beam.pvalue.AsSingleton(visit_id))

my_parser正在应用 literal_eval 将字符串转换为字典。 my_extracter正在从命中中取出时间戳。 set_visit_id只是接受一个参数并将其分配给键visit_id。

这似乎行不通。调试时,我的visit_id分支似乎正常工作,并且在计算最小值之前等待会话结束。但是当用作侧面输入时,我只得到一个 pvalue。空边输入。我怎样才能得到我想要的结果,为什么我的代码返回一个空的边输入?

编辑:我已经用AsIter替换了AsSingleton,以了解这里出了什么问题。我得到的是一个_FilteringIterable:

  • _iterable包含一个WindowedWalue。该值是我发送的唯一命中的时间戳(我们称之为 TS1(。它与一个窗口相关联,从 TS1 到 TS1 + 60。奇怪的是,这个 WindowedValue 的时间戳属性是 TS1 + 60(.238(,但我想这是因为计算最小值的分支在计算最小值之前等待会话完成。
  • _target_window包含一个窗口,从TS + 60(.24(到TS + 120(.24(。

所以我想问题是这个_target_window,但我不明白为什么它的范围从 TS + 60 到 TS + 120。可能是因为窗口值的时间戳吗?这似乎是可能的,因为_target_window的边界似乎来自其四舍五入的值。

我最终通过扔掉任何组合并用 GroupByKey 替换它来管理我想做的事情。

def my_parser(msg):
result = literal_eval(msg)
return result
def set_key(hit):
return (hit['cid'], hit)
def set_vid2(keyed_hits):
k, hits = keyed_hits
visit_id = min([h['time'] for h in hits])
for h in hits:
h['visit_id'] = visit_id
return hits
def unpack_list(l):
for d in l:
yield d
msgs = p | 'Read' >> beam.io.gcp.pubsub.ReadFromPubSub(
topic='projects/{PROJECT_ID}/topics/{SOURCE_TOPIC}'.format(
PROJECT_ID=PROJECT_ID, SOURCE_TOPIC=SOURCE_TOPIC),
id_label='hit_id',
timestamp_attribute='time')
hits = msgs | 'Parse' >> beam.Map(my_parser)
keyed_hits = hits | 'SetKey' >> beam.Map(set_key)
windowed_hits = (keyed_hits | 'Windowing' >> beam.WindowInto(beam.window.Sessions(1 * 60))
| 'Grouping' >> beam.GroupByKey())
clean_hits = windowed_hits | 'ComputeMin' >> beam.Map(set_vid2)
clean_hits | 'Unpack' >> beam.FlatMap(unpack_list)

在 GroupByKey 之后,我有一个包含命中列表的 PCollection(按 cookie ID + 会话窗口分组(。然后,一旦计算了访问 ID 并在每次点击时设置,我就会将命中列表的 PCollection 转换为具有unpack_list的命中 PColction。

我不确定这是正确的方法,以及它是否对性能有任何影响。

最新更新