Я пытаюсь реализовать сторону чтения в своей архитектуре ES-CQRS. Скажем, у меня есть постоянный актер, как это:Поток событий Query Persistence Query и CQRS
object UserWrite {
sealed trait UserEvent
sealed trait State
case object Uninitialized extends State
case class User(username: String, password: String) extends State
case class AddUser(user: User)
case class UserAdded(user: User) extends UserEvent
case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
case class UsersStream(fromSeqNo: Long)
case object GetCurrentUser
def props = Props(new UserWrite)
}
class UserWrite extends PersistentActor {
import UserWrite._
private var currentUser: State = Uninitialized
override def persistenceId: String = "user-write"
override def receiveRecover: Receive = {
case UserAdded(user) => currentUser = user
}
override def receiveCommand: Receive = {
case AddUser(user: User) => persist(UserAdded(user)) {
case UserAdded(`user`) => currentUser = user
}
case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
case GetCurrentUser => sender() ! currentUser
}
def publishUserEvents(fromSeqNo: Long) = {
val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val userEvents = readJournal
.eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
.map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
sender() ! UserEvents(userEvents)
}
}
Насколько я понимаю, каждый раз, когда событие становится сохранялось, мы можем опубликовать его через Akka Persistence Query
. Теперь я не уверен, что будет подходящим способом подписаться на эти события, чтобы я мог сохранить его в своей базе данных с поддержкой чтения? Одна из идей состоит в том, чтобы изначально отправить сообщение UsersStream
от моего читающего актера до UserWrite
актера и «потокового» события в этом читаемом актере.
EDIT
По предложению @cmbaxter, я внедрено прочитать стороны, это так:
object UserRead {
case object GetUsers
case class GetUserByUsername(username: String)
case class LastProcessedEventOffset(seqNo: Long)
case object StreamCompleted
def props = Props(new UserRead)
}
class UserRead extends PersistentActor {
import UserRead._
var inMemoryUsers = Set.empty[User]
var offset = 0L
override val persistenceId: String = "user-read"
override def receiveRecover: Receive = {
// Recovery from snapshot will always give us last sequence number
case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
case RecoveryCompleted => recoveryCompleted()
}
// After recovery is being completed, events will be projected to UserRead actor
def recoveryCompleted(): Unit = {
implicit val materializer = ActorMaterializer()
PersistenceQuery(context.system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
.eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
.map {
case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
}
.runWith(Sink.actorRef(self, StreamCompleted))
}
override def receiveCommand: Receive = {
case GetUsers => sender() ! inMemoryUsers
case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
// Match projected event and update offset
case (seqNo: Long, UserAdded(user)) =>
saveSnapshot(LastProcessedEventOffset(seqNo))
inMemoryUsers += user
}
}
Есть некоторые вопросы, как: поток событий кажется медленным. То есть UserRead
Актер может отвечать множеством пользователей до того, как новый пользователь будет сохранен.
EDIT 2
Я увеличил интервал обновления журнала запросов Кассандры, который более менее решена проблема с медленным потоком событий. Похоже, что журнал событий Cassandra по умолчанию считается опросом каждые 3 секунды. В моем application.conf
я добавил:
cassandra-query-journal {
refresh-interval = 20ms
}
EDIT 3
На самом деле, не убывают Интервал обновления. Это увеличит использование памяти, но это не опасно, ни точка. В целом концепция CQRS заключается в том, что сторона записи и чтения является асинхронной. Поэтому после того, как вы будете писать данные, они никогда не будут доступны для чтения. Работа с пользовательским интерфейсом? Я просто открываю поток и передаю данные через отправленные сервером события после того, как прочитанная сторона подтверждает их.
Я бы просто переместить код, основанный журнал для чтения в вашей стороне чтения проекционной актера вместо отправки ему сообщения с 'Source' на нем. Затем обработайте этот поток в этом стороннем проекторе с проекцией и проецируйте эту информацию в Elasticsearch. – cmbaxter
@cmbaxter Я сделал это. Кажется, это очень хорошая идея. Я обновил свой вопрос и все еще принимаю предложения, так как у меня все еще есть некоторые сомнения. –