December 12, 2016

Day 12 - Logstash Fundamentals

Written by: Jamie Riedesel (@sysadm1138)
Edited by: Nir Cohen (@thinkops)

Logstash by Elastic Co is more than simply a platform for stashing logs, it’s a data-transformation and shipping pipeline that’s also very well suited for log data. At my company, we use Logstash in at least one data-transformation role for the main applications. In other companies, Logstash isn’t involved in log-aggregation at all and is only used for its data transformation capabilities.

Essentially, Logstash is a specialized form of ETL pipeline. It extracts log data from a variety of sources, transforms it using one of the many filters available, and loads it into data-stores (it’s built to dump into ElasticSearch, but it’s not required) or API endpoints. Quite handy. Here are some of the things you can do with it:

  • Ingest your application’s log-files.
  • Directly act as a central RFC3164-based syslog server.
  • Ship log-files from your central RFC5454-based syslog server.
  • Ingest your Windows Event Log information for long term storage.
  • Ingest your audit.log information.
  • Tee data-streams into multiple backing stores, like OpenTSDB, Graphite, or MongoDB.
  • Send events into queuing systems like Kafka, RabbitMQ, and Redis.
  • Gather and collate metrics from the event-stream, dumping them into a time-series database.
  • Monitor twitter for keywords and send alerts to your marketing team.
  • Open JIRA tickets based on events discovered in the stream.

Logstash can scale from an all-in-one box, useful for testing out your parsing logic, to massive distributed parsing architectures supporting the data load of thousands of nodes. It can scale with you as you grow, so long as you pay attention to the distributed nature of some of what it does. We will get into that.

To understand how it works, first we need to talk about its architecture.

The Pipeline

The Logstash processing pipeline has three stages. Input, Filter, and Output. Inputs and outputs have encoders and decoders that can translate data into or out of common formats such as JSON. Logstash Pipeline Inputs define where you are getting your event data. Each item in the pipeline is called an event. This is almost always a line of text, accompanied by a variety of metadata depending on the plugin used. Events may be polled by Logstash, or pushed to Logstash.

Filters define how you transform and enrich the events in the pipeline.

Outputs define where you send the processed events.

Logstash is written in Jruby, which is an implementation of Ruby inside the Java JVM. As such, it threads very well. Which means understanding how Logstash threads each of the pipeline stages will help you understand its limitations and how it scales.

Each input { } block in the config-file is given its own thread. If you have multiple high-throughput files, or different input types in your config, you may get better performance on multi-core systems by defining each of those in a separate input { } block.

Don’t go nuts with this, be intentional about it. For the file plugin, if you’re dealing with fewer than 200 log-lines a second you will not gain much benefit from splitting. If your application creates very active logs, and you’re also monitoring syslog, you may want to put the application-logs in one input {} and syslog-files in another to ensure syslog rarely gets backlogged behind the application-logs. On machines with low core-counts, premature optimization can hurt.

Inputs queue up micro-batches, which are picked up by the filter stage. You can configure how many workers process the filter queue through the -w or --pipeline.workers command-line parameter in Logstash. By default, it’s set to 8.

The worker threads take the micro-batches off of the queue, and run them through the filters. The same thread takes the transformed events, and then runs through the outputs. On a multi-core system, this leads to highly parallel processing.

Of note, the threads do not share state between each other. Certain plugins, such as the metrics plugin, keep some state internal to the thread. When it flushes its events, a single logstash node will flush, by default, 8 separate metrics events. In the case of metrics, it is very wise to use something like statsd to further aggregate the metrics coming out of logstash. The metrics plugin is the biggest trap for the unwary, as people like their numbers and the squiggly lines on graphs that use them.

The state issue matters if you have a need to correlate event flows, such as marking a start, middle, and end of an event contained on multiple log-lines. This needs to be done at the input, likely with the multiline codec, or in a separate ETL process outside of Logstash entirely.

Warning: If you are on logstash 2.1 or older, you’re running an inefficient pipeline model where they added another queue between the filter stage and the output stage. This way, outputs were single-threaded. It also meant that events processed through the pipeline more slowly. On Logstash 1.5, our 4-core main parsing nodes topped out at 47% CPU no matter how we adjusted worker-count or JVM Heap. When we upgraded to Logstash 2.2 we changed nothing about worker-counts or Heap and the CPU high-water mark reached 75%. The event throughput rate rose at the same rate, all from that redesigned pipeline.

The Config-file

The config-file for logstash is broken up into three sections to match the pipeline model. There are two other files for managing JVM settings and startup options, but those will not be covered by this article.

