Spark -使用Firehose从分区文件夹读取JSON



Kinesis firehose管理文件的持久性,在本例中是时间序列JSON,将其放入按YYYY/MM/DD/HH(以24编号的小时为单位)划分的文件夹层次结构中…非常棒。

如何使用Spark 2.0然后我可以读取这些嵌套的子文件夹,并从所有叶子json文件创建一个静态数据框架?数据帧读取器是否有"选项"?

我的下一个目标是让它成为一个流DF,其中由Firehose持久化到s3的新文件使用Spark 2.0中新的结构化流自然地成为流数据框架的一部分。我知道这一切都是实验性的——希望有人以前使用过S3作为流文件源,其中的数据如上所述被分区到文件夹中。当然,我更喜欢直接使用Kinesis流,但是这个连接器没有2.0的日期,所以Firehose->S3是临时的。

ND:我正在使用databricks,它将S3挂载到DBFS中,但当然也可以很容易地成为EMR或其他Spark提供商。如果一个笔记本是可共享的,并给出一个例子,那就太好了。

干杯!

我可以读取嵌套子文件夹并从所有叶子JSON文件创建静态DataFrame吗?是否有一个数据框架阅读器的选项?

是的,因为你的目录结构是常规的(YYYY/MM/DD/HH),你可以用通配符给路径直到叶节点,像下面的

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val jsonDf = spark.read.format("json").json("base/path/*/*/*/*/*.json")
// Here */*/*/*/*.json maps to YYYY/MM/DD/HH/filename.json 

当然,我更喜欢直接使用Kinesis流,但是这个连接器没有2.0的日期,所以Firehose->S3是临时的。

我可以看到有一个库Kinesis与Spark Streaming集成。因此,您可以直接读取流数据并对其执行SQL操作,而无需从S3读取。

groupId = org.apache.spark
artifactId = spark-streaming-kinesis-asl_2.11
version = 2.0.0

Spark Streaming和SQL的示例代码

import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
val kinesisStream = KinesisUtils.createStream(
 streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
 [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
kinesisStream.foreachRDD { rdd =>
  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._
  // Convert RDD[String] to DataFrame
  val jsonDf = rdd.toDF() // or rdd.toDF("specify schema/columns here")
  // Create a temporary view with DataFrame
  jsonDf.createOrReplaceTempView("json_data_tbl")
  //As we have DataFrame and SparkSession object we can perform most 
  //of the Spark SQL stuff here
}

充分披露:我为Databricks工作,但我不代表他们Stack Overflow。

如何使用Spark 2.0然后我可以读取这些嵌套的子文件夹,并从所有叶子json文件创建一个静态数据框架?数据帧读取器是否有"选项"?

DataFrameReader支持加载序列。参见def的文档json(paths: String*): DataFrame。您可以指定序列,使用globbing模式或以编程方式构建它(推荐):

val inputPathSeq = Seq[String]("/mnt/myles/structured-streaming/2016/12/18/02", "/mnt/myles/structured-streaming/2016/12/18/03")
val inputPathGlob = "/mnt/myles/structured-streaming/2016/12/18/*"
val basePath = "/mnt/myles/structured-streaming/2016/12/18/0"
val inputPathList = (2 to 4).toList.map(basePath+_+"/*.json")

我知道这都是实验性的-希望有人以前使用S3作为流文件源,其中数据如上所述分区到文件夹中。当然,我更喜欢直接使用Kinesis流,但是这个连接器没有2.0的日期,所以Firehose->S3是临时的。

由于您使用的是DBFS,我将假设从Firehose传输数据的S3桶已经挂载到DBFS。如果需要帮助将S3存储桶挂载到DBFS,请查看Databricks文档。一旦有了上面描述的输入路径,就可以简单地将文件加载到静态或流数据帧中:

静态

val staticInputDF = 
  spark
    .read
    .schema(jsonSchema)
    .json(inputPathSeq : _*)
staticInputDF.isStreaming
res: Boolean = false

val streamingInputDF = 
  spark
    .readStream                       // `readStream` instead of `read` for creating streaming DataFrame
    .schema(jsonSchema)               // Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  // Treat a sequence of files as a stream by picking one file at a time
    .json(inputPathSeq : _*)
streamingCountsDF.isStreaming
res: Boolean = true

其中大部分直接取自Databricks关于结构化流的文档。甚至还有一个笔记本的例子可以直接导入到Databricks中。

相关内容

  • 没有找到相关文章