使用Apache Spark挂起的Cassandra中的批量插入和从Web Ser触发时上下文未关闭



我是个新手。我正试图使用spark-cassandra连接器将csv文件插入到cassandra表中,如下所示:文件在Hdfs中,我正在获取所有文件的路径,对于每个路径,我调用一个方法,该方法将csv数据转换为相应的cassandra数据类型,并创建一个准备好的语句,将数据绑定到准备好的声明,并将其添加到批处理中。最后,当它达到1000时,我执行该批处理。要点1.我使用的是Apache Cassandra 2.1.8和Spark 1.52.我使用Spark Context读取Csv文件3.我正在使用com.datatax.park.connectorcql.CassandraConnect创建与Cassandra的会话。

我有9个文件,每个文件的数据都放在卡桑德拉的一个表中。一切都很好所有的插入都按预期进行,当我在spark-submit上提交jar时,工作就完成了。

我面临的问题是,当通过web服务调用同一个Jar时(web服务调用脚本来调用Jar),其中一个文件的数据没有被插入,spark上下文也没有停止,因此作业永远在运行。

当我插入4个文件或5个文件时,即使通过web服务,一切都很好。但总的来说,它悬着,我在其中一张表中少了10张唱片,上下文并没有停止。

这很奇怪,因为当我在spark提交上直接提交jar时,一切都很好,通过web服务我面临这个问题,奇怪的是,bcz甚至web服务也将作业提交给同一个spark提交。

这是我的代码

package com.pz.loadtocassandra;

 import java.io.File;
import java.io.IOException;
 import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
 import java.util.List;
import java.util.Map;
import java.util.logging.ConsoleHandler;
import java.util.logging.FileHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.InvalidTypeException;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraRow;
import com.pz.shared.UnicodeBOMInputStream;
import com.pz.shared.fileformat.Header;
import com.pz.shared.mr.fileformat.MRFileFormats.CSVInputFormat;
import com.pz.shared.mr.fileformat.MRFileFormats.TextArrayWritable;

 public class LoadToCassandra {
public static final String STUDYID = "STUDYID";
public static final String PROJECTNAME = "PROJECTNAME";
public static final String FILEID = "FILEID";
public static int count = 0;
public static final String FILE_SERPERATOR = "/";
public static Logger log = Logger.getLogger(LoadToCassandra.class.getName());
public static void main(String[] args) {
        String propFileLoc = args[0];
        String hdfsHome = args[1];
        String hdfs_DtdXmlPath = args[2];
        String hdfs_NormalizedDataPath = args[3];
        run(propFileLoc, hdfsHome,     hdfs_DtdXmlPath,hdfs_NormalizedDataPath);
    } catch (IOException exception) {
        log.log(Level.SEVERE, "Error occur in FileHandler.", exception);
    }
}
public static void run(String propFileLoc, String hdfsHome,
        String hdfs_DtdXmlPath, String hdfs_NormalizedDataPath) {
    JavaSparkContext ctx = null;
    FileSystem hadoopFs = null;
    try {
        PropInitialize.initailizeConfig(propFileLoc);
        //setting spark context
        ctx = setSparkContext(propFileLoc);
        ParseDtdXml.parseDTDXML(hdfsHome, hdfs_DtdXmlPath);
        Configuration configuration = setHadoopConf();
        hadoopFs = getHadoopFs(hdfsHome, configuration);
        FileStatus[] fstat = hadoopFs.listStatus(new Path(hdfs_NormalizedDataPath));
        //Getting the csv paths
        Path[] paths = FileUtil.stat2Paths(fstat);
        log.info("PATH.size - " + paths.length);
        for (Path path : paths) {
            log.info("path is : "+path.toString());
            loadToCassandra(propFileLoc, path, configuration,hdfsHome, ctx);
        }

    } catch (IOException | URISyntaxException e) {
        log.log(Level.SEVERE, "run method", e);
        e.printStackTrace();
    } finally {
        log.info("finally ");
        if (ctx!= null) {
            ctx.stop();
            System.out.println("SC Stopped");
        }
        if (hadoopFs != null) {
            try {
                hadoopFs.close();
            } catch (IOException e) {
                log.log(Level.SEVERE, "run method", e);
            }
        }
    }
}

