Simulating an Elasticsearch Ingest Node pipeline

by Patrick KikFebruary 2, 2017

Indexing document into your cluster can be done in a couple of ways:

  • using Logstash to read your source and send documents to your cluster;
  • using Filebeat to read a log file, send documents to Kafka, let Logstash connect to Kafka and transform the log event and then send those documents to your cluster;
  • using curl and the Bulk API to index a pre-formatted file;
  • using the Java Transport Client from within a custom application;
  • and many more…

Before version 5 however there where only two ways to transform your source data to the document you wanted to index. Using Logstash filters, or you had to do it yourself.

In Elasticsearch 5 the concept of the Ingest Node has been introduced. Just a node in your cluster like any other but with the ability to create a pipeline of processors that can modify incoming documents. The most frequently used Logstash filters have been implemented as processors.

For me, the best part of pipelines is that you can simulate them. Especially in Console, simulating your pipelines makes creating them very fast; the feedback loop on testing your pipeline is very short. Making using pipelines a very convenient way to index data.

If you execute this in Kibana Console:

    POST _ingest/pipeline/_simulate
    {
      "pipeline": {
        "description": "My pretty pipeline",
        "processors": []
      },
      "docs": [
        {
          "_index": "my-index",
          "_type": "my-type",
          "_id": "my-id",
          "_source": {
            "message": "[2017-02-02T09:02:34.234+01:00] nl.trifork.blog.MyClass - {\"value\": 4, \"nextValue\": 8}"
          }
        }
      ]
    }

The result will be:

    {
      "docs": [
        {
          "doc": {
            "_index": "my-index",
            "_id": "my-id",
            "_type": "my-type",
            "_source": {
              "message": """[2017-02-02T09:02:34.234+01:00] nl.trifork.blog.MyClass - {"value": 4, "nextValue": 8}"""
            },
            "_ingest": {
              "timestamp": "2017-02-02T08:25:09.550Z"
            }
          }
        }
      ]
    }

The document that would be indexed is shown. But this is not a nice looking document. First let’s use a grok processor to split up the log message:

    POST _ingest/pipeline/_simulate
    {
      "pipeline": {
        "description": "My pretty pipeline",
        "processors": [
          {
            "grok": {
              "field": "message",
              "patterns": [
                "\\[%{TIMESTAMP_ISO8601:timestamp}] %{JAVACLASS:class_name} - %{GREEDYDATA:message}"
              ]
            }
          }
        ]
      },
      "docs": [
        {
          "_index": "my-index",
          "_type": "my-type",
          "_id": "my-id",
          "_source": {
            "message": "[2017-02-02T09:02:34.234+01:00] nl.trifork.blog.MyClass - {\"value\": 4, \"nextValue\": 8}"
          }
        }
      ]
    }

Notice the pre-defined expressions of grok that Logstash also uses. The resulting document looks a lot better:

    {
      "docs": [
        {
          "doc": {
            "_index": "my-index",
            "_id": "my-id",
            "_type": "my-type",
            "_source": {
              "message": """{"value": 4, "nextValue": 8}""",
              "class_name": "nl.trifork.blog.MyClass",
              "timestamp": "2017-02-02T09:02:34.234+01:00"
            },
            "_ingest": {
              "timestamp": "2017-02-02T08:30:27.930Z"
            }
          }
        }
      ]
    }

Some fields are extracted, but there are more structured fields that can be extracted. The message field actually contains a JSON document. The JSON processor can help parsing it:

    POST _ingest/pipeline/_simulate
    {
      "pipeline": {
        "description": "My pretty pipeline",
        "processors": [
          {
            "grok": {
              "field": "message",
              "patterns": [
                "\\[%{TIMESTAMP_ISO8601:timestamp}] %{JAVACLASS:class_name} - %{GREEDYDATA:message}"
              ]
            }
          },
          {
            "json": {
              "field": "message",
              "add_to_root": true
            }
          }
        ]
      },
      "docs": [
        {
          "_index": "my-index",
          "_type": "my-type",
          "_id": "my-id",
          "_source": {
            "message": "[2017-02-02T09:02:34.234+01:00] nl.trifork.blog.MyClass - {\"value\": 4, \"nextValue\": 8}"
          }
        }
      ]
    }

Now the output of the logger in MyClass is easy accessible as well:

    {
      "docs": [
        {
          "doc": {
            "_index": "my-index",
            "_id": "my-id",
            "_type": "my-type",
            "_source": {
              "nextValue": 8,
              "message": """{"value": 4, "nextValue": 8}""",
              "class_name": "nl.trifork.blog.MyClass",
              "value": 4,
              "timestamp": "2017-02-02T09:02:34.234+01:00"
            },
            "_ingest": {
              "timestamp": "2017-02-02T08:33:58.593Z"
            }
          }
        }
      ]
    }

