У меня есть вопрос о Apache Spark и JavaСпарк Анализ уменьшал (Twitter)
Я делаю приложение, которое потоки данных из Twitter (Twitter4J). И я также создаю приложение, которое анализирует данные. txt-файл с твитами JSON.
StreamingApp: выход tweet.txt: пример: одна линия Json:
{"id":674534622903054336,"user":"twitter","tweet":"a tweet from twitter #twitter.","date":"2015-12-09T11:22:41CET"}
AnalyzerApp:
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("TwitterAnalyzerBigData");
final JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> jsonFile = sc.textFile("whateverpath/tweets.txt");
JavaPairRDD<Long, String> tweetsFiltered = jsonFile.mapToPair(new TwitterFilterFunction());
tweetsFiltered является JavaPairRDD: твит ID : Длинные и твит: String
Теперь я использую некоторые функции карты, чтобы получить что-то вроде этого:
(1,a tweet from twitter #twitter.,0.0,0.055555556,negative, TWITTER)
(Это данные случайных испытаний)
- , являющуюся ID
- a tweet from twitter #twitter: Твитнуть
- 0,0: положительная оценка
- 0,0566: Отрицательная оценка
- отрицательный: категория настроения (положительный или отрицательный)
- TWITTER: категория чирикать (категория на основе хэштегов)
Вопрос: Как уменьшить это RDD, так что я та результат, как это:
TWITTER, 1, 0
- TWITTER: категория твит
- : общее количество твитов Твиттера КАТЕГОРИИ
- : Сумма положительных твиты КАТЕГОРИЯ TWITTER
После ответа Джеймса я сделал красный uceByKey в Java.
JavaRDD<Tuple3<String, Float, Float>> categoryEntryRDD = categoryResult.map(new Function<Tuple4<Long, String, String, String>, Tuple3<String, Float, Float>>() {
@Override
public Tuple3<String, Float, Float> call(Tuple4<Long, String, String, String> tuple4) throws Exception {
if(tuple4._3().equals("positive")){
return new Tuple3<String, Float, Float>(tuple4._4(), 1F, 1F);
} else {
return new Tuple3<String, Float, Float>(tuple4._4(), 1F, 0F);
}
}
});
Tuple3<String, Float, Float> reducedRDD = categoryEntryRDD.reduce(new Function2<Tuple3<String, Float, Float>, Tuple3<String, Float, Float>, Tuple3<String, Float, Float>>() {
@Override
public Tuple3<String, Float, Float> call(Tuple3<String, Float, Float> tuple31, Tuple3<String, Float, Float> tuple32) throws Exception {
System.out.println(tuple31.toString());
return new Tuple3<String, Float, Float>(tuple31._1(), tuple31._2()+tuple32._2(), tuple31._3()+tuple32._3());
}
});
Но метод уменьшения - это не то же самое, что и reduceByKey, как я могу это исправить?
Мой выход: {TWITTER, 1000, 400} Но у меня также есть категория: FACEBOOK с 1000 твитами.
Спасибо, это выглядит здорово. Но я использую Java на данный момент, и вы не можете использовать метод reduceByKey на JavaRDD, какие-либо идеи? –
Я думаю, что это работает, но у меня есть вопрос, что делать, если у меня есть больше категорий? теперь он дает мне: {TWITTER, 1000, 400}, но на самом деле у меня также есть категория FACEBOOK, и ваше решение просто все подсчитывает? : s –
Да, конечным результатом является RDD, у которого есть все уникальные категории и их агрегированная информация. Первая запись может быть {TWITTER, 1000, 400}, а вторая запись может быть {FACEBOOK, 400, 22}, если есть данные с категорией как Facebook. – James