带有Spark Streaming错误的foreach()方法



我试图每120秒将从Kafka提取的数据写入Bigquery表。我想做一些额外的操作,根据文档,这些操作应该可以在.foreach()foreachBatch()方法中进行。

作为一个测试,我想在每次从kafka提取数据并写入BigQuery时打印一条简单的消息。

batch_job=df_alarmsFromKafka.writeStream
.trigger(processingTime='120 seconds') 
.foreachBatch(print("do i get printed every batch?"))
.format("bigquery").outputMode("append") 
.option("temporaryGcsBucket",path1) 
.option("checkpointLocation",path2) 
.option("table", table_kafka) 
.start()
batch_job.awaitTermination()

我希望这个消息每120秒在jupyter实验室的输出单元格上打印一次,但它只打印一次并一直写入BigQuery。

如果我尝试使用.foreach()而不是foreachBatch()

batch_job=df_alarmsFromKafka.writeStream
.trigger(processingTime='120 seconds') 
.foreach(print("do i get printed every batch?"))
.format("bigquery").outputMode("append") 
.option("temporaryGcsBucket",path1) 
.option("checkpointLocation",path2) 
.option("table", table_kafka) 
.start()
batch_job.awaitTermination()

它打印一次消息,然后立即给出以下错误,我无法调试/理解:

/usr/lib/spark/python/pyspark/sql/streaming.py in foreach(self, f)
1335 
1336             if not hasattr(f, 'process'):
-> 1337                 raise Exception("Provided object does not have a 'process' method")
1338 
1339             if not callable(getattr(f, 'process')):
Exception: Provided object does not have a 'process' method

我做错什么了吗?除了直接在评估的数据帧df_alarmsFromKafka上执行的操作之外,我如何简单地每120秒执行一些操作?

允许进行其他操作,但仅限于流式查询的输出数据。但在这里,您试图打印一些与输出数据本身无关的字符串。它只能打印一次。

例如,如果您编写如下foreachbatch函数:

def write_to_cassandra(target_df, batch_id):
target_df.write 
.format("org.apache.spark.sql.cassandra") 
.option("keyspace", "tweet_db") 
.option("table", "tweet2") 
.mode("append") 
.save()
target_df.show()

由于.show((函数与输出数据本身有关,因此它将在每个批处理中产生target_df。

对于您的第二个问题:

Foreach函数希望您通过实现打开、处理和关闭方法来扩展类ForeachWriter,而您没有实现这些方法。

相关内容

  • 没有找到相关文章

最新更新