在阅读了如下的配置单元表后,我正试图将数据写入Kafka主题。
write_kafka_data.py:
read_df = spark.sql("select * from db.table where some_column in ('ASIA', 'Europe')")
final_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))
final_df.write.format("kafka")
.option("kafka.bootstrap.servers", kafka_broker)
.option("kafka.batch.size", 51200)
.option("retries", 3)
.option("kafka.max.request.size", 500000)
.option("kafka.max.block.ms", 120000)
.option("kafka.metadata.max.age.ms", 120000)
.option("kafka.request.timeout.ms", 120000)
.option("kafka.linger.ms", 0)
.option("kafka.delivery.timeout.ms", 130000)
.option("acks", "1")
.option("kafka.compression.type", "snappy")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", oauth_config)
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")
.option("kafka.sasl.mechanism", "OAUTHBEARER")
.option("topic", 'topic_name')
.save()
在成功写入(记录数为29000)后,我正在另一个文件中读取与以下主题相同的数据:read_kafka_data.py:
# SCHEMA
schema = StructType([StructField("col1", StringType()),
StructField("col2", IntegerType())
])
# READ FROM TOPIC
jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
+ " oauth.token.endpoint.uri=" + '"' + "uri" + '"'
+ " oauth.client.id=" + '"' + "client_id" + '"'
+ " oauth.client.secret=" + '"' + "secret_key" + '" ;'
stream_df = spark.readStream
.format('kafka')
.option('kafka.bootstrap.servers', kafka_broker)
.option('subscribe', 'topic_name')
.option('kafka.security.protocol', 'SASL_SSL')
.option('kafka.sasl.mechanism', 'OAUTHBEARER')
.option('kafka.sasl.jaas.config', jass_config)
.option('kafka.sasl.login.callback.handler.class', "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")
.option('startingOffsets', 'latest')
.option('group.id', 'group_id')
.option('maxOffsetsPerTrigger', 200)
.option('fetchOffset.retryIntervalMs', 200)
.option('fetchOffset.numRetries', 3)
.load()
.select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')
stream_df.writeStream.outputMode('append')
.format(HiveWarehouseSession.STREAM_TO_STREAM)
.option("database", "database_name")
.option("table", "table_name")
.option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
.option("checkpointLocation", "/path/to/checkpoint/dir")
.start().awaitTermination()
我是Kafka的初学者,一直在阅读Kafka性能优化技术,并遇到了这两种技术。
spark.streaming.backpressure.enabled
和spark.streaming.kafka.maxRatePerPartition
启用第一个参数:
sparkConf.set("spark.streaming.backpressure.enabled",”true”)
官方文件中对上述参数的解释如下:
启用或禁用Spark Streaming的内部背压机制(自1.5起)。这使Spark Streaming能够控制接收基于当前批处理调度延迟和处理时间的速率使得系统接收的速度仅与系统能够处理的速度一样快。在内部,这会动态设置接收器。此速率的上限值为
spark.streaming.receiver.maxRate
和spark.streaming.kafka.maxRatePerPartition
既然我是第一次运行应用程序,而且之前没有微批处理,我应该为spark.streaming.backpressure.initialRate
指定一些值吗
如果是,我应该如何确定spark.streaming.backpressure.initialRate
的值。该文档还指出,如果spark.streaming.backpressure.enabled
被设置为true
,则最大接收速率被动态设置。如果是这种情况,我们是否还需要配置:spark.streaming.receiver.maxRate
和spark.streaming.kafka.maxRatePerPartition
如果CCD_ 11被设置为CCD_?
该链接表示,当施加背压时,使用spark.streaming.backpressure.initialRate
没有影响。
如有任何帮助,我们将不胜感激。
您所指的配置spark.streaming.[...]
属于直接流式传输(又名Spark Streaming),不属于结构化流式传输。
如果您没有意识到差异,我建议您查看单独的编程指南:
- 结构化数据流:使用关系查询处理结构化数据流(使用数据集和数据帧,比DStreams更新的API)
- Spark Streaming:使用DStreams(旧API)处理数据流
结构化流不提供背压机制。当您从Kafka消费时,您可以使用(正如您已经在做的那样)选项maxOffsetsPerTrigger
来设置每个触发器上读取消息的限制。该选项在《结构化流媒体和Kafka集成指南》中记录为:
"每个触发间隔处理的最大偏移数的速率限制。指定的偏移总数将按比例分配到不同卷的topicPartitions中">
如果您仍然对标题问题感兴趣
在使用Kafka进行火花流的情况下,
spark.streaming.kafka.maxRatePerPartition
与spark.streaming.backpressure.enabled
有何关联?
Spark的配置文件中解释了这种关系:
"启用或禁用Spark Streaming的内部背压机制(自1.5起)。这使Spark Streamling能够根据当前的批调度延迟和处理时间来控制接收速率,从而使系统只能以系统能够处理的速度接收。在内部,这动态地设置接收器的最大接收速率如果设置了值
spark.streaming.receiver.maxRate
和spark.streaming.kafka.maxRatePerPartition
,则此速率为上限(请参见下文)";
关于Spark Streaming(DStream,而非结构化流媒体)中可用的背压机制的所有详细信息都在博客中解释,您已经链接了Enable Back Pressure To Make Your Spark StreamingApplication Production Ready。
通常,如果启用背压,则会将spark.streaming.kafka.maxRatePerPartition
设置为最佳估计速率的150%~200%。
PID控制器的精确计算可以在PIDRateEstimator类的代码中找到。
带有火花流的背压示例
正如你所要求的一个例子,这里是我在一个富有成效的应用程序中做的一个:
设置
- Kafka主题有16个分区
- Spark运行时有16个工作核心,因此每个分区都可以并行使用
- 使用Spark流(非结构化流)
- 批处理间隔为10秒
spark.streaming.backpressure.enabled
设置为truespark.streaming.kafka.maxRatePerPartition
设置为10000spark.streaming.backpressure.pid.minRate
保持默认值100- 该作业每秒可以处理大约5000条消息每个分区
- 在启动流作业之前,Kafka主题在每个分区中包含数百万条消息
观察
- 在第一批中,流作业获取16000条消息(=10秒*16个分区*100 pid.minRate)
- 该作业处理这16000条消息的速度相当快,因此PID控制器估计的最佳速率大于10000的maxRatePerPartition
- 因此,在第二批中,流作业获取1600000(=10秒*16个分区*10000个maxRatePerPartition)消息
- 现在,第二批大约需要22秒才能完成
- 因为我们的批处理间隔被设置为10秒,10秒后,流式作业已经安排了第三个微批处理,再次为1600000。原因是PID控制器只能使用完成微批次的性能信息
- 只有在第六个或第七个微批中,PID控制器才能找到每个分区每秒约5000条消息的最佳处理速率