·7 min read

Export Kubernetes Pod Logs to Upstash Kafka via FluentBit

Enes OzcanEnes OzcanSenior Software Engineer at Zapata Computing (Guest Author)

FluentBit is known to be a lightweight log processor and exporter. Having tens of input and output connectors, FluentBit is one of the most popular options when developers need to keep track of service logs, process, and export them to a backend or a persistent store.

Kafka output plugin comes along with the built-in FluentBit connectors. That is, FluentBit can read logs from an arbitrary source and export them to a Kafka topic. Upstash, on the other hand, provides serverless, fully managed Kafka cluster with pay-per-message model. That rescues developers from the burden of managing, scaling and maintaining clusters. Besides, its price scales to zero if there is no message - as a real serverless offering! The last but not the least, the first 10.000 messages per day are free of charge.

In this article, we will export the logs of particular Kubernetes pods to Upstash Kafka via FluentBit, and then consume, filter and stream these logs to clients through a Go HTTP server.

Before start, make sure you have an Upstash account (sign up free if you haven’t already), access to a Kubernetes cluster (minikube or docker-desktop is fine), and helm installed.

Setup

Create Upstash Kafka cluster and new topic

Log in to the Upstash console, navigate to Kafka tab and create a new Kafka cluster and a topic called logs, in seconds!

create-cluster

Deploy FluentBit to Kubernetes

Let’s use official charts to deploy FluentBit.

$ helm repo add fluent https://fluent.github.io/helm-charts
$ helm repo add fluent https://fluent.github.io/helm-charts

Before starting the installation, we need to change a few default values of the chart, namely:

  • config.inputs: configuration for log inputs, that is, which logs we want to export.
  • config.filters: configuration for log processing before exporting them to output plugins.
  • config.outputs: configuration for log sinks.

Create new files for each and fill them with the content below:

input.conf:

