Configuration

It is possible to fine-tune some aspects of Kopf-based operators, like timeouts, synchronous handler pool sizes, automatic Kubernetes Event creation from object-related log messages, etc.

Startup configuration

Every operator has its settings (even if there is more than one operator in the same process, e.g. due to Embedding). The settings affect how the framework behaves in detail.

The settings can be modified in the startup handlers (see Startup):

import kopf
import logging
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.posting.level = logging.WARNING
    settings.watching.connect_timeout = 1 * 60
    settings.watching.server_timeout = 10 * 60

All the settings have reasonable defaults, so the configuration should be used only for fine-tuning when and if necessary.

For more settings, see kopf.OperatorSettings and settings.

Logging formats and levels

The following log formats are supported on CLI:

  • Full logs (the default) — with timestamps, log levels, and logger names:

    kopf run -v --log-format=full
    
    [2019-11-04 17:49:25,365] kopf.reactor.activit [INFO    ] Initial authentication has been initiated.
    [2019-11-04 17:49:25,650] kopf.objects         [DEBUG   ] [default/kopf-example-1] Resuming is in progress: ...
    
  • Plain logs, with only the message:

    kopf run -v --log-format=plain
    
    Initial authentication has been initiated.
    [default/kopf-example-1] Resuming is in progress: ...
    

    For non-JSON logs, the object prefix can be disabled to make the logs completely flat (as in JSON logs):

    kopf run -v --log-format=plain --no-log-prefix
    
    Initial authentication has been initiated.
    Resuming is in progress: ...
    
  • JSON logs, with only the message:

    kopf run -v --log-format=json
    
    {"message": "Initial authentication has been initiated.", "severity": "info", "timestamp": "2020-12-31T23:59:59.123456"}
    {"message": "Resuming is in progress: ...", "object": {"apiVersion": "kopf.dev/v1", "kind": "KopfExample", "name": "kopf-example-1", "uid": "...", "namespace": "default"}, "severity": "debug", "timestamp": "2020-12-31T23:59:59.123456"}
    

    For JSON logs, the object reference key can be configured to match the log parsers (if used) — instead of the default "object":

    kopf run -v --log-format=json --log-refkey=k8s-obj
    
    {"message": "Initial authentication has been initiated.", "severity": "info", "timestamp": "2020-12-31T23:59:59.123456"}
    {"message": "Resuming is in progress: ...", "k8s-obj": {...}, "severity": "debug", "timestamp": "2020-12-31T23:59:59.123456"}
    

    Note that the object prefixing is disabled for JSON logs by default, as the identifying information is available in the ref-keys. The prefixing can be explicitly re-enabled if needed:

    kopf run -v --log-format=json --log-prefix
    
    {"message": "Initial authentication has been initiated.", "severity": "info", "timestamp": "2020-12-31T23:59:59.123456"}
    {"message": "[default/kopf-example-1] Resuming is in progress: ...", "object": {...}, "severity": "debug", "timestamp": "2020-12-31T23:59:59.123456"}
    

Note

Logging verbosity and formatting are only configured via CLI options, not via settings.logging as all other aspects of configuration. When the startup handlers happen for settings, it is too late: some initial messages could be already logged in the existing formats, or not logged when they should be due to verbosity/quietness levels.

Logging events

settings.posting controls which log messages are posted as Kubernetes events. Use logging constants or integer values to set the level: e.g., logging.WARNING, logging.ERROR, etc. The default is logging.INFO.

import kopf
import logging
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.posting.level = logging.ERROR

The event-posting can be disabled completely (the default is to be enabled):

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.posting.enabled = False

These two settings also affect kopf.event() and related functions: kopf.info(), kopf.warn(), kopf.exception() — even if they are called explicitly in the code.

By default, all log messages made by the handlers on their logger are also posted as Kubernetes events. This can be disabled if it is not desired, e.g. to keep the events list clean, so that only the explicit event-posting calls are posted:

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.posting.loggers = False

Synchronous handlers

settings.execution allows setting the number of synchronous workers used by the operator for synchronous handlers, or replace the asyncio executor with another one:

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.execution.max_workers = 20

It is possible to replace the whole asyncio executor used for synchronous handlers (see Async/Await).

Note that handlers that started in a previous executor will be continued and finished with their original executor. This includes the startup handler itself. To avoid this, make the on-startup handler asynchronous:

