添加新行时,如何将csv文件作为接收器写入控制台



我尝试每5分钟读取一次csv文件,并将不到5分钟前添加的任何新行输出到另一个csv文件。我认为一个好的垫脚石是,每当我手动将一行新数据附加到csv文件中时,读取一个csv文件并将Stream写入控制台以打印出文件内容。问题是,只有当我将一个全新的csv文件插入目录时,控制台才会打印。

每当我的新行添加到csv文件时,我如何打印?

这是我的目录布局:

  • stream_script2.py(下面的代码(
  • csv_files
    • unit_testing.csv
from pyspark.sql import SparkSession
from pyspark.sql.types import (  
StringType,
IntegerType,
StructType,
StructField,
TimestampType,
BooleanType,
DateType,
)
# Create SparkSession
spark = (
SparkSession.builder.appName("streaming")
.master("local[*]")
.getOrCreate()
)
# Made Schema
schema = StructType(
[
StructField("Drug_Name", StringType(), True),
StructField("Count", IntegerType(), True),
StructField("Faulty", BooleanType(), True),
]
)
# Read Stream
df = (
spark.readStream.option("sep", ";")
.schema(schema)
.format("csv")
.load("csv_files/unit_testing*.csv")
)
# Write Stream
query = df.writeStream
.format("console")
.outputMode("append")
.queryName("test1")
.start()
query.awaitTermination()
# Ctrl-C to stop stream

我几乎可以肯定,Spark结构化流不直接支持在添加新行时通知我的功能。相反,解决方法是以某种方式保留对旧文件的引用,并将行数与当前文件进行比较。如果存在差异,则必须进行比较和提取。否则什么也不做。

最新更新