2015-06-09 1 views
0

Мне нужно сопоставить таблицу, в которой написана история использования приложения. Таблица получила эти кортежи:Карта таблицы базы данных cassandra с использованием искры и RDD

<AppId,date,cpuUsage,memoryUsage> 
<AppId,date,cpuUsage,memoryUsage> 
<AppId,date,cpuUsage,memoryUsage> 
<AppId,date,cpuUsage,memoryUsage> 
<AppId,date,cpuUsage,memoryUsage> 

AppId всегда отличается, потому что упоминается у многих приложений, date выражается в этом формате dd/mm/yyyy hh/mmcpuUsage и memoryUsage выражаются в % так, например:

<3ghffh3t482age20304,230720142245,0.2,3,5> 

Я извлек данные из кассандры таким образом (небольшой фрагмент):

public static void main(String[] args) { 
     Cluster cluster; 
     Session session; 
     cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); 
     session = cluster.connect(); 
     session.execute("CREATE KEYSPACE IF NOT EXISTS foo WITH replication " 
       + "= {'class':'SimpleStrategy', 'replication_factor':3};"); 
     String createTableAppUsage = "CREATE TABLE IF NOT EXISTS foo.appusage" 
       + "(appid text,date text, cpuusage double, memoryusage double, " 
       + "PRIMARY KEY(appid,date) " + "WITH CLUSTERING ORDER BY (time ASC);"; 
     session.execute(createTableAppUsage); 
     // Use select to get the appusage's table rows 
     ResultSet resultForAppUsage = session.execute("SELECT appid,cpuusage FROM foo.appusage"); 
     for (Row row: resultForAppUsage) 
      System.out.println("appid :" + row.getString("appid") +" "+ "cpuusage"+row.getString("cpuusage")); 
     // Clean up the connection by closing it 
     cluster.close(); 
    } 

Итак, моя задача теперь сопоставить данные по key value и создать кортеж интегрирующую этот код (фрагмент кода, который не работает):

 <AppId,cpuusage> 

     JavaPairRDD<String, Integer> saveTupleKeyValue =someStructureFromTakeData.mapToPair(new PairFunction<String, String, Integer>() { 
      public Tuple2<String, Integer> call(String x) { 
       return new Tuple2(x, y); 
      } 

как я могу отобразить APPID и cpuusage используя RDD и уменьшить eg. cpuusage >50?

Любая помощь?

благодарит заранее.

+0

Не уверен, что понял вопрос. Вы хотели бы заменить 'session.execute (« SELECT appid, cpuusage FROM foo.appusage »);' эквивалентным выражением API-интерфейса spark-cassandra connection? – maasg

+0

@maasg привет, моя проблема заключается в том, что после извлечения данных из cassandra, как показано выше, я хочу создать RDD набора данных для сопоставления и сделать операцию с уменьшением на этом .. например. Уменьшите использование процессора> 50 .. и так далее. Как я могу это сделать? – OiRc

ответ

1

Если предположить, что у вас есть действительная SparkContext sparkContext уже создали, добавила зависимости коннекторов искровой Кассандры к вашему проекту и настроит искровое приложение, чтобы поговорить с вашей Кассандрой кластера (см docs для этого), то мы можем загрузить данные в РДУ, как это:

val data = sparkContext.cassandraTable("foo", "appusage").select("appid", "cpuusage") 

в Java, идея та же, но она требует немного больше сантехники, описанного here

+0

спасибо за ответ, я сконфигурировал все .. я могу выполнять запросы и т. Д. Таким образом, можно вернуть карту, в этом примере «val data» является rdd-картой? – OiRc

+0

@OiRc RDD - это коллекции и не ограничивают дублирование этих ключей, поэтому они не соответствуют контракту «Карта» (как в структуре данных). Какую функциональность вам требуется от Карты? Вероятно, в Spark есть путь. – maasg

+0

нет нет, я не хочу использовать java-карту .. с термином map i подразумевал, что я хочу включить все пары , а затем уменьшить их. вы можете увидеть это [вопрос, связанный с этим вопросом] (http://stackoverflow.com/questions/30758877/making-operations-before-returning-value-inisde-reducebykey), можете ли вы дать мне ответ на это? Что я хочу знать, если можно выполнить операцию над функцией «reduceByKey», прежде чем что-то вернуть. – OiRc

 Смежные вопросы

  • Нет связанных вопросов^_^