AWS white paper

By Steen Larsen/sla@keycore.dk

Event sourcing with AWS Serverless - Part 1

1.1       Introduction

Here at KeyCore, we seek to give you latest updates in tech trends within AWS services, and Event Sourcing is no exception. In this two-parted blog-entry, we give you the benefits of Event Sourcing and demonstrate an event sourced system with a CQRS based API. We will use AWS Serverless services, such as API gateway, Lambda and DynamoDB to demonstrate how this can be done in a flexible, extensible manner, that requires a minimum of code.

Before we get started, we will need to define some terms, detailed in the following section.

1.2       Definitions

Event sourcing: A paradigm of saving events in an event store, for future processing. The processing typically involves aggregation of the events into some predefined data structure. This is also sometimes called projection, as in, we are projecting the events onto a data structure. One of the advantages of event sourcing is that you have an immutable source of events, that can be projected to new data formats, without changing the data store. You also have the ability to query back in time and get a full log of events that took place for a given ID, that could be an order, user, case or whatever. This also goes a long way to live up to the logging requirements of GDPR, and at the same time isolate data based on a partition key.

Event store: The event store is where events are stored. This store is immutable and needs to have the ability to store the events, in a way that makes it possible to read these in the order they happened. In this example we will use DynamoDB as the event store

Events: Events have a type that defines what command they are the result of. They also have an ID, transaction key, and a payload. In this case the id will be the partition key and the transaction the sort key in DynamoDB. Events are named in the past tense, to point out that this is something that has happened. It can not be “unhappened” or changed.

Commands: Commands are the actions an application wants to execute, and the result can be actions in other systems, or simply just a store of the payload with an event type, but the result ultimately are events.

CQRS: CQRS stands for Command Query Responsibility Segregation, and often goes hand in hand with event sourcing, but we could also have used a REST interface. CQRS basically just have two endpoints, command and query, that takes a payload, in this example in JSON, that contains an action and a data payload. Besides the API being simpler than REST, it is also useful, when RESTs notion of a resource, does not fit the problem space. We will implement this interface with API Gateway and Lambda, and we will use API Gateway models to implement input validation as JSON Schemas, with commands defined in JSON Schema definitions. It is common in CQRS to use different datastores for Command and Query respectively. In part 2, we will demonstrate how to use S3 for reading the most current version, and DynamoDB if we want to see how something looked in the past.

 

 

 

 

 

 

 

Figure 1 Overall event sourcing architecture.

1.3       The Event Store

Let’s start by defining the tables in the event store. We will have one table per logical business domain. This could be orders and customer cases for instance. Besides that, we will have one table to hold the last transaction key per table, much like a sequence in SQL.

The transaction table will have a partition that is the name of the domain, for instance orders. We will not have a sort key, so this is very simple, and will look something like this:

 

type

seq

orders

72828

cases

3488

Table 1 Transaction table schema example

The event tables will have an ID and a sort key that contains the tx value and will look like this          

order-id

tx

type

created

payload

929292

105

item-added

1540896542612

{ “item-id” : { “N” : “92292” }, “price” : { “N” : “2999” } }

Table 2 Event table schema example

Where order-id (or whatever the id is called, the name of this will be specified in a separate config file later) is the partition key and tx the sort key

 

1.4       The API

The API will as mentioned just have two endpoints:

 

POST /command

 

PUT /query

 

PUT is chosen for query since it should be idempotent for the same body. It’s also save, but since GET doesn’t take a body PUT is chosen, to have a generic interface, instead of having a REST endpoint for each different action.

They will both take a payload of the form

 

{“action”: “some-action”,
“data”: {“key1”: “val1”,
         “key2”: “val2”}}

 

Setting up an API as the above is a matter of a few minutes in the AWS console. You can enable CORS on the API by the click of a button and deploy the API to a unique URI. In the real world you would probably map a domain name to this URI, so you never have to change your client code.

Actions needs to be known and well defined and have a schema for the payload. We will express this in JSON Schema by using the API Gateway model feature, and enable validation of the request body in the API.

An example of a model for the Command endpoint, that validates four different definitions, is shown below. Details from three of them have been removed for brevity but follows the same format. It’s possible that this can be more streamlined, with more time spend on setting up the JSON schema, for instance the action could probably be isolated to a single definition, but we would then need some conditionals, for the action and data part to be validated together.

 

