hgrsd

Exploring stream stores

stroming github repo

The what and why of stream stores

A stream store is a kind of database that stores messages - or events. These messages live within a given stream; hence the name. Stream stores are designed to work well well as a backing data store for event sourcing, especially in combination with patterns such as aggregates and asynchronous read models.

Streams and deciders

The team I work in at the moment is quite heavily bought into this approach. We have adopted a mostly functional style of event sourcing, using Jérémie Chassaing's excellent decider pattern. The core of our domain logic lives in a set of conceptually simple decider functions. Each decider forms a consistency boundary around the events that it "owns" and enforces invariants that apply within this boundary. A stream store - which stores messages on a by-stream basis and allows querying by these streams - is the perfect infrastructural complement to this pattern. Using a stream store, handling a command in an imaginary Invoicing Service would go something like this:

Read models and categories

In good CQRS fashion, the decider pattern described above only describes the write side of an application: handling commands and emitting events. What if we want to surface some state to a user, or another subsystem? Imagine, for instance, that we want to display a list of invoices and line items to our customers in some kind of web-based UI.

This is where read models come in. Whereas deciders care only about the events that are in their specific stream (e.g., "Invoice-1"), our read model cares about events from all relevant streams (e.g., all "Invoice-{id}" streams). This notion of "all relevant streams" can be referred to as a "category". Creating a list of invoices can be achieved by reading the "Invoice" category, folding the events over the read model's aggregation function, and persisting or surfacing the resulting state. Given that they are designed with this kind of access pattern in mind, stream stores provide a performant way to read categories. Once we have written events to the "Invoice-1", "Invoice-2" … "Invoice-n" streams, a stream store should allow us to read the "Invoice" category and receive events across all "Invoice-{id}" streams.

Building "Stroming"

Despite having used two stream stores in my day job (EventStoreDB and MessageDB), I have little grasp of the internals of a steam store. I also had a free Saturday and hadn't written any Rust - everyone's favourite side project language - for a while. So a plan started to hatch: what if I tried to build one? This is what led to stroming: a tiny project with three traits, and a clumsy attempt at building an in-memory stream store that implements them.

Three traits for a stream store

What should a stream store do? An excellent statement of expected behaviour has been formulated by ylorph, of EventStoreDB fame. Yet clearly this exceeds what is possible on a Saturday afternoon. So, lazily, I decided to settle on three behaviours that I thought capture the core:

ReadFromStream (read a stream, either forwards or backwards)
ReadFromCategory (read a category, from a given offset)
WriteToStream (write a batch of messages to a stream)

In what follows I'll discuss three key challenges I had in mind while implementing this behaviour.

Challenge 1: Ordering constraints

The order of a stream of events matters. In our Invoice example, it matters whether a line item is added or removed first: the meaning of the invoice changes if the order of events changes. This is not just the case for streams, but the same applies to categories. In order to be able to receive a category's events from a given offset, we need to be able to have an ordering between events from different streams. The two event stores that I have used before both have global ordering: a deterministic order of events for the entire store, based on when they were inserted. It'd be great if stroming could achieve the same thing.

Challenge 2: Concurrency control

A decider makes a decision based on its current state, which in turn is derived from the events that exist in the stream at the time of decision-making. A correct decision, one which enforces all invariants, depends on the decider having accurate knowledge of what has happened before. Concurrency is a threat to this accurate knowledge. Imagine the following sequence of events:

server receives command 1 for Invoice 1
server receives command 2 for Invoice 1
server reads events from Invoice 1 so as to handle command 1
server reads events from Invoice 1 so as to handle command 2
server makes decision for command 1 and writes events
server makes decision for command 2, based on events that *do not include* the new events written to the stream

The upshot of this is that a decider - in this case, the one in the lifecycle of the second request - operates on stale data. This can lead to incorrect decisions. Stream stores need to have a mechanism that prevents this from happening.

Challenge 3: Access patterns

The third challenge I faced was related to the data structure that could underpin the store. We want to be able to: Have a global ordering of events, across streams and categories Query events by stream, either forwards or backwards Query events by category, from a given offset onwards

It was not immediately obvious to me how this could be achieved, and one of the most enjoyable parts of building stroming was trying to figure this out.

A Vector and some Indices

A global append-only log

An event store is often described as an append-only log. Once an event has been appended to the log, it can neither be amended nor deleted. Each event is immutable. One clear candidate for such an append-only log, which gives us fast random access (provided you have an index) and fast appends, is the humble Vector. Clearly, there are reasons why this would not be a wise choice in a production system. For instance, while the time complexity of inserting into a Vector is constant, this time complexity is amortised. Every so often the Vector will need to be resized, and the larger it is at that point, the bigger the performance hit. But for my toy project, does this really matter? I didn't think so - so my global log is a Vector.

Reading streams and categories

Having a global log is great for the write side. It allows us to append to a single Vector, which gives us global ordering. Excellent. However, this does not help us very much for reads. We don't really care about reading a global log - we want access to streams and categories. In addition to the infeasible approach of scanning through the entire log, I could think of two ways in which I could achieve this:

Both of these approaches have their benefits and drawbacks.

Option 1 allows categories and streams to be their own logs. This will make it easier for them to be in contiguous memory, preventing the need to jump around the heap trying to collect all relevant messages for a stream or category.

Option 2 has the benefit of using much less memory and therefore being more scalable. Because streams and categories are views into the global log, there is no need for any duplication of messages. This option would use 1/3rd of the memory required for a naive implementation of Option 1.

I ended up implementing the second option; not because I am convinced that it is the better approach, but because I allowed me to have more fun fighting the Rust borrow checker. The data structure I created to keep track of categories and streams is the LogPositionIndex: a HashMap that keeps track of references (to items in the global log) for a category or stream. It felt like somewhat of an epiphany when I realised that there was no need to store actual references, but that a Vec containing indices into the log would do perfectly fine. Take that, borrow checker!

So in stroming , reading a stream or category is a matter of a constant-time lookup into a HashMap to get references into the global log, and then resolving each reference in constant time too. (Of course, it will likely involve jumping around the heap quite a bit.)

Concurrency

The last of the three challenges is concurrency management. How can we make sure that a decider never uses stale events when making its decision? A pattern commonly used to achieve this is optimistic concurrency control. Optimistic concurrency control is an alternative to locking. In essence, it works by ensuring that when a write is made, the version of the stream is unchanged from when the stream was read.

This is what stroming does too. Each attempt to write into a stream is accompanied by an expected version of that stream. When the write transaction begins, the global log is locked, the version of the stream is compared to the expected version, the write is made, and then the lock is released. This guarantees that a conflict will be detected.

The type of lock I used for this is a RwLock - one of the locks available in the Rust standard library. This kind of lock provides thread safety, allows concurrent read-only access, and locks (both for reads and writes) whenever a write lock is taken out. It is probably not the most performant way of doing this, but it seems to do the trick for my purposes.

But why?

There are several things that I wanted to get out of building stroming. Building a production-grade stream store was not one of them. Instead, I was wanted to create an opportunity to:

So please get in touch or leave a comment in the repo and point out the flaws :)

Tags: #rust #events