在我的管道中,我使用pyflink加载&将数据从RDS和接收器转换为MYSQL。使用FLINK CDC,我可以从RDS获得我想要的数据,并使用JDBC库将其汇接到MYSQL。我的目标是在1个作业中读取1个表,并使用下面的代码示例创建10个其他表(基本上是将一个巨大的表分解成更小的表(。我面临的问题是,尽管使用RocksDB作为状态后端,并在flink cdc中使用scan.incremental.snapshot.chunk.size
、scan.snapshot.fetch.size
和debezium.min.row. count.to.stream.result
等选项,但使用内存仍在不断增长,导致具有2GB内存的Taskmanager失败。我的直觉是,一个简单的select-insert查询无论如何都会加载内存中的所有表!如果是这样,我能以某种方式避免吗?表的大小约为50万行。
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
stmt_set = t_env.create_statement_set()
create_kafka_source= (
"""
CREATE TABLE somethin(
bla INT,
bla1 DOUBLE,
bla2 TIMESTAMP(3),
PRIMARY KEY(bla2) NOT ENFORCED
) WITH (
'connector'='mysql-cdc',
'server-id'='1000',
'debezium.snapshot.mode' = 'when_needed',
'debezium.poll.interval.ms'='5000',
'hostname'= 'som2',
'port' ='som2',
'database-name'='som3',
'username'='som4',
'password'='somepass',
'table-name' = 'atable'
)
"""
)
create_kafka_dest = (
"""CREATE TABLE IF NOT EXISTS atable(
time1 TIMESTAMP(3),
blah2 DOUBLE,
PRIMARY KEY(time_stamp) NOT ENFORCED
) WITH ( 'connector'= 'jdbc',
'url' = 'jdbc:mysql://name1:3306/name1',
'table-name' = 't1','username' = 'user123',
'password' = '123'
)"""
)
t_env.execute_sql(create_kafka_source)
t_env.execute_sql(create_kafka_dest)
stmt_set.add_insert_sql(
"INSERT INTO atable SELECT DISTINCT bla2,bla1,"
"FROM somethin"
)
在流式查询中使用DISTINCT
是非常昂贵的,尤其是在对显著性没有任何时间限制的情况下(例如,每天计算唯一访问者(。我想这就是为什么您的查询需要大量状态的原因。
然而,你应该能够让它发挥作用。RocksDB并不总是表现良好;有时它会消耗比分配的内存更多的内存。
你用的是哪个版本的Flink?Flink 1.11(通过切换到jemalloc(进行了改进,Flink 1.14(通过升级到RocksDB的新版本(进行了进一步改进。所以升级Flink可能会解决这个问题。否则,你可能需要撒谎,告诉Flink它的内存比实际内存少一些,这样当RocksDB越界时,就不会导致内存不足的错误。