Making fluentd, journald, Kubernetes, and Splunk Happy Together

The Requirements

Our requirements are simple.  We run microservices in Docker, using Kubernetes as our deployment platform.  We want all of our logs in Splunk.  So the requirements are simply to take the logs from our microservice containers, and the logs from Kubernetes itself, and the logs from the host OS, and ship them to Splunk.

How Hard Can That Be?

We followed the 12-Factor App’s recommendations to have all logs for our services go to stdout (and sometimes to stderr).  We used the journald logging driver in Docker, and installed Splunk forwarders on each host machine.  Yay, all our logs are going to Splunk!  Done.  What, you mean you actually want to search your logs too?

I suppose you could say that was an implicit requirement.  The trouble we had was that our applications were spitting out logs in JSON format, and when the tooling shipped those up to Splunk, it was escaping out all that nice JSON formatting.  So we were ending up with log messages like:

{
 "__REALTIME_TIMESTAMP": "1495808072502730",
 "__MONOTONIC_TIMESTAMP": "1607839175886",
 "PRIORITY": "6",
 "_PID": "1365",
 "_COMM": "dockerd",
 "_SYSTEMD_UNIT": "docker.service",
 "_HOSTNAME": "kubehost001.example.com",
 "CONTAINER_ID": "9b67ebfb41e5",
 "CONTAINER_ID_FULL": "9b67ebfb41e5b5580593f2a6f685ed5a740a2aa0cd13c40f8f19ebfbfe809017",
 "CONTAINER_NAME": "k8s_awesome-logging-service.41c5113d_awesome-logging-service-2802682077-dnlfg_devEnvironment_5fcb32a0-3be0-11e7-81c6-005056ab1c1d_04634e28",
 "CONTAINER_TAG": "",
 "ENVIRONMENT_NAME": "dev",
 "APP_NAME": "awesome-logging-service",
 "_TRANSPORT": "journal",
 "MESSAGE": "{\"name\":\"awesome-logging-service\",\"hostname\":\"awesome-logging-service-2802682077-dnlfg\",\"pid\":15,\"req_id\":\"f95a3f61-ebb0-46d6-b5d5-37f1383258fc\",\"level\":30,\"msg\":\"Received request { url: '/awesome-logging-service/health',\\n  method: 'GET',\\n  headers: \\n   { host: 'someIp:8080',\\n     'user-agent': 'Go-http-client/1.1',\\n     'accept-encoding': 'gzip',\\n     connection: 'close' },\\n  body: undefined }\",\"time\":\"2017-05-26T14:14:32.501Z\",\"v\":0}",
 "_SOURCE_REALTIME_TIMESTAMP": "1495808072502003"
}

We get some important information, such as the environment and app name, container id if we needed it, etc.  However, we usually want to search on things in the MESSAGEfield.  You know, things like req_id="f95a3f61-ebb0-46d6-b5d5-37f1383258fc".  However, do you see all those backslash “\” characters in the MESSAGE?  Those don’t play nicely with Splunk.  Splunk can’t pull out the key/value pairs when the log is escaped like that.

It turns out that somewhere along the line, probably journald, was wrapping our log message in its JSON payload, and because it doesn’t expect messages to be in JSON format, it does the proper thing and escapes them, so that your log message doesn’t mess up the JSON payload.  On top of that, we still had some logs that were not in JSON format, default logs coming out of our tools like Tomcat.

Iteration 2

To make searching these logs easier, let’s just ship the service’s logs directly to Splunk, without going to stdout or journald or anything.  Splunk happens to have an HTTP Event Collector (HEC), and libraries for the languages we use to ship events directly to the HEC.  So we plugged that in to get our applications to ship logs directly to Splunk.  This made things a bit better:

