Kinesis firehose管理文件的持久性,在本例中是时间序列JSON,将其放入按YYYY/MM/DD/HH(以24编号的小时为单位)划分的文件夹层次结构中…非常棒。
如何使用Spark 2.0然后我可以读取这些嵌套的子文件夹,并从所有叶子json文件创建一个静态数据框架?数据帧读取器是否有"选项"?
我的下一个目标是让它成为一个流DF,其中由Firehose持久化到s3的新文件使用Spark 2.0中新的结构化流自然地成为流数据框架的一部分。我知道这一切都是实验性的——希望有人以前使用过S3作为流文件源,其中的数据如上所述被分区到文件夹中。当然,我更喜欢直接使用Kinesis流,但是这个连接器没有2.0的日期,所以Firehose->S3是临时的。
ND:我正在使用databricks,它将S3挂载到DBFS中,但当然也可以很容易地成为EMR或其他Spark提供商。如果一个笔记本是可共享的,并给出一个例子,那就太好了。
干杯!
我可以读取嵌套子文件夹并从所有叶子JSON文件创建静态DataFrame吗?是否有一个数据框架阅读器的选项?
是的,因为你的目录结构是常规的(YYYY/MM/DD/HH
),你可以用通配符给路径直到叶节点,像下面的
val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val jsonDf = spark.read.format("json").json("base/path/*/*/*/*/*.json")
// Here */*/*/*/*.json maps to YYYY/MM/DD/HH/filename.json
当然,我更喜欢直接使用Kinesis流,但是这个连接器没有2.0的日期,所以Firehose->S3是临时的。
我可以看到有一个库Kinesis与Spark Streaming集成。因此,您可以直接读取流数据并对其执行SQL操作,而无需从S3读取。
groupId = org.apache.spark
artifactId = spark-streaming-kinesis-asl_2.11
version = 2.0.0
Spark Streaming和SQL的示例代码
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
val kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
kinesisStream.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val jsonDf = rdd.toDF() // or rdd.toDF("specify schema/columns here")
// Create a temporary view with DataFrame
jsonDf.createOrReplaceTempView("json_data_tbl")
//As we have DataFrame and SparkSession object we can perform most
//of the Spark SQL stuff here
}
充分披露:我为Databricks工作,但我不代表他们Stack Overflow。
如何使用Spark 2.0然后我可以读取这些嵌套的子文件夹,并从所有叶子json文件创建一个静态数据框架?数据帧读取器是否有"选项"?
DataFrameReader支持加载序列。参见def的文档json(paths: String*): DataFrame。您可以指定序列,使用globbing模式或以编程方式构建它(推荐):
val inputPathSeq = Seq[String]("/mnt/myles/structured-streaming/2016/12/18/02", "/mnt/myles/structured-streaming/2016/12/18/03")
val inputPathGlob = "/mnt/myles/structured-streaming/2016/12/18/*"
val basePath = "/mnt/myles/structured-streaming/2016/12/18/0"
val inputPathList = (2 to 4).toList.map(basePath+_+"/*.json")
我知道这都是实验性的-希望有人以前使用S3作为流文件源,其中数据如上所述分区到文件夹中。当然,我更喜欢直接使用Kinesis流,但是这个连接器没有2.0的日期,所以Firehose->S3是临时的。
由于您使用的是DBFS,我将假设从Firehose传输数据的S3桶已经挂载到DBFS。如果需要帮助将S3存储桶挂载到DBFS,请查看Databricks文档。一旦有了上面描述的输入路径,就可以简单地将文件加载到静态或流数据帧中:
静态
val staticInputDF =
spark
.read
.schema(jsonSchema)
.json(inputPathSeq : _*)
staticInputDF.isStreaming
res: Boolean = false
流val streamingInputDF =
spark
.readStream // `readStream` instead of `read` for creating streaming DataFrame
.schema(jsonSchema) // Set the schema of the JSON data
.option("maxFilesPerTrigger", 1) // Treat a sequence of files as a stream by picking one file at a time
.json(inputPathSeq : _*)
streamingCountsDF.isStreaming
res: Boolean = true
其中大部分直接取自Databricks关于结构化流的文档。甚至还有一个笔记本的例子可以直接导入到Databricks中。