{
“$schema”: “http://json-schema.org/draft-04/schema#”,

“definitions”: {
   “order_data1”: {
       “type”: “object”,
         “additionalProperties”:false,
       “required”: [“action”, “data”],
       “properties”: {
           “action”: {“enum”: [“add-item”]},
           “data”: {
               “type”: “object”,
                 “additionalProperties”: false,
               “required”: [“order-id”, “item-id”, “price”],
               “properties”: {
                     “order-id”: {“type”: “string”},
                     “item-id”: {“type”: “integer”},
                   “price”: {“type”: “integer”}
                }
           }
       }
   },
   “order_data2”: {
   ——- REMOVED ——
   },
   “order_data3”: {
   ——- REMOVED ——
},
“case_data1”: {
       ——- REMOVED ——
},

“type”: “object”,
“oneOf”: [
           {“$ref”: “#/definitions/order_data1”},
           {“$ref”: “#/definitions/order_data2”},
           {“$ref”: “#/definitions/order_data3”},
           {“$ref”: “#/definitions/case_data1”}
           ]
}

 

The action has been given the type ‘enum’, since API Gateway does not support ‘const’ yet. Adding a new input is now just a matter of extending the model and verify that the validation is happening as expected.

Besides handling input validation, the API gateway can manage access to the API in different ways, for instance with simple API keys or via integration to IAM roles, so very detailed control of who can exercise a given endpoint can be achieved. When implementing the Query API, we could also choose to enable the caching functionality of API Gateway, if our application does a lot of identical queries.

1.5       The Lambda function and config file

The Lambda function that does some work with the input, will however need to have some knowledge about that input. To externalize this from the code, we have created a config txt file, that contains information about how the Lambda shall react to the input. For this POC, the config file has just been placed in the root of the Lambda zip file, but it could also have been placed in S3. In this example, the Lambda function is written in Clojure, and so the data structure in the config file is a Clojure map, since it’s very simple in Clojure, to read files with data and code. Other language implementations would probably use JSON, but would then not be able to have actual code in the map, just data, as it’s also the case in this example. The format of the file is not really important, but is shown below

 

{:approve-payment {:func-name “PaymentService” :type :Lambda}
:add-item {:type :passthrough :event-name “item-added”}
:remove-item {:type :passthrough :event-name “item-removed”}
:create-order {:type :passthrough :event-name “order-created”}
:create-case {:type :passthrough :event-name “case-created”}
:close-case {:type :passthrough :event-name “case-closed”}
:command-map {:order {:commands [“add-item” “remove-item”     “create-order” “approve-payment”]
                                            :table-definition [“order-events” “orders” :order-id]}
                               :case {:commands [“create-case” “close-case”]
                                          :table-definition [“case-events” “cases” :case-id]}}}

 

There are two kinds of specifications in the above, namely the command-map, and the individual commands. The command-map defines which commands goes with which domain, in this example we have the domains order and case. These defines the commands per domain, and the table for the event store, as well as the transaction key identifier, and the partition key for the table, in that order.

The individual commands have a type, that specifies what the Lambda should do with it. In the case of :passthrough, the event is just stored in the event store, with the event-name defined in :event-name. If the type is :Lambda, a function name is given, that will be called, and the return value is supposed to be the event-name being stored. We could extend this scheme to include http endpoints, or have function definitions inline in the config file, like in the example below.

 

:some-command {:type :inline :func (fn [data] (do-something data) “event-name”)}

 

The point is, that we can support a lot of different ways, of wiring code together for a specific command, without touching or redeploying the existing code, which in this example is about 40 lines in all (see below).

 

(def func-mapping (delay (read-string (slurp “config.txt”))))

(defn find-data-info [action]
(let [command-map (:command-map @func-mapping)
       type (first (filter #(in? (get-in command-map [% :commands]) action) (keys command-map)))]
   (get-in command-map [type :table-definition])))

(defn call-fn [fname payload]
(let [res (:payload (l/invoke :function-name fname :payload (encode payload)))]
   (decode (String. (. res (array)) “UTF-8”))))

(defn get-tx [type]
(get-in (update-item
           :table-name “tx-counter”
           :key {:type type}
           :update-expression “SET seq = seq + :incr”
           :expression-attribute-values {“:incr” 1}
           :return-values “UPDATED_NEW”) [:attributes :seq]))

(defn create-event [type id tx data]
{id (data id)
   :type type
   :tx tx
   :created (c/to-long (t/now))
   :payload (dissoc data id)})

(defn command2event [action id tx data]
(let [func-map (@func-mapping (keyword action))]
   (cond
     (= (:type func-map) :passthrough) (create-event (:event-name func-map) id tx data)
     (= (:type func-map) :Lambda) (create-event (call-fn (:func-name func-map) data) id tx data))))

(defn write-event [action data]
(let [data-info (find-data-info action)
       tx (get-tx (data-info 1))
       item (command2event action (data-info 2) tx data)]
   (put-item :table-name (data-info 0)
             :return-consumed-capacity “TOTAL”
             :return-item-collection-metrics “SIZE”
             :item item)))

(defn handle-events [command]
(write-event (command :action) (command :data)))

(def -handleRequest (mk-req-handler handle-events))

 

That’s all there is to it. In the above, we can see that the function get-tx, gets a new transaction ID from DynamoDB, as well as the write-event function, that writes the event to DynamoDB. The rest is basically extracting data from the config file, and reacting to it in the command2event function, which would be the one to extend to handle inline functions or http calls, by adding a line to the cond expression.

By using Lambda we also get automatic retry functionality for free, and the ability to defer failed executions to a DLQ (Dead Letter Queue). We are not doing this here, as the API call is synchronous, but if you have an asynchronously called Lambda function, it is very easy to implement (a few clicks).

1.6       Conclusion

In this first part we have shown how you can make an event sourced system with a CQRS API with very little code, and move validation and function assembly, to a declarative form. We have only focused on the Command part, and the Query implementation will be for the next part.

A valid question is of course whether it’s worth it, to externalize functionality like this, and that is pretty much up to the individual use case, but some of the benefits are, that by doing models, you get detailed swagger documentation for free, that will be an asset to most teams, especially if they have external users of the API, and you don’t need to code input validation in your Lambda. Whether one will externalize the table definition and function wiring like in this example, or simply move it into code, will also depend on the use case and deployment strategy of your business.

These details aside, we have seen that by using standard serverless services, we can very easily create an event sourced API, and in the next part we will look at the Query part, and see how we aggregate events, and use S3 as datastore for the most recent snapshot of the event stream.