Instead of ‘value’ and ‘nextValue’, Let’s rename rename them to ‘value0’ and ‘value1’. For that you should use the rename processor. Also the message field is of no use any more:

    POST _ingest/pipeline/_simulate
    {
      "pipeline": {
        "description": "My pretty pipeline",
        "processors": [
          {
            "grok": {
              "field": "message",
              "patterns": [
                "\\[%{TIMESTAMP_ISO8601:timestamp}] %{JAVACLASS:class_name} - %{GREEDYDATA:message}"
              ]
            }
          },
          {
            "json": {
              "field": "message",
              "add_to_root": true
            }
          },
          {
            "remove": {
              "field": "message"
            }
          },
          {
            "rename": {
              "field": "value",
              "target_field": "value0"
            }
          },
          {
            "rename": {
              "field": "nextValue",
              "target_field": "value1"
            }
          }
        ]
      },
      "docs": [
        {
          "_index": "my-index",
          "_type": "my-type",
          "_id": "my-id",
          "_source": {
            "message": "[2017-02-02T09:02:34.234+01:00] nl.trifork.blog.MyClass - {\"value\": 4, \"nextValue\": 8}"
          }
        }
      ]
    }

Then the new and very clean document looks like this:

    {
      "docs": [
        {
          "doc": {
            "_index": "my-index",
            "_id": "my-id",
            "_type": "my-type",
            "_source": {
              "value1": 8,
              "value0": 4,
              "class_name": "nl.trifork.blog.MyClass",
              "timestamp": "2017-02-02T09:02:34.234+01:00"
            },
            "_ingest": {
              "timestamp": "2017-02-02T08:37:33.013Z"
            }
          }
        }
      ]
    }

Now let’s add the two values together. With the script processor the new Painless scripting language can be used to create a sum of these two values:

    POST _ingest/pipeline/_simulate
    {
      "pipeline": {
        "description": "My pretty pipeline",
        "processors": [
          {
            "grok": {
              "field": "message",
              "patterns": [
                "\\[%{TIMESTAMP_ISO8601:timestamp}] %{JAVACLASS:class_name} - %{GREEDYDATA:message}"
              ]
            }
          },
          {
            "json": {
              "field": "message",
              "add_to_root": true
            }
          },
          {
            "remove": {
              "field": "message"
            }
          },
          {
            "rename": {
              "field": "value",
              "target_field": "value0"
            }
          },
          {
            "rename": {
              "field": "nextValue",
              "target_field": "value1"
            }
          },
          {
            "script": {
              "inline": "ctx.total = ctx.value0 + ctx.value1"
            }
          }
        ]
      },
      "docs": [
        {
          "_index": "my-index",
          "_type": "my-type",
          "_id": "my-id",
          "_source": {
            "message": "[2017-02-02T09:02:34.234+01:00] nl.trifork.blog.MyClass - {\"value\": 4, \"nextValue\": 8}"
          }
        }
      ]
    }

The end result looks like this:

    {
      "docs": [
        {
          "doc": {
            "_index": "my-index",
            "_id": "my-id",
            "_type": "my-type",
            "_source": {
              "total": 12,
              "value1": 8,
              "value0": 4,
              "class_name": "nl.trifork.blog.MyClass",
              "timestamp": "2017-02-02T09:02:34.234+01:00"
            },
            "_ingest": {
              "timestamp": "2017-02-02T08:37:33.013Z"
            }
          }
        }
      ]
    }

Now that you are happy about your pipeline, you can register it within your Elasticsearch cluster:

    PUT _ingest/pipeline/my-pretty-pipeline
    {
      "description": "My pretty pipeline",
      "processors": [
          {
            "grok": {
              "field": "message",
              "patterns": [
                "\\[%{TIMESTAMP_ISO8601:timestamp}] %{JAVACLASS:class_name} - %{GREEDYDATA:message}"
              ]
            }
          },
          {
            "json": {
              "field": "message",
              "add_to_root": true
            }
          },
          {
            "remove": {
              "field": "message"
            }
          },
          {
            "rename": {
              "field": "value",
              "target_field": "value0"
            }
          },
          {
            "rename": {
              "field": "nextValue",
              "target_field": "value1"
            }
          },
          {
            "script": {
              "inline": "ctx.total = ctx.value0 + ctx.value1"
            }
          }
      ]
    }

Now index a document, using the newly created pretty pipeline:

    POST my-index/my-type/my-id?pipeline=my-pretty-pipeline
    {
      "message": "[2017-02-02T09:02:34.234+01:00] nl.trifork.blog.MyClass - {\"value\": 4, \"nextValue\": 8}"
    }

The resulting document, when searched for, looks awesome:

    {
      "_index": "my-index",
      "_type": "my-type",
      "_id": "my-id",
      "_version": 1,
      "found": true,
      "_source": {
        "value0": 3,
        "value1": 11,
        "total": 14,
        "class_name": "nl.trifork.blog.MyClass",
        "timestamp": "2017-02-02T09:02:34.234+01:00"
      }
    }

The GOTO Academy offers a variety of courses. One of them is the Elastic Stack 5 Masterclass. This course will teach you about the new features of version 5 of the Elastic Stack like the Ingest Node and the Painless scripting language.