How to write an elasticsearch river plugin

by Luca CavannaJanuary 10, 2013

Up until now I told you why I think elasticsearch is so cool and how you can use it combined with Spring. It’s now time to get to something a little more technical. For example, once you have a search engine running you need to index data; when it comes to indexing data you usually need to choose between the push and the pull approach. This blog entry will detail these approaches and goes into writing a river plugin for elasticsearch.

Implementing the push approach means writing your own indexer using your favourite programming language and pushing data to the search engine through some client library or even sending REST requests to it.

On the other hand, implementing the pull approach with elasticsearch means writing a special type of plugin, also known as river, which will pull data from a data source and index it. The data source can be whatever system you can get data from: the file system, a database, and so on.

While the push approach is the most flexible one, the river is a nice and standard way to distribute an indexer as a plugin. You then run it (at your own risk) in elasticsearch itself, without the need to start up a separate application/process. And if you are a Java developer, it makes even less difference since you are probably going to use the elasticsearch Java API to index data and interact with it, either writing the separate indexer or the river. In the end the code is going to be pretty much the same except for the bootstrap part.

But you may still wonder, what’s better? Push or pull? My answer is: it depends!

If you don’t want to use Java, or if you want to have complete control over the indexing process and write a specific piece of software for your own needs, then the push approach seems to be the way to go.
But if you’re a Java guy and would like to write a generic indexer which other users can benefit from, or if you just don’t want to bother maintaining a separate application for it, the river is a good choice. There are already quite some elasticsearch rivers available. For example, you can give the JDBC river a try if you want to index data taken from a database using JDBC, or you can use the Twitter river to import data from Twitter.

Getting started

I recently wrote the Solr river which I’m going to use as an example for this blogpost in order to show you what the steps needed to write a river are. The Solr river allows you to easily import data from a running Solr instance. Despite the initial concept behind elasticsearch rivers was to handle a stream of constant data, like for example the Twitter river does using the Twitter streaming API, the Solr river is not meant to keep Solr in sync with Elasticsearch (why would you do that?), but only to import data once and start working with elasticsearch in no time.

Elasticsearch uses Google Guice as a dependency injection framework. You don’t need to know a lot about it to write a plugin but I’d suggest to have a look at its Getting Started page if you haven’t heard about it. Also remember that Guice is one of those dependencies that are shared with elasticsearch itself. The version used at the time of writing is 2.0.

Since we are writing a plugin, the first step is to write a class that implements the Plugin interface. That’s even easier if we extend the AbstractPlugin class that handles some boilerplate code for us. We need to implement the name and description methods in order to provide a name and a description for the plugin.

    @Override
    public String name() {
        return "river-solr";
    }

    @Override
    public String description() {
        return "River Solr plugin";
    }

After that we need to register our plugin’s components. What do I mean by this? Every plugin adds some features to the system, and those features need to be registered so that the system knows about them. You can do it through Guice. While loading all plugins, elasticsearch invokes (via reflection) a method called onModule with a parameter that extends the AbstractModule class. The AbstractModule class is actually the base class for any Guice module, which contains a collection of bindings. We are writing a river, therefore we need to write the following method that receives the RiversModule as input, where we can register our river:

public void onModule(RiversModule module) {
    module.registerRiver("solr", SolrRiverModule.class);
}

But the PluginsService gives us as input the module that we need for our plugin, which isn’t always the RiversModule. For example, if we wanted to write a new analyzer, the onModule method would have looked like this:

public void onModule(AnalysisModule module) {
    module.addAnalyzer("new-analyzer", NewAnalyzerProvider.class);
}

while if we wanted to write a plugin that adds a new scripting engine, the onModule would have been like the following:

public void onModule(ScriptModule module) {
    module.addScriptEngine(NewScriptEngineService.class);
}

Finally, if we wanted to write a plugin that adds a new REST endpoint to elasticsearch, here is the needed onModule method:

public void onModule(RestModule module) {
    module.addRestAction(NewRestAction.class);
}

Back to our Solr river, what is the SolrRiverModule that we registered to the RiversModule? That’s our specific Guice module. It contains all the bindings required for our plugin. For the Solr river (and most of the plugins) we just need to load the SolrRiver class which contains, as we’ll see in a moment, the code for our river and implements the River interface.

public class SolrRiverModule extends AbstractModule {
    @Override
    protected void configure() {
        bind(River.class).to(SolrRiver.class).asEagerSingleton();
    }
}

The above line of code just tells Guice that for this module the River class implementation in use will be SolrRiver. We are also asking Guice to eagerly initialize our class. This means that the SolrRiver instance will be created during Guice initialization and not only when needed. As a result, we’ll know immediately if there are problems while creating the object.

We are ready to have a look at the river code, but first I need to show you something else. How does elasticsearch know to load the initial SolrRiverPlugin class? Some kind of component scanning would be nice here, but that’s not available with guice, neither with elasticsearch. What we need to do is put a file called es-plugin.properties on the classpath. It needs to contain the following line, which tells elasticsearch what class to load in order to start the plugin:

