2015-12-09 4 views
0
настроения

У меня есть вопрос о Apache Spark и JavaСпарк Анализ уменьшал (Twitter)

Я делаю приложение, которое потоки данных из Twitter (Twitter4J). И я также создаю приложение, которое анализирует данные. txt-файл с твитами JSON.

StreamingApp: выход tweet.txt: пример: одна линия Json:

{"id":674534622903054336,"user":"twitter","tweet":"a tweet from twitter #twitter.","date":"2015-12-09T11:22:41CET"} 

AnalyzerApp:

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("TwitterAnalyzerBigData"); 
final JavaSparkContext sc = new JavaSparkContext(conf); 
JavaRDD<String> jsonFile = sc.textFile("whateverpath/tweets.txt"); 
JavaPairRDD<Long, String> tweetsFiltered = jsonFile.mapToPair(new TwitterFilterFunction()); 

tweetsFiltered является JavaPairRDD: твит ID : Длинные и твит: String

Теперь я использую некоторые функции карты, чтобы получить что-то вроде этого:

(1,a tweet from twitter #twitter.,0.0,0.055555556,negative, TWITTER) 

(Это данные случайных испытаний)

  • , являющуюся ID
  • a tweet from twitter #twitter: Твитнуть
  • 0,0: положительная оценка
  • 0,0566: Отрицательная оценка
  • отрицательный: категория настроения (положительный или отрицательный)
  • TWITTER: категория чирикать (категория на основе хэштегов)

Вопрос: Как уменьшить это RDD, так что я та результат, как это:

TWITTER, 1, 0 
  • TWITTER: категория твит
  • : общее количество твитов Твиттера КАТЕГОРИИ
  • : Сумма положительных твиты КАТЕГОРИЯ TWITTER

После ответа Джеймса я сделал красный uceByKey в Java.

JavaRDD<Tuple3<String, Float, Float>> categoryEntryRDD = categoryResult.map(new Function<Tuple4<Long, String, String, String>, Tuple3<String, Float, Float>>() { 
      @Override 
      public Tuple3<String, Float, Float> call(Tuple4<Long, String, String, String> tuple4) throws Exception { 
       if(tuple4._3().equals("positive")){ 
        return new Tuple3<String, Float, Float>(tuple4._4(), 1F, 1F); 
       } else { 
        return new Tuple3<String, Float, Float>(tuple4._4(), 1F, 0F); 
       } 

      } 
     }); 


    Tuple3<String, Float, Float> reducedRDD = categoryEntryRDD.reduce(new Function2<Tuple3<String, Float, Float>, Tuple3<String, Float, Float>, Tuple3<String, Float, Float>>() { 
     @Override 
     public Tuple3<String, Float, Float> call(Tuple3<String, Float, Float> tuple31, Tuple3<String, Float, Float> tuple32) throws Exception { 
      System.out.println(tuple31.toString()); 

      return new Tuple3<String, Float, Float>(tuple31._1(), tuple31._2()+tuple32._2(), tuple31._3()+tuple32._3()); 
     } 
    }); 

Но метод уменьшения - это не то же самое, что и reduceByKey, как я могу это исправить?

Мой выход: {TWITTER, 1000, 400} Но у меня также есть категория: FACEBOOK с 1000 твитами.

ответ

0

Это хорошая каноническая-свертке проблема:

  1. Карта записи твита кортеж, обозначающая категорию, и счетчик 1
  2. Снизить кортежи категории подытожить количество для каждого catetory

т.е. псевдокод:

+ map the RDD you have (id, tweet, pos score... 
- map to a tuple that looks like (category, 1, 1) if the tweet is positive 
- map to a tuple that looks like (category, 1, 0) if the tweet is negative 

+ reduceByKey where our key is the category using summation 
- we end up with an RDD of tuples in the form you want 

Вот некоторые Scala код для достижения этой цели - Java аналогично

val categoryEntryRDD = tweetsFiltered.map(mappedTuple => 
    if mappedTuple._5 == "positive" { 
     (mappedTuple._6, 1, 1) 
    } else { 
     (mappedTyple._6, 1, 0) 
    } 
} 

val reducedRDD = categoryEntryRDD.reduceByKey(x, y => (x._1 + y._1, x._2 + y._2)) 

На данный момент reducedRDD держит кортежи, которые выглядят как (категория, общее количество твитов в категории, всего положительные твиты категории).

+0

Спасибо, это выглядит здорово. Но я использую Java на данный момент, и вы не можете использовать метод reduceByKey на JavaRDD, какие-либо идеи? –

+0

Я думаю, что это работает, но у меня есть вопрос, что делать, если у меня есть больше категорий? теперь он дает мне: {TWITTER, 1000, 400}, но на самом деле у меня также есть категория FACEBOOK, и ваше решение просто все подсчитывает? : s –

+0

Да, конечным результатом является RDD, у которого есть все уникальные категории и их агрегированная информация. Первая запись может быть {TWITTER, 1000, 400}, а вторая запись может быть {FACEBOOK, 400, 22}, если есть данные с категорией как Facebook. – James

0

Наконец-то я понял! С Java

JavaPairRDD<String, Float> categoryPositiveTweets = categoryResult.mapToPair(new PairFunction<Tuple4<Long, String, String, String>, String, Float>() { 
     @Override 
     public Tuple2<String, Float> call(Tuple4<Long, String, String, String> tuple4) throws Exception { 
      if(tuple4._3().equals("positive")){ 
       return new Tuple2<String, Float>(tuple4._4(), 1F); 

      } else { 
       return new Tuple2<String, Float>(tuple4._4(), 0F); 
      } 
     } 
    }).reduceByKey(new Function2<Float, Float, Float>() { 
     @Override 
     public Float call(Float aFloat, Float aFloat2) throws Exception { 
      return aFloat+aFloat2; 
     } 
    }); 

    JavaPairRDD<String, Float> categoryTotalTweets = categoryResult.mapToPair(new PairFunction<Tuple4<Long, String, String, String>, String, Float>() { 
     @Override 
     public Tuple2<String, Float> call(Tuple4<Long, String, String, String> tuple4) throws Exception { 
      return new Tuple2<String, Float>(tuple4._4(), 1F); 
     } 
    }).reduceByKey(new Function2<Float, Float, Float>() { 
     @Override 
     public Float call(Float aFloat, Float aFloat2) throws Exception { 
      return aFloat+aFloat2; 
     } 
    }); 

    JavaPairRDD<String, Tuple2<Float, Float>> joinedCategorizedTweets = categoryTotalTweets.join(categoryPositiveTweets); 

    JavaRDD<Tuple3<String, Float, Float>> categorizedScoredTweets = joinedCategorizedTweets.map(new Function<Tuple2<String, Tuple2<Float, Float>>, Tuple3<String, Float, Float>>() { 
     @Override 
     public Tuple3<String, Float, Float> call(Tuple2<String, Tuple2<Float, Float>> tweet) throws Exception { 
      return new Tuple3<String, Float, Float>(
        tweet._1(), 
        tweet._2()._1(), 
        tweet._2()._2()); 
     } 
    }); 

Спасибо за помощь!

результат:

(TWITTER, 100, 40) (FACEBOOK, 80, 20)

 Смежные вопросы

  • Нет связанных вопросов^_^