Another Image

Share

Twitter

LinkedIn

Facebook

Published on 00/00/0000
Last updated on 00/00/0000
Published on 00/00/0000
Last updated on 00/00/0000

Share

Twitter

LinkedIn

Facebook

    IN-DEPTH TECH

    IN-DEPTH TECH

    clock icon

    12 min read

    Blog thumbnail
    Another Image

    Share

    Avi Zimmerman

    by

    Avi Zimmerman

    Published on 10/16/2019
    Last updated on 02/03/2025
    Published on 10/16/2019
    Last updated on 02/03/2025

    User authenticated and access controlled clusters with the Kafka operator

    Share

    Subscribe card background
    Subscribe
    Subscribe to
    the Shift!
    Get emerging insights on emerging technology straight to your inbox.
    Our thinking that there was a hunger for an operator that makes easy the provisioning and operating Apache Kafka clusters on Kubernetes which is not based on Kubernetes stateful sets, proved to be correct as shortly after we released the first version our open-source Koperator a community started to build around it. We received lots of valuable feedback that helps to shape the future of Koperator and also feature contributions from the community. In the following guest post, Avi Zimmerman talks about Kafka topic, ACLs and user management that he contributed to Koperator.
    This is a guest post by Avi Zimmerman.
    Kafka is rapidly becoming an integral part of event-driven architectures and data streaming pipelines. But just like in every other service that hold valuable data - it's necessary to keep what's important safe from prying eyes and unauthorized access. In Kafka, this means setting up a Public Key Infrastructure (PKI) or configuring SASL, which can be a difficult to automate securely. With Koperator, the solution to this problem can be managed for you, using CRDs. The Kafka cluster, topics, users, and their ACLs are all handled by the operator based off the yaml definitions you provide. The operator also uses Cruise Control and Prometheus to monitor your cluster's health and perform graceful healing and scaling operations when needed. The below picture shows the operator creating a Kafka Topic. kafka-topic

    Why should I be using SSL and ACLs if I'm only exposing Kafka internally?

    A common throughline in the design of data pipelines (that meet security requirements) is that all data should be encrypted both at rest, and in transit. When you configure Kafka to use SSL, data is encrypted between your clients and the Kafka cluster. Additionally, you can restrict access to topics to people holding specific certificates. This gives you an extra layer of security in the event of, for example, an unauthorized user gaining access to a pod inside your cluster. If such a user has the ability to use certain network sniffing tools, they'll be able to listen to unencrypted traffic inside your cluster. And if Kafka isn't using ACLs, they can start reading and writing from your topics, and, if they know the address of your cluster, changing broker configurations. While there's no such thing as perfect security, it's easy to take a few reasonable steps to avoid catastrophes like this. And with Koperator doing all the heavy-lifting, why shouldn't you? The picture below shows how the operator manages a user created by CRD. kafka-user

    Complete Example

    This example will guide you through setting up a Kubernetes cluster on your machine, installing Koperator and its dependencies, deploying a KafkaCluster CR, and then an application with KafkaTopic and KafkaUser CRs. For this example we are going to use kind to run a cluster on our development machine. Kind lets you run a full kubernetes cluster inside Docker. And you'll need to have docker installed on your machine, first, if you don't have it already. First, we're going to install kind and create a cluster, then we'll set our KUBECONFIG to point our kubernetes commands at the local running cluster.
    # Install kind on macOS/Linux (refer to their documentation for windows)
    curl -Lo ./kind https://github.com/kubernetes-sigs/kind/releases/download/v0.5.1/kind-$(uname)-amd64
    chmod +x ./kind
    sudo mv ./kind /usr/local/bin/kind
    
    # Create a local cluster with one control-pane and three workers
    cat << EOF | kind create cluster --config -
    kind: Cluster
    apiVersion: kind.sigs.k8s.io/v1alpha3
    nodes:
    - role: control-plane
    - role: worker
    - role: worker
    - role: worker
    EOF
    
    ## The above may take a minute or two depending on your internet connection
    
    # Export the kind KUBECONFIG for kubectl, helm, etc.
    export KUBECONFIG="$(kind get kubeconfig-path)"
    Now that we have a cluster up and running on our machine, we need to add a few more things to it, before we can install Koperator. First, we're going to use helm to install cert-manager and the zookeeper-operator. We believe in the principle of seperation of concerns, so Koperator does not install or manage either Zookeeper, or cert-manager. This allows for different, non-dependent services to take care of the things they are best at, without any unnecessary overhead.
    # Setup Helm in the Kind cluster
    cat << EOF | kubectl apply -f -
    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: tiller
      namespace: kube-system
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: ClusterRoleBinding
    metadata:
      name: tiller
      roleRef:
    apiGroup: rbac.authorization.k8s.io
    kind: ClusterRole
    name: cluster-admin
    subjects:
    - kind: ServiceAccount
      name: tiller
      namespace: kube-system
    EOF
    
    # Install tiller into the cluster and add the jetstack and banzaicloud repos
    helm init --service-account tiller
    helm repo add jetstack https://charts.jetstack.io
    helm repo add banzaicloud-stable https://kubernetes-charts.banzaicloud.com/
    helm repo update
    
    # Install the cert-manager and CRDs into the cluster
    kubectl create ns cert-manager
    kubectl label namespace cert-manager certmanager.k8s.io/disable-validation=true
    kubectl apply -f https://raw.githubusercontent.com/jetstack/cert-manager/release-0.10/deploy/manifests/00-crds.yaml
    helm install --name cert-manager --namespace cert-manager --version v0.10.1 --set webhook.enabled=false jetstack/cert-manager
    
    # Install the zookeeper-operator and setup a zookeeper cluster
    helm install --name zookeeper-operator --namespace zookeeper banzaicloud-stable/zookeeper-operator
    cat << EOF | kubectl apply -f -
    apiVersion: zookeeper.pravega.io/v1beta1
    kind: ZookeeperCluster
    metadata:
      name: example-zookeepercluster
      namespace: zookeeper
    spec:
      replicas: 3
    EOF
    We are now ready to setup a Kafka cluster running on Kubernetes. All the examples below can be found in the Koperator repository so we'll first clone that with git.
    # Clone the repository
    git clone https://github.com/banzaicloud/koperator
    cd koperator
    
    # Install the operator - we will disable prometheus server for this example
    helm install --name kafka-operator --namespace kafka banzaicloud-stable/kafka-operator
    
    # Create the example Kafka cluster with internal SSL and ACLs enabled
    kubectl apply -n kafka -f config/samples/kafkacluster_with_ssl_groups.yaml
    The last command above will return immediately, but it may take 2-3 minutes for all brokers and cruise-control to startup. You can use kubectl to see when the brokers are all ready. The output should look something like this:
    $&gt; kubectl get pod -n kafka
    
    NAME                                       READY   STATUS    RESTARTS   AGE
    kafka-cruisecontrol-549976ffd8-6swvz       1/1     Running   0          1m48s
    kafka-operator-operator-5cf97c9959-llpnn   2/2     Running   0          3m22s
    kafkab6dx6                                 1/1     Running   0          2m6s
    kafkafhdsv                                 1/1     Running   0          2m7s
    kafkapnd5w                                 1/1     Running   0          2m21s
    The sample CR configured a three-broker Kafka cluster with SSL, a managed PKI for user certificates, and a cluster-wide internal reachable address of kafka-headless.kafka.svc.cluster.local:29092. We can now deploy kafka topics, users, and a pod reading/writing from the topic with a simple manifest:
    # kafka-operator/hack/kafka-test-pod/manifest.yaml
    #
    # A KafkaTopic called 'test-topic' with a single partition replicated across all three brokers
    #  -- you can also specify arbitrary topic configurations with `spec.config` --
    apiVersion: banzaicloud.banzaicloud.io/v1alpha1
    kind: KafkaTopic
    metadata:
      name: test-topic
    spec:
      clusterRef:
        name: kafka
        namespace: kafka
      name: test-topic
      partitions: 1
      replicationFactor: 3
    ---
    # A KafkaUser with permission to read from 'test-topic'
    apiVersion: banzaicloud.banzaicloud.io/v1alpha1
    kind: KafkaUser
    metadata:
      name: test-kafka-consumer
    spec:
      clusterRef:
        name: kafka
        namespace: kafka
      secretName: test-kafka-consumer
      topicGrants:
        - topicName: test-topic
          accessType: read
    ---
    # A KafkaUser with permission to write to 'test-topic'
    apiVersion: banzaicloud.banzaicloud.io/v1alpha1
    kind: KafkaUser
    metadata:
      name: test-kafka-producer
    spec:
      clusterRef:
        name: kafka
        namespace: kafka
      secretName: test-kafka-producer
      topicGrants:
        - topicName: test-topic
          accessType: write
    ---
    # A pod containing a producer and consumer using the above credentials
    apiVersion: v1
    kind: Pod
    metadata:
      name: kafka-test-pod
    spec:
      volumes:
        # The consumer secret
        - name: consumer-credentials
          secret:
            defaultMode: 420
            secretName: test-kafka-consumer
    
        # The producer secret
        - name: producer-credentials
          secret:
            defaultMode: 420
            secretName: test-kafka-producer
    
      containers:
        # Container reading from topic with the consumer credentials
        # The environment variables used below are for our test-application,
        # which is described in greater detail later in this post.
        - name: consumer
          image: banzaicloud/kafka-test:latest
          env:
            - name: KAFKA_MODE
              value: consumer
          volumeMounts:
            - mountPath: /etc/secrets/certs
              name: consumer-credentials
              readOnly: true
    
        # Container writing to topic with the producer credentials
        - name: producer
          image: banzaicloud/kafka-test:latest
          env:
            - name: KAFKA_MODE
              value: producer
          volumeMounts:
            - mountPath: /etc/secrets/certs
              name: producer-credentials
              readOnly: true
    You can apply the manifest above (found in the repository) to your test cluster and it should start reading and writing from Kafka. We use kubetail to tail the logs below, which is a script for tailing the logs of multiple pods/containers at a time.
    $> kubectl apply -f hack/kafka-test-pod/manifest.yaml
    
    kafkatopic.banzaicloud.banzaicloud.io/test-topic created
    kafkauser.banzaicloud.banzaicloud.io/test-kafka-consumer created
    kafkauser.banzaicloud.banzaicloud.io/test-kafka-producer created
    pod/kafka-test-pod created
    
    # The above pod may restart a couple times at first due to applying
    # all the objects at once like this and not everything being ready.
    # But after a few seconds you should be able to tail the logs and see something like this.
    $> kubetail kafka-test-pod
    Will tail 2 logs...
    kafka-test-pod consumer
    kafka-test-pod producer
    
    [kafka-test-pod producer] 2019/09/19 02:07:18 Sending message to topic
    [kafka-test-pod consumer] 2019/09/19 02:07:18 Consumed message offset 0 - hello world
    [kafka-test-pod producer] 2019/09/19 02:07:23 Sending message to topic
    [kafka-test-pod consumer] 2019/09/19 02:07:23 Consumed message offset 1 - hello world
    [kafka-test-pod producer] 2019/09/19 02:07:28 Sending message to topic
    [kafka-test-pod consumer] 2019/09/19 02:07:28 Consumed message offset 2 - hello world
    [kafka-test-pod producer] 2019/09/19 02:07:33 Sending message to topic
    [kafka-test-pod consumer] 2019/09/19 02:07:33 Consumed message offset 3 - hello world
    ^C

    Show me some code!

    The code running in the pod above is a simple Go application that depends on the environment variable KAFKA_MODE, which will either read or be written from our test topic. The code for this example can be found in the kafka-operator repository at hack/kafka-test-pod/main.go and at the end of this blog post. First, if you look at the manifest above, you'll see that we set a secret to store our user credentials. We are mounting this secret inside the pod at /etc/secrets/certs. We first define a function that can create a TLS configuration from those credentials.
    package main
    
    import (
      "crypto/tls"
      "crypto/x509"
      "io/ioutil"
      "log"
    )
    
    // The paths to our credentials
    const certFile = "/etc/secrets/certs/tls.crt"
    const keyFile = "/etc/secrets/certs/tls.key"
    const caFile = "/etc/secrets/certs/ca.crt"
    
    // ...
    
    func getTLSConfig() *tls.Config {
    	// Create a TLS configuration from our secret for connecting to Kafka
    	clientCert, err := tls.LoadX509KeyPair(certFile, keyFile)
    	if err != nil {
    		log.Fatal(err)
    	}
    	caCert, err := ioutil.ReadFile(caFile)
    	if err != nil {
    		log.Fatal(err)
    	}
    	caPool := x509.NewCertPool()
    	caPool.AppendCertsFromPEM(caCert)
    	return &tls.Config{
    		Certificates: []tls.Certificate{clientCert},
    		RootCAs:      caPool,
    	}
    }
    In this example, we're using Shopify's sarama library to interact with Kafka. We can include the TLS configuration created by the function above in our options to the sarama client. For example, to create a new consumer we can do the following:
    package main
    
    import (
      // ...
    
      "github.com/Shopify/sarama"
    )
    
    var brokerAddrs = []string{"kafka-headless.kafka.svc.cluster.local:29092"}
    
    func main() {
      config := sarama.NewConfig()
      config.Net.TLS.Enable = true
      config.Net.TLS.Config = getTLSConfig()
    
      // Get a consumer
      consumer, err := sarama.NewConsumer(brokerAddrs, config)
      if err != nil {
        log.Fatal(err)
      }
      defer consumer.Close()
    }
    The full application will create a sarama configuration using the methods above. Then, depending on the value of the environment variable, will produce or consume from the topic. Below is the full example with inline comments.
    package main
    
    import (
    	"crypto/tls"
    	"crypto/x509"
    	"io/ioutil"
    	"log"
    	"os"
    	"os/signal"
    	"syscall"
    	"time"
    
    	"github.com/Shopify/sarama"
    )
    
    // Constants for our topic and credential locations
    const kafkaTopic = "test-topic"
    const certFile = "/etc/secrets/certs/tls.crt"
    const keyFile = "/etc/secrets/certs/tls.key"
    const caFile = "/etc/secrets/certs/ca.crt"
    
    // The address we are going to use for kafka broker discovery
    var brokerAddrs = []string{"kafka-headless.kafka.svc.cluster.local:29092"}
    
    func main() {
    
    	// Create an SSL configuration for connecting to Kafka
    	config := sarama.NewConfig()
    	config.Net.TLS.Enable = true
    	config.Net.TLS.Config = getTLSConfig()
    
    	// Trap SIGINT and SIGTERM to trigger a shutdown.
    	signals := make(chan os.Signal, 1)
    	signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
    
    	// Consume or produce depending on the value of the environment variable
    	if os.Getenv("KAFKA_MODE") == "consumer" {
    		consume(config, signals)
    	} else if os.Getenv("KAFKA_MODE") == "producer" {
    		produce(config, signals)
    	} else {
    		log.Fatal("Invalid test mode:", os.Getenv("KAFKA_MODE"))
    	}
    
    }
    
    func getTLSConfig() *tls.Config {
    	// Create a TLS configuration from our secret for connecting to Kafka
    	clientCert, err := tls.LoadX509KeyPair(certFile, keyFile)
    	if err != nil {
    		log.Fatal(err)
    	}
    	caCert, err := ioutil.ReadFile(caFile)
    	if err != nil {
    		log.Fatal(err)
    	}
    	caPool := x509.NewCertPool()
    	caPool.AppendCertsFromPEM(caCert)
    	return &tls.Config{
    		Certificates: []tls.Certificate{clientCert},
    		RootCAs:      caPool,
    	}
    }
    
    func consume(config *sarama.Config, signals chan os.Signal) {
    	// Get a consumer
    	consumer, err := sarama.NewConsumer(brokerAddrs, config)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer consumer.Close()
    
    	// Setup a channel for messages from our topic
    	partitionConsumer, err := consumer.ConsumePartition(kafkaTopic, 0, sarama.OffsetNewest)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer partitionConsumer.Close()
    
    	consumed := 0
    ConsumerLoop:
    	// Consume the topic
    	for {
    		select {
    		case msg := <-partitionConsumer.Messages():
    			log.Printf("Consumed message offset %d - %s\n", msg.Offset, string(msg.Value))
    			consumed++
    		case <-signals:
    			log.Println("Shutting down")
    			break ConsumerLoop
    		}
    	}
    }
    
    func produce(config *sarama.Config, signals chan os.Signal) {
    
    	// Get a broker connection
    	broker := sarama.NewBroker(brokerAddrs[0])
    	if err := broker.Open(config); err != nil {
    		log.Fatal(err)
    	}
    	defer broker.Close()
    
    	// Create a time ticker to trigger a message every 5 seconds
    	ticker := time.NewTicker(time.Duration(5) * time.Second)
    
    ProducerLoop:
    	// Send a message everytime the ticker is triggered
    	for {
    		select {
    		case <-ticker.C:
    			log.Println("Sending message to topic")
    			msg := &sarama.ProduceRequest{}
    			msg.AddMessage(kafkaTopic, 0, &sarama.Message{
    				Value: []byte("hello world"),
    			})
    			if _, err := broker.Produce(msg); err != nil {
    				log.Fatal(err)
    			}
    		case <-signals:
    			log.Println("Shutting down")
    			break ProducerLoop
    		}
    	}
    
    }
    And, voila, you've set up a Kafka cluster with access control and user authentication. Hopefully, this exercise had been sufficiently enlightening, but if you have questions, or if you're interested in contributing, check us out on GitHub.

    About Banzai Cloud Pipeline

    Banzai Cloud’s Pipeline provides a platform for enterprises to develop, deploy, and scale container-based applications. It leverages best-of-breed cloud components, such as Kubernetes, to create a highly productive, yet flexible environment for developers and operations teams alike. Strong security measures — multiple authentication backends, fine-grained authorization, dynamic secret management, automated secure communications between components using TLS, vulnerability scans, static code analysis, CI/CD, and so on — are default features of the Pipeline platform.
    Another Image
    Subscribe card background
    Subscribe
    Subscribe to
    the Shift!

    Get emerging insights on emerging technology straight to your inbox.

    Welcome to the future of agentic AI: The Internet of Agents

    Outshift is leading the way in building an open, interoperable, agent-first, quantum-safe infrastructure for the future of artificial intelligence.

    * No email required

    thumbnail

    * No email required

    Subscribe
    Subscribe
 to
    The Shift
    !
    Get
    emerging insights
    on innovative technology straight to your inbox.

    The Shift is Outshift’s exclusive newsletter.

    Get the latest news and updates on generative AI, quantum computing, and other groundbreaking innovations shaping the future of technology.

    Outshift Background
    Footer BG
    Footer BG
    Image

    Initiatives

    Our Work
    Internet of Agents
    AI/ML
    Quantum
    Open Source
    Our Collaborators
    DevNet
    Research
    Quantum Labs

    About us

    Company
    About Us
    Our Team
    The Shift
    Apply
    Job Openings
    Design Partner Portal
    Connect
    Events
    Contact Us
    YouTube
    LinkedIn
    GitHub
    X
    BlueSky

    Blog

    Categories
    AI/ML
    Quantum
    In-depth Tech
    Strategy & Insights
    Research
    Inside Outshift

    Resources

    Resource Hub
    View all
    Ebooks
    Webinars & Videos
    White papers
    Explore Cisco
    cta
    Website Terms of Use
    Privacy Policy
    Trademarks
    ©2025 Cisco Systems, Inc.

    Related articles

    Featured home blog
    Icon
    In-depth Tech

    Software Supply Chains: Breeding Ground For Cybercrimes

    Cloud NativeUse CaseKubernetes
    Featured home blog
    Icon
    In-depth Tech

    OpenTelemetry: Getting Started Series

    KubernetesUse Case
    Featured home blog
    Icon
    In-depth Tech

    Reducing Istio proxy resource consumption with outbound traffic restrictions

    Use CaseKubernetes
    Another Image
    Outshift Logo