plugin=org.elasticsearch.plugin.river.solr.SolrRiverPlugin

A river implements the River interface and usually extends the AbstractRiverComponent too. The River interface contains only two methods: start, called when the river is started (which can be either when you register the river or when the node on which the river is allocated is started) and close, called when the river is closed. The AbstractRiverComponent is just a helper that initializes the logger for the river and stores the river name and the river settings on two instance members.

The plugin constructor is annotated with the Guice @Inject annotation, so that the object will be injected by Guice with all the needed dependencies. The current SolrRiver only depends on Rivername, RiverSettings and Client, where Client is a client pointing to the node where the river is allocated.

@Injected
protected SolrRiver(RiverName riverName,
                    RiverSettings riverSettings,
                    Client client)

The nice thing here is that if you have other dependencies on objects that are already available in elasticsearch and bound through Guice, you can just add them as constructor parameters. For example, to add scripting capabilities to the river (which by the way might be the next feature I’m going to work on) we could simply add a parameter of type ScriptService.

What we basically do in the river constructor is reading the settings used when registering it in order to control the river behaviour. The start method contains what the river really does: it sends a query to a running Solr instance through SolrJ and indexes the result in elasticsearch. It uses pagination (configurable page size) while querying in order to avoid retrieving too many documents at the same time.

In order to use a river you need to do something really similar to what you do when you index a document:

curl -XPUT localhost:9200/_river/solr_river/_meta -d '{
    "type" : "solr"
}'

The JSON above is the very minimum configuration needed to register a river, providing its type. The type must match with the string previously provided when registering the SolrRiverModule to the RiversModule:

module.registerRiver("solr", SolrRiverModule.class);

And…yes In fact you are indexing a document, but on a special index called _river, type solr_river. You document id is _meta. If you provide additional configuration while registering the river like this:

curl -XPUT localhost:9200/_river/solr_river/_meta -d '{
    "type" : "solr",
    "solr" : {
        "url" : "http://localhost:8080/solr/",
        "q" : "*:*"
    }
}'

that extra information is stored in the _meta document as well. You can even consider to keep some kind of river state within this special index if you need to. It gets initialized with one shard and one replica.

After you registered a river on a node, every time you start the node it will start again, eventually trying to import again your data. If that’s not what you want, for example because the river is meant to only import data once, like the Solr river, you can just automatically close it after the data import, like this:

client.admin().indices().prepareDeleteMapping("_river").setType(riverName.name()).execute();

Also, it’s good to know that the river is a singleton in the cluster. It’s allocated on a single node which can change in case of failure. You can also control through configuration the allocation of rivers over the cluster.

Use Bulk API

A little hint that can save you time: when you index data, you might want to have a look at the Bulk API to index more documents at the same time and make the indexing process faster. But then you need to control how often you want to send the index request. Every 100 documents? Every 5 minutes? Both? It all depends on your data source…

And what is the maximum number of concurrent bulks you want to allow? In fact it can happen that your data source is faster than your bulks and you might not want to keep adding concurrent bulks over and over. It’s probably best to wait a little, until some of the bulks are completed to then run the new ones. Since this logic is needed pretty much everywhere when it comes to index data, the elasticsearch team exposed the BulkProcessor that helps you a lot:

bulkProcessor = BulkProcessor.builder(client,
        new BulkProcessor.Listener() {

    @Override
    public void beforeBulk(long executionId,
                           BulkRequest request) {
        logger.info("Going to execute new bulk composed of {} actions",
                request.numberOfActions());
    }

    @Override
    public void afterBulk(long executionId,
                          BulkRequest request,
                          BulkResponse response) {
        logger.info("Executed bulk composed of {} actions",
                request.numberOfActions());
    }

    @Override
    public void afterBulk(long executionId,
                          BulkRequest request,
                          Throwable failure) {
        logger.warn("Error executing bulk", failure);
    }
}).setBulkActions(100).setFlushInterval(TimeValue.timeValueMinutes(5))
        .setConcurrentRequests(10).build();

The code above creates the BulkProcessor and configures it to execute the bulk when 100 documents are ready to be indexed and when 5 minutes have passed from the last bulk execution. Also, a maximum number of 10 concurrent bulks will be run.

The following code adds an index request to the previously created BulkProcessor

bulkProcessor.add(Requests.indexRequest(indexName).type(typeName)
        .id(id).source(jsonWriter.toString()));

And…don’t forget to close the bulk at the end to index any left documents:

bulkProcessor.close();

After reading this article, you should know in detail how to write an elasticsearch river.

Just to be sure, let’s break it down to a few simple steps:

  • Write your own plugin class, which implements the Plugin interface (extending the AbstractPlugin class)
  • Add the onModule method to your plugin class
  • Write your Guice module for your plugin
  • Add the es-plugin.properties containing the name of your plugin class to load
  • Write your river class, which implements River and extends AbstractRiverComponent

So then hope this insight helps you to write your own plugin and let me know how you get on…