import concurrent.futures
import kopf
from typing import Any

@kopf.on.startup()
async def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.execution.executor = concurrent.futures.ThreadPoolExecutor()

The same executor is used both for regular sync handlers and for sync daemons. If you expect a large number of synchronous daemons (e.g. for large clusters), make sure to pre-scale the executor accordingly (the default in Python is 5x times the CPU cores):

import kopf
from typing import Any

@kopf.on.startup()
async def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.execution.max_workers = 1000

Networking timeouts

Timeouts can be controlled when communicating with Kubernetes API:

settings.networking.request_timeout (seconds) is how long a regular request should take before failing. This applies to all atomic requests — cluster scanning, resource patching, etc. — except the watch-streams. The default is 5 minutes (300 seconds).

settings.networking.connect_timeout (seconds) is how long a TCP handshake can take for regular requests before failing. There is no default (None), meaning that there is no timeout specifically for this; however, the handshake is limited by the overall time of the request.

settings.watching.connect_timeout (seconds) is how long a TCP handshake can take for watch-streams before failing. There is no default (None), which means settings.networking.connect_timeout is used if set. If not set, settings.networking.request_timeout is used.

Note

With the current aiohttp-based implementation, both connection timeouts correspond to sock_connect= timeout, not to connect= timeout, which would also include the time for getting a connection from the pool. Kopf uses unlimited aiohttp pools, so this should not be a problem.

settings.watching.server_timeout (seconds) is how long the session with a watching request will exist before closing it from the server side. This value is passed to the server-side in a query string, and the server decides on how to follow it. The watch-stream is then gracefully closed. The default is to use the server setup (None).

settings.watching.client_timeout (seconds) is how long the session with a watching request will exist before closing it from the client side. This includes establishing the connection and event streaming. The default is forever (None).

It makes no sense to set the client-side timeout shorter than the server-side timeout, but it is left to the developers’ responsibility to decide.

The server-side timeouts are unpredictable; they can be 10 seconds or 10 minutes. Yet, it feels wrong to assume any “good” values in a framework (especially since it works without timeouts defined, and just produces extra logs).

Warning

Some setups that involve any kind of a load balancer (LB), such as the cloud-hosted Kubernetes clusters, had a well-known problem of freezing and going silent for no reason if nothing happens in the cluster for some time. The best guess is that the connection operator<>LB remains alive, while the connection LB<>K8s closes. Kopf-based operators remain unaware of this disruption.

This was fixed in Kopf 1.44.0 by using bookmark events. Prior to 1.44.0, setting either the client or the server timeout solves the problem of recovering from such freezes, but at the cost of regular reconnections in the normal flow of operations. There is no good default value either; you should determine it experimentally based on your operational requirements, cluster size, and activity level, usually in the range of 1-10 minutes.

settings.watching.reconnect_backoff (seconds) is a backoff interval between watching requests — to prevent API flooding in case of errors or disconnects. The default is 0.1 seconds (nearly instant, but not flooding).

settings.watching.inactivity_timeout (seconds) is how long the watch stream is allowed to stay completely silent —delivering no events, not even bookmarks— before it is considered dead and closed for reconnection. Kubernetes sends bookmark events every 60 seconds as a heartbeat and caches the recent events for 75 seconds (here), so the default of 70 seconds allows for reasonable jitter while still detecting streams that are silently stalled at the TCP level, and then streaming the events from Kubernetes while they are still fresh. You can adjust the timeout if needed. Values lower than ≈62 will cause frequent reconnects on low-activity clusters. To effectively disable the inactivity tracking, set it to a huge number, e.g., 999999999 (≈30 years).

Note

The inactivity tracking is NOT supported in Python 3.10. It works only since Python 3.11 and higher. Python 3.10 behaves the old way — ignores the stalled connections and might freeze with no action indefinitely. As a workaround, set settings.watching.client_timeout to 1-10 mins. Python 3.10’s end-of-life is October 2026, so the fix is not planned. However, the mere flow of bookmark events every 60 seconds may keep the connection alive and resolve the original issue of freezing.

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.networking.connect_timeout = 10
    settings.networking.request_timeout = 60
    settings.watching.server_timeout = 10 * 60

Proxy and environment trust

settings.networking.trust_env (boolean) controls whether the HTTP client session respects the proxy-related environment variables (HTTP_PROXY, HTTPS_PROXY, NO_PROXY) and the ~/.netrc file for credentials. The default is False.

