0

Я новый, чтобы искра. Я пытаюсь вставить файлы csv в таблицу cassandra с помощью разъема spark-cassandra, как показано ниже: Файлы находятся в Hdf, и я получаю Пути всех файлов и для каждого пути, который я называю методом, который преобразует данные csv в corressponding cassandra типы данных и создает подготовленный оператор, привязывает данные к подготовленному оператору и добавляет их в пакет. Наконец, я выполняю пакет, когда его 1000. Ключевые моменты 1. Я использую Apache Cassandra 2.1.8 и Spark 1.5 2. Я прочитал файлы Csv, используя Spark Context 3. Я использую com.datastax.spark. connector.cql.CassandraConnector для создания сеанса с Cassandra.Batch Insert In Cassandra с использованием Apache Spark висит и контекст не закрывается при запуске из Web Ser

У меня есть 9 файлов, каждый файл данных идет в таблицу в cassandra. Каждая вещь прекрасно работает. Все вставки происходят так, как ожидалось, и работа заканчивается, когда я отправляю банку на искре submit.

Проблема, с которой я столкнулся, - это когда один и тот же Jar вызывается через веб-службу (веб-служба вызывает скрипт для вызова банку). Один из файлов не вставлен и контекст искры не останавливается из-за чего рабочие места навсегда работают.

Когда я вставляю 4 файла или 5 файлов, все работает нормально даже через веб-сервис. Но все вместе он висит, и я получаю 10 записей меньше в одной из таблиц, и контекст не останавливается.

Его странно, потому что, когда я представляю банку на искре, прямо все работает нормально, и через веб-сервис я сталкиваюсь с этой проблемой, ее странный bcz даже веб-сервис подчиняет работу той же самой отправке.

Вот мой код

package com.pz.loadtocassandra; 



import java.io.File; 
import java.io.IOException; 
import java.math.BigDecimal; 
import java.net.URI; 
import java.net.URISyntaxException; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.LinkedHashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.logging.ConsoleHandler; 
import java.util.logging.FileHandler; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileStatus; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.FileUtil; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 

import com.datastax.driver.core.BatchStatement; 
import com.datastax.driver.core.BoundStatement; 
import com.datastax.driver.core.PreparedStatement; 
import com.datastax.driver.core.Session; 
import com.datastax.driver.core.exceptions.InvalidTypeException; 
import com.datastax.spark.connector.cql.CassandraConnector; 
import com.datastax.spark.connector.japi.CassandraRow; 
import com.pz.shared.UnicodeBOMInputStream; 
import com.pz.shared.fileformat.Header; 
import com.pz.shared.mr.fileformat.MRFileFormats.CSVInputFormat; 
import com.pz.shared.mr.fileformat.MRFileFormats.TextArrayWritable; 


public class LoadToCassandra { 

public static final String STUDYID = "STUDYID"; 
public static final String PROJECTNAME = "PROJECTNAME"; 
public static final String FILEID = "FILEID"; 
public static int count = 0; 
public static final String FILE_SERPERATOR = "/"; 
public static Logger log = Logger.getLogger(LoadToCassandra.class.getName()); 
public static void main(String[] args) { 
     String propFileLoc = args[0]; 
     String hdfsHome = args[1]; 
     String hdfs_DtdXmlPath = args[2]; 
     String hdfs_NormalizedDataPath = args[3]; 

     run(propFileLoc, hdfsHome,  hdfs_DtdXmlPath,hdfs_NormalizedDataPath); 
    } catch (IOException exception) { 
     log.log(Level.SEVERE, "Error occur in FileHandler.", exception); 
    } 
} 

