Migrating content with Spring Integration – A real life example

by Roberto van der LindenJanuary 13, 2010

In one of the projects we need to migrate content from multiple websites into Hippo CMS. One of the interesting parts of this migration is that one of the websites will constantly provide us with updates of the content. Therefore it makes the migration a continuous process.

In this post I will explain how we use Spring Integration to migrate content, handle errors, measure performance and deal with the fact that content could contain references to other content that is not imported yet.

Short Spring Integration introduction

As I cannot explain Spring Integration in a few sentences, I will just focus on the elements that I use in the example. To get a complete overview of the elements I recommend you to read the reference manual.

  • Message – Generic container for data. It contains the payload which can be any object.
  • Message Channel – Used for transporting the data between the different elements.
  • ChannelAdapter – Message Endpoint that enables connecting a single sender or receiver to a channel.
  • Transformer – Enables the loose-coupling of Message Producers and Message Consumers by transforming a message to another type.
  • Service activator – The endpoint type for connecting any Spring-managed Object to an input channel so that it may play the role of a service.
  • Router – Provides a simple way to connect a router to an input channel.
  • Splitter – Partition a message in several parts, and send the resulting messages to be processed independently.

Getting the content

Before you can start importing the content, you of course need to scrape the content from the different websites. In our case this is done by the guys from Kapow. They provide us with a lot content that is preformatted in XML files so that our import tool can handle the incoming requests. The content is distinguished by the type of document. So for example we could have news, address, a brochure and many more. The different types of documents contain data like images, videos, PDF files and of course text. Each XML file contains an element that represent a unique id for that file. This is used to identify any existing documents in the CMS.

Connecting to the repository

Hippo CMS is created on top of the Apache Jackrabbit repository. We connect directly to the repository using the Hippo Repository Connector that is provided in the Hippo Site Toolkit. This enables us to interact with the repository and provides us the possibility to perform actions like storing, editing and finding nodes.

My colleague Jettro Coenradie has written a blog post that goes more in-depth about connecting to the Hippo Repository.

Process overview

Spring integration

Starting point

As you can see in the image we provide two ways for Kapow to deliver the content. One is via web services and the other one is directly via the file system (as Kapow runs its tool on the same server as the importer). This has few reasons:

  1. Web services provides more flexibility for other applications to connect with the importer tool.
  2. Using the web service will prevent the problem that Spring Integration tries to read the file before it is completely written to the file system.
  3. When the web service is not available, Kapow is still able to write the XML files directly to the file system.

Webservice

Spring Integration provides us with a inbound web service gateway that send a message to a channel that is received from a Web Service invocation.

As you can see in the process overview image, messages that are received from the web service are enriched with the filename and then written to the file system. We have to do this in order to create an XML file (using the transformer) which obviously needs a filename. With the file outbound gateway the XML file is placed in the directory that is scanned for new files.

To make this all work we have to configure in the Spring WS servlet the endpoint mapping that know which request needs to map to our gateway that is configured in our integration context.

When the process is finished with writing to the file system and therefore correctly received by the importer we use a service activator that returns a ResponseMessage with the text “OK”. This response message is received by Kapow.

The endpoint configuration in the web service servlet:

<br>
    &lt;bean id="endpointMapping" class="org.springframework.ws.server.endpoint.mapping.PayloadRootQNameEndpointMapping"&gt;<br>
        &lt;property name="mappings"&gt;<br>
            &lt;props&gt;<br>
                &lt;prop key="{http://www.jteam.nl/pons/webservice}AddressRequest"&gt;webserviceGateway&lt;/prop&gt;<br>
                &lt;prop key="{http://www.jteam.nl/pons/webservice}NewsRequest"&gt;webserviceGateway&lt;/prop&gt;<br>
            &lt;/props&gt;<br>
        &lt;/property&gt;<br>
    &lt;/bean&gt;<br>

The integration configuration:

