Flink 表接收器不适用于 debezium-avro-conflut 源



我使用Flink SQL从Kafka读取debezium avro数据,并将其作为parquet文件存储在S3中。这是我的代码,

import os
from pyflink.datastream import StreamExecutionEnvironment, FsStateBackend
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, StreamTableEnvironment, 
ScalarFunction
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
# start a checkpoint every 12 s
exec_env.enable_checkpointing(12000)
t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)
INPUT_TABLE = 'source'
KAFKA_TOPIC = os.environ['KAFKA_TOPIC']
KAFKA_BOOTSTRAP_SERVER = os.environ['KAFKA_BOOTSTRAP_SERVER']
OUTPUT_TABLE = 'sink'
S3_BUCKET = os.environ['S3_BUCKET']
OUTPUT_S3_LOCATION = os.environ['OUTPUT_S3_LOCATION']
ddl_source = f"""
CREATE TABLE {INPUT_TABLE} (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
`id` BIGINT,
`price` DOUBLE,
`type` INT,
`is_reinvite` INT
) WITH (
'connector' = 'kafka',
'topic' = '{KAFKA_TOPIC}',
'properties.bootstrap.servers' = '{KAFKA_BOOTSTRAP_SERVER}',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.schema-registry.url' = 'http://kafka-production-schema-registry:8081'
)
"""
ddl_sink = f"""
CREATE TABLE {OUTPUT_TABLE} (
`event_time` TIMESTAMP,
`id` BIGINT,
`price` DOUBLE,
`type` INT,
`is_reinvite` INT
) WITH (
'connector' = 'filesystem',
'path' = 's3://{S3_BUCKET}/{OUTPUT_S3_LOCATION}',
'format' = 'parquet'
)
"""
t_env.sql_update(ddl_source)
t_env.sql_update(ddl_sink)
t_env.execute_sql(f"""
INSERT INTO {OUTPUT_TABLE}
SELECT * 
FROM {INPUT_TABLE}
""")

当我提交作业时,我得到以下错误消息,

pyflink.util.exceptions.TableException: Table sink 'default_catalog.default_database.sink' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, price, type, is_reinvite, timestamp])

我使用Flink 1.12.1。源工作正常,我已经使用水槽中的"打印"连接器对其进行了测试。下面是一个示例数据集,它是从任务管理器日志中提取的,当在表接收器中使用'print'连接器时,

-D(2021-02-20T17:07:27.298,14091764,26.0,9,0)
-D(2021-02-20T17:07:27.298,14099765,26.0,9,0)
-D(2021-02-20T17:07:27.299,14189806,16.0,9,0)
-D(2021-02-20T17:07:27.299,14189838,37.0,9,0)
-D(2021-02-20T17:07:27.299,14089840,26.0,9,0)
-D(2021-02-20T17:07:27.299,14089847,26.0,9,0)
-D(2021-02-20T17:07:27.300,14189859,26.0,9,0)
-D(2021-02-20T17:07:27.301,14091808,37.0,9,0)
-D(2021-02-20T17:07:27.301,14089911,37.0,9,0)
-D(2021-02-20T17:07:27.301,14099937,26.0,9,0)
-D(2021-02-20T17:07:27.302,14091851,37.0,9,0)

如何使表汇聚与文件系统连接器一起工作?

实际情况是:

  • 当接收到Debezium记录时,Flink根据Flink的主键,通过添加、删除和抑制Flink行来更新逻辑表。
  • 唯一可以处理这类信息的接收器是那些具有按键更新概念的接收器。Jdbc就是一个典型的例子,在这种情况下,Flink很容易将"带有键foo的Flink行"的概念转换为"bar";使用键foo的JDBC行应该用值bar或其他东西来更新。filesystemsink不支持这种操作,因为文件只能追加。

参见Flink关于追加和更新查询的文档

