2014-11-24 4 views
1

Мне интересно, можно ли каскадировать раздвижные окна друг к другу с помощью Sparks Streaming.Можете ли вы каскадировать раздвижные окна в искровом потоке

Так, например, у меня есть счет, поступающий через каждые 1 секунду. Я хочу суммировать их для окон от 5,15 до 30 секунд. Мне интересно, можно ли повторно использовать 5-секундные оконные результаты за 15 секунд, а 15 секунд - за 30 секунд.

Цель состоит в том, чтобы избежать сохранения 1-го обновления для всех входов, для длины самого длинного окна (поскольку зернистость здесь не имеет значения). Вместо этого мы повторно используем Dstream с частотой, которая соответствует той, которая нам нужна.

Вот и пример:

JavaPairDStream< String, Double > test = input; 
    JavaPairDStream< String, Double > test1 = input; 
    // 5s: 
    test = test.reduceByKeyAndWindow(new SumReducer(), new Duration(5000), new Duration(1000)); 
    test1 = test1.reduceByKeyAndWindow(new SumReducer(), new Duration(5000), new Duration(5000)); 
    // 15s 
    test = test1.reduceByKeyAndWindow(new SumReducer(), new Duration(15000), new Duration(5000)); 
    test1 = test1.reduceByKeyAndWindow(new SumReducer(), new Duration(15000), new Duration(15000)); 
    // 30s 
    test = test1.reduceByKeyAndWindow(new SumReducer(), new Duration(30000), new Duration(15000)); 
    test.print(); 

Я попытался это, но ничего не получить печатные.

ответ

1

Пакетный интервал

Длина окна и скользящего интервала должно быть кратно пакетного интервала. Чтобы избежать условий гонки (например, вычитая три 5-секундных суммы в окне за 10 секунд), интервал между партиями должен быть больше, чем время вычисления. Я возьму здесь пакетный интервал 1000 мс.

Пример

JavaPairDStream<String, Double> stream = input; 

// A: 5s sum every 5s 
stream5sCount = stream.reduceByKeyAndWindow(
    new SumReducer(), new Duration(5000), new Duration(5000)); 

// B: 15s sum every 5s 
stream15sCount = stream5sCount.reduceByKeyAndWindow(
    new SumReducer(), new Duration(15000), new Duration(5000)); 

// C: 30s sum every 15s 
stream30sCount = stream15sCount 
    .reduceByKeyAndWindow(new SumReducer(), new Duration(30000), new Duration(15000)) 
    .map(new DivideBy(3)); 

stream30sCount.print(); 

Объяснение

(на два действия А и В, где В уменьшает: windowLength из В/slideInterval А = число входных наборов для B .)

  1. Каждые 5 секунд подытожает 5 кортежей.
  2. Каждые 5 секунд B суммирует результаты последнего (15/5 =) 3 на основе (3 * 5 =) 15 исходных кортежей.
  3. Каждые 30 секунд C суммирует последние результаты последнего (30/5 =) 6 на основе (6 * 3 * 5 =) оригинальных кортежей! Кортежи будут суммироваться несколько раз, так как интервал времени B больше, чем его скользящий интервал.
  4. Картовер исправляет ошибку расчета.

Коррекция Шаг

Я полагаю, ваше реальное приложение не так просто, как количество слов. Вам понадобится обратная функция для исправления ошибки дублирования. Вы также можете попытаться исправить проблему до C (в примере с примером слова можно разделить раньше). Другим решением было бы отслеживать уже обработанные кортежи и только совокупные дизъюнктивные кортежи. Это зависит от вашего варианта использования.

+0

Основываясь на указанном вами ограничении, я не понимаю, почему не было бы возможности исчислять 15 секунд каждые 5 секунд (и 30 секунд каждые 15 секунд). Не могли бы вы объяснить это? – Arthur

+0

@jules: В вашем вопросе не было ясно, как часто вы хотите испускать. Я буду обновлять свой ответ после работы. –

+0

@jules: Я обновил свой ответ, надеюсь, это поможет. –