在Spark中读取带有空值的UDT的cassandra表,并映射到Scala案例类



错误显示:

由以下原因引起:java.lang.NullPointerException:请求的GettableToMappedTypeConverter由于Scala 2.10 TypeTag限制。它们作为null返回,因此你可以看到这个NPE。

grade.build

dependencies {
implementation group: 'org.scala-lang', name: 'scala-library', version: '2.12.11'
implementation group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.5'
implementation group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.5'
implementation group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.12', version: '2.5.0'
implementation group: 'org.apache.spark', name: 'spark-mllib_2.12', version: '2.4.5'
implementation group: 'log4j', name: 'log4j', version: '1.2.17'
implementation group: 'org.scalaj', name: 'scalaj-http_2.12', version: '2.4.2'
}

Scala对象

object SparkModule {
case class UDTCaseClass(a: Int = 0, b: Float = 0f, c: Int = 0, d: Int = 0)
case class TableCaseClass(id: UUID, col1: Boolean, list: List[UDTCaseClass])
val spark = SparkSession.builder
.master("local[2]")
.appName("App")
.config("spark.cassandra.connection.host", "127.0.0.1")
.config("spark.cassandra.connection.port", "9042")
.config("spark.executor.cores", "1")
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
val cassandraRDD = sc.cassandraTable[TableCaseClass](
"keyspace", "table"
).limit(20)
println(cassandraRDD.count())
}

起初,有时会显示错误,有时不会,直到我缩小范围,意识到当UDT的任何字段为null时,它都会显示,否则它工作得很好。例如,如果表包含以下任何一行,则会引发错误:

f39b5201-1e96-44a8-946c-d959c217f174|错误|[{a:123,b:2.3,c:33,d:null}]
f39b5201-1e96-44a8-946c-d959c217f174|错误|[{a:123,b:2.3,c:null,d:34}]
f39b5201-1e96-44a8-946c-d959c217-f174|虚假|[{a:123、b:null、c:33,d:12}]
f39b5201-1e 96-44a8-946c-d959c217f174|错误|[{a:null,b:2.3,c:33,d:22}]

而,例如,这个:

f39b5201-1e96-44a8-946c-d959c217f174|错误|

cassandraTable读取得很好。

我试着像这样使用Optioncase class UDTCaseClass(a: Option[Int] = None, b: Option[Float] = None, c: Option[Int] = None, d: Option[Int] = None),但出现了相同的错误。

我总是可以插入0而不是nulls,但是,这可以避免吗?

感谢

适用于Spark 2.4.2/Scala 2.12和SCC 2.5.0。

以下UDT/表格和数据:

CREATE TYPE test.udt (
id int,
t1 int,
t2 int,
a2 int
);
CREATE TABLE test.u3 (
id int PRIMARY KEY,
u list<frozen<udt>>
);
insert into test.u3(id, u) values (5, [{id: 1, t1: 3}]);

以下Scala代码运行良好:

case class UDT(id: Int, t1: Int, t2: Option[Int], a2: Option[Int])
case class U3(id: Int, u: List[UDT])
import com.datastax.spark.connector._
val d = sc.cassandraTable[U3]("test", "u3")
d.collect

则如预期的那样返回:CCD_ 6。

您的错误可能是由于您可能没有重新编译代码,或者它以某种方式被缓存。。。

另外,正如我在评论中指出的,如果你刚开始,你更喜欢使用Dataframe API——它完全受SCC支持。

最新更新