在实际操作中,为了进行转换,我们首先必须决定在这个只能追加的文件中究竟需要什么。

如果我们想要的是在id更新时在文件中拥有每个项目的最新版本,那么据我所知,应该先将其转换为流,然后用FileSink输出。注意,在这种情况下,结果包含一个布尔值,表示该行是否被更新或删除,并且我们必须决定如何在结果文件中显示此信息。

注意:我使用了Flink SQL烹饪书中的另一个CDC示例来重现类似的设置:


// assuming a Flink retract table of claims build from a CDC stream:
tableEnv.executeSql("" +
" CREATE TABLE accident_claims (n" +
"    claim_id INT,n" +
"    claim_total FLOAT,n" +
"    claim_total_receipt VARCHAR(50),n" +
"    claim_currency VARCHAR(3),n" +
"    member_id INT,n" +
"    accident_date VARCHAR(20),n" +
"    accident_type VARCHAR(20),n" +
"    accident_detail VARCHAR(20),n" +
"    claim_date VARCHAR(20),n" +
"    claim_status VARCHAR(10),n" +
"    ts_created VARCHAR(20),n" +
"    ts_updated VARCHAR(20)" +
") WITH (n" +
"  'connector' = 'postgres-cdc',n" +
"  'hostname' = 'localhost',n" +
"  'port' = '5432',n" +
"  'username' = 'postgres',n" +
"  'password' = 'postgres',n" +
"  'database-name' = 'postgres',n" +
"  'schema-name' = 'claims',n" +
"  'table-name' = 'accident_claims'n" +
" )"
);
// convert it to a stream
Table accidentClaims = tableEnv.from("accident_claims");
DataStream<Tuple2<Boolean, Row>> accidentClaimsStream = tableEnv
.toRetractStream(accidentClaims, Row.class);
// and write to file
final FileSink<Tuple2<Boolean, Row>> sink = FileSink
// TODO: adapt the output format here:
.forRowFormat(new Path("/tmp/flink-demo"),
(Encoder<Tuple2<Boolean, Row>>) (element, stream) -> stream.write((element.toString() + "n").getBytes(StandardCharsets.UTF_8)))
.build();
ordersStreams.sinkTo(sink);
streamEnv.execute();

注意,在转换过程中,您将获得一个布尔值,告诉您该行是该事故索赔的新值,还是删除了该索赔。我的基本FileSink配置只是在输出中包括布尔值,尽管如何处理删除是根据具体情况决定的。

文件中的结果如下所示:

head /tmp/flink-demo/2021-03-09--09/.part-c7cdb74e-893c-4b0e-8f69-1e8f02505199-0.inprogress.f0f7263e-ec24-4474-b953-4d8ef4641998
(true,1,4153.92,null,AUD,412,2020-06-18 18:49:19,Permanent Injury,Saltwater Crocodile,2020-06-06 03:42:25,IN REVIEW,2021-03-09 06:39:28,2021-03-09 06:39:28)
(true,2,8940.53,IpsumPrimis.tiff,AUD,323,2019-03-18 15:48:16,Collision,Blue Ringed Octopus,2020-05-26 14:59:19,IN REVIEW,2021-03-09 06:39:28,2021-03-09 06:39:28)
(true,3,9406.86,null,USD,39,2019-04-28 21:15:09,Death,Great White Shark,2020-03-06 11:20:54,INITIAL,2021-03-09 06:39:28,2021-03-09 06:39:28)
(true,4,3997.9,null,AUD,315,2019-10-26 21:24:04,Permanent Injury,Saltwater Crocodile,2020-06-25 20:43:32,IN REVIEW,2021-03-09 06:39:28,2021-03-09 06:39:28)
(true,5,2647.35,null,AUD,74,2019-12-07 04:21:37,Light Injury,Cassowary,2020-07-30 10:28:53,REIMBURSED,2021-03-09 06:39:28,2021-03-09 06:39:28)

最新更新