Deploying a CQRS application based on the Axon framework on Google App Engine

by Jettro CoenradieJanuary 4, 2011

The Axon framework is a CQRS framework created by Allard Buijze. From the beginning I am trying to help him out, mostly by creating samples. This blog post is about a sample I am creating for Google App Engine. Everybody that has been working with Google App Engine (GAE) knows that it enforces some limitations on the jdk. The sample uses spring, spring security, sitemesh, objectify and of course Axon. I’ll discuss everything that was needed to deploy a working axon application.

Introduction

Let us have a high level look at what I had to do to make the sample work.

Axon framework

If you are not familiar with Axon and want to learn more about axon, this is not the best blogpost to start with. Start reading the excellent reference manual of Axon. If you want to create your own application with Axon on GAE, you have to use the lastest and greatest version of Axon. I am working with the 0.7-SNAPSHOT and I had to patch it a little bit. More on this later on, but if you are interested you can follow this issue.

There are a few things I have created to make Axon run on GAE. For starters, I needed an event store and a query database that make use of the google app engine data store. The other thing I needed was a snapshotter. Like the name says, the snapshotter has the responsibility to initiate creating a snapshot of an aggregate root. The snapshotter makes use of the GAE Task Queues.

By default, Axon makes use of XStream. Although you can configure Axon to use something else, I have chosen to use XStream. XStream and GAE are not best friends, I had to create some tweaks to make it work.

Objectify

I started of using jpa to store everything in the GAE datastore. I had tried this path in the past, but it did not feel right. For the event store I decided to use the low level api. For the query database I wanted to make my life easier. Therefore I decided to look for a framework. I found Objectify to give what I needed.

Objectify-Appengine provides the thinnest convenient layer which addresses these issues, yet preserves the elegance of get, put, delete, and query.

Because I want to store the EventEntry objects in different collections based on their type I did not see advantages in using Objectify for the EventStore as well. Therefore I sticked to the low level api of the data store.

Spring security

For the application I am creating I want to implement security. Since I am used to Spring security I wanted to use spring security. I found an excellent blog post and a sample in the trunk for spring security that helped me a lot.

Other tools

Other tools I used that require less attention are:

  • Spring web
  • Gradle – for building the sample
  • Intellij – for writing code and running the local GAE instance
  • Gae SDK – version 1.4 at the time of writing
  • Sitemesh – For templating the pages
  • jQuery – To make the sample look just a little bit better
  • ?

The sample

I decided to start this sample to automate some of the basic processes around Human Resource Management in a smaller company. I am starting of with a CV creating tool. I am not finished by far. Actually I have just started and I focussed on getting it to work. If you want to have a look at the code, you can find it at Github.

https://github.com/jettro/CompanyHr

I use gradle to build the sample. Tried to find a good GAE plugin, but did not really succeed. Therefore I build with gradle, generate the intellij project files with gradle and run the developer version of GAE from intellij. Uploading the sample is done using the command line utilities provided by the google app engine sdk. If you are interested in gradle or want to build the sample yourself, have a look at the next section. If you are not interested in building the sample, it is safe to skip this section.

Gradle

First you need to be able to find all the dependencies. The section that configures the repositories, shows a few gradle tricks. The first repo to use is you local maven repository. Beware, if you do not keep your maven repository in you home folder under .m2/repository this will not work. The second trick is about finding the google app engine jars. I use the feature to use a flat directory as a repository. You can configure the location of google app engine in the gradle.properties file. Finally we use the central maven repository and some additional repositories for axon snapshots and objectify. The following code block shows the configuration.

repositories {
    mavenRepo urls: ["file://" + System.getProperty('user.home') + "/.m2/repository/"]
    flatDir(dirs: "$gaeHome/lib/")
    mavenCentral()
    mavenRepo urls: [
            "http://oss.sonatype.org/content/groups/public",
            "http://objectify-appengine.googlecode.com/svn/maven"]
}

The other bit of gradle I want to mention is the exploded war file. The google upload utility needs this. We unzip the war to an external folder as a last stage of the war creation. The following code block shows the configuration of the exploded directory.

war.doLast {
    ant.unzip src: war.archivePath, dest: explodedWar
}

Now you can build the project using gradle: gradle clean build. Or you do gradle idea and run everything from intellij.

XStream hacks

As I mentioned before, axon uses XStream to serialize and deserialize objects. A lot of reflection power is used to re-instantiate serialized objects. With GAE there are some limitations in this area. The most important change is in the XStream class, this is the most used class with the XStream library. Within this class Converters are registered. Some of these converters do not work on google app engine. Therefore I have created a subclass of XStream called GaeXStream. I removed the dynamic loading of converters as well as the jdk specific configurations. You can find the classes in the package nl.gridshore.companyhr.axongeahacks.

