使用Spark和Kafka API从Azure Databricks到Azure Event Hubs的连续数据生成器,



我正在尝试实现从Databricks到Event Hub的连续数据生成器。

我的想法是在.csv文件中生成一些数据,然后用这些数据创建一个数据框架。在循环中,我调用一个函数,该函数执行查询以将数据流式传输到Event Hub。不确定是否这个想法是好的,或者如果火花可以处理写入从相同的数据框架,或者如果我理解正确查询如何工作。

代码如下:

def write_to_event_hub(
df: DataFrame,
topic: str,
bootstrap_servers: str,
config: str,
checkpoint_path: str,
):
return (
df.writeStream.format("kafka")
.option("topic", topic)
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", config)
.option("checkpointLocation", checkpoint_path)
.trigger(once=True)
.start()
)

while True:
query = write_to_event_hub(
streaming_df,
topic,
bootstrap_servers,
sasl_jaas_config,
"/checkpoint",
)
query.awaitTermination()
print("Wrote once")
time.sleep(5)

我想提一下,这就是我如何从CSV文件中读取数据(我在DBFS中有它),我也有它的模式:

streaming_df = (
spark.readStream.format("csv")
.option("header", "true")
.schema(location_schema)
.load(f"{path}")
)

看起来没有数据写入事件,虽然我有消息"写了一次"打印出来。有什么办法吗?谢谢你!

问题是您正在使用readStream来获取CSV数据,因此它将等待新数据被推送到带有CSV文件的目录。但实际上,你不需要使用readStream/writeStream- Kafka连接器在批处理模式下工作得很好,所以你的代码应该是:

df = read_csv_file()
while True:
write_to_kafka(df)
sleep(5)