{
 "message": {
  "name": "awesome-logging-service",
  "pid": 15,
  "audit": true,
  "remoteAddress": "::ffff:10.2.132.0",
  "remotePort": 46418,
  "req_id": "365848f5-9a7f-416a-8a08-f99b9043cb90",
  "req": {
   "query": {},
   "method": "POST",
   "url": "/awesome-logging-service/do",
   "headers": {
    "accept": "text/plain, */*",
    "content-type": "application/json",
    "txnguid": "ea8a2c5b-d981-4bb0-8f35-d9a872c24a5b",
    "txntimestamp": "1495752521990",
    "content-length": "20",
    "host": "kubehost002.example.com:31374",
    "connection": "Keep-Alive",
    "user-agent": "Apache-HttpClient/4.3.6 (java 1.5)",
    "accept-encoding": "gzip,deflate"
   },
   "httpVersion": "1.1",
   "trailers": {},
   "version": "*",
   "timers": {
    "handler-0": 240,
    "handler-1": 22,
    "parseQueryString": 55,
    "readBody": 2910,
    "parseBody": 729,
    "bunyan": 127,
    "handler-2": 94,
    "handler-3": 75,
    "handler-4": 277769
   }
  },
  "res": {
   "statusCode": 201,
   "headers": {
    "api-version": "1.0.0",
    "content-type": "text/plain",
    "content-length": 53
   },
   "trailer": false
  },
  "latency": 284,
  "_audit": true,
  "msg": "handled: 201",
  "v": 0
 },
 "severity": "info"
}

A bit verbose, but that’s fine.  At least now when Splunk gets the message and it is searchable.  Splunk understands the JSON formatting, and you can build up complex spath expressions to pull out what you need.  But wait, what happened to my ENVIRONMENT_NAME and APP_NAME fields?  Well they are gone now, and no good way to bring them back.  We also still have the trouble with pieces of our tooling that still write just to stdout, such as Tomcat, app initializations, and other third-party tools.  So while our service logs themselves are better, we now have made it harder to search across multiple streams, and you always have to think about what type of log event you’re looking for and somehow remember what format those logs are in so that you can search for them. Obviously not ideal.

Iteration 3

Introduce fluentd.  Fluentd is basically a small utility that can ingest and reformat log messages from various sources, and can spit them out to any number of outputs.  So, just as an example, it can ingest logs from journald, inspect and transform those messages, and ship them up to Splunk.

So we deployed fluentd as a DaemonSet in our Kubernetes cluster, and pointed it at the journald logs.  With various filters we can determine which of those logs are from our Kubernetes containers (our services) and which logs are either from the host OS or from Kubernetes itself, and we can act on each of those types of logs differently.  We haven’t quite yet flushed out all the routing and transformations we need to get the logs in a good place, but this is by far the best solution we have worked on.

{
 "name": "awesome-logging-service",
 "index": "dev",
 "sourcetype": "application",
 "source": "awesome-logging-service",
 "host": "awesome-logging-service-2521530320-hzzsh",
 "hostname": "awesome-logging-service-2521530320-hzzsh",
 "pid": 15,
 "level": 20,
 "msg": "Received request { url: '/awesome-logging-service/health',\n  method: 'GET',\n  headers: \n   { host: 'someIp:8080',\n     'user-agent': 'Go-http-client/1.1',\n     'accept-encoding': 'gzip',\n     connection: 'close' },\n  body: undefined }",
 "time": "2017-05-24T16:56:21.393Z",
 "v": 0,
 "PRIORITY": "6",
 "_HOSTNAME": "kubehost001.example.com",
 "CONTAINER_TAG": "",
 "_TRANSPORT": "journal",
 "_PID": "12809",
 "_COMM": "docker",
 "_SYSTEMD_UNIT": "docker.service",
 "CONTAINER_ID": "3cf404b8f08d",
 "CONTAINER_ID_FULL": "3cf404b8f08daa6ffe4752cd53e3531d2d0a2072a8119774b5f5c65bd50efc1a",
 "CONTAINER_NAME": "k8s_awesome-logging-service_awesome-logging-service-2521530320-hzzsh_devEnvironment_57843361-3f10-11e7-a70e-00155d019453_13",
 "_SOURCE_REALTIME_TIMESTAMP": "1495644981402154",
 "docker": {
  "container_id": "3cf404b8f08daa6ffe4752cd53e3531d2d0a2072a8119774b5f5c65bd50efc1a"
 },
 "kubernetes": {
  "container_name": "awesome-logging-service",
  "namespace_name": "dev",
  "pod_name": "awesome-logging-service-2521530320-hzzsh",
  "pod_id": "57843361-3f10-11e7-a70e-00155d019453",
  "labels": {
   "app": "awesome-logging-service",
   "pod-template-hash": "2521530320"
  },
  "host": "kubehost001.example.com",
  "master_url": "https://10.0.0.1:443/api"
 }
}

