如何使用Apache Spark读/写协议缓冲区消息?



我想使用 Apache Spark 从 HDFS 读取/写入协议缓冲区消息。我找到了这些建议的方法:

1(使用Google的Gson库将protobuf消息转换为Json,然后通过SparkSql读取/写入它们。此链接中解释了此解决方案,但我认为这样做(转换为 json(是一项额外的任务。

2(转换为镶木地板文件。有parquet-mrsparksql-protobufgithub 项目用于这种方式,但我不想要 parquet 文件,因为我总是使用所有列(而不是某些列(,这样 Parquet 格式不会给我任何收益(至少我认为(。

3(ScalaPB。也许这就是我正在寻找的。但在斯卡拉语言中,我对此一无所知。我正在寻找一个基于 Java 的解决方案。这个YouTube视频介绍了scalaPB并解释了如何使用它(对于scala开发人员(。

4(通过使用序列文件,这就是我想要的,但一无所获。所以,我的问题是:如何将 protobuf 消息写入 HDFS 上的序列文件并从中?任何其他建议都将是有用的。

5(通过推特的象鸟图书馆。

虽然在点之间有点隐藏,但您似乎在询问如何在 Spark 中写入序列文件。我在这里找到了一个例子。

// Importing org.apache.hadoop.io package
import org.apache.hadoop.io._
// As we need data in sequence file format to read. Let us see how to write first
// Reading data from text file format
val dataRDD = sc.textFile("/public/retail_db/orders")
// Using null as key and value will be of type Text while saving in sequence file format
// By Int and String, we do not need to convert types into IntWritable and Text
// But for others we need to convert to writable object
// For example, if the key/value is of type Long, we might have to 
// type cast by saying new LongWritable(object)
dataRDD.
map(x => (NullWritable.get(), x)).
saveAsSequenceFile("/user/`whoami`/orders_seq")
// Make sure to replace `whoami` with the appropriate OS user id
// Saving in sequence file with key of type Int and value of type String
dataRDD.
map(x => (x.split(",")(0).toInt, x.split(",")(1))).
saveAsSequenceFile("/user/`whoami`/orders_seq")
// Make sure to replace `whoami` with the appropriate OS user id

最新更新