<br>
&lt;si-ws:inbound-gateway id="webserviceGateway" marshaller="unmarshaller" request-channel="transformedIncomingRequestBuffer" /&gt;</p>
<p>&lt;si:channel id="transformedIncomingRequestBuffer" /&gt;</p>
<p>&lt;si:transformer ref="headerEnricher" output-channel="enrichedUnmarchalledInputFile" input-channel="transformedIncomingRequestBuffer" /&gt;<br>
&lt;si:channel id="enrichedUnmarchalledInputFile" /&gt;</p>
<p>&lt;si-xml:marshalling-transformer marshaller="unmarshaller" output-channel="unmarchalledInputFile" input-channel="enrichedUnmarchalledInputFile" result-transformer="toStringTransformer" /&gt;<br>
&lt;si:channel id="unmarchalledInputFile" /&gt;</p>
<p>&lt;file:outbound-gateway request-channel="unmarchalledInputFile" reply-channel="handledFile" delete-source-files="false" directory="$si{file.import.path}" /&gt;</p>
<p>&lt;si:channel id="handledFile" /&gt;</p>
<p>&lt;si:service-activator ref="tempHandler" input-channel="handledFile" /&gt;</p>
<p>&lt;bean id="headerEnricher" class="nl.jteam.importer.HeaderEnricher" /&gt;<br>
&lt;bean id="tempHandler" class="nl.jteam.importer.ResponseMessageHandler" /&gt;<br>

Transform file

All the XML files are written to the file system in a given directory. With the inbound-channel-adapter element the files are read from the given directory. You can provide a filter that only accepts the types based on the known suffix. In our case the only files we want to read are XML files. The poller is used to configure how the maximum messages we want per poll and on what interval it needs to scan the directory for new files.

The file-to-string-transformer element transforms, as the name already suggests, a file to a string. This string is then used by the unmarshalling-transformer as input for the unmarshalling process to create JAXB objects.

<br>
&lt;file:inbound-channel-adapter id="fileAdapter" directory="$si{file.import.path}" filter="patternMatchingFileListFilter" channel="fileInputChannel"&gt;<br>
    &lt;si:poller max-messages-per-poll="$si{file.poller.maxperpoll}"&gt;<br>
        &lt;si:interval-trigger time-unit="MILLISECONDS" interval="$si{file.poller.interval}" /&gt;:         &lt;si:interval-trigger time-unit="MILLISECONDS" interval="$si{file.poller.interval}" /&gt;<br>
    &lt;/si:poller&gt;<br>
&lt;/file:inbound-channel-adapter&gt;<br>
&lt;si:channel id="fileInputChannel" datatype="java.io.File" /&gt;</p>
<p>&lt;file:file-to-string-transformer output-channel="xmlInputChannel" input-channel="fileInputChannel" charset="UTF-8" delete-files="true" /&gt;<br>
&lt;si:channel id="xmlInputChannel" /&gt;</p>
<p>&lt;si-xml:unmarshalling-transformer id="defaultUnmarshaller" output-channel="docTypeChannel" input-channel="xmlInputChannel" unmarshaller="unmarshaller" /&gt;</p>
<p>&lt;bean id="patternMatchingFileListFilter" class="org.springframework.integration.file.PatternMatchingFileListFilter"&gt;<br>
    &lt;constructor-arg value=".*\.xml" /&gt;<br>
&lt;/bean&gt;</p>
<p>&lt;bean id="unmarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller"&gt;<br>
    &lt;property name="marshallerProperties"&gt;<br>
        &lt;map value-type="java.lang.Boolean" key-type="java.lang.String"&gt;<br>
            &lt;entry key="jaxb.formatted.output" value="true" /&gt;<br>
        &lt;/map&gt;<br>
    &lt;/property&gt;<br>
    &lt;property name="contextPaths"&gt;<br>
        &lt;list&gt;<br>
            &lt;value&gt;nl.jteam.importer.jaxb.news&lt;/value&gt;<br>
            &lt;value&gt;nl.jteam.importer.jaxb.address&lt;/value&gt;<br>
        &lt;/list&gt;<br>
    &lt;/property&gt;<br>
    &lt;property name="schemas"&gt;<br>
        &lt;list&gt;<br>
            &lt;value&gt;classpath:/xsd/news.xsd&lt;/value&gt;<br>
            &lt;value&gt;classpath:/xsd/address.xsd&lt;/value&gt;<br>
        &lt;/list&gt;<br>
    &lt;/property&gt;<br>
