2016-11-25 9 views
1

Есть ли пример где-нибудь или кто-то может объяснить, как использовать Kinesis Analytics для создания сеансов реального времени. (например, сессия)Использование Kinesis Analytics для построения сеансов реального времени

Упомявлено, что это возможно здесь: https://aws.amazon.com/blogs/aws/amazon-kinesis-analytics-process-streaming-data-in-real-time-with-sql/ в обсуждении пользовательских окон, но не дает примера.

Обычно это выполняется в SQL с использованием функции LAG, поэтому вы можете вычислить разницу во времени между последовательными строками. Этот пост: https://blog.modeanalytics.com/finding-user-sessions-sql/ описывает, как это сделать с помощью обычного (не потокового) SQL. Тем не менее, я не вижу поддержки функции LAG в Kinesis Analytics.

В частности, мне бы понравились два примера. Предположим, что оба принимают в качестве потока поток, состоящий из user_id и метки времени. Определите сеанс последовательности событий от того же пользователя, разделенных менее чем на 5 минут.

1) Первый выдает поток, который имеет дополнительные столбцы event_count session_start_timestamp. Каждый раз, когда приходит событие, это должно выводить событие с этими двумя дополнительными столбцами.

2) Второй пример - поток, который выводит одно событие на сеанс после завершения сеанса (т. Е. Прошло 5 минут без данных от пользователя). Это событие будет иметь userId, start_timestamp, end_timestamp и event_count

Возможно ли это с помощью Kinesis Analytics?

Вот пример делает это с Apache Спарк: https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/Applications/01%20Sessionization.html

Но я хотел бы сделать это с одним (или два) Kinesis аналитика потоков.

ответ

0

Вы можете сделать это с помощью Drools, создавая следующую логику:

Типы:

package com.test; 

import java.util.List; 

declare EventA 
    @role(event) 
    userId:String; 
    seen:boolean; 
end 

declare SessionStart 
    userId: String; 
    timestamp: long; 
    events: List; 
end 

declare SessionEnd 
    userId: String; 
    timestamp: long; 
    numOfEvents: int; 
end 

declare SessionNotification 
    userId: String; 
    currentNumOfEvents: int; 
end 

Правила:

package com.test; 

import java.util.List; 
import java.util.ArrayList; 

rule "Start session" 
when 
    // for any EventA 
    $a : EventA() from entry-point events 
    // check session is not started for this userId 
    not (exists(SessionStart(userId == $a.userId))) 
then 
    modify($a){setSeen(true);} 
    List events = new ArrayList(); 
    events.add($a); 
    insert(new SessionStart($a.getUserId(), System.currentTimeMillis(), events)); 
end 

rule "join session" 
when 
    // for every new EventA 
    $a : EventA(seen == false) from entry-point events 
    // get event's session 
    $session: SessionStart(userId == $a.userId) 
then 
    $session.getEvents().add($a); 
    insertLogical(new SessionNotification($a.getUserId(), $session.getEvents().size())); 
    modify($a) {setSeen(true);} 

end 

rule "End session" 
// if session timed out, clean up first 
salience 5 
when 
    // for any EventA 
    $a : EventA() from entry-point events 
    // check there is no following EventA with same userId within 30 seconds 
    not (exists(EventA(this != $a, userId == $a.userId, this after[0, 30s] $a) 
     from entry-point events)) 
    // get event's session 
    $session: SessionStart(userId == $a.userId) 
then 
    insertLogical(new SessionEnd($a.getUserId(), System.currentTimeMillis(), 
     $session.getEvents().size())); 

    // cleanup 
    for (Object $x : $session.getEvents()) 
     delete($x); 
    delete($session); 
end 

Вы можете автор Drools KINESIS Analytics с this service