When set to True, all built-in login handlers propagate this flag to kopf.ConnectionInfo, which in turn passes it to the HTTP client session. The session will then use the environment variables and ~/.netrc to configure proxies and credentials automatically. This is useful in environments where the operator must route traffic through a corporate proxy or where the credentials are managed externally.

For more details on authentication and custom login handlers, see Authentication.

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.networking.trust_env = True

Consistency

Generally, Kopf processes the resource events and updates streamed from the Kubernetes API as soon as possible, with no delays or skipping. However, high-level change-detection handlers (creation/resume/update/deletion) require a consistent state of the resource. _Consistency_ means that all patches applied by Kopf itself have arrived back via the watch-stream. If Kopf did not patch the resource recently, it is consistent by definition.

The _inconsistent_ states can happen in relatively rare circumstances on slow networks (with high latency between operator and api-servers) or under high load (high number of resources or changes), especially when an unrelated application or another operator patches the resources on their own.

Handling the _inconsistent_ states could cause double-processing (i.e. double handler execution) and some other undesired side effects. To prevent handling inconsistent states, all state-dependent handlers wait until _consistency_ is reached via one of the following two ways:

  • The expected resource version from the PATCH API operation arrives via the watch-stream of the resource within the specified time window.

  • The expected resource version from the PATCH API operation does not arrive via the watch-stream within the specified time window, in which case Kopf assumes consistency after the time window ends, and processing continues as if the version had arrived, possibly causing the mentioned side effects.

The time window is measured relative to the time of the latest PATCH call. The timeout should be long enough to assume that if the expected resource version did not arrive within the specified time, it will never arrive.

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.persistence.consistency_timeout = 10

The default value (5 seconds) aims for the safest scenario out of the box.

The value of 0 will effectively disable the consistency tracking and declare all resource states as consistent — even if they are not. Use this with care — e.g., with self-made persistence storages instead of Kopf’s annotations (see Handling progress and Change detection).

The consistency timeout does not affect low-level handlers with no persistence, such as @kopf.on.event, @kopf.index, @kopf.daemon, @kopf.timer — these handlers run for each and every watch-event with no delay (if they match the filters, of course).

Finalizers

A resource is blocked from deletion if the framework believes it is safer to do so, e.g. if non-optional deletion handlers are present or if daemons/timers are running at the moment.

For this, a finalizer is added to the object. It is removed when the framework believes it is safe to release the object for actual deletion.

The name of the finalizer can be configured:

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.persistence.finalizer = 'my-operator.example.com/kopf-finalizer'

The default is the one that was hard-coded before: kopf.zalando.org/KopfFinalizerMarker.

Handling progress

To keep the handling state across multiple handling cycles, and to be resilient to errors and tolerable to restarts and downtimes, the operator keeps its state in a configured state storage. See more in Continuity.

To store the state only in the annotations with a preferred prefix:

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.persistence.progress_storage = kopf.AnnotationsProgressStorage(prefix='my-op.example.com')

To store the state only in the status or any other field:

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.persistence.progress_storage = kopf.StatusProgressStorage(field='status.my-operator')

To store in multiple places (all are written to in sync, but the first found state will be used when reading, i.e. the first storage has precedence):

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.persistence.progress_storage = kopf.MultiProgressStorage([
        kopf.AnnotationsProgressStorage(prefix='my-op.example.com'),
        kopf.StatusProgressStorage(field='status.my-operator'),
    ])

The default storage is in annotations, plus read-only from the status stanza, with annotations taking precedence over the status. This was a transitional solution from the original status-only storage (Mar’20–Mar’26; v0.27–1.44). The annotations are kopf.zalando.org/{id} (read-write), the status fields are status.kopf.progress.{id} (read-only). It is an equivalent of:

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.persistence.progress_storage = kopf.SmartProgressStorage()

It is also possible to implement custom state storage instead of storing the state directly in the resource’s fields — e.g., in external databases. For this, inherit from kopf.ProgressStorage and implement its abstract methods (fetch(), store(), purge(), optionally flush()).

Note

The legacy behavior is an equivalent of kopf.StatusProgressStorage(field='status.kopf.progress').

Starting with Kubernetes 1.16, both custom and built-in resources have strict structural schemas with the pruning of unknown fields (more information is in Future of CRDs: Structural Schemas).

