我需要使用 spark-cassandra-connector 读取 spark 中的 cassandra blob
类型,并根据 blob 字段比较两个数据集。
作为示例,以下代码显示了我的意思:
// Cassandra Table
CREATE TABLE keyspace.test (
id bigint,
info blob,
PRIMARY KEY (id)
)
case class Test(
id: Long,
info: java.nio.ByteBuffer
)
session.read
.format("org.apache.spark.sql.cassandra")
.options(Map(
"table" -> tableName,
"keyspace" -> keySpaceName,
"cluster" -> clusterName
)).load().map(i => Test(i.getLong(0), i.get???(1)))
我需要方法而不是i.get???(1)
将 blob 读取为字节缓冲区。我已经尝试了row.getAs[Array[Byte]](i)
但它不能满足我的需求,因为无法比较两个对象。
>据我了解ByteBuffer
这不是解决方案,因为Spark没有为它提供默认编码器,我必须先为它开发编码器才能阅读和使用它。无论如何,ByteBuffer
围绕内容实现了equals
方法。
但是为了简单用法,我将blob
读取为Array[Byte]
并将其转换为具有默认编码器的Seq[Byte]
。
case class Test(
id: Long,
info: Seq[Byte]
)
session.read
.format("org.apache.spark.sql.cassandra")
.options(Map(
"table" -> tableName,
"keyspace" -> keySpaceName,
"cluster" -> clusterName
)).load().map(i => Test(i.getLong(0), (i.getAs[Array[Byte]](1)).toSeq ))
该方法是getBytes,它返回一个ByteBuffer