使用Spark结构化流从s3读取avro文件



我想使用 Spark 结构化流 API 从 s3 读取 Avro 文件。您可以找到有关使用 Kafka 执行此操作的信息,但我找不到有关 s3 的任何内容。这里的问题是我不知道要设置什么格式。这是我的简单代码:

 Dataset<Row> baseDataSet = sparkSession            
    .readStream()                              
    .format("?") //What this format should be?                            
    .schema(new StructType()                   
            .add("value", "binary"))           
    .load("s3://path/to/streaming/key")    
    .select(col("value"))
    .map(value -> {//do avro deserialization},Encoders.kryo(//deserialization class))                                    
    .writeStream() 
    .trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
    .format("console")
    .outputMode("update")
    .start();

我知道 avro 仍然没有在结构化流 API 中实现。但是我应该采用什么格式来读取二进制数据,然后以我想要的任何方式(在 map 函数中)对其进行反序列化。

avro 有一个第三方软件包。您可以下载 jar 并将其与 Spark 一起使用,通过指定 format("com.databricks.spark.avro") 直接加载 avro 文件。

目前无法在结构化流式处理中读取整个文件以稍后应用反序列化。

但是,如果您仍希望使用自定义反序列化程序,则可以通过实现 trait DataSourceRegister 来开发自定义数据源。例如,您可能想要检查 spark-avro 包。

如果您需要将输入数据转换为字节数组,您可以使用这样的东西:

session
    .readStream()
    .textFile("path-to-folder")
    .as(Encoders.BINARY())
    .map(bytesToStringMapper, Encoders.STRING())
    .writeStream()
    .outputMode(OutputMode.Append())
    .format("text")
    .option("path", "path-to-folder")
    .option("checkpointLocation", "path-to-folder")
    .queryName("test-query")
    .start();

不过,当前的方法将文件逐行加载为文本。这意味着bytesToStringMapper接收单行作为字节数组并将其转换为字符串。

相关内容

  • 没有找到相关文章

最新更新