是否可以将 Kafka Streaming 源支持的数据帧写入 AWS Redshift,我们过去曾使用 spark-redshift 写入 Redshift,但我认为它不适用于DataFrame##writeStream
。考虑到 Redshift 的工作方式,使用 JDBC 连接器和 ForeachWriter
编写也可能不是一个好主意。
我从Yelp博客中遇到的一种可能的方法是将文件写入S3,然后使用具有S3对象路径的清单文件调用Redshift COPY
,在结构化流的情况下,我如何控制写入S3的文件? 并且还有一个单独的触发器,用于在将 5 个文件写入 S3 后创建清单文件。
任何其他可能的解决方案也值得赞赏。提前谢谢。
有一种方法可以在结构化流中使用 spark-redshift,但您必须在自己的分支中实现一些额外的类。首先,您需要一个应该实现org.apache.spark.sql.execution.streaming.Sink
接口的 RedshiftSink:
private[redshift] class RedshiftSink(
sqlContext: SQLContext,
parameters: MergedParameters,
redshiftWriter: RedshiftWriter) extends Sink {
private val log = LoggerFactory.getLogger(getClass)
@volatile private var latestBatchId = -1L
override def toString(): String = "RedshiftSink"
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= latestBatchId) {
log.info(s"Skipping already committed batch $batchId")
} else {
val mode = if (parameters.overwrite) SaveMode.Overwrite else SaveMode.Append
redshiftWriter.saveToRedshift(sqlContext, data, mode, parameters)
latestBatchId = batchId
}
}
}
然后com.databricks.spark.redshift.DefaultSource
应该通过实现org.apache.spark.sql.sources.StreamSinkProvider
来扩展:
/**
* Creates a Sink instance
*/
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new RedshiftSink(sqlContext, Parameters.mergeParameters(parameters), new RedshiftWriter(jdbcWrapper, s3ClientFactory))
}
现在您应该能够在结构化流中使用它:
dataset.writeStream()
.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.format("com.databricks.spark.redshift")
.outputMode(OutputMode.Append())
.queryName("redshift-stream")
.start()
更新
要解决向 StreamExecution报告指标的问题,必须将RedshiftWriter.unloadData()
更改为使用 data.queryExecution.toRdd.mapPartitions
而不是data.rdd.mapPartitions
,因为data.rdd
会创建一个对 StreamExecution 不可见的新计划(使用现有计划收集指标(。它还需要将转换函数更改为:
val conversionFunctions: Array[(InternalRow, Int) => Any] = data.schema.fields.map { field =>
field.dataType match {
case DateType =>
val dateFormat = Conversions.createRedshiftDateFormat()
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else dateFormat.format(
DateTimeUtils.toJavaDate(row.getInt(ordinal)))
}
case TimestampType =>
val timestampFormat = Conversions.createRedshiftTimestampFormat()
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else timestampFormat.format(
DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
}
case StringType =>
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else row.getString(ordinal)
}
case dt: DataType =>
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else row.get(ordinal, dt)
}
}
}
非常有效地将普通数据帧加载到Redshift,但我还没有在Spark中使用流。
如果可以连续将流输出写入标准 df,则可以在指定的时间间隔将该 df 加载到 Redshift 并将其清空。
另一种选择是将流发送到Kinesis,并使用Kinesis Firehose将其加载到Redshift。不过,向堆栈中添加另一个流层似乎有些过分。