在不重新启动Spark作业的情况下,在Pyspark中加载和查询



嗨,我想做增量数据查询。

  df = spark .read.csv('csvFile', header=True)  #1000 Rows
  df.persist() #Assume it takes 5 min
  df.registerTempTable('data_table') #or createOrReplaceTempView
  result = spark.sql('select * from data_table where column1 > 10') #100 rows
  df_incremental  = spark.read.csv('incremental.csv') #200 Rows
  df_combined = df.unionAll(df_incremental)
  df_combined.persist() #It will take morethan 5 mins, I want to avoid this, because other queries might be running at this time
  df_combined.registerTempTable("data_table")
  result = spark.sql('select * from data_table where column1 > 10') # 105 Rows.
  1. 将CSV/MySQL表数据读取到Spark DataFrame。

  2. 坚持只有内存中的数据框(原因:我需要性能&我的数据集可以适合内存)

  3. 注册为温度表并运行Spark SQL查询。#till这是我的火花工作启动并运行。

  4. 第二天我将收到一个增量数据集(在temp_mysql_table或csv文件中)。现在,我想在总计I:e Persisted_prevdata erase_read_incrementaldata上运行相同的查询。我称之为混合dataset。***不能确定,当增量数据到达系统时,每天可能会出现30次。

  5. 直到这里我也不希望火花应用掉落。它应该永远起来。我需要以同样的时间度量来查询混合数据的性能。

我的担忧:

  1. 在P4中,我是否需要取消prev_data,并再次持续存在prev&增量数据的联合数据?
  2. 我最重要的担心是我不想重新启动Spark-Job加载/从更新的数据开始(只有服务器掉落时,我当然必须重新启动)。

因此,在高级别上,我需要查询(更快的性能)数据集 recremnatal_data_if_yandy。

目前,我正在通过为所有数据创建文件夹来进行此练习,而增量文件也放在同一目录中。每2-3个小时,我都在重新启动服务器,而SparkApp开始读取该系统中存在的所有CSV文件。然后查询在它们上运行。

并尝试探索Hive持久性和火花流,如果找到任何结果,将在此处进行更新。

请建议我一种实现这一目标的方法/架构。

请发表评论,如果没有任何问题,没有降低投票的问题:)

谢谢。

尝试流式传输,因为会话已经在运行,并且每当您将某些内容放在文件夹中时会触发,它将触发。

df_incremental = spark 
    .readStream 
    .option("sep", ",") 
    .schema(input_schema) 
    .csv(input_path)
df_incremental.where("column1 > 10") 
    .writeStream 
    .queryName("data_table") 
    .format("memory") 
    .start()
spark.sql("SELECT * FROM data_table).show()

相关内容

  • 没有找到相关文章