将应用程序提交到火花群集:错误本地类不兼容



我写了一个java Web应用程序,它在我安装在我自己的计算机上的独立Spark上运行良好。但是当我提交到 Spark 群集服务器时,发生了错误。

服务器端集群环境:
爪哇:1.8.0_74
Hadoop: 2.6.0
火花:1.6.1
斯卡拉:2.10.5

我自己的电脑环境。
爪哇:1.8.0_20
Hadoop: 2.6.0
火花:1.6.1
斯卡拉:2.10.5

错误如下所示:

16/04/23 22:57:58 WARN TaskSetManager: Lost task 6.0 in stage 0.0 (TID 0, student80-x2): java.io.InvalidClassException: javax.servlet.GenericServlet; local class incompatible: stream classdesc serialVersionUID = 1, local class serialVersionUID = -8592279577370996712
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

我的Maven Pom.xml是:

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>3.8.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0</version>
    </dependency>
    <dependency>
        <groupId>javax.servlet</groupId>
        <artifactId>javax.servlet-api</artifactId>
        <version>3.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_2.10</artifactId>
        <version>1.3.0</version>
    </dependency>
</dependencies>
<build>
    <finalName>spark-demo-web</finalName>
</build>

我的代码如下所示:

protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    long t1,t2;
    t1=System.currentTimeMillis();
    //Spark Context
    //String input = "hdfs://localhost:9000/plane/2008c.csv";
    String input = "hdfs://10.42.1.61:9000/plane/2008c.csv";
    SparkConf conf = new SparkConf().setAppName("org.spark.flight").setMaster("spark://10.42.1.61:7077").set("spark.driver.allowMultipleContexts", "true");
    JavaSparkContext context = new JavaSparkContext(conf);
    SQLContext sqlContext=new org.apache.spark.sql.SQLContext(context);
    //Create DataFrame
    JavaRDD<Flight> flights=context.textFile(input).map(
            new Function<String, Flight>(){
                public Flight call(String line) throws Exception{
                    String[] parts=line.split(",");
                    Flight flight=new Flight();
                    flight.setaYear(Double.parseDouble(parts[0]));
                    flight.setbMonth(Double.parseDouble(parts[1]));
                    flight.setcDayofMonth(Double.parseDouble(parts[2]));
                    flight.seteUniqueCarrier(parts[8]);
                    flight.setdFlightNum(parts[9]);
                    flight.setfOrigin(parts[16]);
                    flight.setgDest(parts[17]);
                    if(parts[4].toString().equals("NA")) flight.setiDepTime(0.0);
                    else flight.setiDepTime(Double.parseDouble(parts[5]));
                    if(parts[6].toString().equals("NA")) flight.setjArrTime(0.0);
                    else flight.setjArrTime(Double.parseDouble(parts[6]));
                    if(parts[15].toString().equals("NA")) flight.setkDepDelay(0.0);
                    else flight.setkDepDelay(Double.parseDouble(parts[15]));
                    return flight;
                }
            });
    DataFrame schemaFlight=sqlContext.createDataFrame(flights, Flight.class);
    schemaFlight.registerTempTable("flights");
    //Select using SQL
    //String name = request.getParameter("name");  
    //String password = request.getParameter("password");
    int year=Integer.parseInt(req.getParameter("year")), todayYear=2008;
    int month=Integer.parseInt(req.getParameter("month")), todayMonth=3;
    int dayofmonth=Integer.parseInt(req.getParameter("dayofmonth")), todayDayOfMonth=15;
    String from=req.getParameter("from");
    String to=req.getParameter("to");
    //Select flights according to the users requirement
    //String SQL="Select * from flights where year="+year+" AND month="+month+" AND dayofMonth="+dayofmonth+" AND origin='"+from+"' AND dest='"+to+"'";
    String SQL2="Select * from flights WHERE fOrigin='"+ from + "' AND gDest='" + to +"' AND aYear="+ year +" AND bMonth=" + month + " AND cDayofMonth="+dayofmonth+" order by iDepTime";
    DataFrame selectFlights=sqlContext.sql(SQL2);
    selectFlights.registerTempTable("sf");
    selectFlights.cache();
    selectFlights.show();
    //Map flights data to POJO
    List<Flight> result=selectFlights.javaRDD().map(new Function<Row,Flight>(){
        public Flight call(Row row){
            Flight flight=new Flight();
            flight.setaYear(row.getDouble(0));
            flight.setbMonth(row.getDouble(1));
            flight.setcDayofMonth(row.getDouble(2));
            flight.seteUniqueCarrier(row.getString(4));
            flight.setdFlightNum(row.getString(3));
            flight.setfOrigin(row.getString(5));
            flight.setgDest(row.getString(6));
            flight.setiDepTime(row.getDouble(8));
            flight.setjArrTime(row.getDouble(9));
            return flight;
        }
    }).collect();
    //Select out the UniqueCarriers
    String SQL3="Select distinct eUniqueCarrier from sf";
    DataFrame uc=sqlContext.sql(SQL3);
    List<String> ucList=uc.javaRDD().map(new Function<Row, String>(){
        public String call(Row row){
            return row.getString(0);
        }
    }).collect();
    //Build Linear Regression for each Unique Carrier
    Iterator<String> ucIterator=ucList.iterator();
    while(ucIterator.hasNext()){
        String oneuc=ucIterator.next();
        //Select out flight history of the Unique Carrier "oneuc".
        System.out.println(oneuc);
        String SQL4="Select * from flights WHERE fOrigin='" + from +"' AND gDest='" + to +"' AND (aYear<"+todayYear+" OR (aYear="+todayYear+" and bMonth<"+ todayMonth+") OR (aYear="+todayYear+" AND bMonth="+todayMonth+" AND cDayofMonth<"+todayDayOfMonth+")) AND eUniqueCarrier='"+oneuc+"'";
        DataFrame history=sqlContext.sql(SQL4);
        //Parse the data
        JavaRDD<LabeledPoint> parsedData=history.javaRDD().map(
            new Function<Row,LabeledPoint> (){
            public LabeledPoint call(Row r) {
                double[] v=new double[4];
                v[0]=r.getDouble(0);
                v[1]=r.getDouble(1);
                v[2]=r.getDouble(2);
                v[3]=r.getDouble(8);
                System.out.println(r.getDouble(10)+ ": "+ v[0] + " "  + v[1] + " " + v[2] + " " + v[3]);
                return new LabeledPoint(r.getDouble(10), Vectors.dense(v));
            }
        });
        parsedData.cache();
        System.out.println("**********"+parsedData.count()+"***********");
        // Build the model
         int numIterations = 100;
         double stepSize = 0.00000001;
         //parsedData.rdd()
         final LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations,stepSize);
         System.out.println("***********Prediction*************");
         //Predict
         Iterator<Flight> resultIterator=result.iterator();
            Flight tempf;
            while(resultIterator.hasNext()){
                tempf=resultIterator.next();
                if(tempf.geteUniqueCarrier().equals(oneuc)){
                    double[] v=new double[4];
                    v[0]=tempf.getaYear();
                    v[1]=tempf.getbMonth();
                    v[2]=tempf.getcDayofMonth();
                    v[3]=tempf.getiDepTime();
                    double prediction=model.predict( Vectors.dense(v));
                    tempf.sethPredictDelay(prediction);
                }
            }
    }
Spark

1.6.1打包版本的servlet API和一个打包到jar中的是不同的。火花用途

org.eclipse.jetty.orbit » javax.servlet 3.0.0.v201112011016

http://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.6.1

如果可能的话,尽量不要将 Servlet API 打包到你的 jar 中

<dependency>
   <groupId>org.eclipse.jetty.orbit</groupId>
   <artifactId>javax.servlet</artifactId>
   <version>3.0.0.v201112011016</version>
   <scope>provided</scope>
 </dependency>

相关内容