Kubernetes Operator series 6 — controller-runtime component — Controller

Masato Naka
12 min readJul 30, 2023

--

Introduction

In our previous post, we explored the Reconciler component and discovered its collaboration with the Controller. As we learned, the Controller’s role involves monitoring target resources and triggering the Reconciler function in the Reconciler. The Reconciler, in turn, regularly checks the status of the target resources on Kubernetes API server. When any changes are detected, it triggers the Reconcile function to take appropriate actions.

Now let’s dive into the Controller component, which is one of the core components in the controller-runtime.

What's Controller?

The Controller component plays a crucial role in monitoring changes to the target resources on the Kubernetes API server and involving the Reconcile function in the Reconciler.

Controller and its related components

The process involves the following steps:

  1. Detecting changes in the target objects via the Kubernetes API.
  2. Calling the Reconcile function in the Reconciler when necessary.
  3. Within the Reconcile function, the Reconciler retrieves the latest status of the target object (possibly from the cache) and updates it if required.

To initialize a Controller, both the target resources and the corresponding Reconciler must be provided. This initialization process is handled by the Builder component in controller-runtime.

If you need a refresher on the Builder component’s role, you can refer back to the post where it was introduced.

The Builder component initializes the Controller with the designated target resources and Reconciler, and then registers the Controller with the given Manager.

Finally, when the Manager’s Start function is triggered, it starts the registered Controllers.

This is an overview of the role of the Controller component and the process of how a Controller is initialized and started.

In the following sections, we will delve into more details about the Controller component.

Interface

Let’s take a look at the Controller interface:

// Controller implements a Kubernetes API.  A Controller manages a work queue fed reconcile.Requests
// from source.Sources. Work is performed through the reconcile.Reconciler for each enqueued item.
// Work typically is reads and writes Kubernetes objects to make the system state match the state specified
// in the object Spec.
type Controller interface {
// Reconciler is called to reconcile an object by Namespace/Name
reconcile.Reconciler

// Watch takes events provided by a Source and uses the EventHandler to
// enqueue reconcile.Requests in response to the events.
//
// Watch may be provided one or more Predicates to filter events before
// they are given to the EventHandler. Events will be passed to the
// EventHandler if all provided Predicates evaluate to true.
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error

// Start starts the controller. Start blocks until the context is closed or a
// controller has an error starting.
Start(ctx context.Context) error

// GetLogger returns this controller logger prefilled with basic information.
GetLogger() logr.Logger
}
With Controller interface highlighted

As you can see, the Controller interface embeds reconcile.Reconciler interface. As we studied in the previous post, Reconciler must have Reconcile function.

The Controller interface has two important functions: Watch and Start, which we’ll take a look at in details later in this post.

From this interface, you can guess the core characteristics of the Controller component:

  1. Reconcile function must be provided to a Controller
  2. Watch gets events from Source, and handles the received events with Predicates for filtering and EventHandler usually for putting them into a Queue.
  3. Start triggers the Controller to start subscribing to the queue to get a new item and call the Reconcile with the item in the format of the reconcile.Request.

We’ll explore the details of a specific Controller implementation later in this post.

How a Controller is initialized?

Before diving into the details of an implementation of the Controller interface, let’s check the two functions to create a Controller, which is provided by the controller package:

  1. New(name string, mgr manager.Manager, options Options) (Controller, error): returns a new Controller registered with the Manager. If you initialize a Controller with New, you can start the Controller by calling the Manager’s Start function.
  2. NewUnmanaged(name string mgr manager.Manager, options Options) (Controller, error): returns a new controller without adding it to the Manager. If you initialize a Controller with NewUnmanaged, you’re responsible for starting the controller.

When using either of the above method, the implementation of the Controller interface in the internal package will be initialized:

&controller.Controller{
Do: options.Reconciler,
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
SetFields: mgr.SetFields,
Name: name,
LogConstructor: options.LogConstructor,
RecoverPanic: options.RecoverPanic,
}

Note that it’s a little bit confusing in that the name of the struct is also Controller.

Let’s explore the implementation of Controller in the next section.

Implementation

The definition of Controller is as follows:

type Controller struct {
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Name string

// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
MaxConcurrentReconciles int

// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
// ensures that the state of the system matches the state specified in the object.
// Defaults to the DefaultReconcileFunc.
Do reconcile.Reconciler

// MakeQueue constructs the queue for this controller once the controller is ready to start.
// This exists because the standard Kubernetes workqueues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
MakeQueue func() workqueue.RateLimitingInterface

// Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing
Queue workqueue.RateLimitingInterface

// SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates
// Deprecated: the caller should handle injected fields itself.
SetFields func(i interface{}) error

// mu is used to synchronize Controller setup
mu sync.Mutex

// Started is true if the Controller has been Started
Started bool

// ctx is the context that was passed to Start() and used when starting watches.
//
// According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
// while we usually always strive to follow best practices, we consider this a legacy case and it should
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
ctx context.Context

// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
// Defaults to 2 minutes if not set.
CacheSyncTimeout time.Duration

// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []watchDescription

// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
// or for example when a watch is started.
// Note: LogConstructor has to be able to handle nil requests as we are also using it
// outside the context of a reconciliation.
LogConstructor func(request *reconcile.Request) logr.Logger

// RecoverPanic indicates whether the panic caused by reconcile should be recovered.
RecoverPanic bool
}
Controller and its important fields

