火花作业 (scala) 将类型日期写入 Cassandra



我正在使用DSE 5.1(spark 2.0.2.6和cassandra 3.10.0.1652)

我的卡桑德拉桌子:

CREATE TABLE ks.tbl (
dk int,
date date,
ck int,
val int,
PRIMARY KEY (dk, date, ck)
) WITH CLUSTERING ORDER BY (date DESC, ck ASC);

具有以下数据:

dk | date       | ck | val
----+------------+----+-----
1 | 2017-01-01 |  1 | 100
1 | 2017-01-01 |  2 | 200

我的代码必须读取此数据并写入相同内容,但使用昨天的日期(它编译成功):

package com.datastax.spark.example
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import com.github.nscala_time.time._
import com.github.nscala_time.time.Imports._
object test extends App {
val conf = new SparkConf().setAppName("DSE calculus app TEST")
val sc = new SparkContext(conf)
val yesterday= (DateTime.now - 1.days).toString(StaticDateTimeFormat.forPattern("yyyy-MM-dd"))
val tbl = sc.cassandraTable("ks","tbl").select("dk","date","ck","val").where("dk=1")
tbl.map(row => (row.getInt("dk"),yesterday,row.getInt("ck"),row.getInt("val"))).saveToCassandra("ks","tbl")
sc.stop()
sys.exit(0)
}

当我运行此应用程序时:

dse spark-submit --class com.datastax.spark.example.test test-assembly-0.1.jar

它无法正确写入 Cassandra。日期变量似乎未正确插入地图中。 我得到的错误是:

Error:
WARN  2017-05-08 22:23:16,472 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <IP of one of my nodes>): java.io.IOException: Failed to write statements to ks.tbl.
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:207)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

但是,当我直接在map语句中插入日期(字符串)时,如下所示,代码确实正确插入了数据:

tbl.map(row => (row.getInt("dk"),"2017-02-02",row.getInt("ck"),row.getInt("val"))).saveToCassandra("ks","tbl")

如果我将昨天设置为整数(自纪元以来的天数),它也会正确插入数据。这将是最佳的,但不能让"昨天"以这种方式运行

编辑:实际上,这不会正确插入数据。无论我将"昨天"设置为 1 还是 100,000,000,它总是插入纪元 ('1970-01-01)

失败的代码行为正确,正如我在 DSE Spark 控制台中所期望的那样。

我只是想不通我做错了什么。欢迎任何帮助。

EDIT2:执行器 0 stderr 日志确实显示它试图在列日期中插入 Null 值,这显然是不可能的,因为它是聚类列。

在为Spark 作业编写代码时,了解何时设置特定变量以及何时序列化它们非常重要。让我们来看看App特质文档中的注释

警告

应该注意的是,此特征是使用 延迟初始化功能,这意味着对象的字段将 在执行 main 方法之前尚未初始化。

这意味着在实际运行代码时,对App正文中使用的变量的引用可能未在执行程序上初始化。

我的猜测是,您编写的 lambda 包含对 val 的引用,该 val 在 App 类的延迟初始化部分中初始化。这意味着不运行Main方法的执行器上代码的序列化版本将获取值的未初始化版本 (null)。

将常量切换到lazy val(或将其移动到单独的对象或类中)可以通过确保值远程初始化(lazy val)或简单地序列化初始化(单独的类/对象)来解决此问题。

我想我知道你的问题是什么。
您可能会看到完整的日志文件。您只需附加其中的一部分...
今天有类似的错误,当我只有一个 cassandra 实例时,使用 replication_factor:3 创建密钥空间。

所以我改变了它,问题就消失了。

ALTER KEYSPACE "some_keyspace_name" WITH REPLICATION =
{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

这是我的错误.log文件

日志的重要部分:

Logging.scala[logError]:72) - Failed to execute: com.datastax.spark.connector.writer.RichBoundStatement@4746499f
com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_QUORUM (2 required but only 1 alive)

最新更新