a software geek,
an FP promoter
and a pragmatic purist
Sarunas Valaskevicius @ Inviqa
Review elements of cqrs/es;
look at how can eventflow help;
everyone is invited to participate!
...
It happened, no way around it.
A message that a change has occurred. Used to model the state of an aggregate as a sequence of deltas.
Make it happen!
A request from outside the aggregate (e.g. from a user) to change the current state.
Can this happen right now?
Checks if the requested changes comply with the consistency rules for the aggregate. Ensures strong consistency within its bounds.
[E] -> C -> [E] Xor Error
Executes command, given aggregate's past events, and either results in new events, or an error.
E.g. in the examples later, it ensures that a counter doesn't go below 0.
D -> E -> D
Maintains its own read model (D), and updates it when new events are emitted.
Can "listen" for multiple aggregates, or even aggregate types.
object Counter {
sealed trait Event
final case class Created(id: AggregateId, start: Int) extends Event
case object Incremented extends Event
case object Decremented extends Event
sealed trait Command
final case class Create(id: AggregateId, start: Int) extends Command
case object Increment extends Command
case object Decrement extends Command
}
object CounterAggregate extends EventFlow[Event, Command] {
val counting: RegisteredFlowStateAux[Int] = ref('counter, c => handler(
when(Increment).emit(Incremented).switch(c + 1 -> counting),
when(Decrement).guard(_ => c > 0, "Counter cannot be decremented").
emit(Decremented).switch(c - 1 -> counting)
))
val aggregateLogic: Flow[Unit] = handler(
when[Create].emit[Created].switchByEvent(evt => evt.start -> counting)
)
val snapshottableStates: FlowStates = List(counting)
}
object Door {
sealed trait Event
final case class Registered(id: AggregateId) extends Event
case object Opened extends Event
case object Closed extends Event
final case class Locked(key: String) extends Event
final case class Unlocked(key: String) extends Event
sealed trait Command
final case class Register(id: AggregateId) extends Command
case object Open extends Command
case object Close extends Command
final case class Lock(key: String) extends Command
final case class Unlock(key: String) extends Command
}
val openDoors: RegisteredFlowStateAux[Unit] =
ref('open, handler(
when(Close).emit(Closed).switch(closedDoors)
))
val closedDoors: RegisteredFlowStateAux[Unit] =
ref('closed, handler(
when(Open).emit(Opened).switch(openDoors),
when[Lock].emit[Locked].switchByEvent(ev => ev.key -> lockedDoors)
))
val lockedDoors: RegisteredFlowStateAux[String] =
ref('locked, key => handler(
when(Unlock(key)).emit[Unlocked].switch(closedDoors),
when[Unlock].failWithMessage("Attempted unlock key is invalid"),
anyOther.failWithMessage("Locked door can only be unlocked.")
))
(generated using http://www.bottlecaps.de/rr/ui)
As we keep using an aggregate, it grows.
The more events an aggregate has, the slower it is to restore its working state.
Snapshots are caches of the working state.
Every ref
'ed state function is registered by a symbol (reference).
When snapshot is taken, it only stores the reference of the state function and its parameters.
Restoring a snapshot simply applies the saved parameters to the referred state function.
Say we need to...
object CounterProjection extends Projection[TreeMap[AggregateId, Int]] {
def initialData = TreeMap.empty
val listeningFor = List(CounterAggregate.tag)
def accept[E](d: Data) = {
case EventData(_, id, _, Created(_, start)) =>
d + (id -> start)
case EventData(_, id, _, Incremented) =>
d + (id -> d.get(id).fold(1)(_ + 1))
case EventData(_, id, _, Decremented) =>
d + (id -> d.get(id).fold(-1)(_ - 1))
}
}
Or the same for doors, with a custom projection target structure
sealed trait DoorState
object DoorState {
case object Open extends DoorState
case object Closed extends DoorState
case object Locked extends DoorState
}
object DoorProjection extends Projection[TreeMap[AggregateId, DoorState]] {
def initialData = TreeMap.empty
val listeningFor = List(DoorAggregate.tag)
def accept[E](d: Data) = {
case EventData(_, id, _, Registered(_)) => d + (id -> DoorState.Open)
case EventData(_, id, _, Closed) => d + (id -> DoorState.Closed)
case EventData(_, id, _, Opened) => d + (id -> DoorState.Open)
case EventData(_, id, _, Locked(_)) => d + (id -> DoorState.Locked)
case EventData(_, id, _, Unlocked(_)) => d + (id -> DoorState.Closed)
}
}
val listeningFor = List(CounterAggregate.tag, DoorAggregate.tag)
def accept[E](d: Data) = {
case EventData(_, id, _, Door.Registered(_)) => ...
case EventData(_, id, _, Door.Closed) => ...
case EventData(_, id, _, Door.Opened) => ...
case EventData(_, id, _, Counter.Incremented) => ...
case EventData(_, id, _, Counter.Decremented) => ...
}
Initialise the events relevant for the test case.
Specify the command to execute.
Assert the expected results - are the new events resulting from the commmand execution correct?
"Locked door" should "allow to unclock with the same key" in {
given {
newDb.withEvents[Event](tag, "door",
Registered("door"), Closed, Locked("key")
)
} when {
_.command(DoorAggregate, "door", Unlock("key"))
} thenCheck {
_.newEvents[Event](tag, "door") should be(List(Unlocked("key")))
}
}
In memory database;
GetEventStore integration;
more to come on demand.
http hemisu-mindpowered.rhcloud.com/counters
http hemisu-mindpowered.rhcloud.com/counter/a Create:='{"id":"a", "start":10}'
http hemisu-mindpowered.rhcloud.com/counter/a Increment=true
http hemisu-mindpowered.rhcloud.com/counter/a Decrement=true
http hemisu-mindpowered.rhcloud.com/counter/a
http hemisu-mindpowered.rhcloud.com/counters
http hemisu-mindpowered.rhcloud.com/door/a Register:='{"id":"golden"}'
HTTP/1.1 200 OK
http hemisu-mindpowered.rhcloud.com/doors
HTTP/1.1 200 OK
http hemisu-mindpowered.rhcloud.com/door/a
HTTP/1.1 200 OK
http hemisu-mindpowered.rhcloud.com/door/a Close:=true
HTTP/1.1 200 OK
http hemisu-mindpowered.rhcloud.com/door/a Lock:='{"key":"golden"}'
HTTP/1.1 200 OK
http hemisu-mindpowered.rhcloud.com/door/a
HTTP/1.1 200 OK
http hemisu-mindpowered.rhcloud.com/door/a Unlock:='{"key":"rusty"}'
HTTP/1.1 412 Precondition Failed
http hemisu-mindpowered.rhcloud.com/door/a Unlock:='{"key":"golden"}'
HTTP/1.1 200 OK
http hemisu-mindpowered.rhcloud.com/door/a
HTTP/1.1 200 OK
What events happen in the business?
What has to be consistent at all times?
A good tool for this is eventstorming
What do clients expect to see?
Can become quite complex too
(e.g. pushing data to customers as it changes)
not supported in the eventflow yet.
We've reviewed the basic components of CQRS/ES system,
the way eventflow models aggregates,
and a few implementation examples.
But first your need to answer a question :)