Eventflow

About me

a software geek,

an FP promoter

and a pragmatic purist


Sarunas Valaskevicius @ Inviqa

What to expect

Review elements of cqrs/es;

look at how can eventflow help;

everyone is invited to participate!

Eventsourcing elements

We'll talk about

  • event
  • command
  • aggregate
  • command handler

...

Events

It happened, no way around it.


A message that a change has occurred. Used to model the state of an aggregate as a sequence of deltas.

Commands

Make it happen!


A request from outside the aggregate (e.g. from a user) to change the current state.

Aggregate

Can this happen right now?


Checks if the requested changes comply with the consistency rules for the aggregate. Ensures strong consistency within its bounds.

Command handler

[E] -> C -> [E] Xor Error


Executes command, given aggregate's past events, and either results in new events, or an error.

E.g. in the examples later, it ensures that a counter doesn't go below 0.

Projections

D -> E -> D


Maintains its own read model (D), and updates it when new events are emitted.

Can "listen" for multiple aggregates, or even aggregate types.

The sum of the parts

The system as a collection of strongly consistent components.
The system as a collection of strongly consistent components.

Eventflow usage

The counter example

object Counter {
  sealed trait Event
  final case class Created(id: AggregateId, start: Int) extends Event
  case object Incremented extends Event
  case object Decremented extends Event

  sealed trait Command
  final case class Create(id: AggregateId, start: Int) extends Command
  case object Increment extends Command
  case object Decrement extends Command
}

The counter aggregate model

object CounterAggregate extends EventFlow[Event, Command] {

  val counting: RegisteredFlowStateAux[Int] = ref('counter, c => handler(
    when(Increment).emit(Incremented).switch(c + 1 -> counting),
    when(Decrement).guard(_ => c > 0, "Counter cannot be decremented").
        emit(Decremented).switch(c - 1 -> counting)
  ))

  val aggregateLogic: Flow[Unit] = handler(
    when[Create].emit[Created].switchByEvent(evt => evt.start -> counting)
  )

  val snapshottableStates: FlowStates = List(counting)
}

The doors example

object Door {
  sealed trait Event
  final case class Registered(id: AggregateId) extends Event
  case object Opened extends Event
  case object Closed extends Event
  final case class Locked(key: String) extends Event
  final case class Unlocked(key: String) extends Event

  sealed trait Command
  final case class Register(id: AggregateId) extends Command
  case object Open extends Command
  case object Close extends Command
  final case class Lock(key: String) extends Command
  final case class Unlock(key: String) extends Command
}

The door aggregate

val openDoors: RegisteredFlowStateAux[Unit] = 
  ref('open, handler(
    when(Close).emit(Closed).switch(closedDoors)
  ))

val closedDoors: RegisteredFlowStateAux[Unit] = 
  ref('closed, handler(
    when(Open).emit(Opened).switch(openDoors),
    when[Lock].emit[Locked].switchByEvent(ev => ev.key -> lockedDoors)
  ))

val lockedDoors: RegisteredFlowStateAux[String] =
  ref('locked, key => handler(
    when(Unlock(key)).emit[Unlocked].switch(closedDoors),
    when[Unlock].failWithMessage("Attempted unlock key is invalid"),
    anyOther.failWithMessage("Locked door can only be unlocked.")
  ))

DSL overview