Iteration 4

After many small tweaks to the fluentd configs, and what seems like a hundred other things, we landed on something similar to what is below:

DaemonSet

apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd
  namespace: example-system
  labels:
    k8s-app: fluentd
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd
  labels:
    k8s-app: fluentd
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
  - ""
  resources:
  - "namespaces"
  - "pods"
  verbs:
  - "get"
  - "watch"
  - "list"
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd
  labels:
    k8s-app: fluentd
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
subjects:
- kind: ServiceAccount
  name: fluentd
  namespace: example-system
  apiGroup: ""
roleRef:
  kind: ClusterRole
  name: fluentd
  apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: example-system
  labels:
    app: fluentd
    version: ${IMAGE_VERSION}
    gitsha: "${GITSHA}"
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
spec:
  selector:
    matchLabels:
      app: fluentd
  template:
    metadata:
      labels:
        app: fluentd
        version: "1.6"
    spec:
      serviceAccountName: fluentd
      containers:
      - name: fluentd
        image: docker-repo.example.com:5000/fluentd:1.6
        env:
          - name: KUBE_NODE
            valueFrom:
              fieldRef:
                fieldPath: spec.nodeName
          - name: RUBY_GC_HEAP_INIT_SLOTS
            value: "500000"
          - name: RUBY_GC_HEAP_GROWTH_FACTOR
            value: "1"
          - name: RUBY_GC_HEAP_GROWTH_MAX_SLOTS
            value: "10000"
          - name: FLUENT_UID
            value: "0"
        resources:
          limits:
            memory: 275Mi
            cpu: 500m
          requests:
            cpu: 100m
            memory: 275Mi
        volumeMounts:
        - name: config
          mountPath: /fluentd/etc
        - name: varlog
          mountPath: /var/log
        - name: journal
          mountPath: /run/log/journal
        - name: ephemeralmount
          mountPath: /mnt/sda1/var/lib/docker/containers
          readOnly: true
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: dockersock
          mountPath: /var/run/docker.sock
          readOnly: true
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: ephemeralmount
        hostPath:
          path: /mnt/sda1/var/lib/docker/containers
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: dockersock
        hostPath:
          path: /var/run/docker.sock
      - name: journal
        hostPath:
          path: /run/log/journal
      - name: config
        configMap:
          name: fluentd
      imagePullSecrets:
        - name: example-docker-repo

This contains a lot of recommended setup for running fluentd as a DaemonSet. Along with that, it pulls a custom fluentd image we build, and then maps in volumes from the host system. These volumes come into play so that we can read the system journal logs, a kubernetes plugin for fluentd can gather metadata about containers, and we also mount in a ConfigMap to configure fluentd.

ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd
  namespace: example-system
