Kubernetes Operator series 6 — controller-runtime component — Controller
Introduction
In our previous post, we explored the Reconciler component and discovered its collaboration with the Controller
. As we learned, the Controller
’s role involves monitoring target resources and triggering the Reconciler
function in the Reconciler
. The Reconciler
, in turn, regularly checks the status of the target resources on Kubernetes API server. When any changes are detected, it triggers the Reconcile
function to take appropriate actions.
Now let’s dive into the Controller component, which is one of the core components in the controller-runtime.
What's Controller?
The Controller component plays a crucial role in monitoring changes to the target resources on the Kubernetes API server and involving the Reconcile function in the Reconciler.
The process involves the following steps:
- Detecting changes in the target objects via the Kubernetes API.
- Calling the Reconcile function in the Reconciler when necessary.
- Within the Reconcile function, the Reconciler retrieves the latest status of the target object (possibly from the cache) and updates it if required.
To initialize a Controller, both the target resources and the corresponding Reconciler must be provided. This initialization process is handled by the Builder component in controller-runtime.
If you need a refresher on the Builder component’s role, you can refer back to the post where it was introduced.
The Builder component initializes the Controller with the designated target resources and Reconciler, and then registers the Controller with the given Manager.
Finally, when the Manager’s Start function is triggered, it starts the registered Controllers.
This is an overview of the role of the Controller component and the process of how a Controller is initialized and started.
In the following sections, we will delve into more details about the Controller component.
Interface
Let’s take a look at the Controller interface:
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
// from source.Sources. Work is performed through the reconcile.Reconciler for each enqueued item.
// Work typically is reads and writes Kubernetes objects to make the system state match the state specified
// in the object Spec.
type Controller interface {
// Reconciler is called to reconcile an object by Namespace/Name
reconcile.Reconciler
// Watch takes events provided by a Source and uses the EventHandler to
// enqueue reconcile.Requests in response to the events.
//
// Watch may be provided one or more Predicates to filter events before
// they are given to the EventHandler. Events will be passed to the
// EventHandler if all provided Predicates evaluate to true.
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
// Start starts the controller. Start blocks until the context is closed or a
// controller has an error starting.
Start(ctx context.Context) error
// GetLogger returns this controller logger prefilled with basic information.
GetLogger() logr.Logger
}
As you can see, the Controller interface embeds reconcile.Reconciler
interface. As we studied in the previous post, Reconciler
must have Reconcile
function.
The Controller
interface has two important functions: Watch
and Start
, which we’ll take a look at in details later in this post.
From this interface, you can guess the core characteristics of the Controller component:
Reconcile
function must be provided to aController
Watch
gets events from Source, and handles the received events with Predicates for filtering and EventHandler usually for putting them into a Queue.Start
triggers the Controller to start subscribing to the queue to get a new item and call the Reconcile with the item in the format of thereconcile.Request
.
We’ll explore the details of a specific Controller implementation later in this post.
How a Controller is initialized?
Before diving into the details of an implementation of the Controller interface, let’s check the two functions to create a Controller, which is provided by the controller package:
New(name string, mgr manager.Manager, options Options) (Controller, error)
: returns a new Controller registered with the Manager. If you initialize a Controller withNew
, you can start the Controller by calling the Manager’sStart
function.NewUnmanaged(name string mgr manager.Manager, options Options) (Controller, error)
: returns a new controller without adding it to the Manager. If you initialize a Controller withNewUnmanaged
, you’re responsible for starting the controller.
When using either of the above method, the implementation of the Controller interface in the internal package will be initialized:
&controller.Controller{
Do: options.Reconciler,
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
SetFields: mgr.SetFields,
Name: name,
LogConstructor: options.LogConstructor,
RecoverPanic: options.RecoverPanic,
}
Note that it’s a little bit confusing in that the name of the struct is also Controller
.
Let’s explore the implementation of Controller
in the next section.
Implementation
The definition of Controller
is as follows:
type Controller struct {
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Name string
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
MaxConcurrentReconciles int
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
// ensures that the state of the system matches the state specified in the object.
// Defaults to the DefaultReconcileFunc.
Do reconcile.Reconciler
// MakeQueue constructs the queue for this controller once the controller is ready to start.
// This exists because the standard Kubernetes workqueues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
MakeQueue func() workqueue.RateLimitingInterface
// Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing
Queue workqueue.RateLimitingInterface
// SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates
// Deprecated: the caller should handle injected fields itself.
SetFields func(i interface{}) error
// mu is used to synchronize Controller setup
mu sync.Mutex
// Started is true if the Controller has been Started
Started bool
// ctx is the context that was passed to Start() and used when starting watches.
//
// According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
// while we usually always strive to follow best practices, we consider this a legacy case and it should
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
ctx context.Context
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
// Defaults to 2 minutes if not set.
CacheSyncTimeout time.Duration
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []watchDescription
// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
// or for example when a watch is started.
// Note: LogConstructor has to be able to handle nil requests as we are also using it
// outside the context of a reconciliation.
LogConstructor func(request *reconcile.Request) logr.Logger
// RecoverPanic indicates whether the panic caused by reconcile should be recovered.
RecoverPanic bool
}
There are several fields in the struct, and we can’t cover all of them. We’ll just pick up a few important ones. It’d be helpful to start with the core fields:
Do
(reconcile.Reconciler
): TheReconciler
passed to theController
is stored in this field.Controller
hasReconcile
function, which callsDo.Reconcile
(TheReconcile
function of the givenReconciler
)MakeQueue
(func() workqueue.RateLimitingInterface
): A function to create a workQueue.Queue
(workqueue.RateLimitingInterface
): A queue to transmit events of changes of the target resources.
Now let’s see the entire flow of how a Controller
is initialized and started step by step. Let’s also remember how Manager
, Builder
, and Reconciler
work together.
- Prepare
Manager
andReconciler
. - Bind the
Reconciler
and theManager
withBuilder
by callingNewControllerManagedBy(mgr)
andComplete(r)
, in whichBuilder.build
callsbldr.doController
to create a controller with Controller.New(). (this is what we’ve just studied!!). - The created
Controller
is registered to theManager
withmgr.Add(controller)
and set tobldr.ctrl
in theBuilder
. Builder.build
also callsbldr.doWatch
, which initializes aKind
(a source of events for changes of the target resources) forFor
,Owns
, andWatches
input, andEnqueueRequestForObject
(a handler that enqueus the received events from theSource
into theQueue
in the type ofreconcile.Request
), and lastly callsController
’sWatch
function. We’ll study what’s done insideWatch
function in the next section.Manager.Start
callscontroller.Start
, which creates aQueue
and starts the source of each target resource (startWatches
) and starts subscribing to theQueue
to get changed item and trigger Reconciler.Reconcile.
Watch func
Watch func is the first half process of the controller: it monitors and detects changes of the target resources, and enqueues the events into Queue
.
The Watch
function is called in bldr.doWatch
in Builder
for For
, Owns
, and Watches
.
// Watch implements controller.Controller.
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
c.mu.Lock()
defer c.mu.Unlock()
// Inject Cache into arguments
if err := c.SetFields(src); err != nil {
return err
}
if err := c.SetFields(evthdler); err != nil {
return err
}
for _, pr := range prct {
if err := c.SetFields(pr); err != nil {
return err
}
}
// Controller hasn't started yet, store the watches locally and return.
//
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
if !c.Started {
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
return nil
}
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}
Then, the Watch
function calls SetFields
, which injects the shared cache (originally held by the Cluster
in the Manager
) into Source
, EventHandler
, and Predicates
. The details of those components will be studied in another post.
There are several types of Source
, but for now, you can think of it as a component that retrieves changes to the target resource via the Kubernetes API.
EventHandler
can be considered as a component that converts the received event from the Source
component into a Request
. This Request
is enqueued to the Queue
and used as input for Reconciler.Reconcile
.
Finally, src.Start(c.ctx, evthdler, c.Queue, prct…)
starts monitoring the Kubernetes API and puts changes into the Queue
.
Start func
Start func is the latter half process of the controller: subscribe to the Queue
and call Reconcile
for each queue item.
// Start implements controller.Controller.
func (c *Controller) Start(ctx context.Context) error {
...
c.Queue = c.MakeQueue()
...
// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c.processNextWorkItem(ctx) {
}
c.Started = true
...
return nil
}
(Please check the entire codes if you’re interested.)
In Start
function, Queue
is created and processNextWorkItem
is continuously called in a for loop.
processNextWorkItem
get an item from the Queue
and call reconcileHandler(ctx, obj)
, which finally calls c.Reconcile(ctx, req)
.
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
}
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.Queue.Done(obj)
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
c.reconcileHandler(ctx, obj)
return true
}
Dependent components
As mentioned in the previous sections, the Controller
component relies on several dependent components to function effectively. Let’s summarize them below:
Source:
The Source
interface serves as a way to obtain events from various sources such as create, update, and delete operations on Kubernetes objects, Webhook callbacks, etc. These events are then processed by the EventHandler
.
type Source interface {
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}
The Source
interface provides the Start
method, which is used internally by the Controller
to register an EventHandler
with the Informer
and enqueue reconcile.Request
s. This enables the Controller
to detect changes in the target resources and trigger the corresponding Reconcile
function.
There are several implementations of the Source
interface, including Kind
, kindWithCache
, Informer
, Channel
, etc. For example, in the bldr.doWatch
function, Kind
is used in blder.doWatch
:
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
Each implementation of Source
caters to specific use cases and allows the Controller
to efficiently monitor and capture events from the designated sources. In most cases, the Source
is used to capture changes in the target Kubernetes resources.
We’ll explore the details of these Source
implementations in another post.
Queue (workqueue.RateLimitingInterface)
The Queue
, also known as workqueue
, is an interface that rate limits items being added to the queue. It plays a crucial role in managing the flow of change events detected by the Source
and enqueued by the EventHandler
.
Please note that workqueue.RateLimitingInterface
is not part of the controller-runtime package but is a part of the client-go package:
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}
The Queue
is responsible for maintaining a steady and controlled rate of adding items to the queue, ensuring the Controller
can efficiently handle and process them. The RateLimitingInterface
provides methods such as AddRateLimited
, Forget
, and NumRequeues
to manage the items in the queue effectively.
In the context of the Controller
, the Queue
stores the change events detected by the Source
and enqueued by the EventHandler
. These events are represented as reconcile.Request
items, which are consumed by the processNextWorkItem
function in the Controller
.
In future posts, we will delve into more details about the Queue
and its significance in the functioning of the Kubernetes Operator.
EventHandler
The EventHandler
is an interface responsible for handling various types of events, such as Create, Update, Delete, or Generic events, and enqueuing them into the workqueue
. Each event’s specific logic is defined in the corresponding implementation.
type EventHandler interface {
// Create is called in response to an create event - e.g. Pod Creation.
Create(event.CreateEvent, workqueue.RateLimitingInterface)
// Update is called in response to an update event - e.g. Pod Updated.
Update(event.UpdateEvent, workqueue.RateLimitingInterface)
// Delete is called in response to a delete event - e.g. Pod Deleted.
Delete(event.DeleteEvent, workqueue.RateLimitingInterface)
// Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or
// external trigger request - e.g. reconcile Autoscaling, or a Webhook.
Generic(event.GenericEvent, workqueue.RateLimitingInterface)
}
The implementation of each method defines how the EventHandler
should handle the specific event and enqueue it into the workqueue
. For example, EnqueueRequestForObject
is one such implementation that enqueues the received events into the Queue
in the format of reconcile.Request
, allowing the Reconciler
to retrieve items from the Queue
for processing.
Predicate
The Predicate
is an interface used to filter events before they are enqueued for processing by EventHandlers
.
// Predicate filters events before enqueuing the keys.
type Predicate interface {
// Create returns true if the Create event should be processed
Create(event.CreateEvent) bool
// Delete returns true if the Delete event should be processed
Delete(event.DeleteEvent) bool
// Update returns true if the Update event should be processed
Update(event.UpdateEvent) bool
// Generic returns true if the Generic event should be processed
Generic(event.GenericEvent) bool
}
The Predicate
interface provides four methods, each corresponding to a different type of event: Create, Delete, Update, and Generic. These methods return a boolean value indicating whether the corresponding event should be processed or filtered out.
Predicates
are useful when you want to selectively handle or ignore certain events based on specific criteria. For example, you can use Predicates
to filter out events that are not relevant to your application’s logic or to optimize the resource usage by skipping unnecessary event processing.
If there are no specific filter requirements for the events, you can leave the Predicate empty, and all events will be processed by the EventHandlers
.
Summary
In this post, we delved into the Controller
component, one of the core components in the controller-runtime. Here are the key points we covered:
- The
Controller
interface embeds the Reconciler interface, indicating the close collaboration between the Controller and the Reconciler. - The
Controller
is responsible for monitoring changes to the target resources on the Kubernetes API server and calling the Reconcile function in theReconciler
when necessary. - The
Watch
function in theController
is used to monitor and detect changes to the target objects via the Kubernetes API. It then enqueues the events into theworkqueue
for further processing. - The
Start
function in theController
is the process that subscribes to theworkqueue
and triggers theReconcile
function for each item in the queue. - The overall flow of the
Controller
involvesSource
→EventHandler
→Queue
→Controller.Reconcile
→Controller.Do.Reconcile
.
Throughout this series, we will explore the dependent components of the Controller, such as Source
, EventHandler
, Predicate
, and workqueue
, in greater detail.
Stay tuned for the upcoming posts, where we’ll delve deeper into the various components and their implementations!
Series Index
- Kubernetes Operator series 1 — controller-runtime example controller
- Kubernetes Operator series 2 — Overview of controller-runtime
- Kubernetes Operator series 3 — controller-runtime component — Manager
- Kubernetes Operator series 4 — controller-runtime component — Builder
- Kubernetes Operator series 5 — controller-runtime component — Reconciler
- Kubernetes Operator series 6 — controller-runtime component — Controller
- Kubernetes Operator series 7 — controller-runtime component — Source