&lt;/bean&gt;<br>

Document router

To identify which handler class needs to handle the document that is being imported, we created a router. This router gets the name from the payload and returns this value. This value than can be used to determine the channel to put the message on. So when the name of the payload is News it will use the channel with id News to put the message on.

The configuration:

<br>
&lt;si:channel id="docTypeChannel" /&gt;</p>
<p>&lt;si:router ref="docTypeRouter" input-channel="docTypeChannel" method="resolveObjectTypeChannel" /&gt;<br>
&lt;si:channel id="News" /&gt;<br>
&lt;si:channel id="Address" /&gt;</p>
<p>&lt;si:service-activator ref="newsMessageHandler" output-channel="newContentItemNotification" input-channel="News" method="handleMessage" /&gt;<br>
&lt;si:service-activator ref="addressMessageHandler" output-channel="newContentItemNotification" input-channel="Address" method="handleMessage" /&gt;</p>
<p>&lt;bean class="nl.jteam.importer.DocumentTypeMessageRouter" name="docTypeRouter" /&gt;<br>

The implementation of the router class:

<br>
public class DocumentTypeMessageRouter {</p>
<p>    public String resolveObjectTypeChannel(Message message) {<br>
        return message.getPayload().getClass().getSimpleName();<br>
    }<br>
}<br>

Handlers

Each document type has his own handler class. This class is responsible for handling all the properties that are relevant for that document.

A document in Hippo CMS is a JCR node in the repository. The handler will create this node if it not already exists. If it does exist it will take that node. Because the content from the XML is the correct content, all properties and child nodes will be removed. So any change that is made to the content via the CMS in that document will be gone. Because we keep the node, references in the repository by other nodes will be intact, but all properties will be recreated by the handler.

For each property the handler will add this to the node or calls another handler that handles that specific property such as an image handler or audio handler class.

Missing links

It could be that a document has references to another document that is not imported yet. This means that the link cannot be created yet. To solve this problem we store for each link that cannot be created a missing link node in the repository. This node contains the information that is necessary to create the link when that document is imported. I will explain later on how the missing links are used when updating content.

When the handler is finished with creating a document in Hippo CMS, it will execute a query that finds all the documents that contain a reference to the imported document or in other words, it will find all missing links that now can be resolved.

Referred documents splitter

The outcome of the query that looks for references to the imported document could return multiple documents of different types. For example, we have imported an address document and that document is referenced by a news item and a press release item. These two documents will each be represented in a ReferredDefinition object. These ReferredDefinition objects contain the information of the missing link node plus the unique repository id (uuid) of the currently imported document.

Spring Integration provides us a splitter that has the capability to partition the message in several messages. The implementation of our splitter receives a list of ReferredDefinition objects. Each message (ReferredDefinition object) will be send to a router that routes the ReferredDefinition object to his appropriate channel.

The configuration:

<br>
&lt;si:channel id="newContentItemNotification" /&gt;</p>
<p>&lt;si:splitter ref="referredBySplitter" output-channel="referedBy" input-channel="newContentItemNotification" method="splitReferredDefinitions" /&gt;</p>
<p>&lt;bean class="nl.jteam.importer.ReferredByMessageSplitter" name="referredBySplitter" /&gt;<br>

The referred by message splitter class:

<br>
public class ReferredByMessageSplitter {</p>
<p>    public List&lt;referreddefinition&gt; splitReferredDefinitions(List&lt;referreddefinition&gt; referedDefinitions) {<br>
        return referredDefinitions;<br>
    }<br>
}<br>

Referred by router