data:
  fluent.conf: |
    @include kubernetes.conf
    <match *.kubernetes.journal.container.fluentd>
       @type splunk-http-eventcollector
       server "{{ splunk_host }}:{{ splunk_port }}"
       token "{{ splunk_token }}"
       protocol "{{ splunk_protocol }}"
       index infrastructure
       sourcetype ${tag}
       source fluentd
       host "#{ENV['KUBE_NODE']}"
       all_items true

       batch_size_limit 1000000
       buffer_type file
       buffer_chunk_limit 700k
       buffer_path /var/log/fluent/buffer-infrastructure-fluentd
       flush_interval 5s
    </match>
    <match audit.kubernetes.journal.container.**>
       @type splunk-http-eventcollector
       server "{{ splunk_host }}:{{ splunk_port }}"
       token "{{ splunk_token }}"
       protocol "{{ splunk_protocol }}"
       index user-gestures
       sourcetype ${tag}
       source fluentd
       host "#{ENV['KUBE_NODE']}"
       all_items true

       batch_size_limit 1000000
       buffer_type file
       buffer_chunk_limit 700k
       buffer_path /var/log/fluent/buffer-user-gestures
       flush_interval 5s
    </match>
    <match app.kubernetes.journal.container.**>
       @type splunk-http-eventcollector
       server "{{ splunk_host }}:{{ splunk_port }}"
       token "{{ splunk_token }}"
       protocol "{{ splunk_protocol }}"
       index app-data
       sourcetype ${tag}
       source fluentd
       host "#{ENV['KUBE_NODE']}"
       all_items true

       batch_size_limit 1000000
       buffer_type file
       buffer_chunk_limit 700k
       buffer_path /var/log/fluent/buffer-app-data
       flush_interval 5s
    </match>
    <match **>
       @type splunk-http-eventcollector
       server "{{ splunk_host }}:{{ splunk_port }}"
       token "{{ splunk_token }}"
       protocol "{{ splunk_protocol }}"
       index infrastructure
       sourcetype ${tag}
       source fluentd
       host "#{ENV['KUBE_NODE']}"
       all_items true

       batch_size_limit 1000000
       buffer_type file
       buffer_chunk_limit 700k
       buffer_path /var/log/fluent/buffer-infrastructure
       flush_interval 5s
    </match>
  kubernetes.conf: |
    <match fluent.**>
      @type null
    </match>

    <source>
      @type systemd
      path /var/log/journal
      read_from_head false
      <storage>
        @type local
        persistent true
        path /var/log/fluent/journal.pos
      </storage>
      tag journal
    </source>
    <source>
      @type systemd
      path /run/log/journal
      read_from_head false
      <storage>
        @type local
        persistent true
        path /var/log/fluent/runjournal.pos
      </storage>
      tag journal
    </source>

    <match journal>
      @type rewrite_tag_filter
      <rule>
        key CONTAINER_NAME
        pattern ^k8s_([\w-]*?)[\._]
        tag kubernetes.journal.container.$1
      </rule>
      <rule>
        key SYSLOG_IDENTIFIER 
        pattern ^(.*) 
        tag journal.$1
      </rule>
    </match>

    <source>
      @type tail
      format /^(?<time>[^ ]* [^ ,]*)[^\[]*\[[^\]]*\]\[(?<severity>[^ \]]*) *\] (?<msg>.*)$/
      time_format %Y-%m-%d %H:%M:%S
      path /var/log/salt/minion
      pos_file /var/log/fluent/fluentd-salt.pos
      tag salt
    </source>

    <source>
      @type tail
      format syslog
      path /var/log/startupscript.log
      pos_file /var/log/fluent/fluentd-startupscript.log.pos
      tag startupscript
    </source>

    <source>
      @type tail
      format /^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<msg>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=($<status_code>\d+))?/
      path /var/log/docker.log
      pos_file /var/log/fluent/fluentd-docker.log.pos
      tag docker
    </source>

    <source>
      @type tail
      format none
      path /var/log/etcd.log
      pos_file /var/log/fluent/fluentd-etcd.log.pos
      tag etcd
    </source>

    <filter kubernetes.journal.container.**>
      @type kubernetes_metadata
      annotation_match /^example\..*$/
    </filter>
    <filter kubernetes.journal.container.**>
      @type parser
      key_name MESSAGE
      reserve_data true

      <parse>
        @type multi_format
        @include parsers.conf
      </parse>
    </filter>
    <filter kubernetes.journal.container.**>
      @type record_transformer
      enable_ruby true
      remove_keys hostname,name,msg
      <record>
        message ${record["msg"] ? record["msg"] : record["message"] ? record["message"] : record["MESSAGE"]}
        level ${record["level"] ? record["level"] == 10 ? "TRACE" : record["level"] == 20 ? "DEBUG" : record["level"] == 30 ? "INFO" : record["level"] == 40 ? "WARN" : record["level"] == 50 ? "ERROR" : record["level"] == 60 ? "FATAL" : record["level"] : (record["PRIORITY"] ? record["PRIORITY"] == "0" ? "EMERG" : record["PRIORITY"] == "1" ? "ALERT" : record["PRIORITY"] == "2" ? "CRIT" : record["PRIORITY"] == "3" ? "ERROR" : record["PRIORITY"] == "4" ? "WARN" : record["PRIORITY"] == "5" ? "NOTICE" : record["PRIORITY"] == "6" ? "INFO" : record["PRIORITY"] == "7" ? "DEBUG" : record["PRIORITY"] : record["severity"])}
      </record>
    </filter>

    <match kubernetes.journal.container.**>
      @type rewrite_tag_filter
      @include audit.conf
    </match>

    <match unprocessed.audit.**>
      @type rewrite_tag_filter
      #In here, we want to mark kube-probe health checks
      remove_tag_prefix unprocessed
      <rule>
        key userAgent
        pattern ^(?!kube-probe).*$
        tag ${tag}
      </rule>
      <rule>
        key $.req.headers.user-agent
        pattern ^(?!kube-probe).*$
        tag ${tag}
      </rule>
      <rule>
        key responseStatus
        pattern ^(?!200)$
        tag ${tag}
      </rule>
      <rule>
        key $.res.statusCode
        pattern ^(?!200)$
        tag ${tag}
      </rule>
      <rule>
        #things that make it here we throw away
        key message
        pattern .+
        tag ignore.${tag}
      </rule>
    </match>

    <match ignore.audit.**>
      @type null
    </match>

