我想在我的应用程序 Spark 和 Kafka 中添加一些参数,以便将数据帧写入主题 kafka。
我在 spark-kafka 文档中找不到 acks 和 compression.codec
.write
.format("kafka")
.option("kafka.sasl.mechanism", Config.KAFKA_SASL_MECHANISM)
.option("kafka.security.protocol", Config.KAFKA_SECURITY_PROTOCOL)
.option("kafka.sasl.jaas.config", KAFKA_JAAS_CONFIG)
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
.option("fetchOffset.numRetries", 6)
.option("acks","all")
.option("compression.codec","lz4")
.option("kafka.request.timeout.ms", 120000)
.option("topic", topic)
.save()```
您可以使用此特定属性来定义序列化程序:default.value.serde
对于序列化程序,创建一个案例类或其他一到三列数据帧,该数据帧仅包含key
和value
的Array[Byte]
字段(字符串也可以使用(。然后topic
字符串字段。如果只需要 Kafka 值,则只需要一列数据帧
在写入 Kafka 之前,您需要映射当前数据以序列化所有数据。
然后,文档确实说任何其他生产者属性都
只是以kafka.
更多信息请点击此处 https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka
对于 SASL 属性,我认为您需要在提交期间使用 spark.executor.options
并使用--files
传递关键选项卡或 jaas 文件