嗨,我想做增量数据查询。
df = spark .read.csv('csvFile', header=True) #1000 Rows
df.persist() #Assume it takes 5 min
df.registerTempTable('data_table') #or createOrReplaceTempView
result = spark.sql('select * from data_table where column1 > 10') #100 rows
df_incremental = spark.read.csv('incremental.csv') #200 Rows
df_combined = df.unionAll(df_incremental)
df_combined.persist() #It will take morethan 5 mins, I want to avoid this, because other queries might be running at this time
df_combined.registerTempTable("data_table")
result = spark.sql('select * from data_table where column1 > 10') # 105 Rows.
将CSV/MySQL表数据读取到Spark DataFrame。
坚持只有内存中的数据框(原因:我需要性能&我的数据集可以适合内存)
注册为温度表并运行Spark SQL查询。#till这是我的火花工作启动并运行。
第二天我将收到一个增量数据集(在temp_mysql_table或csv文件中)。现在,我想在总计I:e Persisted_prevdata erase_read_incrementaldata上运行相同的查询。我称之为混合dataset。***不能确定,当增量数据到达系统时,每天可能会出现30次。
直到这里我也不希望火花应用掉落。它应该永远起来。我需要以同样的时间度量来查询混合数据的性能。
我的担忧:
- 在P4中,我是否需要取消prev_data,并再次持续存在prev&增量数据的联合数据?
- 我最重要的担心是我不想重新启动Spark-Job加载/从更新的数据开始(只有服务器掉落时,我当然必须重新启动)。
因此,在高级别上,我需要查询(更快的性能)数据集 recremnatal_data_if_yandy。
目前,我正在通过为所有数据创建文件夹来进行此练习,而增量文件也放在同一目录中。每2-3个小时,我都在重新启动服务器,而SparkApp开始读取该系统中存在的所有CSV文件。然后查询在它们上运行。
并尝试探索Hive持久性和火花流,如果找到任何结果,将在此处进行更新。
请建议我一种实现这一目标的方法/架构。
请发表评论,如果没有任何问题,没有降低投票的问题:)
谢谢。
尝试流式传输,因为会话已经在运行,并且每当您将某些内容放在文件夹中时会触发,它将触发。
df_incremental = spark
.readStream
.option("sep", ",")
.schema(input_schema)
.csv(input_path)
df_incremental.where("column1 > 10")
.writeStream
.queryName("data_table")
.format("memory")
.start()
spark.sql("SELECT * FROM data_table).show()