我试图每120秒将从Kafka提取的数据写入Bigquery表。我想做一些额外的操作,根据文档,这些操作应该可以在.foreach()
或foreachBatch()
方法中进行。
作为一个测试,我想在每次从kafka提取数据并写入BigQuery时打印一条简单的消息。
batch_job=df_alarmsFromKafka.writeStream
.trigger(processingTime='120 seconds')
.foreachBatch(print("do i get printed every batch?"))
.format("bigquery").outputMode("append")
.option("temporaryGcsBucket",path1)
.option("checkpointLocation",path2)
.option("table", table_kafka)
.start()
batch_job.awaitTermination()
我希望这个消息每120秒在jupyter实验室的输出单元格上打印一次,但它只打印一次并一直写入BigQuery。
如果我尝试使用.foreach()
而不是foreachBatch()
batch_job=df_alarmsFromKafka.writeStream
.trigger(processingTime='120 seconds')
.foreach(print("do i get printed every batch?"))
.format("bigquery").outputMode("append")
.option("temporaryGcsBucket",path1)
.option("checkpointLocation",path2)
.option("table", table_kafka)
.start()
batch_job.awaitTermination()
它打印一次消息,然后立即给出以下错误,我无法调试/理解:
/usr/lib/spark/python/pyspark/sql/streaming.py in foreach(self, f)
1335
1336 if not hasattr(f, 'process'):
-> 1337 raise Exception("Provided object does not have a 'process' method")
1338
1339 if not callable(getattr(f, 'process')):
Exception: Provided object does not have a 'process' method
我做错什么了吗?除了直接在评估的数据帧df_alarmsFromKafka
上执行的操作之外,我如何简单地每120秒执行一些操作?
允许进行其他操作,但仅限于流式查询的输出数据。但在这里,您试图打印一些与输出数据本身无关的字符串。它只能打印一次。
例如,如果您编写如下foreachbatch函数:
def write_to_cassandra(target_df, batch_id):
target_df.write
.format("org.apache.spark.sql.cassandra")
.option("keyspace", "tweet_db")
.option("table", "tweet2")
.mode("append")
.save()
target_df.show()
由于.show((函数与输出数据本身有关,因此它将在每个批处理中产生target_df。
对于您的第二个问题:
Foreach函数希望您通过实现打开、处理和关闭方法来扩展类ForeachWriter,而您没有实现这些方法。