如何将CSV作为流式表源加载到PyFlink中



我正在尝试设置一个简单的游乐场环境,以使用Flink Python Table API。我最终要写的乔布斯将来自卡夫卡或凯尼斯的队列,但这使得玩弄想法(和测试(变得非常困难。

我可以很高兴地从CSV加载并在批处理模式下处理它。但我无法让它在流媒体模式下工作。如果不是在StreamingExecutionEnvironment中,我该如何做类似的事情(主要是为了让我可以玩windows(。

我知道我需要让系统使用EventTime(因为ProcTime会同时出现(,但我无论如何都找不到设置它的方法。原则上,我应该能够将CSV的其中一列设置为事件时间,但文档中不清楚如何做到这一点(或者是否可能(。

为了运行批处理执行测试,我使用了以下代码,它从input.csv读取并输出到output.csv

from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
TableConfig,
DataTypes,
BatchTableEnvironment,
StreamTableEnvironment,
)
from pyflink.table.descriptors import Schema, Csv, OldCsv, FileSystem
from pathlib import Path
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
root = Path(__file__).parent.resolve()
out_path = root / "output.csv"
try:
out_path.unlink()
except:
pass
from pyflink.table.window import Tumble
(
t_env.connect(FileSystem().path(str(root / "input.csv")))
.with_format(Csv())
.with_schema(
Schema().field("time", DataTypes.TIMESTAMP(3)).field("word", DataTypes.STRING())
)
.create_temporary_table("mySource")
)
(
t_env.connect(FileSystem().path(str(out_path)))
.with_format(Csv())
.with_schema(
Schema().field("word", DataTypes.STRING()).field("count", DataTypes.BIGINT())
)
.create_temporary_table("mySink")
)
(
t_env.from_path("mySource")
.group_by("word")
.select("word, count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)
t_env.execute("tutorial_job")

input.csv为

2000-01-01 00:00:00.000000000,james
2000-01-01 00:00:00.000000000,james
2002-01-01 00:00:00.000000000,steve

所以我的问题是,我如何设置它,使它从同一个CSV中读取,但使用第一列作为事件时间,并允许我编写如下代码:

(
t_env.from_path("mySource")
.window(Tumble.over("10.minutes").on("time").alias("w"))
.group_by("w, word")
.select("w, word, count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)

任何帮助都将不胜感激,我无法从医生那里解决这个问题。我正在使用python 3.7flink 1.11.1

如果您使用描述符API,您可以通过模式指定一个字段为事件时间字段:

.with_schema(  # declare the schema of the table
Schema()
.field("rowtime", DataTypes.TIMESTAMP())
.rowtime(
Rowtime()
.timestamps_from_field("time")
.watermarks_periodic_bounded(60000))
.field("a", DataTypes.STRING())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())
)

但我还是建议你使用DDL,一方面它更容易使用,另一方面现有的描述符API存在一些bug,社区正在讨论重构描述符API

您尝试过使用水印策略吗?如前所述,您需要使用水印策略来使用事件时间。对于pyflink的情况,我个人认为用这样的ddl格式声明它更容易。

相关内容

  • 没有找到相关文章

最新更新