public static void run(String propFileLoc, String hdfsHome, 
     String hdfs_DtdXmlPath, String hdfs_NormalizedDataPath) { 
    JavaSparkContext ctx = null; 
    FileSystem hadoopFs = null; 
    try { 

     PropInitialize.initailizeConfig(propFileLoc); 
     //setting spark context 
     ctx = setSparkContext(propFileLoc); 
     ParseDtdXml.parseDTDXML(hdfsHome, hdfs_DtdXmlPath); 
     Configuration configuration = setHadoopConf(); 
     hadoopFs = getHadoopFs(hdfsHome, configuration); 
     FileStatus[] fstat = hadoopFs.listStatus(new Path(hdfs_NormalizedDataPath)); 
     //Getting the csv paths 
     Path[] paths = FileUtil.stat2Paths(fstat); 
     log.info("PATH.size - " + paths.length); 
     for (Path path : paths) { 
      log.info("path is : "+path.toString()); 
      loadToCassandra(propFileLoc, path, configuration,hdfsHome, ctx); 
     } 


    } catch (IOException | URISyntaxException e) { 
     log.log(Level.SEVERE, "run method", e); 
     e.printStackTrace(); 
    } finally { 
     log.info("finally "); 
     if (ctx!= null) { 
      ctx.stop(); 
      System.out.println("SC Stopped"); 
     } 
     if (hadoopFs != null) { 
      try { 
       hadoopFs.close(); 
      } catch (IOException e) { 
       log.log(Level.SEVERE, "run method", e); 
      } 
     } 
    } 
} 



// input : 1. String hdfs home , 
// 2. Configuration hadoop conf object 
// returns : hadoop File System object 
private static FileSystem getHadoopFs(String hdfsHome, 
     Configuration configuration) throws IOException, URISyntaxException { 
    return FileSystem.get(new URI(hdfsHome), configuration); 

} 

// input : no inputs 
// process : sets hadoop config parameters 
// output : retuns hadoop conf object 
private static Configuration setHadoopConf() throws IOException, 
     URISyntaxException { 
    Configuration configuration = new Configuration(); 
    configuration.setBoolean("csvFileFormat.encoded.flag", true); 
    configuration.set("csvinputformat.token.delimiter", ","); 
    return configuration; 

} 

// input : string Properties File Location 
// process : creates and sets the configurations of spark context 
// retuns : JavaSparkContext object with configurations set to it. 
private static JavaSparkContext setSparkContext(String propFileLoc) { 
    PropInitialize.initailizeConfig(propFileLoc); 
    SparkConf conf = new SparkConf(); 
    conf.set("spark.serializer", 
      "org.apache.spark.serializer.KryoSerializer"); 
    conf.setAppName("Loading Data"); 
    conf.setMaster(PropInitialize.spark_master); 
    conf.set("spark.cassandra.connection.host", 
      PropInitialize.cassandra_hostname); 
    conf.setJars(PropInitialize.external_jars); 
    return new JavaSparkContext(conf); 

} 

