请如何应用CDC(更改数据捕获)数据库,我使用spark读取,然后将其保存为parquet到HADOOP HDFS。这是代码:
spark = SparkSession
.builder
.appName("Ingest")
.master("local[*]")
.config("spark.driver.extraClassPath", "/home.../mysql-connector-java-5.1.30.jar")
.getOrCreate()
df = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/classicmodels")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "employees")
.option("user", "...")
.option("password", "...").load()
print(df.show())
dataframe_mysql.write.parquet("hdfs://localhost:9000/...")
代码返回从数据框中读取的数据。
Spark不做更改数据捕获。实际上,任何批处理/轮询JDBC客户机也不需要,因为您总是查询after状态,而不是实际的更改事件
对此,经常使用Debezium+Kafka,尽管,也有其他选择
一旦数据存储在Kafka中,它就可以被Kafka Connect, Spark等消费,并写入Parquet