There are several fields in the struct, and we can’t cover all of them. We’ll just pick up a few important ones. It’d be helpful to start with the core fields:

  1. Do (reconcile.Reconciler): The Reconciler passed to the Controller is stored in this field. Controller has Reconcile function, which calls Do.Reconcile (The Reconcile function of the given Reconciler)
  2. MakeQueue (func() workqueue.RateLimitingInterface): A function to create a workQueue.
  3. Queue (workqueue.RateLimitingInterface): A queue to transmit events of changes of the target resources.

Now let’s see the entire flow of how a Controller is initialized and started step by step. Let’s also remember how Manager, Builder, and Reconciler work together.

  1. Prepare Manager and Reconciler.
  2. Bind the Reconciler and the Manager with Builder by calling NewControllerManagedBy(mgr) and Complete(r), in which Builder.build calls bldr.doController to create a controller with Controller.New(). (this is what we’ve just studied!!).
  3. The created Controller is registered to the Manager with mgr.Add(controller) and set to bldr.ctrl in the Builder.
  4. Builder.build also calls bldr.doWatch, which initializes a Kind (a source of events for changes of the target resources) for For ,Owns, and Watches input, and EnqueueRequestForObject (a handler that enqueus the received events from the Source into the Queue in the type of reconcile.Request), and lastly calls Controller’s Watch function. We’ll study what’s done inside Watch function in the next section.
  5. Manager.Start calls controller.Start, which creates a Queue and starts the source of each target resource (startWatches) and starts subscribing to the Queue to get changed item and trigger Reconciler.Reconcile.

Watch func

Watch func is the first half process of the controller: it monitors and detects changes of the target resources, and enqueues the events into Queue.

The Watch function is called in bldr.doWatch in Builder for For, Owns, and Watches.

// Watch implements controller.Controller.
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
c.mu.Lock()
defer c.mu.Unlock()

// Inject Cache into arguments
if err := c.SetFields(src); err != nil {
return err
}
if err := c.SetFields(evthdler); err != nil {
return err
}
for _, pr := range prct {
if err := c.SetFields(pr); err != nil {
return err
}
}

// Controller hasn't started yet, store the watches locally and return.
//
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
if !c.Started {
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
return nil
}

c.LogConstructor(nil).Info("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

Then, the Watch function calls SetFields, which injects the shared cache (originally held by the Cluster in the Manager) into Source, EventHandler, and Predicates. The details of those components will be studied in another post.

There are several types of Source, but for now, you can think of it as a component that retrieves changes to the target resource via the Kubernetes API.

EventHandler can be considered as a component that converts the received event from the Source component into a Request. This Request is enqueued to the Queue and used as input for Reconciler.Reconcile.

Finally, src.Start(c.ctx, evthdler, c.Queue, prct…) starts monitoring the Kubernetes API and puts changes into the Queue.

Start func

Start func is the latter half process of the controller: subscribe to the Queue and call Reconcile for each queue item.

// Start implements controller.Controller.
func (c *Controller) Start(ctx context.Context) error {
...

c.Queue = c.MakeQueue()
...


// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c.processNextWorkItem(ctx) {
}


c.Started = true
...
return nil
}

(Please check the entire codes if you’re interested.)

In Start function, Queue is created and processNextWorkItem is continuously called in a for loop.

processNextWorkItem get an item from the Queue and call reconcileHandler(ctx, obj), which finally calls c.Reconcile(ctx, req).

func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
}

// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.Queue.Done(obj)

ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)

c.reconcileHandler(ctx, obj)
return true
}

Dependent components

As mentioned in the previous sections, the Controller component relies on several dependent components to function effectively. Let’s summarize them below:

Source:

The Source interface serves as a way to obtain events from various sources such as create, update, and delete operations on Kubernetes objects, Webhook callbacks, etc. These events are then processed by the EventHandler.

type Source interface {
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

The Source interface provides the Start method, which is used internally by the Controller to register an EventHandler with the Informer and enqueue reconcile.Requests. This enables the Controller to detect changes in the target resources and trigger the corresponding Reconcile function.

There are several implementations of the Source interface, including Kind, kindWithCache, Informer, Channel, etc. For example, in the bldr.doWatch function, Kind is used in blder.doWatch:

typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}

