在java中使用Apache Spark连接器从Cassandra检索数据时出错



使用sparkCassandraConnect从Cassandra表检索数据时遇到问题。我在Cassandra中创建了一个名为"ks"的名称空间和表"student"。表格如下:

id |名称

----+-----------

10|Catherine

我通过运行start-all.sh 在本地启动了Spark

然后我创建了这个类"SparkCassandraConnect",它有一个连接spark和Cassandra的命令。我想做的是从学生表中检索数据并将其打印在屏幕上。

我得到的错误是"java.lang.ClassNotFoundException:SparkCassandraConnector$Studentjava.net.URLClassLoader$1.run(URLClassLoader.java:372)java.net.URLClassLoader$1.run(URLClassLoader.java:361)java.security.AccessController.doPrivileged(本机方法)java.net.URLClassLoader.findClass(URLClassLoader.java:360)java.lang.ClassLoader.loadClass(ClassLoader.java:424)java.lang.ClassLoader.loadClass(ClassLoader.java:357)java.lang.Class.forName0(本机方法)java.lang.Class.forName(Class.java:340)

这是我的程序:

import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable;
import static com.datastax.spark.connector.CassandraJavaUtil.javaFunctions;
public class SparkCassandraConnector  implements Serializable {
public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("Simple Application");
    conf.setMaster("spark://127.0.0.1:7077");
    conf.set("spark.cassandra.connection.host", "127.0.0.1");
    String[] jars = new String[10];
    jars[0] = "~/.m2/repository/com/datastax/spark/spark-cassandra-connector-java_2.10/1.1.0-alpha4/spark-cassandra-connector-java_2.10-1.1.0-alpha4.jar";
    jars[1] = "~/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.0/cassandra-driver-core-2.1.0.jar";
    jars[3] = "~/.m2/repository/com/datastax/spark/spark-cassandra-connector_2.10/1.1.0-alpha4/spark-cassandra-connector_2.10-1.1.0-alpha4.jar";
    jars[4] = "~/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.0/cassandra-driver-core-2.1.0.jar";
    jars[5] = "~/.m2/repository/org/apache/cassandra/cassandra-thrift/2.1.0/cassandra-thrift-2.1.0.jar";
    jars[6] = "~/.m2/repository/org/apache/cassandra/cassandra-clientutil/2.1.0/cassandra-clientutil-2.1.0.jar";
    conf = conf.setJars(jars);
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("ks", "student", Student.class)
            .map(new org.apache.spark.api.java.function.Function<Student, String>() {
                @Override
                public String call(Student person) throws Exception {
                    return person.toString();
                }
            });
    System.out.println("Data as Person beans: n" + StringUtils.join(rdd.collect(), "n"));
}
public static class Student implements  Serializable{
    private Integer id;
    private String name;
    public Student(){
    }
    public Student(Integer id, String name) {
        this.id = id;
        this.name = name;
    }
    public Integer getId() {
        return id;
    }
    public void setId(Integer id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}

}

这是我的POM文件:

<dependencies>

    <!--Spark-->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.1.0-alpha4</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.1.0-alpha4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-catalyst_2.10</artifactId>
        <version>1.0.0</version>
    </dependency>
</dependencies>

在提供的jar中,缺少包含作业的jar,因此缺少Student.class。一个快速的修复方法是添加项目的./target文件夹中的jar。

另一种选择是将作业和所有依赖项打包到一个"uber jar"中,并将该uber jar用作唯一声明的jar。查看maven shade插件。

Jars也可以使用spark-submit --jars选项从命令行提供。

最新更新