A stash of sugar cane ready to be processed into sugar and Arrack

This is part of a series of short articles on setting up an ELK deployment on K8s.

  1. ElasticSearch on K8s: 01 — Basic Design
  2. ElasticSearch on K8s: 02 — Log Collection with Filebeat
  3. ElasticSearch on K8s: 03 - Log Enrichment with Logstash
  4. ElasticSearch on K8s: 04 - Log Storage and Search with ElasticSearch
  5. ElasticSearch on K8s: 05 - Visualization and Production Readying
  6. ElasticSearch Index Management
  7. Authentication and Authorization for ElasticSearch: 01 - A Blueprint for Multi-tenant SSO
  8. Authentication and Authorization for ElasticSearch: 02 - Basic SSO with Role Assignment
  9. Authentication and Authorization for ElasticSearch: 03 - Multi-Tenancy with KeyCloak and Kibana

Log Enrichment

Now that the logs are being collected from the required sources, it’s time to start making some sense out of them. This part of the process is called enrichment, and Logstash is the tool of choice in our stack.

In brief, Logstash is a customizable pipeline that each incoming event goes through. A pipeline contains some kind of an input that gets processed to be sent away to an output.

Input

A pipeline can have different types of input plugins defined. This could be file, s3, or http among other things. beats is the type of input that we are interested in this use case, that will allow our Filebeat agents to push collected logs to Logstash. Defining an input for beats opens up a port on the host that Logstash is running on so that Filebeat agents are able to connect to that port and send over the logs.

input {
  beats {
    port => 5044
  }
}

If you refer to the last post of this series on Filebeat, you can see that the Filebeat configuration defines a Logstash URL as the output. In the URL the port used is 5044. This is the port opened by the Logstash input.beats section as configured above.

Processing

Log events coming through these inputs are then fed into a pipeline of filters. These filters can modify, delete, clone, or hold (throttle) the log event based on the parameters provided in the filter configuration. The most important of these filters is the grok filter where the incoming log message could be split up to meaningful fields to make sense out of later.

For an example, consider the following log line produced by an Nginx Pod.

172.17.0.1 - - [23/Oct/2019:09:01:52 +0000] "GET / HTTP/1.1" 200 612 "-" "curl/7.64.0" "-"

The whole entry has a few meaningful pieces of information huddled together. These are,

log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                  '$status $body_bytes_sent "$http_referer" '
                  '"$http_user_agent" "$http_x_forwarded_for"';

access_log  /var/log/nginx/access.log  main;

This format is the default format specified in the Nginx configuration shipped as a Docker image tagged 1.7.9. We can see that the fields in a single log line are,

  1. remote address - 172.17.0.1
  2. - - -
  3. remote user - - (unknown)
  4. [local time] - [23/Oct/2019:09:01:52 +0000]
  5. “request” - "GET / HTTP/1.1"
  6. response status code - 200
  7. response body size in bytes - 612 (default nginx response page)
  8. “http referer” - "-" (not found, this was generated by a direct curl hit)
  9. “user agent” - "curl/7.64.0"
  10. “x-forwarded-for” original sender (if sent through proxy) - "-"

Given the understanding of the log format, we can instruct Logstash to extract the fields out of each Nginx log line.

How this happens is through the use of the grok filter. A pattern to match is given to grok that will try its best to parse the given line. The result is a map of values where keys are fields in the pattern provided to the grok filter, and the values are the matched values in the log line.

%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)

The above pattern is a direct copy and paste from the grok patterns shipped by default with Logstash. What’s interesting is that each type above is a redefinition of a more simple set of types. And the combination of above types can be referenced by one type called COMBINEDAPACHELOG

Since all sorts of logs generated by different Pods could be fed through Filebeat, we can instruct Logstash to only apply the grok filter to the log events originating from Nginx Pods. We can do this since before publishing the log event, Filebeat has already enriched it with K8s metadata. We can conditionally apply the grok pattern only if kubernetes.labels.app is equal to nginx (given that the Nginx Pod has a label app with the value nginx ).

filter {
  if "nginx" in [kubernetes][labels][app] {
    grok {
     match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes}|-)"  } 
     overwrite => ["message"]
     add_tag => [ "groked" ]
    } 
  }
}

Output

After the enrichment of the events is done, they can be pushed off to storage (which is ElasticSearch in our stack). Reaching ElasticSearch is also done using a K8s Service that can be addressed just by name or the FQDN if Logstash and Elasticsearch are not colocated on the same Namespace.

The configuration for the output is specified on the output section of the Logstash configuration. Once again, there are different output plugins, but the one we are interested is elasticsearch.

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "logstash-%{+YYYY.MM.dd}-%{[kubernetes][namespace]}"
  }
}

In the above output.elasticsearch configuration, we are making use of the logstash interpolation functions to send logs from different sources to different indices (let’s discuss what indices are on the next post on this series). Using the configuration option index we are instructing logstash to push logs to indices named based on the date and the K8s Namespace value (which is forwarded by the Filebeat metadata processor). This functionality is most useful especially where in a K8s setup, we could implement some kind of tenancy based data segregation by sending logs from different K8s Namespaces to different ElasticSearch indices.

In terms of scheduling the Logstash processes, a standard Deployment could be used to schedule Pods of Logstash throughout the K8s cluster. However, a Deployment will not be suitable for production level tuning that will have to be done at the Logstash level in the future.

You might have noticed that we have used a StatefulSet with persistence instead of a Deployment in the above diagram. Why this is, has to do with fault tolerance requirements production deployments may have. This is discussed in detail in the last section of this article, where the basic setup has to be modified in order to withstand production level loads.