Axon from the trenches: how to keep your code compatible with legacy events and Sagas

by Rommert de BruijnJune 8, 2015

Imagine you’re using Axon to run an event sourcing application. Your production event store might contain millions of events, created in various versions of the application. Wouldn’t it be nice to know for sure that the latest version of your application plays nicely with all your production events and Sagas, including those from previous versions? Well, you can check for that, and it is fairly easy.

Axon is a CQRS framework that offers (but is not limited to) a very nice event sourcing implementation. When event sourcing, the logical state of an Aggregate (a self-contained representation of a domain concept) is built up by interpreting all the changes that ever occurred for that specific instance. Part of event sourcing is taking care of legacy events. Because events tend to be modified over time, it’s common for older events to contain fields that are no longer used by the current code, or lack certain fields that the current code requires. These small compatibility issues are usually solved by using Upcasters, classes that convert the payload of older events to their state-of-the-art counterpart. Axon applies these Upcasters when reading events from the EventStore, using an Upcaster chain that knows which Upcasters to apply.

When working with Axon in general, it is good practice to use simple Objects as parameters for events, as opposed to Objects that contain complex structures. Complex Objects can potentially hold any number of classes internally, and whenever the code for one of those classes is modified (say, a field being added or removed) it might cause issues when deserializing older events because their payload is no longer compatible with the current code. These issues will be easily solved by adding an Upcaster, but it requires the developer to be aware of the fact that he introduced a breaking change. If the class he modified seemingly has nothing to do with events, will he remember to write that Upcaster?

The same pitfall, sometimes even more implicitly, goes for Axon Sagas. In general, the Saga state is used to decide whether an incoming event requires any further command to be issued. It might involve notifying other Aggregates, calling a 3rd party service, or scheduling future commands, but a Saga might also use incoming events to update its own internal state for a future event to be interpreted properly. If this state contains complex Objects, then any change to the classes contained in those Objects will cause deserialization issues.

When deserialization issues occur, the Aggregate will fail to load. Any events beyond the failing event will never be applied to the Aggregate, and since the Aggregate is never successfully loaded, it will not be possible to handle any further commands for that Aggregate as well. In other words: the Aggregate is frozen until the deserialization issue is solved. Obviously this is not something you’d want to encounter in a production environment, so it’s wise to make sure that any new code you release to production will be compatible with every single event that occurs in the production event store. The same goes for Sagas.

To ensure this compatibility, you can make a nightly job (in Jenkins or similar) that runs a “sanity checking” tool. The tool will try to deserialize each event from the production database using the development version of the event store (including any Upcasters that were added to support legacy events). If all Upcasters are correct and in place in your development code, each production event should be upcasted to the development version of the application, and deserialization should not be an issue. If a missing Upcaster causes deserialization to fail, the tool will log which event failed deserialization. For Sagas you’d use a similar tool.

Here’s a basic implementation of such a tool (which can be improved for performance/logging where necessary). The visitEvents method of the EventStore is used to go over each DomainEventMessage. These DomainEventMessages internally uses LazyDeserializingObjects for payload and metadata. So, to force the actual deserialization, we make an explicit call to the event’s getPayload(). If this call succeeds, it means that the original payload was properly modified by all matching Upcasters, and was successfully deserialized to a Java object. In other words: it’s compatible with the current code base.

public class EventSanityChecker {
    // main method creates an EventSanityChecker with:
    // - EventStore as used in the actual application
    // then calls checkEvents() on that EventSanityChecker</code>

    public void checkEvents() {
        ((EventStoreManagement) eventStore).visitEvents(this::testEvent);
    }

    private void testEvent(DomainEventMessage domainEventMessage) {
        try {
            Object event = domainEventMessage.getPayload(); // force deserialization
            Assert.notNull(event);
            logger.debug("Succesfully deserialized Event " +
                "with eventIdentifier {}",
                domainEventMessage.getIdentifier());
        } catch (Exception e) {
            logger.error("Error while deserializing Event " +
                "with eventIdentifier {}",
                domainEventMessage.getIdentifier());
        }
    }
}

For Sagas the code is very similar. An EntityManager is used to fetch the id’s of every Saga in the saga table. For each id, the SagaRepository.load() is called. If load() is successful (or it fails while injecting Spring beans into the Saga, which happens after the Saga is created) it means the Saga was deserialized successfully.

public class SagaSanityChecker {
    // main method creates a SagaSanityChecker with:
    // - SagaRepository as used in the actual application
    // - EntityManager that can read the Saga table in the database
    // then calls checkSagas() on that SagaSanityChecker

    @Override
    public void checkSagas() {
        List sagaIds = fetchSagaIds();
        sagaIds.stream().forEach(saga -> testSaga(saga));
    }

    private void testSaga(String sagaId) {
        try {
            sagaRepository.load(sagaId);
            logger.debug("succesfully deserialized SagaEntry "+
                "with id {}",
                sagaId);
        } catch (BeanCreationException e) {
            // If this exception is thrown, it means the Saga was
            // deserialized succesfully, but Spring fails to wire all
            // the dependencies. Consider this a success
            logger.debug("succesfully deserialized SagaEntry with " +
                "id {}, ignored Spring wiring",
                sagaId);
        } catch (Exception e) {
            logger.error("error while deserializing SagaEntry " +
                "with id {}",
                sagaId, e);
        }
    }

    private List fetchSagaIds() {
        return entityManager.createQuery("Select s.sagaId " +
                "from SagaEntry s",
                String.class)
            .getResultList();
    }
}

Running these tools regularly against actual production data will ensure that all legacy events and Sagas are supported. Any deserialization Exceptions should be self-explanatory, and you’ll know what Upcasters to write. To prevent any performance issues, run the tools against a copy of the production database, rather then against the production database itself.

Do note that Axon does not currently offer an upcasting mechanism for Sagas, like it does for events. Building such a mechanism yourself is perfectly feasible, though. The classes that Axon uses to do event upcasting can be used as a guideline. You will need to add a custom Saga Serializer to the definition of the SagaRepository:

<axon:jpa-saga-repository id="sagaRepository"
    saga-serializer="customSagaUpcastingSerializer"/>

This custom Serializer should contain a delegate Serializer, and an implementation of an UpcasterChain (an Axon class) that can find all available Upcasters for a given class+revision. When deserializing a Saga, the custom Serializer should apply all available Upcasters to the serialized Saga (by calling upcasterChain.upcast(serializedSaga)) before handing the XML to the delegate Serializer to deserialize.

At our current project at the ANWB we run these SanityCheck jobs as part of our nightly build. It has been a great help identifying and solving potential issues with legacy events and saga’s that we otherwise would have encountered in production. Before we introduced the SanityCheck jobs, deploying to production was perceived as a high-risk operation with unpredictable consequences. SanityChecks have changed that for the better.