input {
  # Input plugin declarations go here.
  # There can be more than one plugin per block.
}

input {
  # There can be multiple input blocks, too. Threading, as discussed.
}

filter {
  # Filter plugin declarations go here
}

filter {
  # Like inputs, you can have multiple filters.
  # In fact, you almost definitely will have more than one.
}

output {
  # Output plugin declarations go here.
  # yes, like the other two, there can be multiple declarations.
  # And multiple output plugin declarations.
}

config files are kept at /etc/logstash/conf.d/*, the location of which can be overidden by command-line parameters. It isn’t clear what order Logstash loads those files, so it’s still a good idea to make a single big one to be sure order is preserved. input {} blocks don’t care much about order as they all run in their own threads. Filter blocks are handled by multiple threads and order is important there.

When an event is created by an input plugin, a message field is created with the contents of the text received. The event will also have metadata fields attached to it that depeend on the individual plugins. Use of a codec, such as JSON, will cause fields contained in the incoming text to be applied to the event. Once this metadata tagging is done the event is handed off to the filter workers in micro-batches.

Filter workers then take the marked up event and process them. This can be anything from adding fields, using the grok filter to parse fields for more fields, dropping fields, casting fields to a data type, and many more transforms. For complex filter chains, a field may exist only briefly; created and thrown away as filtering proceeds.

To explain why ordering matters in filters, take a look at this example:

# Parse an authentication header and get details
filter {
  if [message] =~ "Authentication_request" {
    grok {
      match => {
        message => "Authentication_request: %{GREEDYDATA:auth_message}"
      }
    }
    add_field => {
      "sub_type" => "authentication"
    }
  }
}

# Parse the auth_message and populate auth-related fields.
filter {
  if [sub_type] == "authentication" {
    grok {
      match => {
        auth_message => "%{WORD:auth_type} / %{WORD:auth_user} / %{WORD:application}"
      }
    }
  }
}

first filter block creates a field called sub_type, and sets its value to authentication. It also decodes the message field and creates a new field called auth_message.

The second filter block checks to see if the sub_type field is set to authentication and then does some work on the auth_message field that should be there. Due to ordering, if this second block appeared before the first it would never execute.

Now that we’ve discussed order and the overall file, it’s time to go into each section.

Inputs!

As of very recently, there were over 50 available input plugins. That’s quite a lot, so I’m going to focus on the plugins that sysadmins would be most interested in for gathering log and telemetry data:

  • file, because we work with log files.
  • eventlog, because Windows needs stashing too.
  • beats, for reasons you’ll see in a moment.

There are others that are very interesting, but would make this long read even longer:

  • TCP and UDP, send text at a port directly, and do away with your application logfiles entirely.
  • rabbitmq, zeromq, and kafka, receive events off of a message broker.
  • irc, hipchat, and xmpp, to archive chat messages for compliance reasons, allow analytics on support-channel traffic, or use as a message-passing bus.
  • jdbc, for pulling events out of RDBMS.

File

The file input is very straight forward. You give it a path or an array of paths to monitor. That’s it. You can enhance the input’s performance with additional options. Here is a simple example:

input {
  file {
    path => [
      "/var/log/syslog",
      "/var/log/auth.log"
    ]
    type => "syslog"
  }
}

will monitor the local syslog files, and set the event type to syslog. Depending on your requirements, this may be all you need to configure in your input section.

A more detailed example:

input {
  file {
    path => [
      "/var/log/app/*"
    ]
    exclude => "*.gz"
    type => "applog"
    codec => "json"
  }
}

This configures a file input to monitor an entire directory. It has been given an exclude parameter to ensure gzipped files are not stashed after logrotate runs. It uses the JSON codec for events, because this hypothetical application dumps JSON-encoded log-lines. This puts the fields in the JSON into the event directly, without need for parsing them out during the filter stage. Handy!

A warning about multi-line events

Some applications, such as tomcat, are known to produce logs with events spread across many, many, many lines. Logstash needs to be explicitly told about the multiline format for a given input plugin. To handle logs of this type, you will need to specify codec => multiline {} in your file {} declaration. The options in this codec specify how multiple lines are aggregated into a single event. For Tomcat, this could be, for lines that begin with whitespace, aggregate it with the previous line. When this codec is used on an input it runs in a single thread, allowing it to successfully merge multi-line events.

file {
  path => [ "/opt/tomcat/logs/*.log" ]
  type => 'tomcat'
  codec => multiline {
      pattern => "^\s"
      what => "previous"
    }
  }
}

pattern and what settings declare that lines starting with whitespace belongs to the previous event in the pipeline.

Multi-line events are not readily assembled if they’re not parsed directly from the file. The distributed nature of logstash ensures that a single filter-thread is not guaranteed to see all of the log-lines for a specific file.

Eventlog

The eventlog plugin is similarly simple. The one caveat here is that Logstash has to run on the node you want to collect Event Logs from. So if you want to collect from a Domain Controller, Logstash has to be on the Domain Controller. Event Log forwarding may be of use here, but setting that up is beyond the scope of this article.

An example:

input {
  eventlog {
    interval => 2000
    logfile => ‘System’
    type => ‘eventlog-system’
  }
}

will read in the System event-log, do so every 2 seconds, and set the event type to eventlog-system. Because this is polling the System log, the user that Logstash runs under will need to be granted the right permissions to see this Event Log.

Beats

The beats plugin opens up a TCP port to listen for traffic generated by Elastic Co’s beats framework of data shippers written in Go. If the idea of putting a JVM on everything fills you with ennui, Beats may be for you. They are intended to run directly on your application hosts, and do no filtering. They can deliver events to the beats plugin here, or proxied through Outputs such as Redis, Kafka, or directly into ElasticSearch. From there, a full Logstash instance running one of those Inputs can filter the events gathered by Beats and output them to their final destination.

input {
  beats {
    port => 5044
  }
}

This tells the beats plugin to listen on the specified port, which is the default port for beats. On the shipper side, you would tell Beats to send events through their beats output to this server

Outputs!

We’ve seen a few examples of what configuration blocks look like now, so the outputs should be no surprise. I’ll go over three different outputs: ElasticSearch, Redis, and PagerDuty.

ElasticSearch

Because Elasic Co. makes Logstash, ElasticSearch is the platforms preferred output; but you do not have to use it. If you do, the ElasticSearch Output is how you do so. This can be very simple:

output {
  elasticsearch {
    hosts => [
      "localhost",
      "logelastic.prod.internal"
    ]
    template_name => "logstash"
    index => "logstash-{+YYYY.MM.dd}"
  }
}

outputs to an ElasticSearch instance on the local box, or to a well-known address on the network somewhere. It outputs to the logstash index, which is the default name, and will create a new index every day (the {+YYYY.MM.dd} part is where the rotation is configured). It’s up to you to clean up the old indexes, though Elastic provides Curator to help with that.

For those who are more comfortable with ElasticSearch, you can definitely get more complicated with it:

output {
  if "audit" in [tags] {
    elasticsearch {
      hosts => [
        "localhost",
        "logelastic.prod.internal"
      ]
      template_name => "audit"
      index => "audit-{+xxxx.ww}"
      manage_template => true
      template => '/etc/logstash/templates/audit.json'
    }
  }
}

outputs to an index named audit, that is rotated weekly instead of daily (xxxx.ww specifies the ISO year and week). What’s more, we are uploading a custom Mapping to the audit indexes to ensure the fields that enter ElasticSearch are typed and mapped correctly.

Redis

The redis plugin is often used in architectures where the Logstash on the application nodes is configured to ship events without filtering, or with Beats which can’t filter. Another set of Logstash nodes use the Redis input to pull events off of the queue for filtering and outputting. This way, CPU and RAM loading is isolated from application load.

output {
  redis {
    host => "logstash-redis.prod.internal"
    data_type => list
    key => "logstash-firehose"
  }
}

sends all events to the redis server at logstash-redis.prod.internal, and will store all events in the key named logstash-firehose. The input plugin is structured similarly to this one. Parsing nodes will pull events out of that key to filter and output them.

PagerDuty

The PagerDuty plugin allows you to wake humans up directly from logstash. It’s a pain, but it can be a very useful part of your monitoring infrastructure, as Logstash is a form of push-monitoring and can react faster than most poll-based monitoring methodlogies. Not every organization will want to do this, but it’s important enough I feel I should mention how it’s done.

output{
  if [type] == "redis_mon" and [size] > 250000 {
    pagerduty {
      service_key => "biglongsecret"
      description => "REDIS queue in production is critically backlogged."
      details => {
        "message" => "REDIS: Redis queue %{queue} size > critical threshold: %{size} (host: %{host})"
        "timestamp" => "%{@timestamp}"
      }
      incident_key => "logstash/%{host}/%{queue}"
      event_type => "trigger"
    }
  } else if [type] == "redis_mon" and [size] < 250000 {
    # Logstash doesn't have state, so we have to rely on PagerDuty's 
    # API for this. (Sorry, PagerDuty! This explains some things...)
    # This will trigger a 'resolve' every time size is small enough.
    pagerduty {
      service_key => "biglongsecret"
      incident_key => "logstash/%{host}/%{queue}"
      event_type => "resolve"
    }
  }
}

setup will automatically trigger, and automatically resolve, the case where a redis queue gets excessively backed up. Remember that incident_key is used by PagerDuty as the unique identifier, so structure that to ensure your triggers and resolves are touching the right incidents.

Logstash can also output to email, but I’m leaving it up to you to figure that out. Our inboxes are already too full.

Now that we’ve handled inputs and outputs, time to filter our events.

Transforms! I mean, Filters!

We’ve learned how to get the data, and dispose of it once we’re done with it. Now for the hard part: transforming it. There are a wide variety of filter plugins available, but I’m going to focus on a few of the most useful for sysadmins.

  • grok: Using Regex to populate additional fields.
  • kv: Using key=value pairs to populate additional fields.
  • json: Using JSON expressions to populate additional fields.
  • mutate: to use logstash conditionals to manipulate events.
  • date: turn the timestamps in your logs into timestamps on the events.

Grok

Grok allows you to turn log statements like this syslog example:

May 19 19:22:06 ip-172-16-2-4 pii-repo-backup[4982]: ALARM Unable to isolate framulator, backup not taken.

And extract meaning from it in ways you can later automate. Want to get an alarm when the PII backup is anything but OK? With Grok, you set it up so you can get that alarm. A full breakdown of how it works would be a SysAdvent post all by itself. Instead, I’m going to cowardly forward you off to an example I used at LISA this year.

KV

This allows you to turn strings full of key=value pairs into fields on an event. Quite powerful, if you have something that logs that way. Take the following log-line:

May 20 19:22:06 ip-172-16-2-4 pii-repo-backup[4982]: STATS objects=491792 size=182837475

If we use Grok to isolate the details of this STATS event for the backup, we can then execute the kv filter over the rest of the line. This would add both objects and size to the event.

filter {
  if 'backup_output' in [tags] AND [backup_state] == 'STATS' {
    kv {
      source => backup_message
      prefix => 'backup_'
    }
  }
}

will take that logline and create backup_objects and backup_size fields, thanks to the prefix declaration. You can use these numbers later on for outputting to statsd, opentsdb, or anything else really.

JSON

Like the KV filter, the JSON filter allows you to parse JSON. If that log line had been formatted like this instead:

May 20 19:22:06 ip-172-16-2-4 pii-repo-backup[4982]: STATS {“objects”: “491792”, “size”: “182837475”}

We could use the json filter much like we did the KV one:

filter {
  if 'backup_output' in [tags] AND [backup_state] == 'STATS' {
    json {
      source => backup_message
      target => 'backup'
    }
  }
}

we’re importing a data-structure instead of a flat namespace, the created fields will be backup.objects and backup.size instead. Where objects and size are sub-fields to the backup field.

Mutate

Mutate allows you to add and remove fields, add and remove tags, upcase or lowercase field contents, join arrays into strings, split strings into arrays, perform regex replace operations, and cast fields into specific data-types. This is mostly used to normalize your events, and clean up fields used for processing that don’t need to be in the final data-stores.

Lets take the backup output we’ve been working with. We have some numbers in there, let’s force them to a numeric type.

filter {
  if 'backup_output' in [tags] AND [backup_state] == 'STATS' {
    mutate {
      convert => {
        "backup.objects" => integer,
        "backup.size" => integer
      }
      remove_field => [ 'backup_message' ]
    }
  }
}

will cast both of those fields into integer, and remove the backup_message field we processed using the JSON filter.

Date

The date filter is what you use to turn the timestamp in your logline into the timestamp of your event. By default, the timestamp on an event is when it is ingested. This can lead to apparent event-spikes if your ingestion pauses and has to catch up.

Given the log-line we’ve been working with:

May 20 19:22:06 ip-172-16-2-4 pii-repo-backup[4982]: STATS {“objects”: “491792”, “size”: “182837475”}

we can turn that timestamp into something useful like so:

filter {
  if [type] == 'syslog' {
    date{
      match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

tells the date filter to look in the timestamp field for a date-string coded like something in the following array. The format string is in joda format.

An Example Config

We’ve gone through a few explanations so far, so here is an example config using some of the sample configs we’ve already spoken about.

input {
  file {
    path => [ '/var/log/syslog' ]
    type => syslog,
  }
}

filter {
  grok {
  # This will create a new field called SYSLOGMESSAGE, that contains the 
  # data part of a syslog line.
  #
  # If given a line like:
  # Sep  9 19:09:50 ip-192-0-2-153 dhclient: bound to 192.0.2.153 -- renewal in 1367 seconds.
  # SYSLOGMESSAGE will equal "bound to 192.0.2.153 -- renewal in 1367 seconds."
  #
  match => {
    "message" => "%{SYSLOGBASE}%{SPACE}%{GREEDYDATA:SYSLOGMESSAGE}"
  }
}

# Get the timestamp pulled out of the syslog field.
filter {
  if [type] == 'syslog' {
    date{
      match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

# Get backup data and parse it into the event.
filter {
  if [program] =~ "-backup$" {
    grok {
      match => {
        "SYSLOGMESSAGE" => "^(?<backup_state>OK|WARN|ALARM|CRIT|STATS) %{GREEDYDATA:backup_message}$"
        "program" => "^%{DATA:backup_name}-backup$"
      }
      add_tag => [ "backup_output" ]
    }
  }
}

filter {
  # Parse backup output, and dig out backup stats
  if 'backup_output' in [tags] AND [backup_state] == 'STATS' {
    json {
      source => backup_message
      target => 'backup'
    }
    
    mutate {
      convert => {
        "backup.objects" => integer,
        "backup.size" => integer
      }
      remove_field => [ 'backup_message' ]
    }
  }
}

output {
  elasticsearch {
    hosts => [
      "localhost",
      "logelastic.prod.internal"
    ]
    template_name => "logstash"
    index => "logstash-{+YYYY.MM.dd}"
  }
}

. This reads the local syslog file (pre-systemd of course). 2. A Grok expression parses the syslog file and populates the usual syslog fields. 3. A date expression pulls the timestamp out of the syslog fields. 4. If the program is a backup program, a second Grok expression populates backup-specific fields. 5. If the event is backup_output and the action is STATS, parse the backup_message to extract details about the backup. 6. Then cast the datatypes for the stat fields to integer, and drop the backup_message field entirely. 7. Finally, output the whole lot to elasticsearch, into an index that rotates daily.

Architectures

At last, here are a couple of architectures for your Logstash setup. It can scale to huge, if you do it right. This should give you the hints needed to see how your own environment can grow.

Short Logstash Architecture

In this architecture, Logstash is running on every node and feeding directly into the backing storage. The filter {} sections on each node will be customized for that particular workflow. The advantage to this design is that there is minimal latency between when events enter the pipeline and when it is queryable in the backing storage.

The disadvantage is that you’re doing potentially CPU and RAM intensive work on a node that may be doing some of the same.

Medium Distributed Logstash Architecture

In this architecture, we’ve added a queueing layer. There is still a Logstash running on the nodes, but all it does is ship events; the filter {} section will be abscent, or very minimal on those. Events enter the queue largely unprocessed, though with sourcing tags. A parsing tier of Logstash instances then services the queue. This parsing tier has complex filter {} sections, as it has to deal with multiple types of services.

The advantage to this architecture is that the Logstash footprint on the nodes doing production work is very small. And can be made smaller if you leverage Beats instead of Logstash (if that makes sense for you). The use of a queue allows you to scale up your parsing tier as event loads change. That queue also allows you a buffer for your events to handle spikes. There is a security boundary here as well; since application nodes don’t have access to where the logs are stored, an attacker can’t hide their tracks as easy as if they had write access to them like the short architecture.

The disadvantage to this architecture is latency; for the most optimal path, there are a few more network round-trip-times for an event to pass through before it is queryable. Also, the parsing tier is very high throughput so mistakes in optimization can lead to unexpected reductions in throughput.

Scaling tips

As you grow your logstash infrastructure, here are some tips to keep things running at full speed.

  • Split your file inputs into multiple input blocks (if that makes sense for you)
  • Do some testing to see what your JVM heap size needs to be, and how far you can push your worker (-w) count.
  • Use Logstash 2.2 or later, to take advantage of the improved pipeline architecture.
  • Do your Grok right, for your own sake. Here are some Grok rules I follow.
  • Avoid multiline logs. If you need it, use it as a codec on an input ingesting directly from the log source.
  • Avoid the need for state anywhere. Logstash is not designed to be stateful outside of a single thread.

Further reading

No comments :