// input : 1. String hdfs home ,
// 2. Configuration hadoop conf object
// returns : hadoop File System object
private static FileSystem getHadoopFs(String hdfsHome,
        Configuration configuration) throws IOException, URISyntaxException {
    return FileSystem.get(new URI(hdfsHome), configuration);
}
// input : no inputs
// process : sets hadoop config parameters
// output : retuns hadoop conf object
private static Configuration setHadoopConf() throws IOException,
        URISyntaxException {
    Configuration configuration = new Configuration();
    configuration.setBoolean("csvFileFormat.encoded.flag", true);
    configuration.set("csvinputformat.token.delimiter", ",");
    return configuration;
}
// input : string Properties File Location
// process : creates and sets the configurations of spark context
// retuns : JavaSparkContext object with configurations set to it.
private static JavaSparkContext setSparkContext(String propFileLoc) {
    PropInitialize.initailizeConfig(propFileLoc);
    SparkConf conf = new SparkConf();
    conf.set("spark.serializer",
            "org.apache.spark.serializer.KryoSerializer");
    conf.setAppName("Loading Data");
    conf.setMaster(PropInitialize.spark_master);
    conf.set("spark.cassandra.connection.host",
            PropInitialize.cassandra_hostname);
    conf.setJars(PropInitialize.external_jars);
    return new JavaSparkContext(conf);
}
private static void loadToCassandra(String propFileLoc, Path sourceFileHdfsPath,
        Configuration hadoopConf, String hdfsHome,JavaSparkContext ctx) {
    System.out.println("File :: " + sourceFileHdfsPath.toString());
    FileSystem hadoopFs = null;
    PropInitialize.initailizeConfig(propFileLoc);
    String cassKeyspaceName = PropInitialize.cass_keyspace_name;
    log.info("entered here for file "+sourceFileHdfsPath.toString());
    final String strInputFileName = StringUtils.split(
            sourceFileHdfsPath.getName(), "#")[0].toLowerCase();
    final String strTableNameInCass = StringUtils.split(
            sourceFileHdfsPath.getName(), "-")[0].split("#")[1]
            .toLowerCase();
    final String strSourceFilePath = sourceFileHdfsPath.toString();
    try {
        hadoopFs = getHadoopFs(hdfsHome, hadoopConf);
        //getting the cassandra connection using spark conf
        final CassandraConnector connector = getCassandraConnection(ctx);
         final JavaRDD<CassandraRow> cassTableObj=getCassTableObj(ctx,cassKeyspaceName,strTableNameInCass);
        final Map<String, List<String>> tabColMapWithColTypes1 = ParseDtdXml.tabColMapWithColTypes;
        final String headersUpdated;
        final String headers;
        UnicodeBOMInputStream ubis = new UnicodeBOMInputStream(
                hadoopFs.open(sourceFileHdfsPath));
        Header CsvHeader = Header.getCSVHeader(ubis, ",");
        if (!strTableNameInCass.equalsIgnoreCase("PCMASTER")) {
            String fString = "";
            for (int i = 0; i < CsvHeader.size() - 1; i++) {
                fString = fString + CsvHeader.get(i).ColumnName + ",";
            }
            fString = fString
                    + CsvHeader.get(CsvHeader.size() - 1).ColumnName;
            headers = fString; // StringUtils.join(stringArr.toString(),",");
            headersUpdated = strTableNameInCass.toUpperCase() + "ID,"
                    + headers;
        } else {
            String[] stringArr = new String[CsvHeader.size()];
            String fString = "";
            for (int i = 0; i < CsvHeader.size() - 1; i++) {
                // stringArr[i] = CsvHeader.get(i).ColumnName;
                fString = fString + CsvHeader.get(i).ColumnName + ",";
            }
            fString = fString
                    + CsvHeader.get(CsvHeader.size() - 1).ColumnName;
            headers = StringUtils.join(stringArr.toString(), ",");
            headersUpdated = fString;
        }
        ubis.close();

        //Reading the file using spark context
        JavaPairRDD<LongWritable, TextArrayWritable> fileRdd = ctx
                .newAPIHadoopFile(strSourceFilePath, CSVInputFormat.class,
                        LongWritable.class, TextArrayWritable.class,
                        hadoopConf);

        final long recCount = fileRdd.count();

        final String[] actCols = headersUpdated.split(",");
        final LinkedHashMap<Object, String> mapOfColNameAndType = new LinkedHashMap<Object, String>();
        final List<String> colNameAndType = tabColMapWithColTypes1
                .get(strTableNameInCass.toUpperCase());
        for (int i = 0; i < actCols.length; i++) {
            if (colNameAndType.contains(actCols[i] + " " + "text")) {
                int indexOfColName = colNameAndType.indexOf(actCols[i]
                        + " " + "text");
                mapOfColNameAndType.put(i,
                        colNameAndType.get(indexOfColName).split(" ")[1]);
            } else if (colNameAndType
                    .contains(actCols[i] + " " + "decimal")) {
                int indexOfColName = colNameAndType.indexOf(actCols[i]
                        + " " + "decimal");
                mapOfColNameAndType.put(i,
                        colNameAndType.get(indexOfColName).split(" ")[1]);
            } else {
                continue;
            }
        }
        //creates the query for prepared statement
        final String makeStatement = makeSt(cassKeyspaceName,
                strTableNameInCass, actCols);
        final long seqId1 = cassTableObj.count();

        //calling map on the fileRdd 
        JavaRDD<String> data = fileRdd.values().map(
                new Function<TextArrayWritable, String>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;
                    Session session;
                    boolean isssession = false;
                    PreparedStatement statement;
                    BatchStatement batch;
                    int lineCount = 0;
                    long seqId = seqId1;
                    /*for each line returned as an TextArrayWritable convert each cell the corresponding
                     * bind the data to prepared statement
                     * add it to batch
                     */
                    @Override
                    public String call(TextArrayWritable tup)
                            throws Exception {
                        seqId++;
                        lineCount++;
                        log.info("entered here 3 for file "+strSourceFilePath.toString());
                        String[] part = tup.toStrings();

                        Object[] parts = getDataWithUniqueId(
                                strTableNameInCass, part);

                        //For each file
                        //Creates the session
                        //creates the PreparedStatement
                        if (!isssession) {
                            session = connector.openSession();
                            statement = session.prepare(makeStatement);
                            log.info("entered here 4 for file "+strSourceFilePath.toString());
                            // System.out.println("statement :" +
                            // statement);
                            isssession = true;
                            batch = new BatchStatement();
                        }
                        List<Object> typeConvData = new ArrayList<Object>();
                        for (int i = 0; i < parts.length; i++) {
                            String type = mapOfColNameAndType.get(i);
                            try {
                                if (type.equalsIgnoreCase("text")) {
                                    typeConvData.add(parts[i]);
                                } else {
                                    // parts[i] =
                                    // parts[i].toString().replace(""",
                                    // "");
                                    // check if the String the has to
                                    // converted to a BigDecimal is any
                                    // positive or negetive integer or not.
                                    // if its not a positive integer or
                                    // negative forcefully convert it to
                                    // zero (avoiding NumberFormatException)
                                    if (!((String) parts[i])
                                            .matches("-?\d+")) {
                                        parts[i] = "0";
                                    }
                                    long s = Long
                                            .valueOf((String) parts[i]);
                                    typeConvData.add(BigDecimal.valueOf(s));
                                }
                            } catch (NullPointerException e) {
                                log.log(Level.SEVERE, "loadToCass method", e);

                            } catch (NumberFormatException e) {
                                log.log(Level.SEVERE, "loadToCass method", e);
                            } catch (InvalidTypeException e) {
                                log.log(Level.SEVERE, "loadToCass method", e);
                            }
                        }
                                                    List<Object> data = typeConvData;
                        //bind data to query
                        final BoundStatement query = statement.bind(data
                                .toArray(new Object[data.size()]));
                        //add query to batch
                        batch.add(query);
                        int count = LoadToCassandra.count;
                        //when count is 1k execute batch
                        if (count == 1000) {
                            log.info("entered here 5 for file "+strSourceFilePath.toString());
                            log.info("batch done");
                            session.execute(batch);
                            LoadToCassandra.count = 0;
                            batch = new BatchStatement();
                            return StringUtils.join(tup.toStrings());
                        }
                        //if its the last batch and its not of size 1k
                        if (lineCount == (recCount))
                            {
                            log.info("Last Batch");
                            session.executeAsync(batch);
                            log.info("entered here 6 for file "+strSourceFilePath.toString());
                            //session.execute(batch);
                            session.close();
                            log.info("Session closed");
                        }
                        LoadToCassandra.count++;
                        return StringUtils.join(tup.toStrings());
                    }
                    private Object[] getDataWithUniqueId(
                            String strTableNameInCass, String[] part) {
                        Object[] parts = null;
                        ArrayList<String> tempArraylist = new ArrayList<String>();
                        if (!strTableNameInCass
                                .equalsIgnoreCase("PCMASTER")) {
                            for (int i = 0; i < part.length; i++) {
                                if (i == 0) {
                                    tempArraylist.add(0,
                                            String.valueOf(seqId));
                                }
                                tempArraylist.add(part[i]);
                            }
                            parts = tempArraylist.toArray();
                        } else {
                            parts = part;
                        }
                        return parts;
                    }
                });
        data.count();
        hadoopFs.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
private static JavaRDD<CassandraRow> getCassTableObj(
        JavaSparkContext ctx, String cassKeyspaceName,
        String strTableNameInCass) {
    return javaFunctions(ctx)
            .cassandraTable(cassKeyspaceName,
                    strTableNameInCass.toLowerCase());
}
private static CassandraConnector getCassandraConnection(
        JavaSparkContext ctx) {
    return CassandraConnector.apply(ctx.getConf());
}
private static String makeSt(String keyspace, String tabName,
        String[] colNames) {
    StringBuilder sb = new StringBuilder();
    sb.append("INSERT INTO " + keyspace + "." + tabName + " ( ");
    List<String> vars = new ArrayList<>();
    for (int i = 0; i < (colNames.length - 1); i++) {
        sb.append(colNames[i] + ",");
        vars.add("?");
    }
    vars.add("?");
    sb.append(colNames[colNames.length - 1] + " ) values ( "
            + StringUtils.join(vars, ",") + " ) ");
    return sb.toString();
   }}

有人能告诉我造成这个问题的原因是什么以及如何解决吗。感谢

将数据插入cassandra后,调用ctx.stop()方法,它将停止spark上下文。

相关内容

最新更新