添加配置参数 - spark & Kafka:确认和压缩



我想在我的应用程序 Spark 和 Kafka 中添加一些参数,以便将数据帧写入主题 kafka。

我在 spark-kafka 文档中找不到 acks 和 compression.codec

   .write
   .format("kafka")
   .option("kafka.sasl.mechanism", Config.KAFKA_SASL_MECHANISM)
   .option("kafka.security.protocol", Config.KAFKA_SECURITY_PROTOCOL)
   .option("kafka.sasl.jaas.config", KAFKA_JAAS_CONFIG)
   .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
   .option("fetchOffset.numRetries", 6)
   .option("acks","all")
   .option("compression.codec","lz4")
   .option("kafka.request.timeout.ms", 120000)
   .option("topic", topic)
   .save()```

您可以使用此特定属性来定义序列化程序:default.value.serde

对于序列化程序,创建一个案例类或其他一到三列数据帧,该数据帧仅包含keyvalueArray[Byte]字段(字符串也可以使用(。然后topic字符串字段。如果只需要 Kafka 值,则只需要一列数据帧

在写入 Kafka 之前,您需要映射当前数据以序列化所有数据。

然后,文档确实说任何其他生产者属性都

只是以kafka.

更多信息请点击此处 https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

对于 SASL 属性,我认为您需要在提交期间使用 spark.executor.options 并使用--files传递关键选项卡或 jaas 文件

最新更新