Are 'distinct'不支持对结构化流数据集的操作?



在spark结构化流文档中,不支持的操作包含

不支持对流数据集进行不同操作。

然而,在API中有一个distinct()方法,我也可以在流化DateSet后调用distinct()

  public final class JavaStructuredNetworkWordDistinct {
  public static void main(String[] args) throws Exception {
    System.setProperty("hadoop.home.dir","C://hadoop" );
    SparkSession spark = SparkSession
            .builder()
            .appName("JavaStructuredNetworkWordDistinct")
            .config("spark.master", "local[*]")
            .getOrCreate();
    spark.sparkContext().setLogLevel("ERROR");
    spark.conf().set("spark.sql.shuffle.partitions",4);
    // Read all the csv files written atomically in a directory
    StructType userSchema = new StructType().add("event_time", "string").add("id", "string");
    Dataset<Tuple2<Timestamp, String>> dropStream = spark
            .readStream()
            .option("sep", ",")
            .schema(userSchema)      // Specify schema of the csv files
            .csv("D:\deduplication")
            .selectExpr("to_timestamp(event_time,'yyyy-MM-dd HH:mm:ss') as event_time","id as id")
            .as(Encoders.tuple(Encoders.TIMESTAMP(), Encoders.STRING()));
    StreamingQuery outerQuery =  execDeduplicationDistinct(spark,dropStream);
    outerQuery.awaitTermination();
  }

  private static StreamingQuery execDeduplicationDistinct(SparkSession spark, Dataset<Tuple2<Timestamp, String>> dropStream) {
    Dataset<Tuple2<Timestamp, String>> dropDuplicatesStream = dropStream.distinct();
    // Start running the query that prints the running counts to the console
    StreamingQuery query = dropDuplicatesStream.writeStream()
            .outputMode("append")
            .format("console")
            .start();
    return query;
  }
}

文件夹D:\deduplication下只有一个文件,内容为

event_time,word
2022-04-10 11:44:00,word1
2022-04-10 11:45:00,word2
2022-04-10 11:45:00,word2
2022-04-10 11:45:00,word2

最后,结果表明

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+-----+
|         event_time|   id|
+-------------------+-----+
|2022-04-10 11:44:00|word1|
|2022-04-10 11:45:00|word2|
|               null| word|
+-------------------+-----+

?当我理解distinct时,有什么问题?

并且,我还运行套接字流。代码是

object StructuredNetworkWordCountDistinct {
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
    val spark: SparkSession = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .master("local[*]")
      .config("spark.sql.shuffle.partitions",4)
      .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    import spark.implicits._
    // Create DataFrame representing the stream of input lines from connection to host:port
    val lines: DataFrame = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()
    // Split the lines into words
    val words = lines.as[String].flatMap(_.split(" "))
    // Generate running word count
    val wordCounts = words.distinct()
    // Start running the query that prints the running counts to the console
    val query = wordCounts.writeStream
      .trigger(Trigger.ProcessingTime("1 second"))  // only change in query
      .outputMode("append")
      .format("console")
      .start()
    query.awaitTermination()
  }
}

和启动netcat与 nc -L -p 9999。首先输入v1,所有输出批结果均为

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
|   v1|
+-----+
再次输入v1,新的输出批结果为

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+
|value|
+-----+
+-----+

spark似乎记住了第一个v1(批)来区分第二批结果。

"Spark不支持某些Dataframe操作结构化流,例如,区分,排序等转换Spark需要将整个数据存储在内存中。…">

严重解释道。

这种complete模式的查询不能用于distinct like操作:

val mappedDF = jsonEdits.groupBy($"user").agg(countDistinct($"userURL").as("cnt"))  

它返回:

org.apache.spark.sql.AnalysisException: Distinct aggregations are not supported on streaming DataFrames/Datasets. Consider using approx_count_distinct() instead.

如果使用append模式,它也需要一个watermark

你的查询是一个简单的append;有一个distinct没有聚合。这是一个小事 Spark在微批处理在级别上,它不需要考虑状态问题或agg,只需要考虑过程当前的微批次,如你所见。医生在这方面有点差这方面,

最新更新