如何应用CDC?



请如何应用CDC(更改数据捕获)数据库,我使用spark读取,然后将其保存为parquet到HADOOP HDFS。这是代码:

spark = SparkSession 
.builder 
.appName("Ingest") 
.master("local[*]") 
.config("spark.driver.extraClassPath", "/home.../mysql-connector-java-5.1.30.jar") 
.getOrCreate()
df = spark.read
.format("jdbc") 
.option("url", "jdbc:mysql://localhost:3306/classicmodels") 
.option("driver", "com.mysql.jdbc.Driver") 
.option("dbtable", "employees") 
.option("user", "...") 
.option("password", "...").load()
print(df.show())
dataframe_mysql.write.parquet("hdfs://localhost:9000/...")

代码返回从数据框中读取的数据。

Spark不做更改数据捕获。实际上,任何批处理/轮询JDBC客户机也不需要,因为您总是查询after状态,而不是实际的更改事件

对此,经常使用Debezium+Kafka,尽管,也有其他选择

一旦数据存储在Kafka中,它就可以被Kafka Connect, Spark等消费,并写入Parquet

最新更新