In the ConfigMap, we define a number config files for fluentd.  One is basically just a Splunk routing file that defines which Splunk indexes to load the various log types into (we defined “infrastructure”, “audit”, and “app” indexes).  

The second config file defined in that configmap, kubernetes.conf is a file with routes to gather and filter out what we care about. We piece out the different sources, and match patterns to see if the log event is from one of our containers, or a piece of the infrastructure. If they are from our containers (tag starts with kubernetes.journal.container) we do more processing on the event to pull out some Kubernetes annotations, parse the logs, and determine if they are HTTP audit logs, or generic application logs. We have pulled out the parsing and audit patterns into separate files to help with reading and maintaining those.

We also process the audit logs a bit more, to try and remove all the successful health checks. These are logs that we don’t need in Splunk; they do not add any value and just add a whole lot of clutter.

Dockerfile

FROM ruby:2.3 as builder
WORKDIR /example-fluent
COPY fluent-plugin-example.gemspec Rakefile Gemfile /example-fluent/
COPY src ./src
COPY test ./test
COPY k8s_configmap ./k8s_configmap
RUN bundle install && bundle exec rake test && gem build fluent-plugin-example.gemspec

FROM fluentd:v1.6.2-debian-1.0
LABEL maintainer="example.com"
LABEL owner="Example"

USER root

COPY --from=builder /example-fluent/fluent-plugin-example-*.gem .

RUN buildDeps="sudo make gcc g++ libc-dev ruby-dev" \
 && apt-get update \
 && apt-get install -y --no-install-recommends $buildDeps \
 && apt-get install -y --no-install-recommends iproute \
 && sudo gem install \
#        fluent-plugin-splunk-http-eventcollector:0.3.0 \ included in the our own plugin, until this thing updates
        fluent-plugin-systemd:1.0.2 \
        fluent-plugin-record-reformer:0.9.1 \
        fluent-plugin-kubernetes_metadata_filter:2.1.6 \
        fluent-plugin-rewrite-tag-filter:2.2.0 \
        fluent-plugin-multi-format-parser:1.0.0 \
        fluent-plugin-example \
 && sudo gem sources --clear-all \
 && SUDO_FORCE_REMOVE=yes \
    apt-get purge -y --auto-remove \
                  -o APT::AutoRemove::RecommendsImportant=false \
                  $buildDeps \
 && rm -rf /var/lib/apt/lists/* \
           /home/fluent/.gem/ruby/2.3.0/cache/*.gem

CMD [ "fluentd", "-c", "/fluentd/etc/fluent.conf", "-p", "/fluentd/plugins", "--no-supervisor" ]

Our own fluentd image is based on the official fluentd image, and we basically just bundle in all the plugins we need. We do build our own fluentd plugin, to add a fix to the splunk-http-evencollector plugin that is currently commented out, and also include some event translation that is hard to do in the fluentd configs to help with log event consistency.

Hopefully This Helps

This was my most viewed post from my Blogger site, so it seemed like a great starting point for pulling posts over to this site. Hopefully this rambling excursion and the example files provided here can help somebody if they are facing similar difficulties.

If we were to start over from scratch, I’m sure there are much better ways to do this, including using the fluentd logging driver for Docker directly. That would eliminate many of the hacks we had to add in here.

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.