Each implementation of Source caters to specific use cases and allows the Controller to efficiently monitor and capture events from the designated sources. In most cases, the Source is used to capture changes in the target Kubernetes resources.

We’ll explore the details of these Source implementations in another post.

Queue (workqueue.RateLimitingInterface)

The Queue, also known as workqueue, is an interface that rate limits items being added to the queue. It plays a crucial role in managing the flow of change events detected by the Source and enqueued by the EventHandler.

Please note that workqueue.RateLimitingInterface is not part of the controller-runtime package but is a part of the client-go package:

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface

// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
AddRateLimited(item interface{})

// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})

// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}

The Queue is responsible for maintaining a steady and controlled rate of adding items to the queue, ensuring the Controller can efficiently handle and process them. The RateLimitingInterface provides methods such as AddRateLimited, Forget, and NumRequeues to manage the items in the queue effectively.

In the context of the Controller, the Queue stores the change events detected by the Source and enqueued by the EventHandler. These events are represented as reconcile.Request items, which are consumed by the processNextWorkItem function in the Controller.

In future posts, we will delve into more details about the Queue and its significance in the functioning of the Kubernetes Operator.

EventHandler

The EventHandler is an interface responsible for handling various types of events, such as Create, Update, Delete, or Generic events, and enqueuing them into the workqueue. Each event’s specific logic is defined in the corresponding implementation.

type EventHandler interface {
// Create is called in response to an create event - e.g. Pod Creation.
Create(event.CreateEvent, workqueue.RateLimitingInterface)

// Update is called in response to an update event - e.g. Pod Updated.
Update(event.UpdateEvent, workqueue.RateLimitingInterface)

// Delete is called in response to a delete event - e.g. Pod Deleted.
Delete(event.DeleteEvent, workqueue.RateLimitingInterface)

// Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or
// external trigger request - e.g. reconcile Autoscaling, or a Webhook.
Generic(event.GenericEvent, workqueue.RateLimitingInterface)
}

The implementation of each method defines how the EventHandler should handle the specific event and enqueue it into the workqueue. For example, EnqueueRequestForObject is one such implementation that enqueues the received events into the Queue in the format of reconcile.Request, allowing the Reconciler to retrieve items from the Queue for processing.

Predicate

The Predicate is an interface used to filter events before they are enqueued for processing by EventHandlers.

// Predicate filters events before enqueuing the keys.
type Predicate interface {
// Create returns true if the Create event should be processed
Create(event.CreateEvent) bool

// Delete returns true if the Delete event should be processed
Delete(event.DeleteEvent) bool

// Update returns true if the Update event should be processed
Update(event.UpdateEvent) bool

// Generic returns true if the Generic event should be processed
Generic(event.GenericEvent) bool
}

The Predicate interface provides four methods, each corresponding to a different type of event: Create, Delete, Update, and Generic. These methods return a boolean value indicating whether the corresponding event should be processed or filtered out.

Predicates are useful when you want to selectively handle or ignore certain events based on specific criteria. For example, you can use Predicates to filter out events that are not relevant to your application’s logic or to optimize the resource usage by skipping unnecessary event processing.

If there are no specific filter requirements for the events, you can leave the Predicate empty, and all events will be processed by the EventHandlers.

Summary

In this post, we delved into the Controller component, one of the core components in the controller-runtime. Here are the key points we covered:

  1. The Controller interface embeds the Reconciler interface, indicating the close collaboration between the Controller and the Reconciler.
  2. The Controller is responsible for monitoring changes to the target resources on the Kubernetes API server and calling the Reconcile function in the Reconciler when necessary.
  3. The Watch function in the Controller is used to monitor and detect changes to the target objects via the Kubernetes API. It then enqueues the events into the workqueue for further processing.
  4. The Start function in the Controller is the process that subscribes to the workqueue and triggers the Reconcile function for each item in the queue.
  5. The overall flow of the Controller involves SourceEventHandlerQueueController.ReconcileController.Do.Reconcile.

Throughout this series, we will explore the dependent components of the Controller, such as Source, EventHandler, Predicate, and workqueue, in greater detail.

Stay tuned for the upcoming posts, where we’ll delve deeper into the various components and their implementations!

Series Index

  1. Kubernetes Operator series 1 — controller-runtime example controller
  2. Kubernetes Operator series 2 — Overview of controller-runtime
  3. Kubernetes Operator series 3 — controller-runtime component — Manager
  4. Kubernetes Operator series 4 — controller-runtime component — Builder
  5. Kubernetes Operator series 5 — controller-runtime component — Reconciler
  6. Kubernetes Operator series 6 — controller-runtime component — Controller
  7. Kubernetes Operator series 7 — controller-runtime component — Source

--

--

Masato Naka

An SRE, mainly working on Kubernetes. CKA (Feb 2021). His Interests include Cloud-Native application development, and machine learning.