Spring-AMQP and payload validation: some notes from the trenches

by Joris KuipersFebruary 29, 2016

It’s been a while since I’ve written one of our from-the-trenches blogs: that’s mostly because I’ve been very busy in those trenches developing systems for our customers.

This week I completed a Spring Boot-based microservice which is responsible for interacting with some 3rd party SOAP service: its own clients communicate with it by sending request message over RabbitMQ, and the service then sends back a response to a response queue after handling the SOAP response message.

Of course I used Spring-AMQP to build this service. Spring-AMQP supports a nice annotation-based listener programming model, based on Spring’s generic Message support.
That allows writing listener methods like this:

@RabbitListener(queues = REQUEST_QUEUE)
public DeclarationResponse submitDeclaration(DeclarationRequest request) {
  // handle the request and return a response
}

The request parameter here is the result of converting the incoming AMQP message using a Spring-AMQP MessageConverter, after which it is considered to be the payload of the message (even when headers are used in the conversion as well).

The request messages that the clients send have some required fields: without those fields, the service can’t make the SOAP calls. While reading the RabbitListener JavaDoc I noticed that Spring-AMQP allows you to apply validation to message payload parameters by annotating it. When using this, you also have to add the @Payload annotation (which is optional without validation if your method doesn’t have any other arguments), so the result looks like this:

@RabbitListener(queues = REQUEST_QUEUE)
public DeclarationResponse submitDeclaration(@Valid @Payload DeclarationRequest request) { … }

By the way, Spring’s own @Validated (even as a meta-annotation) and in fact every annotation whose name starts with “Valid” are supported for this purpose as well.

Now we can add some JSR-303 Bean Validation annotations to the fields in our DeclarationRequest, like @NotNull, to express our validation constraints.

Defining and registering a validator

You might think, as I did at first, that this would suffice to have the parameter validated before the listener method is invoked. However, as it turns out you have to explicitly inject a Validator into a DefaultMessageHandlerMethodFactory which you then have to register as the MessageHandlerMethodFactory to be used by Spring-AMQP per the documentation, even when using the Spring Boot autoconfiguration

To do that, you need a Validator bean. Since I’m using a Spring Boot web application, there should already be a default Validator bean — created by the web auto-configuration — called “mvcValidator”. However, when I tried to autowire that Validator into my configuration I got an unexpected error at startup:

ERROR 10732 --- [ main] o.s.boot.SpringApplication : Application startup failed

org.springframework.beans.factory.BeanCreationException:
  Error creating bean with name 'defaultServletHandlerMapping'
  defined in class path resource
  [org/springframework/boot/autoconfigure/web/WebMvcAutoConfiguration$EnableWebMvcConfiguration.class]:
  Bean instantiation via factory method failed; nested exception is
  org.springframework.beans.BeanInstantiationException:
  Failed to instantiate [org.springframework.web.servlet.HandlerMapping]:
  Factory method 'defaultServletHandlerMapping' threw exception;
  nested exception is java.lang.IllegalArgumentException:
   A ServletContext is required to configure default servlet handling
 at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod
 at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod
[...]
 at org.springframework.boot.SpringApplication.run
 at myapp.Application.main
Caused by: org.springframework.beans.BeanInstantiationException:
 Failed to instantiate [org.springframework.web.servlet.HandlerMapping]:
Factory method 'defaultServletHandlerMapping' threw exception;
nested exception is java.lang.IllegalArgumentException:
  A ServletContext is required to configure default servlet handling
 at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate
 at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod
 ... 18 common frames omitted
Caused by: java.lang.IllegalArgumentException:
  A ServletContext is required to configure default servlet handling
 at org.springframework.util.Assert.notNull
 at org.springframework.web.servlet.config.annotation.DefaultServletHandlerConfigurer.<init>;
 at org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport.defaultServletHandlerMapping
[...]
 ... 19 common frames omitted

Oops…

Apparently adding an “@Autowired Validator validator” to your main Spring Boot class triggers some premature initialization causing this error.
Interestingly enough, when you move your AMQP-related Spring configuration (I had everything in my main class first) to a dedicated @Configuration class and autowire the Validator into that, this problem does not happen.

I’m not even going to try to understand the details of that; Spring initialization in Boot is a rather complicated and obviously sometimes also brittle process, so to err on the safe side I simply added a dedicated Validator for Spring-AMQP like this:

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
   registrar.setMessageHandlerMethodFactory(validatingHandlerMethodFactory());
}

@Bean
DefaultMessageHandlerMethodFactory validatingHandlerMethodFactory() {
   DefaultMessageHandlerMethodFactory factory =
       new DefaultMessageHandlerMethodFactory();
   factory.setValidator(amqpValidator());
   return factory;
}

@Bean
Validator amqpValidator() {
   return new OptionalValidatorFactoryBean();
}

Handling validation errors

So, with this in place we now have working payload validation for incoming AMQP-messages!
Hooray, let’s check what happens if we now send an invalid message:

WARN 22764 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException:
Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public myapp.DeclarationResponse myapp.WsClient.submitDeclaration(myapp.DeclarationRequest)]
Bean [myapp.WsClient@e8e0dec]
 at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:120)