Long story short, unknown fields are silently pruned by Kubernetes API. As a result, Kopf’s status storage will not be able to store anything in the resource, as it will be instantly lost. (See #321.)

To quickly fix this for custom resources, modify their definitions with x-kubernetes-preserve-unknown-fields: true. For example:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
spec:
  scope: ...
  group: ...
  names: ...
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          x-kubernetes-preserve-unknown-fields: true

See a more verbose example in examples/crd.yaml.

For built-in resources, such as pods, namespaces, etc, the schemas cannot be modified, so a full switch to annotations storage is advised.

The new default “smart” storage is designed to ensure a smooth upgrade of Kopf-based operators to the new state location without any special upgrade actions or conversions.

Change detection

For change-detecting handlers, Kopf keeps the last handled configuration — i.e. the last state that has been successfully handled. New changes are compared against the last handled configuration, and a diff list is formed.

The last-handled configuration is also used to detect if there were any essential changes at all — i.e. not just the system or status fields.

The last-handled configuration storage can be configured with settings.persistence.diffbase_storage. The default is an equivalent of:

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.persistence.diffbase_storage = kopf.AnnotationsDiffBaseStorage(
        prefix='kopf.zalando.org',
        key='last-handled-configuration',
    )

The stored content is a JSON-serialised essence of the object (i.e., only the important fields, with system fields and status stanza removed).

It is generally not a good idea to override this storage unless multiple Kopf-based operators must handle the same resources and must not collide with each other. In that case, they must use different names.

Storage transition

Warning

Changing a storage method for an existing operator with existing resources is dangerous: the operator will consider all those resources as not yet handled (due to the absence of a diff-base key), or will lose their progress state (if some handlers are retried or slow). The operator will start handling each of them again, which can lead to duplicated children or other side effects.

To ensure a smooth transition, use a composite multi-storage with the new storage as the first child and the old storage as the second child (both are used for writing; the first found value is used for reading).

For example, to eventually switch from Kopf’s annotations to a status field for diff-base storage, apply this configuration:

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.persistence.diffbase_storage = kopf.MultiDiffBaseStorage([
        kopf.StatusDiffBaseStorage(field='status.diff-base'),
        kopf.AnnotationsDiffBaseStorage(prefix='kopf.zalando.org', key='last-handled-configuration'),
    ])

Run the operator for some time. Let all resources change, or force this, e.g. by arbitrarily labelling them so that a new diff-base is generated:

kubectl label kex -l somelabel=somevalue  ping=pong

Then, switch to the new storage alone, without the transitional setup.

Cluster discovery

settings.scanning.disabled controls the cluster discovery at runtime.

If enabled (the default), the operator will try to observe namespaces and custom resources, and will gracefully start/stop the watch streams for them (as well as peering activities, if applicable). This requires RBAC permissions to list/watch V1 namespaces and CRDs.

If disabled, or if enabled but the permission is not granted, then only the specific namespaces will be served, with namespace patterns ignored; and only the resources detected at startup will be served, with added CRDs or CRD versions being ignored and deleted CRDs causing failures.

The default mode is sufficient for most cases, unless the strict (non-dynamic) mode is intended, for example to suppress the warnings in the logs.

If you have very restrictive cluster permissions, disable the cluster discovery:

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.scanning.disabled = True

Retrying of API errors

In some cases, the Kubernetes API servers might not be ready on startup, or occasionally at runtime; the network might have issues too. In most cases, these issues are temporary and resolve themselves within seconds.

The framework retries TCP/SSL networking errors and HTTP 5xx errors (“the server is wrong”) — i.e. everything presumed to be temporary; other errors presumed to be permanent, including HTTP 4xx errors (“the client is wrong”), escalate immediately without retrying.

The setting settings.networking.error_backoffs controls how many times and with what backoff interval (in seconds) the retries are performed.

It is a sequence of back-offs between attempts (in seconds):

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.networking.error_backoffs = [10, 20, 30]

Note that the number of attempts is one more than the number of backoff intervals (because the backoffs happen between the attempts).

A single integer or float value means a single backoff, i.e. 2 attempts total: (1.0) is equivalent to (1.0,) or [1.0] for convenience.

To have a uniform back-off delay D with N+1 attempts, set to [D] * N.

To disable retrying (at your own risk), set it to [] or ().

The default value covers roughly a minute of attempts before giving up.

Once the retries are over (if disabled, immediately on error), the API errors escalate and are then handled according to Throttling of unexpected errors.

This value can be an arbitrary collection or iterable object (even infinite): only iter() is called on every new retrying cycle, no other protocols are required; however, make sure that it is re-iterable for multiple uses:

import kopf
import random
from collections.abc import Iterator
from typing import Any

class InfiniteBackoffsWithJitter:
    def __iter__(self) -> Iterator[int]:
        while True:
            yield 10 + random.randint(-5, +5)

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.networking.error_backoffs = InfiniteBackoffsWithJitter()

Retrying an API error blocks the task or the object’s worker in which the API error occurs. However, other objects and tasks run normally in parallel (unless they encounter the same error in the same cluster).

Each consecutive error leads to the next, typically larger backoff. Each success resets the backoff intervals, and they restart from the beginning on the next error.

Note

The format is the same as for settings.queueing.error_delays. The only difference is that if the API operation does not succeed by the end of the sequence, the error of the last attempt escalates instead of blocking and retrying indefinitely with the last delay in the sequence.

See also

These back-offs cover only the server-side and networking errors. For errors in handlers, see Error handling. For errors in the framework, see Throttling of unexpected errors.

Throttling of “too many requests”

When the API server responds with HTTP 429 “Too Many Requests”, Kopf will retry as for usual errors. However, it will obey the server-suggested interval if it is longer than what Kopf would use otherwise from its own settings.networking.error_backoffs.

settings.networking.enforce_retry_after (boolean) tells Kopf what to do when its own backoff interval is longer than the server-requested interval. If True, the server-provided interval will be used. If False (default), Kopf’s longer backoff interval will be used. Either way, the backoff interval will never be shorter than what the server requested.

Throttling of unexpected errors

To prevent an uncontrollable flood of activity in case of errors that prevent resources from being marked as handled, which could lead to Kubernetes API flooding, it is possible to throttle activity on a per-resource basis:

import kopf
from typing import Any

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_: Any) -> None:
    settings.queueing.error_delays = [10, 20, 30]