(generated using http://www.bottlecaps.de/rr/ui)

DSL for command handler

DSL for event listener

DSL for event handler

Snapshots

Their purpose

As we keep using an aggregate, it grows.

The more events an aggregate has, the slower it is to restore its working state.

Snapshots are caches of the working state.

How do they work in the Eventflow

Every ref'ed state function is registered by a symbol (reference).

When snapshot is taken, it only stores the reference of the state function and its parameters.

Restoring a snapshot simply applies the saved parameters to the referred state function.

Projections

Counter example

Say we need to...

  • read the current state of a particular counter
  • find all counters in the system

Counter example code

object CounterProjection extends Projection[TreeMap[AggregateId, Int]] {
  def initialData = TreeMap.empty

  val listeningFor = List(CounterAggregate.tag)

  def accept[E](d: Data) = {
    case EventData(_, id, _, Created(_, start)) => 
        d + (id -> start)
    case EventData(_, id, _, Incremented) =>
        d + (id -> d.get(id).fold(1)(_ + 1))
    case EventData(_, id, _, Decremented) => 
        d + (id -> d.get(id).fold(-1)(_ - 1))
  }
}

Doors example

Or the same for doors, with a custom projection target structure

sealed trait DoorState
object DoorState {
  case object Open extends DoorState
  case object Closed extends DoorState
  case object Locked extends DoorState
}

Doors example continued

object DoorProjection extends Projection[TreeMap[AggregateId, DoorState]] {

  def initialData = TreeMap.empty

  val listeningFor = List(DoorAggregate.tag)

  def accept[E](d: Data) = {
    case EventData(_, id, _, Registered(_)) => d + (id -> DoorState.Open)
    case EventData(_, id, _, Closed)        => d + (id -> DoorState.Closed)
    case EventData(_, id, _, Opened)        => d + (id -> DoorState.Open)
    case EventData(_, id, _, Locked(_))     => d + (id -> DoorState.Locked)
    case EventData(_, id, _, Unlocked(_))   => d + (id -> DoorState.Closed)
  }
}

Reading from multiple aggregate types

val listeningFor = List(CounterAggregate.tag, DoorAggregate.tag)

def accept[E](d: Data) = {
  case EventData(_, id, _, Door.Registered(_)) => ...
  case EventData(_, id, _, Door.Closed)        => ...
  case EventData(_, id, _, Door.Opened)        => ...
  case EventData(_, id, _, Counter.Incremented) => ...
  case EventData(_, id, _, Counter.Decremented) => ...
}

Testing aggregates

Given

Initialise the events relevant for the test case.

When

Specify the command to execute.

ThenCheck

Assert the expected results - are the new events resulting from the commmand execution correct?

Exampe tests for counter

"Locked door" should "allow to unclock with the same key" in {
  given {
    newDb.withEvents[Event](tag, "door", 
        Registered("door"), Closed, Locked("key")
    )
  } when {
    _.command(DoorAggregate, "door", Unlock("key"))
  } thenCheck {
    _.newEvents[Event](tag, "door") should be(List(Unlocked("key")))
  }
}

Current state of Eventflow

Database backends

In memory database;

GetEventStore integration;

more to come on demand.

Frontend example

Code

Demo / counter

http hemisu-mindpowered.rhcloud.com/counters
http hemisu-mindpowered.rhcloud.com/counter/a Create:='{"id":"a", "start":10}'
http hemisu-mindpowered.rhcloud.com/counter/a Increment=true
http hemisu-mindpowered.rhcloud.com/counter/a Decrement=true
http hemisu-mindpowered.rhcloud.com/counter/a
http hemisu-mindpowered.rhcloud.com/counters

Demo / doors

Recorded session

http hemisu-mindpowered.rhcloud.com/door/a Register:='{"id":"golden"}'
HTTP/1.1 200 OK
http hemisu-mindpowered.rhcloud.com/doors
HTTP/1.1 200 OK
http hemisu-mindpowered.rhcloud.com/door/a
HTTP/1.1 200 OK
http hemisu-mindpowered.rhcloud.com/door/a Close:=true
HTTP/1.1 200 OK
http hemisu-mindpowered.rhcloud.com/door/a Lock:='{"key":"golden"}'
HTTP/1.1 200 OK
http hemisu-mindpowered.rhcloud.com/door/a
HTTP/1.1 200 OK
http hemisu-mindpowered.rhcloud.com/door/a Unlock:='{"key":"rusty"}'
HTTP/1.1 412 Precondition Failed
http hemisu-mindpowered.rhcloud.com/door/a Unlock:='{"key":"golden"}'
HTTP/1.1 200 OK
http hemisu-mindpowered.rhcloud.com/door/a
HTTP/1.1 200 OK

Future

Implementing a domain model from scratch

Define aggregates

What events happen in the business?

What has to be consistent at all times?

A good tool for this is eventstorming

Add projections

What do clients expect to see?


Can become quite complex too

(e.g. pushing data to customers as it changes)

Sagas

not supported in the eventflow yet.

Summary

You've seen

We've reviewed the basic components of CQRS/ES system,

the way eventflow models aggregates,

and a few implementation examples.

Where to next?

github.com/svalaskevicius/eventflow/

gitter.im/svalaskevicius/eventflow

We have a T-shirt

But first your need to answer a question :)

Thanks - any questions?