有什么方法可以将Spark-Cassandra与Springboot 2集成



我正在使用jhipster生成应用程序。我正在尝试将Spark集成到Spring Boot中。但是某种程度上行不通。我是春季靴子和火花的新手。我没有得到任何例外或错误,但也没有获得输出。如果我使用Java-Spark-Cassandra,它的运行良好。有人可以告诉我我的代码怎么了?

//SparkService.java
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
public class SparkService{
    @Autowired
    private JavaSparkContext javaSparkContext;
    @Autowired
    private SparkSession sparkSession;
    @Value("${spring.data.cassandra.keyspace-name}")
    private String CassandraKeyspace;
    @Value("${cassandra.table}")
    private String CassandraTable;
        public void getAllOrders() {
            try{
            Map<String, String> options = new HashMap<String, String>();
            options.put("keyspace", CassandraKeyspace);
            options.put("table", CassandraTable);
            sparkSession
                .read()
                .format("org.apache.spark.sql.cassandra")
                .options(options)
                .load()
                .createOrReplaceTempView(CassandraTable);
            
            Dataset<Row> df = sparkSession.sql("select * from instruments");
            df.show();
        }
        catch(Exception ex){
        ex.printStackTrace();
    }
    }
}
//SparkServiceImpl.java
package com.celeritio.sparkgateway.spark;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
public class SparkServiceImpl {
    @Autowired
    SparkService sparkService;
    @EventListener(ContextRefreshedEvent.class)
    public void myMain(){
        System.out.println("myMain");
        sparkService.getAllOrders();
    }
}
//SparkConfiguration.java
@Configuration
@PropertySource("classpath:sparkconfig.properties")
public class SparkConfiguration {
    private static Logger log = LoggerFactory.getLogger(SparkConfiguration.class.getName());
    @Value("${spark.master}")
    private String sparkMaster;
    @Value("${spring.data.cassandra.keyspace-name}")
    private String cassandraKeyspace;
    @Value("${cassandra.table}")
    private String cassandraTable;
    @Value("${spring.data.cassandra.contact-points}")
    private String cassandraHost;
    @Value("${spring.data.cassandra.port}")
    private String cassandraPort;
    @Bean
    public SparkConf sparkConf() {
        SparkConf conf = new SparkConf(true)
            .set("spark.cassandra.connection.host",cassandraHost)
            .set("spark.cassandra.connection.port", cassandraPort)
            .setMaster(sparkMaster)
            .setAppName("SparkConfiguration");
        System.out.println("SparkConf"+conf.isTraceEnabled());
        return conf;
    }
    @Bean
    public JavaSparkContext javaSparkContext() {
        log.info("Connecting to spark with master Url: {}, and cassandra host: {}",
            sparkMaster, cassandraHost);
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf());
        log.debug("spark context created");
        return javaSparkContext;
    }
    @Bean
    public SparkSession sparkSession() {
        return SparkSession
            .builder()
            .config(sparkConf())
            .sparkContext(javaSparkContext().sc())
            .appName("SparkConfiguration")
            .getOrCreate();
    }
}

我认为仅在记录框架中的问题,Spark使用log4j,而Spring使用了logback。

我记得,与Spark集成时,我们需要从弹簧中删除从春季的记录依赖。

最新更新