在 Cassandra 中保存结构类型数据



>我在火花中有一个rdd;

[((u'imdb.com/title/tt1389137', '2009-04-18'), 1),
 ((u'imdb.com/title/tt0829482', '2010-09-02'), 1),
 ((u'imdb.com/title/tt0167260', '2010-04-12'), 1),
 ((u'imdb.com/title/tt1682180', '2009-11-24'), 1),
 ((u'imdb.com/title/tt1124035', '2011-02-24'), 1),
 ((u'imdb.com/title/tt0056058', '2009-02-17'), 1),
 ((u'imdb.com/title/tt0308644', '2011-06-27'), 1),
...]

我将其转换为数据帧。架构;

root
 |-- url_date: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: string (nullable = true)
 |-- count: long (nullable = true)

我想将这些数据保存到卡桑德拉中。我创建了一个表作为;

create table movies_tweets_per_day (url_date frozen<set<text>>,count int, PRIMARY KEY (url_date));

并尝试将数据写入卡桑德拉表;

movies_tweets_per_day.select("url_date", "count")
.write.format("org.apache.spark.sql.cassandra")
.options(table="movies_tweets_per_day", keyspace="mykeyspace")
.save(mode="overwrite")

这是我得到的错误;

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-24-5ff2c90cdbe3> in <module>()
      6 movies_tweets_per_day.printSchema()
      7 
----> 8 movies_tweets_per_day.select("url_date", "count").write.format("org.apache.spark.sql.cassandra").options(table="movies_tweets_per_day", keyspace="assignment2").save(mode="overwrite")
/usr/local/lib/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/readwriter.pyc in save(self, path, format, mode, partitionBy, **options)
    546             self.format(format)
    547         if path is None:
--> 548             self._jwrite.save()
    549         else:
    550             self._jwrite.save(path)
/usr/local/lib/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:
/usr/local/lib/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()
/usr/local/lib/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(
Py4JJavaError: An error occurred while calling o686.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 64.0 failed 1 times, most recent failure: Lost task 2.0 in stage 64.0 (TID 296, localhost, executor driver): com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [imdb.com/title/tt1772341,2010-12-14] of type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema to Set[AnyRef].
    at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:45)
    at com.datastax.spark.connector.types.TypeConverter$CollectionConverter$$anonfun$convertPF$30.applyOrElse(TypeConverter.scala:608)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
    at com.datastax.spark.connector.types.TypeConverter$CollectionConverter.convert(TypeConverter.scala:596)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter$$anonfun$convertPF$31.applyOrElse(TypeConverter.scala:812)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:795)
    at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:56)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.convert(TypeConverter.scala:795)
    at com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$readColumnValues$1.apply$mcVI$sp(SqlRowWriter.scala:26)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:24)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:198)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
    at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
    at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:65)
    at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:426)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [imdb.com/title/tt1772341,2010-12-14] of type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema to Set[AnyRef].
    at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:45)
    at com.datastax.spark.connector.types.TypeConverter$CollectionConverter$$anonfun$convertPF$30.applyOrElse(TypeConverter.scala:608)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
    at com.datastax.spark.connector.types.TypeConverter$CollectionConverter.convert(TypeConverter.scala:596)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter$$anonfun$convertPF$31.applyOrElse(TypeConverter.scala:812)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:795)
    at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:56)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.convert(TypeConverter.scala:795)
    at com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$readColumnValues$1.apply$mcVI$sp(SqlRowWriter.scala:26)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:24)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:198)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

我尝试在卡桑德拉中创建表格;

create table movies_tweets_per_day (url_date frozen<map<text,text>>,count int, PRIMARY KEY (url_date));
create table movies_tweets_per_day (url_date list<frozen<text>>,count int, PRIMARY KEY (url_date));

他们中的任何一个都有效。如何解决问题?提前感谢!

将结构转换为 ArrayType。例

val df = sc.sparkContext.parallelize(List((List("imdb.com/title/tt1389137", "2009-04-18"), 1),
      (List("imdb.com/title/tt0829482", "2010-09-02"), 1),
      (List("imdb.com/title/tt0167260", "2010-04-12"), 1),
      (List("imdb.com/title/tt1682180", "2009-11-24"), 1),
      (List("imdb.com/title/tt1124035", "2011-02-24"), 1),
      (List("imdb.com/title/tt0056058", "2009-02-17"), 1),
      (List("imdb.com/title/tt0308644", "2011-06-27"), 1))).toDF("url_date", "count")

以上将给出以下架构:

root
 |-- url_date: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- count: integer (nullable = true)

这可以很容易地保存到卡桑德拉表中。

对于转换,您可以在RDD上使用映射方法。

相关内容

  • 没有找到相关文章

最新更新