In that case, all unhandled errors in the framework or in the Kubernetes API will be backed off by 10s after the 1st error, then by 20s after the 2nd, and then by 30s after the 3rd, 4th, 5th errors and so on. On the first success, the backoff intervals will be reset and reused on the next error.

The default is a sequence of Fibonacci numbers from 1 second to 10 minutes.

The backoffs are not persisted, so they are lost on operator restarts.

These backoffs do not cover errors in the handlers — the handlers have their own per-handler backoff intervals. These backoffs are for Kopf’s own errors.

To disable throttling (at your own risk), set it to [] or (). This means: no throttling delays set, no throttling sleeps performed.

If needed, this value can be an arbitrary collection/iterator/object: only iter() is called on every new throttling cycle, no other protocols are required; but make sure that it is re-iterable for multiple uses.

Log levels & filters

If the logs of any component are too verbose or contain sensitive data, this can be controlled with the usual Python logging machinery.

For example, to disable the access logs of the probing server:

import logging
from typing import Any

@kopf.on.startup()
async def configure(**_: Any) -> None:
    logging.getLogger('aiohttp.access').propagate = False

To selectively filter only some log messages but not the others:

import kopf
import logging
from typing import Any

class ExcludeProbesFilter(logging.Filter):
    def filter(self, record: logging.LogRecord) -> bool:
        return 'GET /healthz ' not in record.getMessage()

@kopf.on.startup()
async def configure_access_logs(**_: Any) -> None:
    logging.getLogger('aiohttp.access').addFilter(ExcludeProbesFilter())

For more information on the logging configuration, see: logging.

In particular, you can use the special logger kopf.objects to filter object-related log messages coming from the logger and from Kopf’s internals, which are then posted as Kubernetes events (v1/events):

import kopf
import logging
from typing import Any

class ExcludeKopfInternals(logging.Filter):
    def filter(self, record: logging.LogRecord) -> bool:
        return '/kopf/' not in record.pathname

@kopf.on.startup()
async def configure_kopf_logs(**_: Any) -> None:
    logging.getLogger('kopf.objects').addFilter(ExcludeKopfInternals())

Warning

The path names and module names of internal modules, as well as the extra fields of logging.LogRecord added by Kopf, can change without notice. Do not rely on their stability. They are not a public interface of Kopf.