使用 Py4J 调用一个获取 JavaSparkContext 并返回 JavaRDD <Integer>的方法



我正在寻找一些帮助或示例代码来说明pyspark调用用户在spark本身之外编写的Java代码,该代码从Python获取Spark上下文,然后返回用Java构建的RDD。

为了完整起见,我使用的是Py4J 0.81,Java 8,Python 2.7和spark 1.3.1

这是我用于Python一半的内容:

import pyspark
sc = pyspark.SparkContext(master='local[4]',
                          appName='HelloWorld')
print "version", sc._jsc.version()
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()
print gateway.entry_point.getRDDFromSC(sc._jsc)

Java 部分是:

import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import py4j.GatewayServer;
public class HelloWorld 
{
   public JavaRDD<Integer> getRDDFromSC(JavaSparkContext jsc)
   {
      JavaRDD<Integer> result = null;
      if (jsc == null)
      {
         System.out.println("XXX Bad mojo XXX");
         return result;
      }
      int n = 10;
      List<Integer> l = new ArrayList<Integer>(n);
      for (int i = 0; i < n; i++) 
      {
         l.add(i);
      }
      result = jsc.parallelize(l);
      return result;
   }
   public static void main(String[] args)
   {
      HelloWorld app = new HelloWorld();
      GatewayServer server = new GatewayServer(app);
      server.start();
   }
}

运行在 Python 端产生:

$ spark-1.3.1-bin-hadoop1/bin/spark-submit main.py
version 1.3.1
sc._jsc <class 'py4j.java_gateway.JavaObject'>
org.apache.spark.api.java.JavaSparkContext@50418105
None

Java 端报告:

$ spark-1.3.1-bin-hadoop1/bin/spark-submit --class "HelloWorld" --master local[4] target/hello-world-1.0.jar
XXX Bad mojo XXX

问题似乎是我没有正确地将JavaSparkContext从 Python 传递到 Java。 当我从python sc._scj.sc()使用时,JavaRDD null的相同故障。

调用使用 Python 中的 Spark 的用户定义的 Java 代码的正确方法是什么?

所以我

在我正在为闪闪发光的熊猫工作的分支中有一个例子 分支住在 https://github.com/holdenk/sparklingpandas/tree/add-kurtosis-support,PR 在 https://github.com/sparklingpandas/sparklingpandas/pull/90。

就目前而言,看起来您有两个不同的网关服务器,这似乎可能会导致一些问题,相反,您可以使用现有的网关服务器并执行以下操作:

sc._jvm.what.ever.your.class.package.is.HelloWorld.getRDDFromSC(sc._jsc)

假设您也将其设置为静态方法。

相关内容

  • 没有找到相关文章

最新更新