Spark:单个应用程序最佳实践中的两个 SparkContext



我想我今天有一个有趣的问题要问大家。 在下面的代码中,你会注意到我有两个SparkContexts,一个用于SparkStreaming,另一个是普通的SparkContext。 根据最佳实践,Spark应用程序中应该只有一个SparkContext,即使可以通过配置中的allowMultipleContexts来规避这一点。

问题是,我需要从 hive 和 Kafka 主题中检索数据来执行一些逻辑,每当我提交应用程序时,它显然都会返回"不能在 JVM 上运行 2 个 Spark 上下文"。

我的问题是,有没有比我现在正确的方法做到这一点?

public class MainApp {
private final String logFile= Properties.getString("SparkLogFileDir");
private static final String KAFKA_GROUPID = Properties.getString("KafkaGroupId");
private static final String ZOOKEEPER_URL = Properties.getString("ZookeeperURL");
private static final String KAFKA_BROKER = Properties.getString("KafkaBroker");
private static final String KAFKA_TOPIC = Properties.getString("KafkaTopic");
private static final String Database = Properties.getString("HiveDatabase");
private static final Integer KAFKA_PARA = Properties.getInt("KafkaParrallel");
public static void main(String[] args){
    //set settings
    String sql="";
    //START APP
    System.out.println("Starting NPI_TWITTERAPP...." + new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
    System.out.println("Configuring Settings...."+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
    SparkConf conf = new SparkConf()
            .setAppName(Properties.getString("SparkAppName"))
            .setMaster(Properties.getString("SparkMasterUrl"));
    //Set Spark/hive/sql Context
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(5000));
    JavaHiveContext HiveSqlContext = new JavaHiveContext(sc);
    //Check if Twitter Hive Table Exists
    try {
        HiveSqlContext.sql("DROP TABLE IF EXISTS "+Database+"TWITTERSTORE");
        HiveSqlContext.sql("CREATE TABLE IF NOT EXISTS "+Database+".TWITTERSTORE "
        +" (created_at String, id String, id_str String, text String, source String, truncated String, in_reply_to_user_id String, processed_at String, lon String, lat String)"
        +" STORED AS TEXTFILE");
    }catch(Exception e){
        System.out.println(e);
    }
    //Check if Ivapp Table Exists
    sql ="CREATE TABLE IF NOT EXISTS "+Database+".IVAPPGEO AS SELECT DISTINCT a.LATITUDE, a.LONGITUDE, b.ODNCIRCUIT_OLT_CLLI, b.ODNCIRCUIT_OLT_TID, a.CITY, a.STATE, a.ZIP FROM "
            +Database+".T_PONNMS_SERVICE B, "
            +Database+".CLLI_LATLON_MSTR A WHERE a.BID_CLLI = substr(b.ODNCIRCUIT_OLT_CLLI,0,8)";
    try {
        System.out.println(sql + new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
        HiveSqlContext.sql(sql);
        sql = "SELECT LATITUDE, LONGITUDE, ODNCIRCUIT_OLT_CLLI, ODNCIRCUIT_OLT_TID, CITY, STATE, ZIP FROM "+Database+".IVAPPGEO";
        JavaSchemaRDD RDD_IVAPPGEO = HiveSqlContext.sql(sql).cache();
    }catch(Exception e){
        System.out.println(sql + new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
    }
    //JavaHiveContext hc = new JavaHiveContext();
    System.out.println("Retrieve Data from Kafka Topic: "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    topicMap.put(KAFKA_TOPIC,KAFKA_PARA);
    JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(
                jssc, KAFKA_GROUPID, ZOOKEEPER_URL, topicMap);
    JavaDStream<String> json = messages.map(
            new Function<Tuple2<String, String>, String>() {
                private static final long serialVersionUID = 42l;
                @Override
                public String call(Tuple2<String, String> message) {
                    return message._2();
                }
            }
    );
    System.out.println("Completed Kafka Messages... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));

    System.out.println("Filtering Resultset... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
    JavaPairDStream<Long, String> tweets = json.mapToPair(
            new TwitterFilterFunction());
    JavaPairDStream<Long, String> filtered = tweets.filter(
            new Function<Tuple2<Long, String>, Boolean>() {
                private static final long serialVersionUID = 42l;
                @Override
                public Boolean call(Tuple2<Long, String> tweet) {
                    return tweet != null;
                }
            }
    );
    JavaDStream<Tuple2<Long, String>> tweetsFiltered = filtered.map(
            new TextFilterFunction());
    tweetsFiltered = tweetsFiltered.map(
            new StemmingFunction());
    System.out.println("Finished Filtering Resultset... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));

    System.out.println("Processing Sentiment Data... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
    //calculate postive tweets
    JavaPairDStream<Tuple2<Long, String>, Float> positiveTweets =
            tweetsFiltered.mapToPair(new PositiveScoreFunction());
    //calculate negative tweets
    JavaPairDStream<Tuple2<Long, String>, Float> negativeTweets =
            tweetsFiltered.mapToPair(new NegativeScoreFunction());
    JavaPairDStream<Tuple2<Long, String>, Tuple2<Float, Float>> joined =
            positiveTweets.join(negativeTweets);
    //Score tweets
    JavaDStream<Tuple4<Long, String, Float, Float>> scoredTweets =
            joined.map(new Function<Tuple2<Tuple2<Long, String>,
                    Tuple2<Float, Float>>,
                    Tuple4<Long, String, Float, Float>>() {
                private static final long serialVersionUID = 42l;
                @Override
                public Tuple4<Long, String, Float, Float> call(
                        Tuple2<Tuple2<Long, String>, Tuple2<Float, Float>> tweet)
                {
                    return new Tuple4<Long, String, Float, Float>(
                            tweet._1()._1(),
                            tweet._1()._2(),
                            tweet._2()._1(),
                            tweet._2()._2());
                }
            });
    System.out.println("Finished Processing Sentiment Data... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
    System.out.println("Outputting Tweets Data to flat file "+Properties.getString("HdfsOutput")+" ... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
    JavaDStream<Tuple5<Long, String, Float, Float, String>> result =
            scoredTweets.map(new ScoreTweetsFunction());
    result.foreachRDD(new FileWriter());
    System.out.println("Outputting Sentiment Data to Hive... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));

    jssc.start();
    jssc.awaitTermination();
}

}

创建 SparkContext

您可以先创建或不创建 SparkConf 对象的 SparkContext 实例。

获取现有或创建新的 SparkContext(getOrCreate 方法)

getOrCreate(): SparkContext
getOrCreate(conf: SparkConf): SparkContext

SparkContext.getOrCreate 方法允许您获取现有的 SparkContext 或创建一个新的 SparkContext。

import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate()
// Using an explicit SparkConf object
import org.apache.spark.SparkConf
val conf = new SparkConf()
  .setMaster("local[*]")
  .setAppName("SparkMe App")
val sc = SparkContext.getOrCreate(conf)

参考这里 - https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sparkcontext.html

显然,如果我在执行JavaStreaming Context之前使用sc.close()关闭原始SparkContext,它可以完美运行,没有错误或问题。

您可以使用

单例对象ContextManager来处理要提供的上下文。

public class ContextManager {
private static JavaSparkContext context;
private static String currentType;
private ContextManager() {}
public static JavaSparkContext getContext(String type) {
if(type == currentType && context != null) {
   return context;
}
else if (type == "streaming"){
     .. clean up the current context ..
     .. initialize the context to streaming context ..
     currentType = type;
}
else {
    ..clean up the current context..
    ... initialize the context to normal context ..
    currentType = type;

  }
 return context;
 }
}

有一些问题,例如在快速切换上下文的项目中,开销会非常大。

您可以从 JavaStreamingSparkContext 访问 SparkContext,并在创建其他上下文时使用该引用。

SparkConf sparkConfig = new SparkConf().setAppName("foo");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, Duration.seconds(30));
SqlContext sqlContext = new SqlContext(jssc.sparkContext());

相关内容

  • 没有找到相关文章

最新更新