无法从 pyspark 的 cassandra 数据库加载信息



我有这个代码:

import os
from pyspark import SparkContext,SparkFiles,SQLContext,SparkFiles
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import col
secure_bundle_file=os.getcwd()+'\secure-connect-dbtest.zip'
sparkSession =SparkSession.builder.appName('SparkCassandraApp')
.config('spark.cassandra.connection.config.cloud.path',secure_bundle_file)
.config('spark.cassandra.auth.username', 'test')
.config('spark.cassandra.auth.password','testquart')
.config('spark.dse.continuousPagingEnabled',False)
.master('local[*]').getOrCreate()
data = sparkSession.read.format("org.apache.spark.sql.cassandra")
.options(table="tbthesis", keyspace="test").load()
data.count()

我尝试做的是连接到我的数据库并检索我的数据。代码与数据库连接良好,但一旦到达读取行,就会显示:

Exception has occurred: Py4JJavaError
An error occurred while calling o48.load.
: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra. 
Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:674)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:728)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:203)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)

有人能帮我吗?

此外,我想添加一些关于这个代码的更多细节:

我想做的是测试spark从我的数据库中读取200万条记录的持续时间,正常的python cassandra驱动程序在大约1小时内读取200万个记录(使用SimpleStatement(,所以在这里我想知道使用spark对这200万条记录的持续时间。

感谢

类路径中没有Spark Cassandra Connector包,因此找不到相应的类。

你需要用--packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1开始你的工作(spark-submitpyspark(。

如果你真的只想从python代码中完成,那么你可以在创建SparkSession时尝试添加.config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.11:2.5.1"),但如果类路径已经实例化,它可能并不总是有效的。

p.S.Spark通常应该优于SimpleStatement,即使在本地模式下也是如此,尽管Spark在分布式模式下确实很出色。您真的不应该使用SimpleStatement来重复执行仅在参数上不同的查询——您应该为此使用准备好的语句。请阅读《使用DataStax驱动程序开发应用程序》指南。DataStax还提供了Cassandra的第三版。《最终指南》这本书刚刚出版,我建议你读一读。

我的问题解决了。

问题不在于java、hadoop或spark,而是连接器的下载过程,但我无法下载任何东西,因为我的这个jar的缓存文件夹上有东西。

我的spark下载外部jar的文件夹是C:\Users\UlysesRico.ivy2\jars。它的缓存是C:\Users\UlysesRico.ivy2\cache

我只是删除了缓存和罐子文件夹,然后我做了:

pyspark--packages com.datatax.spark:spark-cassandra-connector_2.11:2.5.1瞧,我下载了所有的罐子和缓存信息。

问题终于解决了。

相关内容

  • 没有找到相关文章

最新更新