从 Java 代码向 Yarn 提交 Python 应用程序时出现问题



我目前正在使用Java(和Yarn客户端(将作业提交到yarn集群(在Ubunutu/Linux中(环境中。 提交 Java 程序时,一切正常。 当提交一个Python程序时,它似乎停滞在ACCEPTED状态并最终出错。

这是我用来提交程序的代码:

import org.apache.spark.SparkConf;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
/**
 * This class submits a SparkPi to a YARN from a Java client (as opposed 
 * to submitting a Spark job from a shell command line using spark-submit).
 * 
 * To accomplish submitting a Spark job from a Java client, we use 
 * the org.apache.spark.deploy.yarn.Client class described below:
 * 
 Usage: org.apache.spark.deploy.yarn.Client [options]
 Options:
  --jar JAR_PATH           Path to your application's JAR file (required in yarn-cluster mode)
  --class CLASS_NAME       Name of your application's main class (required)
  --primary-py-file        A main Python file
  --arg ARG                Argument to be passed to your application's main class. 
                   Multiple invocations are possible, each will be passed in order.
  --num-executors NUM      Number of executors to start (Default: 2)
  --executor-cores NUM     Number of cores per executor (Default: 1).
  --driver-memory MEM      Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)
  --driver-cores NUM       Number of cores used by the driver (Default: 1).
  --executor-memory MEM    Memory per executor (e.g. 1000M, 2G) (Default: 1G)
  --name NAME              The name of your application (Default: Spark)
  --queue QUEUE            The hadoop queue to use for allocation requests (Default: 'default')
  --addJars jars           Comma separated list of local jars that want SparkContext.addJar to work with.
  --py-files PY_FILES      Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.
  --files files            Comma separated list of files to be distributed with the job.
  --archives archives      Comma separated list of archives to be distributed with the job.
  How to call this program example:
     export SPARK_HOME="/opt/spark/spark-1.6.0"
     java -DSPARK_HOME="$SPARK_HOME" org.dataalgorithms.client.SubmitYARNJobFromJava 10
*/
public class SubmitPyYARNJobFromJava {
    public static void main(String[] args) throws Exception {
    long startTime = System.currentTimeMillis();
    // this is passed to SparkPi program
    String slices = args[0];  
    // String slices = "15";

    // String SPARK_HOME = System.getProperty("SPARK_HOME");
    String SPARK_HOME = "/opt/spark/spark-1.6.0";
    //
    pi(SPARK_HOME, slices); // ... the code being measured ... 
    //
    long elapsedTime = System.currentTimeMillis() - startTime;
    }
    static void pi(String SPARK_HOME, String slices) throws Exception {
    //       
    String[] args = new String[]{
        // application name
        "--name",
        "SparkPi-Python",
        // Python Program
        "--primary-py-file",
        SPARK_HOME + "/examples/src/main/python/pi.py",
        // number of executors
        "--num-executors",  
        "2",
        // driver memory
        "--driver-memory",
        "512m",
        // executor memory
        "--executor-memory", 
        "512m",
        // executor cores
        "--executor-cores", 
        "2",
        // argument 1 to my Spark program
        "--arg",
        slices,
        // argument 2 to my Spark program (helper argument to create a proper JavaSparkContext object)
        "--arg",
        "yarn-cluster"
    };
    Configuration config = new Configuration();
    //
    System.setProperty("SPARK_YARN_MODE", "true");
    //
    SparkConf sparkConf = new SparkConf();
    ClientArguments clientArgs = new ClientArguments(args, sparkConf);
    Client client = new Client(clientArgs, config, sparkConf);
    client.run();
    // done!
    }
}

我从命令行调用代码,如下所示:

java -cp *:. SubmitPyYARNJobFromJava 10

Pi.py 程序是为Hadoop-2.6.0构建的Spark-1.6.0附带的标准程序。

from __future__ import print_function
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
from random import random
from operator import add
from pyspark import SparkContext

if __name__ == "__main__":
    """
    Usage: pi [partitions]
    """
    sc = SparkContext(appName="PythonPi")
    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions
    def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 < 1 else 0
    count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))
    sc.stop()
提交

作业后,它看起来将正确提交。 它到达接受状态,然后停止。

log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/02/12 16:15:24 INFO Client: Requesting a new application from cluster with 1 NodeManagers
16/02/12 16:15:24 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (2048 MB per container)
16/02/12 16:15:24 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
16/02/12 16:15:24 INFO Client: Setting up container launch context for our AM
16/02/12 16:15:24 INFO Client: Setting up the launch environment for our AM container
16/02/12 16:15:24 INFO Client: Preparing resources for our AM container
16/02/12 16:15:25 INFO Client: Source and destination file systems are the same. Not copying file:/home/shunley/workspace/rabbitmq_java_rpc/spark-assembly-1.6.0-hadoop2.6.0.jar
16/02/12 16:15:25 INFO Client: Source and destination file systems are the same. Not copying file:/tmp/spark-7dbbb73f-e5bc-4fc1-a535-02a60cb68b16/__spark_conf__6244658246692860568.zip
16/02/12 16:15:25 INFO SecurityManager: Changing view acls to: shunley
16/02/12 16:15:25 INFO SecurityManager: Changing modify acls to: shunley
16/02/12 16:15:25 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(shunley); users with modify permissions: Set(shunley)
16/02/12 16:15:26 INFO Client: Submitting application 8 to ResourceManager
16/02/12 16:15:26 INFO YarnClientImpl: Submitted application application_1455307995259_0008
16/02/12 16:15:27 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
16/02/12 16:15:27 INFO Client: 
     client token: N/A
     diagnostics: N/A
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: default
     start time: 1455311726233
     final status: UNDEFINED
     tracking URL: http://shunley-VirtualBox:8088/proxy/application_1455307995259_0008/
     user: shunley
16/02/12 16:15:28 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
16/02/12 16:15:29 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
16/02/12 16:15:30 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
16/02/12 16:15:31 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
16/02/12 16:15:32 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
16/02/12 16:15:33 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)
16/02/12 16:15:34 INFO Client: Application report for application_1455307995259_0008 (state: ACCEPTED)

然后它最终失败,给出的错误如下:

16/02/12 16:43:56 INFO Client: Application report for application_1455307995259_0009 (state: FAILED)
16/02/12 16:43:56 INFO Client: 
     client token: N/A
     diagnostics: Application application_1455307995259_0009 failed 2 times due to AM Container for appattempt_1455307995259_0009_000002 exited with  exitCode: 10
For more detailed output, check application tracking page:http://shunley-VirtualBox:8088/proxy/application_1455307995259_0009/Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1455307995259_0009_02_000001
Exit code: 10
Stack trace: ExitCodeException exitCode=10: 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
    at org.apache.hadoop.util.Shell.run(Shell.java:455)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 10
Failing this attempt. Failing the application.
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: default
     start time: 1455313224060
     final status: FAILED
     tracking URL: http://shunley-VirtualBox:8088/cluster/app/application_1455307995259_0009
     user: shunley
Exception in thread "main" org.apache.spark.SparkException: Application application_1455307995259_0009 finished with failed status
    at org.apache.spark.deploy.yarn.Client.run(Client.scala:1029)
    at SubmitPyYARNJobFromJava.pi(SubmitPyYARNJobFromJava.java:101)
    at SubmitPyYARNJobFromJava.main(SubmitPyYARNJobFromJava.java:52)
16/02/12 16:43:56 INFO ShutdownHookManager: Shutdown hook called
16/02/12 16:43:56 INFO ShutdownHookManager: Deleting directory /tmp/spark-f5f15d4f-7383-4a97-b2ff-5734148d8a29

我尝试在谷歌上到处搜索这样的东西,但无济于事。 以前有人见过这样的事情吗? 我需要能够通过代码将 python 和 java 应用程序提交给 Yarn。 到目前为止,Python是唯一一个不起作用的。 我可以提交Java和Scala(还没有尝试过R(,但是我们的数据科学家用于机器学习的Python不起作用。

任何帮助或帮助的指示将不胜感激!

谢谢。

客户端参数缺少"--class"和"--py-files"。

对于提交python脚本,类应该是"org.apache.spark.deploy.PythonRunner"。此外,还应附加pyspark库和py4j,以便驱动程序可以正确导入Spark。

因此,您的客户端配置应如下所示:

String[] args = new String[]{
    // application name
    "--name",
    "SparkPi-Python",
    "--class",
    "org.apache.spark.deploy.PythonRunner",
    "--py-files",
    SPARK_HOME + "/python/lib/pyspark.zip,"+ SPARK_HOME +"/python/lib/py4j-0.9-src.zip",
    // Python Program
    "--primary-py-file",
    SPARK_HOME + "/examples/src/main/python/pi.py",
    // number of executors
    "--num-executors",  
    "2",
    // driver memory
    "--driver-memory",
    "512m",
    // executor memory
    "--executor-memory", 
    "512m",
    // executor cores
    "--executor-cores", 
    "2",
    // argument 1 to my Spark program
    "--arg",
    slices,
    // argument 2 to my Spark program (helper argument to create a proper JavaSparkContext object)
    "--arg",
    "yarn-cluster"
};

有什么东西打印到 STDERR 上吗? 通过运行纱线日志进行检查 -应用程序标识 appid

Oozie

现在支持Spark Action,也许你可以尝试使用Oozie而不是编写自己的提交者。

相关内容

  • 没有找到相关文章