private static void loadToCassandra(String propFileLoc, Path sourceFileHdfsPath, 
     Configuration hadoopConf, String hdfsHome,JavaSparkContext ctx) { 
    System.out.println("File :: " + sourceFileHdfsPath.toString()); 
    FileSystem hadoopFs = null; 
    PropInitialize.initailizeConfig(propFileLoc); 
    String cassKeyspaceName = PropInitialize.cass_keyspace_name; 
    log.info("entered here for file "+sourceFileHdfsPath.toString()); 

    final String strInputFileName = StringUtils.split(
      sourceFileHdfsPath.getName(), "#")[0].toLowerCase(); 
    final String strTableNameInCass = StringUtils.split(
      sourceFileHdfsPath.getName(), "-")[0].split("#")[1] 
      .toLowerCase(); 

    final String strSourceFilePath = sourceFileHdfsPath.toString(); 

    try { 
     hadoopFs = getHadoopFs(hdfsHome, hadoopConf); 

     //getting the cassandra connection using spark conf 
     final CassandraConnector connector = getCassandraConnection(ctx); 

     final JavaRDD<CassandraRow> cassTableObj=getCassTableObj(ctx,cassKeyspaceName,strTableNameInCass); 

     final Map<String, List<String>> tabColMapWithColTypes1 = ParseDtdXml.tabColMapWithColTypes; 

     final String headersUpdated; 
     final String headers; 

     UnicodeBOMInputStream ubis = new UnicodeBOMInputStream(
       hadoopFs.open(sourceFileHdfsPath)); 
     Header CsvHeader = Header.getCSVHeader(ubis, ","); 
     if (!strTableNameInCass.equalsIgnoreCase("PCMASTER")) { 

      String fString = ""; 
      for (int i = 0; i < CsvHeader.size() - 1; i++) { 
       fString = fString + CsvHeader.get(i).ColumnName + ","; 
      } 
      fString = fString 
        + CsvHeader.get(CsvHeader.size() - 1).ColumnName; 

      headers = fString; // StringUtils.join(stringArr.toString(),","); 

      headersUpdated = strTableNameInCass.toUpperCase() + "ID," 
        + headers; 

     } else { 

      String[] stringArr = new String[CsvHeader.size()]; 
      String fString = ""; 
      for (int i = 0; i < CsvHeader.size() - 1; i++) { 
       // stringArr[i] = CsvHeader.get(i).ColumnName; 
       fString = fString + CsvHeader.get(i).ColumnName + ","; 
      } 
      fString = fString 
        + CsvHeader.get(CsvHeader.size() - 1).ColumnName; 
      headers = StringUtils.join(stringArr.toString(), ","); 
      headersUpdated = fString; 

     } 

     ubis.close(); 


     //Reading the file using spark context 
     JavaPairRDD<LongWritable, TextArrayWritable> fileRdd = ctx 
       .newAPIHadoopFile(strSourceFilePath, CSVInputFormat.class, 
         LongWritable.class, TextArrayWritable.class, 
         hadoopConf); 


     final long recCount = fileRdd.count(); 



     final String[] actCols = headersUpdated.split(","); 

     final LinkedHashMap<Object, String> mapOfColNameAndType = new LinkedHashMap<Object, String>(); 
     final List<String> colNameAndType = tabColMapWithColTypes1 
       .get(strTableNameInCass.toUpperCase()); 

     for (int i = 0; i < actCols.length; i++) { 

      if (colNameAndType.contains(actCols[i] + " " + "text")) { 
       int indexOfColName = colNameAndType.indexOf(actCols[i] 
         + " " + "text"); 

       mapOfColNameAndType.put(i, 
         colNameAndType.get(indexOfColName).split(" ")[1]); 

      } else if (colNameAndType 
        .contains(actCols[i] + " " + "decimal")) { 
       int indexOfColName = colNameAndType.indexOf(actCols[i] 
         + " " + "decimal"); 
       mapOfColNameAndType.put(i, 
         colNameAndType.get(indexOfColName).split(" ")[1]); 
      } else { 
       continue; 
      } 

     } 

     //creates the query for prepared statement 
     final String makeStatement = makeSt(cassKeyspaceName, 
       strTableNameInCass, actCols); 
     final long seqId1 = cassTableObj.count(); 


     //calling map on the fileRdd 
     JavaRDD<String> data = fileRdd.values().map(
       new Function<TextArrayWritable, String>() { 
        /** 
        * 
        */ 
        private static final long serialVersionUID = 1L; 
        Session session; 
        boolean isssession = false; 
        PreparedStatement statement; 
        BatchStatement batch; 
        int lineCount = 0; 

        long seqId = seqId1; 

        /*for each line returned as an TextArrayWritable convert each cell the corresponding 
        * bind the data to prepared statement 
        * add it to batch 
        */ 
        @Override 
        public String call(TextArrayWritable tup) 
          throws Exception { 
         seqId++; 
         lineCount++; 

         log.info("entered here 3 for file "+strSourceFilePath.toString()); 
         String[] part = tup.toStrings(); 


         Object[] parts = getDataWithUniqueId(
           strTableNameInCass, part); 


         //For each file 
         //Creates the session 
         //creates the PreparedStatement 
         if (!isssession) { 
          session = connector.openSession(); 
          statement = session.prepare(makeStatement); 
          log.info("entered here 4 for file "+strSourceFilePath.toString()); 
          // System.out.println("statement :" + 
          // statement); 
          isssession = true; 
          batch = new BatchStatement(); 
         } 

         List<Object> typeConvData = new ArrayList<Object>(); 

         for (int i = 0; i < parts.length; i++) { 
          String type = mapOfColNameAndType.get(i); 
          try { 
           if (type.equalsIgnoreCase("text")) { 

            typeConvData.add(parts[i]); 
           } else { 

            // parts[i] = 
            // parts[i].toString().replace("\"", 
            // ""); 
            // check if the String the has to 
            // converted to a BigDecimal is any 
            // positive or negetive integer or not. 
            // if its not a positive integer or 
            // negative forcefully convert it to 
            // zero (avoiding NumberFormatException) 
            if (!((String) parts[i]) 
              .matches("-?\\d+")) { 
             parts[i] = "0"; 
            } 
            long s = Long 
              .valueOf((String) parts[i]); 
            typeConvData.add(BigDecimal.valueOf(s)); 

           } 
          } catch (NullPointerException e) { 
           log.log(Level.SEVERE, "loadToCass method", e); 


          } catch (NumberFormatException e) { 
           log.log(Level.SEVERE, "loadToCass method", e); 
          } catch (InvalidTypeException e) { 
           log.log(Level.SEVERE, "loadToCass method", e); 
          } 
         } 

                List<Object> data = typeConvData; 

         //bind data to query 
         final BoundStatement query = statement.bind(data 
           .toArray(new Object[data.size()])); 

         //add query to batch 
         batch.add(query); 
         int count = LoadToCassandra.count; 

         //when count is 1k execute batch 
         if (count == 1000) { 

          log.info("entered here 5 for file "+strSourceFilePath.toString()); 
          log.info("batch done"); 
          session.execute(batch); 
          LoadToCassandra.count = 0; 
          batch = new BatchStatement(); 
          return StringUtils.join(tup.toStrings()); 
         } 

         //if its the last batch and its not of size 1k 
         if (lineCount == (recCount)) 
          { 
          log.info("Last Batch"); 
          session.executeAsync(batch); 
          log.info("entered here 6 for file "+strSourceFilePath.toString()); 
          //session.execute(batch); 
          session.close(); 
          log.info("Session closed"); 
         } 

         LoadToCassandra.count++; 
         return StringUtils.join(tup.toStrings()); 
        } 

        private Object[] getDataWithUniqueId(
          String strTableNameInCass, String[] part) { 
         Object[] parts = null; 
         ArrayList<String> tempArraylist = new ArrayList<String>(); 
         if (!strTableNameInCass 
           .equalsIgnoreCase("PCMASTER")) { 
          for (int i = 0; i < part.length; i++) { 
           if (i == 0) { 
            tempArraylist.add(0, 
              String.valueOf(seqId)); 
           } 
           tempArraylist.add(part[i]); 
          } 
          parts = tempArraylist.toArray(); 
         } else { 
          parts = part; 
         } 

         return parts; 
        } 

       }); 

     data.count(); 
     hadoopFs.close(); 

    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

private static JavaRDD<CassandraRow> getCassTableObj(
     JavaSparkContext ctx, String cassKeyspaceName, 
     String strTableNameInCass) { 
    return javaFunctions(ctx) 
      .cassandraTable(cassKeyspaceName, 
        strTableNameInCass.toLowerCase()); 
} 

private static CassandraConnector getCassandraConnection(
     JavaSparkContext ctx) { 
    return CassandraConnector.apply(ctx.getConf()); 

} 

private static String makeSt(String keyspace, String tabName, 
     String[] colNames) { 
    StringBuilder sb = new StringBuilder(); 
    sb.append("INSERT INTO " + keyspace + "." + tabName + " ("); 
    List<String> vars = new ArrayList<>(); 
    for (int i = 0; i < (colNames.length - 1); i++) { 
     sb.append(colNames[i] + ","); 
     vars.add("?"); 
    } 
    vars.add("?"); 
    sb.append(colNames[colNames.length - 1] + ") values (" 
      + StringUtils.join(vars, ",") + ") "); 

    return sb.toString(); 
    }} 

Может кто-нибудь сказать мне, что может причиной того, что вызывает эту проблему и как она может быть решена. Спасибо

ответ

0

Как только вы вставили свои данные в cassandra, вызовите метод ctx.stop(), он остановит контекст искры.

+0

Я делаю это в блоке finally метода run. Но, похоже, когда последний файл вызывает метод loadToCassandra, он становится где-то повесившимся, из-за чего около 10 записей этого последнего файла не вставлены в определенную таблицу и из-за которых контекст не останавливается ... Это oli, когда jar вызывается веб-службой. –

+0

Вы получаете какое-то исключение? Вы можете поделиться им? – Kaushal

+0

Я не получаю исключения, но я могу поделиться с журналами –