2017-02-01 10 views
2

Itry для объединения данных из файла в HDFS. Мне нужно добавить некоторые данные из этих данных со значением на определенной таблице в hbase.Получить строку на Spark на карте Call

, но у меня есть исключение:

org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) 
    at org.apache.spark.rdd.RDD.map(RDD.scala:286) 
    at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:113) 
    at org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:46) 
    at ...... 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:577) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:174) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:197) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation 
Serialization stack: 

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 

Я знаю, что проблема возникла при попытке получить доступ к HBase во время функции карты.

Мой вопрос: как заполнить мои RDD со значением, содержащимся в таблице hbase.

, например: файл в HDFS являются CSV:

Name;Number1;Number2 
toto;1;2 

в HBase мы имеем связать данные с именем Тото.

Мне нужно получить сумму Number1 и Number 2 (это самая простая часть) и объединить с данными в таблице. например:

ключ для редуктора будет tata и может быть получен путем получения rowkey toto в таблице hbase.

Любые предложения?

+0

, пожалуйста, проверьте мой ответ, похоже, что это [аналогичная проблема] (http://stackoverflow.com/a/41759525/647053). переместить вещи, такие как htable и т. д. ... в закрытие будет исправлять в этом случае –

+0

, насколько велико количество отображений, в основном количество ключей в Hbase для ваших данных? –

+0

Спасибо за ваш ответ, я попробую этот Рам Гадиярам. В таблице могут быть миллионы ключей, и каждый ключ может иметь тысячи столбцов – okitas

ответ

1

Наконец коллега сделал это, благодаря советам Yours:

так это код карты, который позволяет агрегировать файл с DATAS из таблицы Hbase.

private final Logger LOGGER = LoggerFactory.getLogger(AbtractGetSDMapFunction.class); 




/** 
* Namespace name 
*/ 
public static final String NAMESPACE = "NameSpace"; 
private static final String ID = "id"; 
private Connection connection = null; 
private static final String LINEID = "l"; 
private static final String CHANGE_LINE_ID = "clid"; 
private static final String CHANGE_LINE_DATE = "cld"; 
private String constClientPortHBase; 
private String constQuorumHBase; 
private int constTimeOutHBase; 
private String constZnodeHBase; 
public void initConnection() { 
    Configuration conf = HBaseConfiguration.create(); 
    conf.setInt("timeout", constTimeOutHBase); 
    conf.set("hbase.zookeeper.quorum", constQuorumHBase); 
    conf.set("hbase.zookeeper.property.clientPort", constClientPortHBase); 
    conf.set("zookeeper.znode.parent", constZnodeHBase); 
    try { 
     connection = HConnectionManager.createConnection(conf); 
    } catch (Exception e) { 
     LOGGER.error("Error in the configuration of the connection with HBase.", e); 
    } 
} 

public Tuple2<String, myInput> call(String row) throws Exception { 
//this is where you need to init the connection for hbase to avoid serialization problem 
    initConnection(); 

....do your work 
State state = getCurrentState(myInput.getKey()); 
....do your work 
} 

public AbtractGetSDMapFunction(String constClientPortHBase, String constQuorumHBase, String constZnodeHBase, int constTimeOutHBase) { 
    this.constClientPortHBase = constClientPortHBase; 
    this.constQuorumHBase = constQuorumHBase; 
    this.constZnodeHBase = constZnodeHBase; 
    this.constTimeOutHBase = constTimeOutHBase; 
} 

/***************************************************************************/ 
/** 
* Table Name 
*/ 
public static final String TABLE_NAME = "Table"; 

public state getCurrentState(String key) throws TechnicalException { 
    LOGGER.debug("start key {}", key); 
    String buildRowKey = buildRowKey(key); 
    State currentState = new State(); 
    String columnFamily = State.getColumnFamily(); 
    if (!StringUtils.isEmpty(buildRowKey) && null != columnFamily) { 
     try { 
      Get scan = new Get(Bytes.toBytes(buildRowKey)); 
      scan.addFamily(Bytes.toBytes(columnFamily)); 
      addColumnsToScan(scan, columnFamily, ID);     
      Result result = getTable().get(scan); 
      currentState.setCurrentId(getLong(result, columnFamily, ID));    
     } catch (IOException ex) { 
      throw new TechnicalException(ex); 
     } 
     LOGGER.debug("end "); 
    } 
    return currentState; 
} 

/***********************************************************/ 

private Table getTable() throws IOException, TechnicalException { 
    Connection connection = getConnection(); 
    // Table retrieve 
    if (connection != null) { 
     Table table = connection.getTable(TableName.valueOf(NAMESPACE, TABLE_NAME)); 


     return table; 
    } else { 
     throw new TechnicalException("Connection to Hbase not available"); 
    } 
} 

/****************************************************************/ 




private Long getLong(Result result, String columnFamily, String qualifier) { 
    Long toLong = null; 
    if (null != columnFamily && null != qualifier) { 
     byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); 
     toLong = (value != null ? Bytes.toLong(value) : null); 
    } 
    return toLong; 
} 

private String getString(Result result, String columnFamily, String qualifier) { 
    String toString = null; 
    if (null != columnFamily && null != qualifier) { 
     byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); 
     toString = (value != null ? Bytes.toString(value) : null); 
    } 
    return toString; 
} 


public Connection getConnection() { 
    return connection; 
} 

public void setConnection(Connection connection) { 
    this.connection = connection; 
} 



private void addColumnsToScan(Get scan, String family, String qualifier) { 
    if (org.apache.commons.lang.StringUtils.isNotEmpty(family) && org.apache.commons.lang.StringUtils.isNotEmpty(qualifier)) { 
     scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); 
    } 
} 

private String buildRowKey(String key) throws TechnicalException { 
    StringBuilder rowKeyBuilder = new StringBuilder(); 
    rowKeyBuilder.append(HashFunction.makeSHA1Hash(key)); 
    return rowKeyBuilder.toString(); 
}