The splitter provides the router with a single ReferredDefinition item. The ReferredDefinition object contains information about the document type that needs to be updated. The implementation of the router is quite simple. It gets the type of the document that needs to be updated and adds the string “Update” to the document type. This combination gives us the name of the channel that is used to put the message on.

The configuration:

<br>
&lt;si:channel id="referredBy" /&gt;<br>
&lt;si:router ref="referredByRouter" input-channel="referredBy" method="resolveItemChannel" /&gt;</p>
<p>&lt;si:channel id="NewsUpdate" /&gt;<br>
&lt;si:channel id="PressReleaseUpdate" /&gt;</p>
<p>&lt;bean class="nl.jteam.importer.ReferredByMessageRouter" name="referredByRouter" /&gt;<br>

The referred by message router class:

<br>
public class ReferredByMessageRouter {</p>
<p>    public String resolveItemChannel(ReferredDefinition referredDefinition) {<br>
        return referredDefinition.getPrimaryTypeReferredBy() + "Update";<br>
    }<br>
}<br>

Update handlers

Each document type has just like the normal handlers his own update handler. The update handlers are used for updating content that has a reference to the imported document. The handler receives in ReferredDefinition object which contains the information of the document that contains the reference.

update

So if we have for example two documents (A and B). B is a news item that contains a field addres and A is an address document. Document B is already imported and has a reference to A. Document B links to document A, but because A was not imported yet, a missing link node for A exists. Now document A is going to be imported, so document B can be updated with a working link. The content handler of A finds the missing link and creates a ReferredDefinition object with the information of the missing link. The update handler finds document B based on his unique id that is also stored in the ReferredDefinition object.

Now that document B is found, the update handler can add the correct property (which is specified in the ReferredDefinition object) to document B with the link to document A. When the link is created, it is no longer necessary to keep the missing link node and will therefore be removed.

Error handling

When importing content errors could occur. We distinguish between two types of errors:

  1. Critical errors – Something is wrong with the XML structure or with the connection to the repository.
  2. Non critical errors – Errors like incorrect content or links that cannot be created.

When a critical error occurs it means that we cannot import that document at this moment. So we have defined an error channel that is used to put the payload on and transform it back to an XML file. That file is than written to a different  folder, so that it will not be picked up again by Spring Integration.

With non critical errors it is only necessary to notify a system admin that something went wrong. So in order to do this, we introduced an error collector. This error collector is nothing more than object that contains a list of errors and a little information about the document. This collector is passed through the whole process of importing a document and all errors that occur will be collected. At the end of that process the errors will be mailed to the system admin. Currently this is done after each document, but this will probably change in the future to use some sort of batch mailing.

<br>
&lt;si:channel id="errorChannel" /&gt;</p>
<p>&lt;si:chain input-channel="errorChannel"&gt;<br>
    &lt;si:service-activator ref="errorMessageHandler" method="handleMessage" /&gt;<br>
    &lt;si:router ref="errorMessageRouter" method="routeByPayloadType" /&gt;<br>
&lt;/si:chain&gt;</p>
<p>&lt;si:channel id="errorOutputChannel" /&gt;<br>
&lt;si-xml:marshalling-transformer id="defaultMarshaller" marshaller="unmarshaller" output-channel="xmlOutputChannel" input-channel="errorOutputChannel" result-transformer="toStringTransformer" /&gt;</p>
<p>&lt;bean id="toStringTransformer" class="org.springframework.integration.xml.transformer.ResultToStringTransformer" /&gt;</p>
<p>&lt;si:channel id="xmlOutputChannel" /&gt;</p>
<p>&lt;file:outbound-channel-adapter directory="$si{file.unhandled.path}" channel="xmlOutputChannel" /&gt;</p>
<p>&lt;bean id="errorMessageHandler" class="nl.jteam.importer.errorhandling.ErrorHandler"&gt;<br>
    &lt;property name="errorMailer" ref="errorMailer"&gt;&lt;/property&gt;<br>
