2016-07-07 9 views
7

Я пытаюсь реализовать сторону чтения в своей архитектуре ES-CQRS. Скажем, у меня есть постоянный актер, как это:Поток событий Query Persistence Query и CQRS

object UserWrite { 

    sealed trait UserEvent 
    sealed trait State 
    case object Uninitialized extends State 
    case class User(username: String, password: String) extends State 
    case class AddUser(user: User) 
    case class UserAdded(user: User) extends UserEvent 
    case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed]) 
    case class UsersStream(fromSeqNo: Long) 
    case object GetCurrentUser 

    def props = Props(new UserWrite) 
} 

class UserWrite extends PersistentActor { 

    import UserWrite._ 

    private var currentUser: State = Uninitialized 

    override def persistenceId: String = "user-write" 

    override def receiveRecover: Receive = { 
    case UserAdded(user) => currentUser = user 
    } 

    override def receiveCommand: Receive = { 
    case AddUser(user: User) => persist(UserAdded(user)) { 
     case UserAdded(`user`) => currentUser = user 
    } 
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo) 
    case GetCurrentUser => sender() ! currentUser 
    } 

    def publishUserEvents(fromSeqNo: Long) = { 
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
    val userEvents = readJournal 
     .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue) 
     .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event } 
    sender() ! UserEvents(userEvents) 
    } 
} 

Насколько я понимаю, каждый раз, когда событие становится сохранялось, мы можем опубликовать его через Akka Persistence Query. Теперь я не уверен, что будет подходящим способом подписаться на эти события, чтобы я мог сохранить его в своей базе данных с поддержкой чтения? Одна из идей состоит в том, чтобы изначально отправить сообщение UsersStream от моего читающего актера до UserWrite актера и «потокового» события в этом читаемом актере.

EDIT

По предложению @cmbaxter, я внедрено прочитать стороны, это так:

object UserRead { 

    case object GetUsers 
    case class GetUserByUsername(username: String) 
    case class LastProcessedEventOffset(seqNo: Long) 
    case object StreamCompleted 

    def props = Props(new UserRead) 
} 

class UserRead extends PersistentActor { 
    import UserRead._ 

    var inMemoryUsers = Set.empty[User] 
    var offset  = 0L 

    override val persistenceId: String = "user-read" 

    override def receiveRecover: Receive = { 
    // Recovery from snapshot will always give us last sequence number 
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo 
    case RecoveryCompleted         => recoveryCompleted() 
    } 

    // After recovery is being completed, events will be projected to UserRead actor 
    def recoveryCompleted(): Unit = { 
    implicit val materializer = ActorMaterializer() 
    PersistenceQuery(context.system) 
     .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
     .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue) 
     .map { 
     case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event 
     } 
     .runWith(Sink.actorRef(self, StreamCompleted)) 
    } 

    override def receiveCommand: Receive = { 
    case GetUsers     => sender() ! inMemoryUsers 
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username) 
    // Match projected event and update offset 
    case (seqNo: Long, UserAdded(user)) => 
     saveSnapshot(LastProcessedEventOffset(seqNo)) 
     inMemoryUsers += user 
    } 
} 

Есть некоторые вопросы, как: поток событий кажется медленным. То есть UserRead Актер может отвечать множеством пользователей до того, как новый пользователь будет сохранен.

EDIT 2

Я увеличил интервал обновления журнала запросов Кассандры, который более менее решена проблема с медленным потоком событий. Похоже, что журнал событий Cassandra по умолчанию считается опросом каждые 3 секунды. В моем application.conf я добавил:

cassandra-query-journal { 
    refresh-interval = 20ms 
} 

EDIT 3

На самом деле, не убывают Интервал обновления. Это увеличит использование памяти, но это не опасно, ни точка. В целом концепция CQRS заключается в том, что сторона записи и чтения является асинхронной. Поэтому после того, как вы будете писать данные, они никогда не будут доступны для чтения. Работа с пользовательским интерфейсом? Я просто открываю поток и передаю данные через отправленные сервером события после того, как прочитанная сторона подтверждает их.

+2

Я бы просто переместить код, основанный журнал для чтения в вашей стороне чтения проекционной актера вместо отправки ему сообщения с 'Source' на нем. Затем обработайте этот поток в этом стороннем проекторе с проекцией и проецируйте эту информацию в Elasticsearch. – cmbaxter

+0

@cmbaxter Я сделал это. Кажется, это очень хорошая идея. Я обновил свой вопрос и все еще принимаю предложения, так как у меня все еще есть некоторые сомнения. –

ответ

4

Есть несколько способов сделать это. Например, в моем приложении у меня есть актер в моей стороне запроса, у которого есть PersistenceQuery, который постоянно ищет изменения, но вы можете иметь поток с тем же запросом. Дело в том, чтобы поддерживать поток открытым, чтобы иметь возможность прочитать сохраненное событие, как только это происходит

val readJournal = 
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
    CassandraReadJournal.Identifier) 

// issue query to journal 
val source: Source[EventEnvelope, NotUsed] = 
    readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue) 