[INPUT]
    Name tail
    Path /var/log/containers/*_upstashed_*.log
    multiline.parser docker, cri
    Tag kube.*
    Mem_Buf_Limit 5MB
    Skip_Long_Lines On

Here we tell FluentBit to use tail plugin, which observes the new lines appended to the files specified with Path. Notice _upstashed_ value in the Path key. Log files are named as <pod-name>_<namespace>_<container-name-container-id>.log and in this blog post, we want FluentBit to export logs from the pods that reside under upstashed namespace only.

filter.conf

[FILTER]
    Name kubernetes
    Match kube.*
    Merge_Log On
    Keep_Log Off
    K8S-Logging.Parser On
    K8S-Logging.Exclude On

[FILTER]
    Name nest
    Match *
    Operation lift
    Nested_under kubernetes
    Add_prefix k8s_

Here we use the built-in kubernetes filter of FluentBit - which creates a structured output containing log line, timestamp, pod info, container info, etc. This is an example output generated by kubernetes filter:

{
  "@timestamp": 1670672614.142579,
  "log": "<log-line>",
  "stream": "stdout",
  "time": "2022-12-10T11:43:34.1425787Z",
  "kubernetes": {
    "pod_name": "<pod-name>",
    "namespace_name": "<namespace>",
    "pod_id": "<id>",
    "labels": {
      "app": "<foo>",
      "pod-template-hash": "<bar>"
    },
    "host": "docker-desktop",
    "container_name": "<baz>",
    "docker_id": "<some-id>",
    "container_hash": "<some-hash>",
    "container_image": "<some-image>"
  }
}
{
  "@timestamp": 1670672614.142579,
  "log": "<log-line>",
  "stream": "stdout",
  "time": "2022-12-10T11:43:34.1425787Z",
  "kubernetes": {
    "pod_name": "<pod-name>",
    "namespace_name": "<namespace>",
    "pod_id": "<id>",
    "labels": {
      "app": "<foo>",
      "pod-template-hash": "<bar>"
    },
    "host": "docker-desktop",
    "container_name": "<baz>",
    "docker_id": "<some-id>",
    "container_hash": "<some-hash>",
    "container_image": "<some-image>"
  }
}

That’s cool. But we want to move fields under "kubernetes" key to the outer block. That’s because we want to refer to these values, namely pod_name when sending logs to Kafka. Since FluentBit’s Kafka output does not support the record accessor feature (which allows accessing inner values of JSON records), we apply such a workaround. The second filter, named nest, helps us to process the JSON in that sense. Also, it will add “k8s_” prefix to all these fields under "kubernetes" key. So the message becomes:

{
  "@timestamp": 1670672943.712912,
  "log": "<log-line>",
  "stream": "stdout",
  "time": "2022-12-10T11:49:03.7129124Z",
  "k8s_pod_name": "<pod-name>",
  "k8s_namespace_name": "<namespace>",
  "k8s_pod_id": "<id>",
  "k8s_labels": {
    "app": "<foo>",
    "pod-template-hash": "<bar>"
  },
  "k8s_host": "docker-desktop",
  "k8s_container_name": "<baz>",
  "k8s_docker_id": "<some-id>",
  "k8s_container_hash": "<some-hash>",
  "k8s_container_image": "<some-image>"
}
{
  "@timestamp": 1670672943.712912,
  "log": "<log-line>",
  "stream": "stdout",
  "time": "2022-12-10T11:49:03.7129124Z",
  "k8s_pod_name": "<pod-name>",
  "k8s_namespace_name": "<namespace>",
  "k8s_pod_id": "<id>",
  "k8s_labels": {
    "app": "<foo>",
    "pod-template-hash": "<bar>"
  },
  "k8s_host": "docker-desktop",
  "k8s_container_name": "<baz>",
  "k8s_docker_id": "<some-id>",
  "k8s_container_hash": "<some-hash>",
  "k8s_container_image": "<some-image>"
}

Alright! We now want these logs to be exported to Upstash Kafka. Let’s configure FluentBit output plugin as the last step:

output.conf

[OUTPUT]
    Name kafka
    Match kube.*
    Brokers <broker-provided-by-upstash>
    Topics logs
    Message_Key_Field k8s_pod_name
    rdkafka.security.protocol sasl_ssl
    rdkafka.sasl.mechanism SCRAM-SHA-256
    rdkafka.sasl.username <username-provided-by-upstash>
    rdkafka.sasl.password <password-provided-by-upstash>

Remember that we created logs topic when creating Upstash Kafka cluster. Hence, use this topic to export pod logs. Also, use k8s_pod_name value as the message key during export.

Now we are ready to deploy FluentBit:

$ helm install fluent-bit -n fluent-bit --create-namespace fluent/fluent-bit \
  --set-file config.inputs=input.conf \
  --set-file config.filters=filter.conf \
  --set-file config.outputs=output.conf
$ helm install fluent-bit -n fluent-bit --create-namespace fluent/fluent-bit \
  --set-file config.inputs=input.conf \
  --set-file config.filters=filter.conf \
  --set-file config.outputs=output.conf

That will deploy fluent-bit daemonsets:

$ kubectl get ds -n fluent-bit
 
NAME         DESIRED   CURRENT   READY   UP-TO-DATE   AVAILABLE   NODE SELECTOR   AGE
fluent-bit   1         1         1       1            1           <none>          19h
$ kubectl get ds -n fluent-bit
 
NAME         DESIRED   CURRENT   READY   UP-TO-DATE   AVAILABLE   NODE SELECTOR   AGE
fluent-bit   1         1         1       1            1           <none>          19h

Check if Kafka connection is established:

$ kubectl logs -f ds/fluent-bit -n fluent-bit
 


[2022/12/10 11:49:01] [ info] [output:kafka:kafka.0] brokers='<broker-url>' topics='logs'

$ kubectl logs -f ds/fluent-bit -n fluent-bit
 


[2022/12/10 11:49:01] [ info] [output:kafka:kafka.0] brokers='<broker-url>' topics='logs'

Create a new pod of which logs are to be exported

Remember we configure FluentBit input to observe logs from upstashed namespace only. Let’s create a pod under this namespace that produces random logs:

$ kubectl create ns upstashed
$ kubectl create deployment random-logger -n upstashed --image=chentex/random-logger:v1.0.1 -- /entrypoint.sh 7500 7500 1000
$ kubectl create ns upstashed
$ kubectl create deployment random-logger -n upstashed --image=chentex/random-logger:v1.0.1 -- /entrypoint.sh 7500 7500 1000

This deployment produces random logs and prints to stdout. The arguments are minLogInterval, maxLogInterval and numberOfLogs respectively. So the above deployment will produce a log every 7.5 seconds and 1000 lines in total:

$ kubectl logs -f deployment/random-logger -n upstashed
 
2022-12-10T15:09:30+0000 ERROR An error is usually an exception that has been caught and not handled.
2022-12-10T15:09:37+0000 INFO This is less important than debug log and is often used to provide context in the current task.
2022-12-10T15:09:45+0000 DEBUG This is a debug log that shows a log that can be ignored.
2022-12-10T15:09:52+0000 WARN A warning that should be ignored is usually at this level and should be actionable.
$ kubectl logs -f deployment/random-logger -n upstashed
 
2022-12-10T15:09:30+0000 ERROR An error is usually an exception that has been caught and not handled.
2022-12-10T15:09:37+0000 INFO This is less important than debug log and is often used to provide context in the current task.
2022-12-10T15:09:45+0000 DEBUG This is a debug log that shows a log that can be ignored.
2022-12-10T15:09:52+0000 WARN A warning that should be ignored is usually at this level and should be actionable.

Now navigate back to Upstash console to see if the pod logs arrive:

logs-gif

It’s that easy!

Consume Logs from Upstash Kafka

Upstash Kafka in the above setup behaves like a buffer for the collected logs. We can then configure a sink for these messages to be processed or persisted. During all these processes, let's say we also want to give some clients access to real-time logs, but not the entire message. For instance, we do not want clients to see k8s_container_image field of the Kafka messages.

Create HTTP Server

After adding Consumer snippet for Go provided by Upstash console under Details tab, start an HTTP server that handles requests at /logs endpoint and streams messages to clients:

func main() {
    dialer := getKafkaDialer()
    http.HandleFunc("/logs", func(w http.ResponseWriter, r *http.Request) {
        reader := getKafkaReader(dialer)
        defer reader.Close()
 
        // Errors are ignored for brevity. Note that this is allowed only
        // in blogpost code snippets! Never ignore errors in Golang!
 
        w.Header().Set("Transfer-Encoding", "chunked")
        w.Write([]byte("------------ Streaming Logs ------------\n"))
 
        flusher, _ := w.(http.Flusher)
        flusher.Flush()
        for {
            message, err := reader.ReadMessage(r.Context())
            if err != nil {
                // should have been handled and responded properly
                // for context.Canceled and other errors.
                return
            }
            // include only `log` field from the message value
            resp := struct {
                Log string `json:"log"`
            }{}
            _ = json.Unmarshal(message.Value, &resp)
            w.Write(message.Key)
            w.Write([]byte{':', '\t'})
            w.Write([]byte(resp.Log))
            flusher.Flush()
        }
    })
    http.ListenAndServe(":8080", nil)
}
func main() {
    dialer := getKafkaDialer()
    http.HandleFunc("/logs", func(w http.ResponseWriter, r *http.Request) {
        reader := getKafkaReader(dialer)
        defer reader.Close()
 
        // Errors are ignored for brevity. Note that this is allowed only
        // in blogpost code snippets! Never ignore errors in Golang!
 
        w.Header().Set("Transfer-Encoding", "chunked")
        w.Write([]byte("------------ Streaming Logs ------------\n"))
 
        flusher, _ := w.(http.Flusher)
        flusher.Flush()
        for {
            message, err := reader.ReadMessage(r.Context())
            if err != nil {
                // should have been handled and responded properly
                // for context.Canceled and other errors.
                return
            }
            // include only `log` field from the message value
            resp := struct {
                Log string `json:"log"`
            }{}
            _ = json.Unmarshal(message.Value, &resp)
            w.Write(message.Key)
            w.Write([]byte{':', '\t'})
            w.Write([]byte(resp.Log))
            flusher.Flush()
        }
    })
    http.ListenAndServe(":8080", nil)
}

This is a minimal HTTP server with a single endpoint /logs. The handler reads from logs Upstash Kafka topic, then sends each log line to clients along with the pod name received as the message key.

Let's compare kubectl logs vs stream logs from Upstash!

final-gif

The command running in the upper terminal tab is

kubectl logs -f deployment/random-logger -n upstashed
kubectl logs -f deployment/random-logger -n upstashed

while in the lower tab it is

curl localhost:8080/logs
curl localhost:8080/logs

The upper tab retrieves logs from my local kubernetes cluster, where lower logs are received from Upstash Kafka running in eu-west. Both are almost synchronized. Upstash rocks, doesn't it?

Conclusion

Logs usually contain hints when we encounter an error or unexpected behavior in deployments and hence become the first place to look by developers during diagnosis. Thus, instead of being deleted and forgotten, logs should be cared for and consumed properly.

In this article, we exported particular Kubernetes pod logs to the Upstash Kafka topic, from where other consumers can read, process, and act on them. Moreover, after applying a simple filter we exposed these real-time logs to HTTP clients - which do not necessarily have access to neither Kubernetes cluster nor Kafka topics.

Now that Kubernetes pod logs are buffered in Upstash Kafka, one can further benefit from this and consume them at other backends, storage drivers, dashboards, observability tools, and the like.