>我在火花中有一个rdd;
[((u'imdb.com/title/tt1389137', '2009-04-18'), 1),
((u'imdb.com/title/tt0829482', '2010-09-02'), 1),
((u'imdb.com/title/tt0167260', '2010-04-12'), 1),
((u'imdb.com/title/tt1682180', '2009-11-24'), 1),
((u'imdb.com/title/tt1124035', '2011-02-24'), 1),
((u'imdb.com/title/tt0056058', '2009-02-17'), 1),
((u'imdb.com/title/tt0308644', '2011-06-27'), 1),
...]
我将其转换为数据帧。架构;
root
|-- url_date: struct (nullable = true)
| |-- _1: string (nullable = true)
| |-- _2: string (nullable = true)
|-- count: long (nullable = true)
我想将这些数据保存到卡桑德拉中。我创建了一个表作为;
create table movies_tweets_per_day (url_date frozen<set<text>>,count int, PRIMARY KEY (url_date));
并尝试将数据写入卡桑德拉表;
movies_tweets_per_day.select("url_date", "count")
.write.format("org.apache.spark.sql.cassandra")
.options(table="movies_tweets_per_day", keyspace="mykeyspace")
.save(mode="overwrite")
这是我得到的错误;
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-24-5ff2c90cdbe3> in <module>()
6 movies_tweets_per_day.printSchema()
7
----> 8 movies_tweets_per_day.select("url_date", "count").write.format("org.apache.spark.sql.cassandra").options(table="movies_tweets_per_day", keyspace="assignment2").save(mode="overwrite")
/usr/local/lib/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/readwriter.pyc in save(self, path, format, mode, partitionBy, **options)
546 self.format(format)
547 if path is None:
--> 548 self._jwrite.save()
549 else:
550 self._jwrite.save(path)
/usr/local/lib/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/local/lib/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/lib/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o686.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 64.0 failed 1 times, most recent failure: Lost task 2.0 in stage 64.0 (TID 296, localhost, executor driver): com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [imdb.com/title/tt1772341,2010-12-14] of type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema to Set[AnyRef].
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:45)
at com.datastax.spark.connector.types.TypeConverter$CollectionConverter$$anonfun$convertPF$30.applyOrElse(TypeConverter.scala:608)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
at com.datastax.spark.connector.types.TypeConverter$CollectionConverter.convert(TypeConverter.scala:596)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter$$anonfun$convertPF$31.applyOrElse(TypeConverter.scala:812)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:795)
at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:56)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.convert(TypeConverter.scala:795)
at com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$readColumnValues$1.apply$mcVI$sp(SqlRowWriter.scala:26)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:24)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:198)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:65)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:426)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [imdb.com/title/tt1772341,2010-12-14] of type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema to Set[AnyRef].
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:45)
at com.datastax.spark.connector.types.TypeConverter$CollectionConverter$$anonfun$convertPF$30.applyOrElse(TypeConverter.scala:608)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
at com.datastax.spark.connector.types.TypeConverter$CollectionConverter.convert(TypeConverter.scala:596)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter$$anonfun$convertPF$31.applyOrElse(TypeConverter.scala:812)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:795)
at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:56)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.convert(TypeConverter.scala:795)
at com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$readColumnValues$1.apply$mcVI$sp(SqlRowWriter.scala:26)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:24)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:198)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
我尝试在卡桑德拉中创建表格;
create table movies_tweets_per_day (url_date frozen<map<text,text>>,count int, PRIMARY KEY (url_date));
create table movies_tweets_per_day (url_date list<frozen<text>>,count int, PRIMARY KEY (url_date));
他们中的任何一个都有效。如何解决问题?提前感谢!
将结构转换为 ArrayType。例
val df = sc.sparkContext.parallelize(List((List("imdb.com/title/tt1389137", "2009-04-18"), 1),
(List("imdb.com/title/tt0829482", "2010-09-02"), 1),
(List("imdb.com/title/tt0167260", "2010-04-12"), 1),
(List("imdb.com/title/tt1682180", "2009-11-24"), 1),
(List("imdb.com/title/tt1124035", "2011-02-24"), 1),
(List("imdb.com/title/tt0056058", "2009-02-17"), 1),
(List("imdb.com/title/tt0308644", "2011-06-27"), 1))).toDF("url_date", "count")
以上将给出以下架构:
root
|-- url_date: array (nullable = true)
| |-- element: string (containsNull = true)
|-- count: integer (nullable = true)
这可以很容易地保存到卡桑德拉表中。
对于转换,您可以在RDD上使用映射方法。