// materialize stream, consuming events 
implicit val mat = ActorMaterializer() 
source.map(_.event).runForeach{ 
    case userEvent: UserEvent => { 
    doSomething(userEvent) 
    } 
} 

Вместо этого, вы можете иметь таймер, который поднимает PersistenceQuery и сохраняет новые события, но я думаю, что имея поток открытым является лучшим способом

2

Хотя решение с PersistenceQuery только был утвержден, он содержит следующие проблемы:

  1. это частичное, есть только способ чтения EventEnvelopes представлены.
  2. Он не может работать с моментальными снимками состояния, и, как результат, часть чтения CQRS должна превышать , все сохраненные события сохраняются.

Первое решение лучше, но имеет следующие вопросы:

  1. Это слишком сложно. Это приводит к тому, что пользователю не нужно иметь дело с порядковыми номерами.
  2. Код относится к состоянию (запрос/обновление), также связанному с реализацией Actors.

Существует существует проще один:

import akka.NotUsed 
import akka.actor.{Actor, ActorLogging} 
import akka.persistence.query.{EventEnvelope, PersistenceQuery} 
import akka.persistence.query.javadsl.{EventsByPersistenceIdQuery, ReadJournal} 
import akka.persistence._ 
import akka.stream.ActorMaterializer 
import akka.stream.javadsl.Source 

/** 
    * Created by alexv on 4/26/2017. 
    */ 
class CQRSTest { 

    // User Command, will be transformed to User Event 
    sealed trait UserCommand 
    // User Event 
    // let's assume some conversion from Command to event here 
    case class PersistedEvent(command: UserCommand) extends Serializable 
    // User State, for simplicity assumed that all State will be snapshotted 
    sealed trait State extends Serializable{ 
    def clear(): Unit 
    def updateState(event: PersistedEvent): Unit 
    def validateCommand(command:UserCommand): Boolean 
    def applyShapshot(newState: State): Unit 
    def getShapshot() : State 
    } 
    case class SaveSnapshot() 

    /** 
    * Common code for Both reader and writer 
    * @param state - State 
    */ 
    abstract class CQRSCore(state: State) extends PersistentActor with ActorLogging { 
    override def persistenceId: String = "CQRSPersistenceId" 

    override def preStart(): Unit = { 
     // Since the state is external and not depends to Actor's failure or restarts it should be cleared. 
     state.clear() 
    } 

    override def receiveRecover: Receive = { 
     case event : PersistedEvent => state.updateState(event) 
     case SnapshotOffer(_, snapshot: State) => state.applyShapshot(snapshot) 
     case RecoveryCompleted => onRecoveryCompleted(super.lastSequenceNr) 
    } 

    abstract def onRecoveryCompleted(lastSequenceNr:Long) 
    } 

    class CQRSWriter(state: State) extends CQRSCore(state){ 
    override def preStart(): Unit = { 
     super.preStart() 
     log.info("CQRSWriter Started") 
    } 

    override def onRecoveryCompleted(lastSequenceNr: Long): Unit = { 
     log.info("Recovery completed") 
    } 

    override def receiveCommand: Receive = { 
     case command: UserCommand => 
     if(state.validateCommand(command)) { 
      // Persist events and call state.updateState with each persisted event 
      persistAll(List(PersistedEvent(command)))(state.updateState) 
     } 
     else { 
      log.error("Validation Failed for Command: {}", command) 
     } 
     case SaveSnapshot => saveSnapshot(state.getShapshot()) 
     case SaveSnapshotSuccess(metadata) => log.debug("Saved snapshot successfully: {}", metadata) 
     case SaveSnapshotFailure(metadata, reason) => log.error("Failed to Save snapshot: {} . Reason: {}", metadata, reason) 
    } 
    } 

    class CQRSReader(state: State) extends CQRSCore(state){ 
    override def preStart(): Unit = { 
     super.preStart() 
     log.info("CQRSReader Started") 
    } 

    override def onRecoveryCompleted(lastSequenceNr: Long): Unit = { 
     log.info("Recovery completed, Starting QueryStream") 

     // ReadJournal type not specified here, so may be used with Cassandra or In-memory Journal (for Tests) 
     val readJournal = PersistenceQuery(context.system).readJournalFor(
     context.system.settings.config.getString("akka.persistence.query.my-read-journal")) 
     .asInstanceOf[ReadJournal 
     with EventsByPersistenceIdQuery] 
     val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(
     OrgPersistentActor.orgPersistenceId, lastSequenceNr + 1, Long.MaxValue) 
     source.runForeach({ envelope => state.updateState(envelope.event.asInstanceOf[PersistedEvent]) },ActorMaterializer()) 

    } 

    // Nothing received since it is Reader only 
    override def receiveCommand: Receive = Actor.emptyBehavior 
    } 
} 
+0

Предполагаемая часть CQRSRead должна запрашиваться непосредственно через ее состояние. CQRSReader гарантирует, что состояние похоже на CQRSWriter. Я не реализовал состояние Concrete здесь, но это может быть что угодно: от простой Hash Map до In-memory Graph DB –