Kubernetes Operator series 7— controller-runtime component — Source

Masato Naka
6 min readJul 27, 2024

--

Introduction

Welcome back to our ongoing series on controller-runtime components. In the previous installment, we delved into the Controller component, a cornerstone of this framework.

As we venture further, we’ll be examining its essential companions: Source, EventHandler, Predicate, workqueue, and more. Over the next few episodes, we’ll be taking an in-depth look at the components that the Controller component relies on. In this episode, our spotlight shines on the Source component. Let’s dive in!

What’s Source?

First of all, let’s remember the diagram we studied in the last episode:

You can see the Source component at the right side, interacting between the Controller component and Kubernetes API server.

The main role of the Source component, as its name indicates, is to provide the Source event from a source resource, which is usually Kubernetes API server but can be other sources, such as GitHub webhook event.

The source component insert source events into the Queue that is consumed by the Controller component to execute reconciliation logic.

Interface

type Source interface {
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

The Source interface is very simple. As is written as its comment, the Start method needs to be called by the Controller with EventHandler and Workqueue.

(The interface of the current latest version (v0.18.4) is slightly different)

Implementations

The internal.Kind struct is a simple implementation of Source.

type Kind struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Type client.Object

// Cache used to watch APIs
Cache cache.Cache

// started may contain an error if one was encountered during startup. If its closed and does not
// contain an error, startup and syncing finished.
started chan error
startCancel func()
}

To implement the Sourceinterface above, Kind also has Start()method.

The start method has three arguments other than context:

  1. handler handler.EventHandler: The logic of how an event should be handled with given queue. Usually enqueue an item for each event. (We’ll study in details in another post)
  2. queue workqueue.RateLimitingInterface: Queue that is given to the eventHandler, which is usually consumed by a controller to process.
  3. prct …predicate.Predicate: You can implement a filter condition for conditional handling logic.

Inside the Start method,

  1. Lookup the Informer from the Cache for the specified type: i, lastErr = ks.Cache.GetInformer(ctx, ks.Type)
  2. Add the eventhandler to the obtained informer with the queue: _, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())
  3. Sync the cache: ks.Cache.WaitForCacheSync(ctx)

Informer, which we’ll study in details in another post, is kind of a subscriber to Kubernetes API to continuously receive create/update/delete event of the target Kubernetes type.

Example

Let’s try using Kind as an implementation of Source.

You can initialize it with the following piece of codes:

source.Kind(cache, &v1.Pod{})

source.Kind is a kind of constructor of internal.Kind struct and return an instance with the given object and cache:

// Kind creates a KindSource with the given cache provider.
func Kind(cache cache.Cache, object client.Object) SyncingSource {
return &internal.Kind{Type: object, Cache: cache}
}

We create a cache with the following code:

cache, err := cache.New(cfg, cache.Options{})

Next, we initialize a workqueue with the following line:

queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")

Here we define a simple EventHandler, which only writes info log and enque WorkQueueItem object with Event with the object name.

 eventHandler := handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
log.Info("CreateFunc is called", "object", e.Object.GetName())
queue.Add(WorkQueueItem{Event: "Create", Name: e.Object.GetName()})
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
log.Info("UpdateFunc is called", "objectNew", e.ObjectNew.GetName(), "objectOld", e.ObjectOld.GetName())
queue.Add(WorkQueueItem{Event: "Update", Name: e.ObjectNew.GetName()})
},
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
log.Info("DeleteFunc is called", "object", e.Object.GetName())
queue.Add(WorkQueueItem{Event: "Delete", Name: e.Object.GetName()})
},
}

Lastly, we start kind with kind.Start(ctx, eventHandler, queue).

The entire code is as follows:

package main

import (
"context"
"flag"

mysqlv1alpha1 "github.com/nakamasato/mysql-operator/api/v1alpha1"
"go.uber.org/zap/zapcore"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/source"
)

var (
log = logf.Log.WithName("source-examples")
scheme = runtime.NewScheme()
)

func init() {
utilruntime.Must(mysqlv1alpha1.AddToScheme(scheme))
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
}