[...]
Caused by: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException:
Could not resolve method parameter at index 0 in method:
public myapp.DeclarationResponse myapp.WsClient.submitDeclaration(myapp.DeclarationRequest) ,
with 3 error(s): [Field error in object 'request' on field 'declarationId': rejected value [null]; [...]
 at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.validate(PayloadArgumentResolver.java:178)
[...]
 ... 12 common frames omitted

WARN 22764 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException:
Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public myapp.DeclarationResponse myapp.WsClient.submitDeclaration(myapp.DeclarationRequest)]
Bean [myapp.WsClient@e8e0dec]
 at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:120)
[...]
Caused by: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException:
Could not resolve method parameter at index 0 in method:
public myapp.DeclarationResponse myapp.WsClient.submitDeclaration(myapp.DeclarationRequest) ,
with 3 error(s): [Field error in object 'request' on field 'declarationId': rejected value [null]; [...]
 at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.validate(PayloadArgumentResolver.java:178)
[...]
 ... 12 common frames omitted

WARN 22764 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException:
Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public myapp.DeclarationResponse myapp.WsClient.submitDeclaration(myapp.DeclarationRequest)]
Bean [myapp.WsClient@e8e0dec]
 at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:120)
[...]
Caused by: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException:
Could not resolve method parameter at index 0 in method:
public myapp.DeclarationResponse myapp.WsClient.submitDeclaration(myapp.DeclarationRequest) ,
with 3 error(s): [Field error in object 'request' on field 'declarationId': rejected value [null]; [...]
 at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.validate(PayloadArgumentResolver.java:178)
[...]
 ... 12 common frames omitted

[repeated ad infinitum]

What you see here is that when the message is received and the validation fails, the message is rejected and put back on the queue. However, that only causes the message to be received again, and of course the redelivered message will fail to validate again, causing the same error over and over again.

I found this quite surprising, as I had expected a validation error to be considered as fatal (it’s non-transient in nature), thus causing the message to simply be discarded instead of requeued. Instead, it becomes a ‘poison pill’ which causes this pathological behavior.
BTW, in the case of AMQP a message is put back at the tail of the queue when it’s being requeued: that will at least not prevent other messages from being received. With JMS, where an unacknowledged message will be put back at the head of the queue, this would be even worse: it would prevent the processing of any further messages!

Discarding invalid messages

After a little digging in the Spring-AMQP sources, I found that you remedy this relatively easily: the SimpleRabbitListenerContainerFactory (which is the only supplied implementation of a RabbitListenerContainerFactory) creates a SimpleMessageListenerContainer with a default ErrorHandler of type ConditionalRejectingErrorHandler. This ConditionalRejectingErrorHandler uses a so-called FatalExceptionStrategy to determine if an exception should be considered ‘fatal’, i.e. if it should cause the received message to be discarded rather than requeud.
The standard strategy only considers an exception to be fatal if it was caused by a MessageConversionException. However, when there is a validation error then this doesn’t cause a MessageConversionException, but a MethodArgumentNotValidException. Therefore, you need to implement your own FatalExceptionStrategy that considers that root exception as fatal too. Here’s how you can implement and configure that in your configuration class:

@Bean
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
  SimpleRabbitListenerContainerFactory listenerContainerFactory =
      new SimpleRabbitListenerContainerFactory();
  listenerContainerFactory.setConnectionFactory(connectionFactory);
  listenerContainerFactory.setErrorHandler(
      new ConditionalRejectingErrorHandler(
          new InvalidPayloadRejectingFatalExceptionStrategy()));
  listenerContainerFactory.setMessageConverter(messageConverter());
  return listenerContainerFactory;
}

/**
 * Extension of Spring-AMQP's
 * {@link ConditionalRejectingErrorHandler.DefaultExceptionStrategy}
 * which also considers a root cause of {@link MethodArgumentNotValidException}
 * (thrown when payload does not validate) as fatal.
 */
static class InvalidPayloadRejectingFatalExceptionStrategy implements FatalExceptionStrategy {

  private Logger logger = LoggerFactory.getLogger(getClass());

  @Override
  public boolean isFatal(Throwable t) {
    if (t instanceof ListenerExecutionFailedException &&
          (t.getCause() instanceof MessageConversionException ||
           t.getCause() instanceof MethodArgumentNotValidException)) {
      logger.warn("Fatal message conversion error; message rejected; it will be dropped: {}",
                  ((ListenerExecutionFailedException) t).getFailedMessage());
      return true;
    }
    return false;
  }
}

This implementation differs from Spring-AMQP’s default in two ways: most importantly, it checks for MethodArgumentNotValidExceptions in addition to MessageConversionExceptions. Also, it does not log a stack trace when returning true: that stack trace will already have been logged when this code is invoked by the ConditionalRejectingErrorHandler, so there’s no need to log it again.
I’d argue that including MethodArgumentNotValidExceptions here should be the default implementation really, so I’m planning to file an issue for that.

And that’s it, then: we now have validation in place that will cause invalid messages to simply be dropped when received after logging the problem in detail (but only once).

Alternatively, you could simply disable requeueing of rejected messages altogether on your SimpleRabbitListenerContainerFactory:

listenerContainerFactory.setDefaultRequeueRejected(false);

However, that would cause message loss if some transient error occurrs while processing the message: in my particular case that’s undesirable.

Conclusion

I hope that post will help some people who start out with validating their AMQP messages and run into the same issues as I did.
This is another great example of why it’s so nice to use good quality open-source software for your applications: it allows you to redefine the behavior of the libraries in an easily controlled manner, and allows you to propose improvements to the libraries themselves as well!