Axon provides a GenericXStreamSerializer, this class initializes an XStream object. At the moment there is no way to provide your own XStream object. Maybe in the future we can provide a constructor for this class that accepts an XStream object. For now I have created my own implementation of this class,

  • GaeGenericXStreamSerializer
  • .

    The final class I had to change was the

  • XStreamEventSerializer
  • . This class creates the GenericXStreamSerializer. Again there is no real mechanism to change this construction. I have copied the original class and changed the line where the GenericXStreamSerializer is created into the creation of my GaeGenericXStreamSerializer. My own class is called GaeXStreamEventSerializer.

    In the future I’ll try to simplify this and maybe change Axon to make this a little bit easier. For now this is working.

    Axon Event Store

    I already created an EventStore that uses MongoDB. Check my previous blog posts on gridshore.

    http://www.gridshore.nl/2010/09/20/learning-mongodb/
    http://www.gridshore.nl/2010/09/27/still-learning-mongodb/

    Allard improved the code for the MongoDB implementation and now I can reuse the ideas for the Google App Engine Event Store.

    EventEntry

    The EventEntry is the representation of what we store in the google data store. It has a few fields like aggregateIdentifier and the serializedEvent. The serialized event is our actual content serialized using XStream. The EventEntry also contains some methods to help us with the translation to google data store Entity objects and queries. To give you an idea, the following code block shows you the method to create a query for returning all Entities based on the provided aggregateIdentifier after the provided sequence number. You can find the sourcecode for the complete class here

    static Query forAggregate(String type, String aggregateIdentifier, long firstSequenceNumber) {
        return new Query(type)
                .addFilter(AGGREGATE_IDENTIFIER, Query.FilterOperator.EQUAL, aggregateIdentifier)
                .addFilter(SEQUENCE_NUMBER, Query.FilterOperator.GREATER_THAN_OR_EQUAL, firstSequenceNumber)
                .addSort(SEQUENCE_NUMBER, Query.SortDirection.ASCENDING);
    }
    

    EventStore

    For the GAE event store I implement the org.axonframework.eventstore.SnapshotEventStore interface. The implementation supports storing the events as well as working with snapshot events. GAE provides the DatastoreServiceFactory. From this factory we obtain the ServiceFactory. The event store makes use of the XStream event serializer that was discussed before in this post. All aggregate types are stored in separate collections. We could store them all in the same collection, but according to google tips you should try to limit the amount of items in a collection and especially the amount of queries performed on a collection. We also store the snapshots in a different collection with respect to the normal events. The following image shows you the collections my app currently has.

    Screen shot 2011-01-03 at 08.50.27.png

    As you can see, project has three tables. Project, snapshot_Project and ProjectEntry. Project contains the events, snapshot_Project contains the snapshots and ProjectEntry contains the projects from the query database perspective.

    Now let us have a look at the implementation. You can find the code here: nl.gridshore.companyhr.app.axon.GaeEventStore. When writing to the collections you have to understand transactions and google app engine. GAE does support transactions, but only within one aggregate root. Therefore we have to create new transactions for all events that we need to store, they are all aggregate roots. We could change this and create one document with all its events as an aggregate root, but I did not choose that direction. Appending an event now consists of a few steps. Loop over all provided DomainEvents, convert them into EventEntry objects, create transaction, store objects within transaction and commit. The following code block shows these steps in code. In the real code base this is refactored, but this is easier to read.

    public void appendEvents(String type, DomainEventStream events) {
        while (events.hasNext()) {
            DomainEvent event = events.next();
            EventEntry entry = new EventEntry(type, event, eventSerializer);
            Transaction transaction = datastoreService.beginTransaction();
            try {
                datastoreService.put(transaction, entry.asEntity());
                transaction.commit();
            } finally {
                if (transaction.isActive()) {
                    transaction.rollback();
                }
            }
        }
    }
    

    Storing snapshots is done in the exact same way. Loading an aggregate root is done using all events or all events starting from a snapshot event. To give you an idea about reading objects using GAE api, I’ll show you how to load the last snapshot event for an aggregate root. The query object is created using the special factory method in EventEntry, then we prepare the object and iterate over the results of the query. Only the most recent snapshot is needed. Check the following code.

    // from GaeEventStore
    private EventEntry loadLastSnapshotEvent(String type, AggregateIdentifier identifier) {
        Query query = EventEntry.forLastSnapshot("snapshot_" + type, identifier.asString());
        PreparedQuery preparedQuery = datastoreService.prepare(query);
        Iterator<Entity> entityIterator = preparedQuery.asIterable().iterator();
        if (entityIterator.hasNext()) {
            Entity lastSnapshot = entityIterator.next();
            return new EventEntry(lastSnapshot);
        }
        return null;
    }
    
    // from EventEntry
    static Query forLastSnapshot(String type, String aggregateIdentifier) {
        return new Query(type)
                .addFilter(AGGREGATE_IDENTIFIER, Query.FilterOperator.EQUAL, aggregateIdentifier)
                .addSort(SEQUENCE_NUMBER, Query.SortDirection.ASCENDING);
    }
    

    The other part I want to have a look at, is the next step in recovering an aggregate root. Reading all events that happened after a snapshot event. The next code block shows the most important part for the implementation.

    private List<DomainEvent> readEventSegmentInternal(String type, AggregateIdentifier identifier,
                                                       long firstSequenceNumber) {
        Query query = EventEntry.forAggregate(type, identifier.asString(), firstSequenceNumber);
        PreparedQuery preparedQuery = datastoreService.prepare(query);
        List<Entity> entities = preparedQuery.asList(FetchOptions.Builder.withDefaults());
    
        List<DomainEvent> events = new ArrayList<DomainEvent>(entities.size());
        for (Entity entity : entities) {
            byte[] bytes = ((Text) entity.getProperty("serializedEvent")).getValue().getBytes(EventEntry.UTF8);
            events.add(eventSerializer.deserialize(bytes));
        }
        return events;
    }
    

    Since a serialized event can become big, we store it in a Text field. As you can see we need some steps to extract the actual content. Other than that the code is easy to follow I hope.

    I did not implement the org.axonframework.eventstore.EventStoreManagement interface yet. I Will do this in the nearby future.

    With the EventStore implementation we can store and read events as well as snapshot events. Axon also has support for deciding when to create a snapshot. Google app engine provides the tasks api. Therefore I wanted to have an implementation of the org.axonframework.eventsourcing.Snapshotter interface that makes use of the task api.

    Snapshotter

    Creating your own snapshotter is not hard, it is an easy interface with just one method. The next code block shows you the implementation of the interface.

    public void scheduleSnapshot(String typeIdentifier, AggregateIdentifier aggregateIdentifier) {
        logger.debug("Schedule a new task to create a snapshot for type {} and aggregate {}",
                typeIdentifier, aggregateIdentifier);
    
        Queue queue = QueueFactory.getQueue("snapshotter");
    
        queue.add(TaskOptions.Builder.withUrl("/task/snapshot")
                .param("typeIdentifier", typeIdentifier)
                .param("aggregateIdentifier", aggregateIdentifier.asString())
                .method(TaskOptions.Method.POST)
        );
    }
    

    As you can see, the code is very easy. We do a post to a certain url with the parameters typeIdentifier and aggregateIdentifier. Then in a controller we intercept this url and call the following method.

    public void createSnapshot(String typeIdentifier, String aggregateIdentifier) {
        DomainEventStream eventStream =
                eventStore.readEvents(typeIdentifier, new StringAggregateIdentifier(aggregateIdentifier));
        DomainEvent snapshotEvent = createSnapshot(typeIdentifier, eventStream);
        if (snapshotEvent != null) {
            eventStore.appendSnapshotEvent(typeIdentifier, snapshotEvent);
        }
    }
    
    private DomainEvent createSnapshot(String typeIdentifier, DomainEventStream eventStream) {
        AggregateFactory<?> aggregateFactory = aggregateFactories.get(typeIdentifier);
    
        DomainEvent firstEvent = eventStream.peek();
        AggregateIdentifier aggregateIdentifier = firstEvent.getAggregateIdentifier();
    
        EventSourcedAggregateRoot aggregate = aggregateFactory.createAggregate(aggregateIdentifier, firstEvent);
        aggregate.initializeState(eventStream);
    
        return new AggregateSnapshot<EventSourcedAggregateRoot>(aggregate);
    }
    

    That is all not to hard, now we have an a-synchronous snapshotter. An alternative would be to use the standard axon implementations with the org.axonframework.util.DirectExecutor and the org.axonframework.eventsourcing.SpringAggregateSnapshotter. I leave this as an exercise to yourself πŸ™‚

    Concluding

    That is it, this is what is required to use axon on google app engine. Now you can configure the CommandBus, the EventBus, repositories, snapshotters, etc. In future posts I’ll have a more detailed look at some of the other aspects of the sample like Objectify and the spring security implementation. If you want more information on certain topics, feel free to ask for it in the comment section. I’ll answer your questions and could even write a new blog post if the answer becomes to big πŸ™‚