&lt;/bean&gt;</p>
<p>&lt;bean id="errorMessageRouter" class="nl.jteam.importer.errorhandling.ErrorMessageRouter"&gt;<br>
    &lt;property name="objectChannel" ref="errorOutputChannel"&gt;&lt;/property&gt;<br>
    &lt;property name="xmlChannel" ref="xmlOutputChannel"&gt;&lt;/property&gt;<br>
&lt;/bean&gt;<br>

Performance

With the use of an aspect we send a message to the performanceMonitor channel. When you declare a MessageChannel you can call the method send(..) to send your message to the channel. In our case we create a new StringMessage to send the performance message. The channel is used by the service-activator. The service-activator uses a poller to throttle inbound messages.

The service-activator passes the message to the Log4jMonitorPersister, where it logs the message.

The configuration:

<br>
&lt;si:channel id="performanceMonitor"&gt;<br>
    &lt;si:queue /&gt;<br>
&lt;/si:channel&gt;</p>
<p>&lt;si:service-activator ref="monitorPersister" input-channel="performanceMonitor" method="persist"&gt;<br>
    &lt;si:poller max-messages-per-poll="100"&gt;<br>
        &lt;si:interval-trigger time-unit="MILLISECONDS" interval="10000" /&gt;<br>
    &lt;/si:poller&gt;<br>
&lt;/si:service-activator&gt;</p>
<p>&lt;bean id="monitorPersister" class="nl.jteam.importer.monitoring.Log4jMonitorPersister" /&gt;</p>
<p>&lt;aop:aspectj-autoproxy /&gt;</p>
<p>&lt;bean id="monitoringHandlersAdvice" class="nl.jteam.importer.monitoring.MonitoringHandlersAdvice"&gt;<br>
    &lt;property name="performanceMonitor" ref="performanceMonitor"&gt;&lt;/property&gt;<br>
&lt;/bean&gt;<br>

The MonitoringHandlersAdvice class:

<br>
@Aspect<br>
public class MonitoringHandlersAdvice {</p>
<p>    private MessageChannel channel;</p>
<p>    @Around("execution(* nl.jteam.importer.handler.*Handler.handleMessage(..)) &amp;&amp; args(message)")<br>
    public Object monitorB(ProceedingJoinPoint proceedingJoinPoint, Message message)<br>
            throws Throwable {<br>
        String fileName = (String) message.getHeaders().get("springintegration_file_name");<br>
        Object payload = message.getPayload();</p>
<p>        StopWatch clock = new StopWatch();<br>
        DateTime start = new DateTime();</p>
<p>        try {<br>
            clock.start(proceedingJoinPoint.toShortString());<br>
            return proceedingJoinPoint.proceed(new Object[]{message});<br>
        } finally {<br>
            clock.stop();<br>
            StringBuilder outMessageBuilder = new StringBuilder();<br>
            outMessageBuilder.append(clock.getTotalTimeMillis()).append(";")<br>
                    .append(payload.getClass().getSimpleName()).append(";")<br>
                    .append(fileName).append(";")<br>
                    .append(start.toString("yyyy-MM-dd HH:mm:ss"));<br>
            channel.send(new StringMessage(outMessageBuilder.toString()));<br>
        }<br>
    }</p>
<p>    @Required<br>
    public void setPerformanceMonitor(MessageChannel performanceMonitor) {<br>
        channel = performanceMonitor;<br>
    }<br>
}<br>

The Log4jMonitorPersister class:

<br>
public class Log4jMonitorPersister implements MonitorPersister {<br>
    private final static Logger log = LoggerFactory.getLogger(MonitorPersister.class);</p>
<p>    @Override<br>
    public void persist(String monitorEvent) {<br>
        log.info(monitorEvent);<br>
    }<br>
}<br>

Conclusion

I have showed you how we use Spring Integration to migrate XML content to Hippo CMS. As you could see the configuration is relatively simple and when we want to import a new content type, all we have to do is create a handler and wire up the configuration. As always SpringSource provides good documentation that makes is easy work with Spring Integration.

References

Spring Integration: http://static.springsource.org/spring-integration/reference/htmlsingle/spring-integration-reference.html