AWS white paper

By Steen Larsen/sla@keycore.dk

Event sourcing med AWS Serverless - Part 2

1.1       Introduction

In part one, we created the CQRS Command API and the event store based on DynamoDB and we made it possible to add new commands without changing code. In this last part we will look into querying the event store, and also demonstrate, how we could make use of S3, to store the newest snapshot of a projection over the events, which would make it possible to do  analysis on the data, for instance via AWS Athena, or if the number of events per partition key is high, simply query S3 instead of DynamoDB, when we wanted the newest version. We will also demonstrate how we can travel back in time, and see how a projection looked at a given transaction time.

The query API has the form:

PUT /query

 

and the payload have the same form as for the command. An example we will use here is defined as an API Gateway model below in JSON schema. The tx parameter is optional, but if specified, we will only collect events up to and including that point. This can be changed, if needed, in the get-events function shown in the code later on.

{
  “$schema”: “http://json-schema.org/draft-04/schema#”,

  “definitions”: {
    “order_data1”: {
        “type”: “object”,
        “additionalProperties”:false,
        “required”: [“action”, “data”],
        “properties”: {
            “action”: {“enum”: [“view-order”]},
            “data”: {
                “type”: “object”,
                “required”: [“order-id”],
                “properties”: {
                    “order-id”: {“type”: “string”},
                    “tx”: {“type”: “integer”}
                }
            }
        }
    }
 },

 “type”: “object”,
 “oneOf”: [
           {“$ref”: “#/definitions/order_data1”}
          ]
}

 

As we can see, we have only one definition, but can easily add more, just as in the Command example.

The Lambda function for Query is shown below. You will notice that the get-events and save functions are simple calls to DynamoDB and S3 respectively. To make it a little more interesting, this Lambda is triggered both by DynamoDB and API Gateway. We can hook it up with as many DynamoDB tables as we want, but in this case, there is only one, but the code is generic enough to handle more.

(def func-mapping (delay (read-string (slurp “config.txt”))))

(def bucket (System/getenv “BUCKET_NAME”))

(defn get-events [id tx table id-key]
  (:items
   (query
    :table-name table
    :select “ALL_ATTRIBUTES”
    :scan-index-forward true
    :expression-attribute-values (if tx {“:id” id “:tx” tx} {“:id” id})
    :expression-attribute-names {“#id” id-key}
    :key-condition-expression (str “#id = :id” (when tx ” AND tx <= :tx”)))))

(defn handle-event [m e]
  (cond
    (= (:type e) “order-created”) (assoc m :order-id (:order-id e) :order-started (:created e))
    (= (:type e) “item-added”) (assoc m :items (vec (conj (:items m) (:payload e))))
    (= (:type e) “item-removed”) (assoc m :items (vec (filter #(not= (:item-id %) (:item-id (:payload e))) (:items m))))
    (= (:type e) “payment-approved”) (assoc m :payment-status “approved” :order-finished (:created e) :amount (get-in e [:payload :amount]) :address (get-in e [:payload :address]) :customer-name (get-in e [:payload :customer-name]))))

(defn create-aggregate [data table id-key]
  (let [e (get-events (data id-key) (data :tx) table (name id-key))]
    (reduce handle-event {} e)))

(defn save [object prefix id]
  (put-object :bucket-name bucket
                        :key (str prefix “/” id)
                        :input-stream (ByteArrayInputStream. (.getBytes (encode object)))))

(defn save-to-s3 [records]
  (let [key-table-id-list (->> records
                                                (map #(vector (dissoc (get-in % [:dynamodb :Keys]) :tx) (% :eventSourceARN)))
                                                set
                                                (map #(vector (get-in (first %) [(first (keys (first %))) :S]) (second (re-matches                   #”^arn:aws:dynamodb:.*:.*:table/(.*)/stream.*$” (second %))) (first (keys (first %))))))]
      (map #(save (create-aggregate (hash-map (% 2) (% 0)) (% 1) (% 2)) (% 1) (% 0)) key-table-id-list)))

(defn -handler [q]
  (let [query (as-clj-map q)
           records (query :Records)]
    (if records
       (encode (save-to-s3 records))
       (let [data-info (find-data-info (query :action) func-mapping)
                func-map (@func-mapping (keyword (query :action)))]
         (encode
          (cond
            (= (:type func-map) :inline) ((eval (:func func-map)) query data-info)))))))

 

We have also introduced inline functions in the config file, so it now looks like this

{:approve-payment {:func-name “PaymentService” :type :Lambda}
 :add-item {:type :passthrough :event-name “item-added”}
 :remove-item {:type :passthrough :event-name “item-removed”}
 :create-order {:type :passthrough :event-name “order-created”}
 :create-case {:type :passthrough :event-name “case-created”}
 :close-case {:type :passthrough :event-name “case-closed”}
 :view-order {:type :inline :func (fn [query data-info] (cqrs.query/create-aggregate (query :data) (data-info 0) (data-info 2)))}
 :command-map {:order {:commands [“add-item” “remove-item” “create-order” “approve-payment” “view-order”]
                                              :table-definition [“order-events” “orders” :order-id]}
                                :case  {:commands [“create-case” “close-case”]
                                             :table-definition [“case-events” “cases” :case-id]}}}

 

You can see that the view-order command have been introduced, and is defined as an inline function with a function definition in :func, that will be read and executed, just as a function in the Lambda itself. Look at the last line in the -handler function for this. This is mainly a demonstration of the possibilities of externalizing function code, if needed, since it just calls the create-aggregate function. This function is what creates the aggregate, or projection, based on the events read. It is just a reduce (fold in some languages) over the event stream, where each event type, will need some specific processing. This is kept in the Lambda code here, but could in principle be externalized too if needed.

1.2       Reacting on DynamoDB streams

To store a projection in S3, we can hook up a Lambda to the dynamoDB stream, and thus let it react on all events happening on a table. Since our table for events are supposed to be immutable, only inserts should be allowed, so that the newest insert, will mandate the creation of a projection to be stored. Thus, the Lambda receives one or more events on the stream, and if several has happened on the same table, for the same partition id, we only want to create one new aggregate to store. This is what the set operation in save-to-s3 does. We could create a specific Lambda to store to S3, but in this case we reuse the Query Lambda, and checks on the input, whether it is triggered from DynamoDB streams, or the API Gateway. If the records variable is not nil, we know it’s from DynamoDB, based on the format of the input. If records are not nil, we loop through the events, extracts the table name and partition id, and does the aggregation for said ID, and then stores in S3.

 

1.3       An example

In the following, we will show an example of some events for an order. As we can see from the handle-event function code, orders are the only thing that is currently handled, but other domains could easily be added. So, we will run the following five command bodys through the POST /command API.

Create the order. The order id could be a UUID, or we could refrain from sending it, and have a backend service (Lambda) generate it. For ease, we specify it here directly.

{
  “action”: “create-order”,
  “data”: {
    “order-id”: “822928”
  }
}

 

We add item 72727, with a price of 1000, to the basket

{
  “action”: “add-item”,
  “data”: {
    “order-id”: “822928”,
    “item-id”: 72727,
    “price”: 1000
  }
}

 

We add item 82727, with a price of 1500, to the basket

{
  “action”: “add-item”,
  “data”: {
    “order-id”: “822928”,
    “item-id”: 82727,
    “price”: 1500
  }
}

 

We remove item 72727 to the basket

 

{
  “action”: “remove-item”,
  “data”: {
    “order-id”: “822928”,
    “item-id”: 72727
  }
}

 

We finalize the order by approving payment

{
  “action”: “approve-payment”,
  “data”: {
    “order-id”: “822928”,
    “user”: “xxx”,
    “amount”: 1500,
    “customer-name”: “Jens Jensen”,
    “address”: “Testvej 4, 2000 Kbh”
  }
}

 

We can now PUT /query the following

{
  “action”: “view-order”,
  “data”: {
    “order-id”: “822928”
  }
}

 

This will return the following

{“order-id”:”822928″,

 “order-started”:1540987251315,

 “items”:[{“item-id”:82727,”price”:1500}],

 “payment-status”:”approved”,

 “order-finished”:1540987292914,

 “amount”:1500,

 “address”:”Testvej 4, 2000 Kbh”,

 “customer-name”:”Jens Jensen”}

 

If we want to travel back in time and see the order, we can look at the tx values, in DynamoDB as seen below. In a real scenario, we would probably rather use a timestamp, as in the created column, however we can’t guarantee that several events won’t happen in the same second or millisecond, so some scheme to translate between time and tx, or by combining them in some way, would be needed to achieve this. For simplicity we just use the tx here.

 

If we want to see the order at tx = 127, we would PUT the following to /query

{
  “action”: “view-order”,
  “data”: {
    “order-id”: “822928”,
    “tx”: 127
  }
}

 

The result of this would be

{“order-id”:”822928″,

“order-started”:1540987251315,

 “items”:[{“item-id”:82727,”price”:1500}]}

 

as expected. If we had chosen to see the order at tx = 126, we would have had two items in our list, like this

{“order-id”:”822928″,

 “order-started”:1540987251315,

 “items”:[{“item-id”:72727,”price”:1000} {“item-id”:82727,”price”:1500}]}

1.4       Conclusion

We have in this part together with part 1, demonstrated a very generic CQRS API, on top of an event sourced model, using API Gateway, Lambda and DynamoDB. We have also seen, that if we want to store snapshots in S3, for analysis or faster access, we can easily do this, by hooking up a Lambda function to DynamoDB streams. To make the API return from S3 instead of DynamoDB, we would simply add a check on the tx parameter in the API input, and if not present, we would load the JSON document stored in S3, instead of getting all events from DynamoDB, and running the aggregation over them all, which can be potentially slow, if many events can happen for a partition ID.

All in all, there is about 100 lines of actual Lambda code to achieve this, along with the JSON schema definitions necessary to define the API Gateway models. If we want to do automatic deployment of the whole infrastructure, we would need to create a SAM template, and use this to deploy buckets, tables, API definition and the actual Lambdas. The API definition would be expressed via Swagger in a swagger yaml or json file, that would also serve as documentation for the API endpoints, as wells as the constraints on inputs.

An API like this, can be easily built in a days’ time, and if the need for externalizing definitions of commands etc., is not necessary, you can get up and running in a few hours, with a documented, cacheable and, going forward, security optimized API.

It is important to note, that a synchronous API like this, is bound by the constraints of the component, with the least throughput. In this that will be DynamoDB, that all though extremely scalable, has an upper limit on writes of 40000 per second per table. If more is needed, or the price of so high a write factor is too much, an asynchronous model will be better, but also more complicated. In that case we would introduce Kinesis Streams or SQS, depending on whether the goal is higher throughput or lower price. This is a topic for another blog post though.

1.5      Addendum

Since this code was originally written, DynamoDB have introduced the notion of transactions, which could be used to update transaction key, and do the insert in one operation, which would be a more elegant solution, that first getting and updating the transaction key, and then doing the insert.