在spark结构化流文档中,不支持的操作包含
不支持对流数据集进行不同操作。
然而,在API中有一个distinct()
方法,我也可以在流化DateSet后调用distinct()
。
public final class JavaStructuredNetworkWordDistinct {
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir","C://hadoop" );
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordDistinct")
.config("spark.master", "local[*]")
.getOrCreate();
spark.sparkContext().setLogLevel("ERROR");
spark.conf().set("spark.sql.shuffle.partitions",4);
// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("event_time", "string").add("id", "string");
Dataset<Tuple2<Timestamp, String>> dropStream = spark
.readStream()
.option("sep", ",")
.schema(userSchema) // Specify schema of the csv files
.csv("D:\deduplication")
.selectExpr("to_timestamp(event_time,'yyyy-MM-dd HH:mm:ss') as event_time","id as id")
.as(Encoders.tuple(Encoders.TIMESTAMP(), Encoders.STRING()));
StreamingQuery outerQuery = execDeduplicationDistinct(spark,dropStream);
outerQuery.awaitTermination();
}
private static StreamingQuery execDeduplicationDistinct(SparkSession spark, Dataset<Tuple2<Timestamp, String>> dropStream) {
Dataset<Tuple2<Timestamp, String>> dropDuplicatesStream = dropStream.distinct();
// Start running the query that prints the running counts to the console
StreamingQuery query = dropDuplicatesStream.writeStream()
.outputMode("append")
.format("console")
.start();
return query;
}
}
文件夹D:\deduplication
下只有一个文件,内容为
event_time,word
2022-04-10 11:44:00,word1
2022-04-10 11:45:00,word2
2022-04-10 11:45:00,word2
2022-04-10 11:45:00,word2
最后,结果表明
-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+-----+
| event_time| id|
+-------------------+-----+
|2022-04-10 11:44:00|word1|
|2022-04-10 11:45:00|word2|
| null| word|
+-------------------+-----+
?当我理解distinct
时,有什么问题?
并且,我还运行套接字流。代码是
object StructuredNetworkWordCountDistinct {
def main(args: Array[String]): Unit = {
val host = args(0)
val port = args(1).toInt
val spark: SparkSession = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.master("local[*]")
.config("spark.sql.shuffle.partitions",4)
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port
val lines: DataFrame = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.distinct()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.trigger(Trigger.ProcessingTime("1 second")) // only change in query
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}
和启动netcat与 nc -L -p 9999
。首先输入v1
,所有输出批结果均为
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
| v1|
+-----+
再次输入v1
,新的输出批结果为
-------------------------------------------
Batch: 2
-------------------------------------------
+-----+
|value|
+-----+
+-----+
spark似乎记住了第一个v1
(批)来区分第二批结果。
"Spark不支持某些Dataframe操作结构化流,例如,区分,排序等转换Spark需要将整个数据存储在内存中。…">
严重解释道。
这种complete
模式的查询不能用于distinct like操作:
val mappedDF = jsonEdits.groupBy($"user").agg(countDistinct($"userURL").as("cnt"))
它返回:
org.apache.spark.sql.AnalysisException: Distinct aggregations are not supported on streaming DataFrames/Datasets. Consider using approx_count_distinct() instead.
如果使用append
模式,它也需要一个watermark
。
你的查询是一个简单的
append
;有一个distinct
没有聚合。这是一个小事 Spark在微批处理在级别上,它不需要考虑状态问题或agg,只需要考虑过程当前的微批次,如你所见。医生在这方面有点差这方面,