func main() {
// Prepare log
opts := zap.Options{
Development: true,
TimeEncoder: zapcore.ISO8601TimeEncoder,
}
opts.BindFlags(flag.CommandLine)
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
log.Info("source start")

// Get a kubeconfig
cfg, err := config.GetConfig()
if err != nil {
log.Error(err, "")
}

// Create a Cache
cache, err := cache.New(cfg, cache.Options{})
if err != nil {
log.Error(err, "")
}
log.Info("cache is created")
ctx := context.Background()

// Start Cache
go func() {
if err := cache.Start(ctx); err != nil {
log.Error(err, "failed to start cache")
}
}()
log.Info("cache is started")

kind := source.Kind(cache, &v1.Pod{})

// Prepare queue and eventHandler
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")

eventHandler := handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
log.Info("CreateFunc is called", "object", e.Object.GetName())
queue.Add(WorkQueueItem{Event: "Create", Name: e.Object.GetName()})
},
UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
log.Info("UpdateFunc is called", "objectNew", e.ObjectNew.GetName(), "objectOld", e.ObjectOld.GetName())
queue.Add(WorkQueueItem{Event: "Update", Name: e.ObjectNew.GetName()})
},
DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
log.Info("DeleteFunc is called", "object", e.Object.GetName())
queue.Add(WorkQueueItem{Event: "Delete", Name: e.Object.GetName()})
},
}

// Start Source
if err := kind.Start(ctx, eventHandler, queue); err != nil { // Get informer and set eventHandler
log.Error(err, "")
}

// Wait for cache
if err := kind.WaitForSync(ctx); err != nil {
log.Error(err, "")
}
log.Info("kind is ready")

for {
item, shutdown := queue.Get()
if shutdown {
break
}
log.Info("got item", "item", item)
}
}

type WorkQueueItem struct {
Event string
Name string
}

Let’s start running the code:

go run main.go

You’ll see some logs for source, cache and kind at the beginning and some CreateFunc is called.

2024-07-27T11:06:10.216+0900    INFO    source-examples source start
2024-07-27T11:06:10.218+0900 INFO source-examples cache is created
2024-07-27T11:06:10.218+0900 INFO source-examples cache is started
2024-07-27T11:06:10.332+0900 INFO source-examples kind is ready
2024-07-27T11:06:10.333+0900 INFO source-examples CreateFunc is called {"object": "kube-controller-manager-kind-control-plane"}
2024-07-27T11:06:10.333+0900 INFO source-examples CreateFunc is called {"object": "kube-proxy-574mz"}
2024-07-27T11:06:10.333+0900 INFO source-examples CreateFunc is called {"object": "kube-scheduler-kind-control-plane"}
2024-07-27T11:06:10.333+0900 INFO source-examples got item {"item": {"Event":"Create","Name":"kube-controller-manager-kind-control-plane"}}
...

Those events are consumed when the cache is synced. After a while, the logs stop. Now let’s create an nginx Pod with the following command:

kubectl run nginx --image=nginx
pod/nginx created

You can see the logs for Create and Update events:

2024-07-27T10:57:09.211+0900    INFO    source-examples CreateFunc is called    {"object": "nginx"}
2024-07-27T10:57:09.211+0900 INFO source-examples got item {"item": {"Event":"Create","Name":"nginx"}}
2024-07-27T10:57:09.215+0900 INFO source-examples UpdateFunc is called {"objectNew": "nginx", "objectOld": "nginx"}
2024-07-27T10:57:09.215+0900 INFO source-examples got item {"item": {"Event":"Update","Name":"nginx"}}
2024-07-27T10:57:09.231+0900 INFO source-examples UpdateFunc is called {"objectNew": "nginx", "objectOld": "nginx"}

With the script, we successfully receive the Pod events and call the simple eventHandler, which enqueue items corresponding to the events.

In short, the Start method of the Source is a binding logic of the given event handler, workqueue, and predicates for the cache (which is to get Kubernetes events) and start syncing the cache.

Summary

In this post, we study Source component in controller-runtime with a simple example of an implementation Kind.

These are the summary of the example:

  1. Kind requires cache and object to initialize.
  2. Kind.Start is called with EventHandler, workqueue, and Predicates.
  3. Kind.Start extracts the target informer (that receives the Create/Update/Delete events of the target object from K8s API server) and add the given eventhandler and workqueue and finally starts syncing the cache.

By implementing the Source interface, we can control what events to process by a Controller, which makes it possible to develop flexible Kubernetes controllers that is triggered by resources outside Kubernetes clusters.

Series Index

  1. Kubernetes Operator series 1 — controller-runtime example controller
  2. Kubernetes Operator series 2 — Overview of controller-runtime
  3. Kubernetes Operator series 3 — controller-runtime component — Manager
  4. Kubernetes Operator series 4 — controller-runtime component — Builder
  5. Kubernetes Operator series 5 — controller-runtime component — Reconciler
  6. Kubernetes Operator series 6 — controller-runtime component — Controller
  7. Kubernetes Operator series 7 — controller-runtime component — Source

--

--

Masato Naka

An SRE engineer, mainly working on Kubernetes. CKA (Feb 2021). His Interests include Cloud-Native application development, and machine learning.