In-memory indexing¶
Indexers automatically maintain in-memory overviews of resources (indices), grouped by keys that are usually calculated based on these resources.
The indices can be used for cross-resource awareness: e.g., when a resource of kind X is changed, it can get all the information about all resources of kind Y without talking to the Kubernetes API. Under the hood, the centralised watch streams — one per resource kind — are more efficient at gathering information than individual listing requests.
Index declaration¶
Indices are declared with a @kopf.index decorator on an indexing function
(all standard filters are supported — see Filtering):
import kopf
from typing import Any
@kopf.index('pods')
async def my_idx(**_: Any) -> Any:
...
The name of the function or its id= option is the index’s name.
The indices are then available to all resource- and operator-level handlers as direct kwargs named the same as the index (type hints are optional):
import kopf
from typing import Any
# ... continued from previous examples:
@kopf.timer('KopfExample', interval=5)
def tick(my_idx: kopf.Index, **_: Any) -> None:
...
@kopf.on.probe()
def metric(my_idx: kopf.Index, **_: Any) -> Any:
...
When a resource is created or starts matching the filters, it is processed by all relevant indexing functions, and the result is put into the indices.
When a previously indexed resource is deleted or stops matching the filters, all associated values are removed (as are all empty collections after that — to keep the indices clean).
See also
Health-checks for probing handlers in the example above.
Index structure¶
An index is always a read-only mapping of type kopf.Index
with arbitrary keys pointing to collections of type kopf.Store,
which in turn contain arbitrary values generated by the indexing functions.
The index is initially empty. Collections are never empty:
they are removed when their last item is removed.
For example, if several individual resources return the following results from the same indexing function, then the index gets the following structure (shown in the comment below the code):
return {'key1': 'valueA'} # 1st
return {'key1': 'valueB'} # 2nd
return {'key2': 'valueC'} # 3rd
# {'key1': ['valueA', 'valueB'],
# 'key2': ['valueC']}
Indices are not nested. A 2nd-level mapping in the result is stored as a regular value:
return {'key1': 'valueA'} # 1st
return {'key1': 'valueB'} # 2nd
return {'key2': {'key3': 'valueC'}} # 3rd
# {'key1': ['valueA', 'valueB'],
# 'key2': [{'key3': 'valueC'}]}
Index content¶
When an indexing function returns a dict (strictly dict — not a generic
mapping, not even a subclass of dict, such as kopf.Memo),
it is merged into the index under the key taken from the result:
import kopf
from typing import Any
@kopf.index('pods')
async def string_keys(namespace: str | None, name: str, **_: Any) -> Any:
return {namespace: name}
# {'namespace1': ['pod1a', 'pod1b', ...],
# 'namespace2': ['pod2a', 'pod2b', ...],
# ...]
Multi-value keys are possible using tuples or other hashable types:
import kopf
from typing import Any
@kopf.index('pods')
async def tuple_keys(namespace: str | None, name: str, **_: Any) -> Any:
return {(namespace, name): 'hello'}
# {('namespace1', 'pod1a'): ['hello'],
# ('namespace1', 'pod1b'): ['hello'],
# ('namespace2': 'pod2a'): ['hello'],
# ('namespace2', 'pod2b'): ['hello'],
# ...}
Multiple keys can be returned at once for a single resource, and they are all merged into their respective places in the index:
import kopf
from typing import Any
@kopf.index('pods')
async def by_label(labels: kopf.Labels, name: str, **_: Any) -> Any:
return {(label, value): name for label, value in labels.items()}
# {('label1', 'value1a'): ['pod1', 'pod2', ...],
# ('label1', 'value1b'): ['pod3', 'pod4', ...],
# ('label2', 'value2a'): ['pod5', 'pod6', ...],
# ('label2', 'value2b'): ['pod1', 'pod3', ...],
# ...}
@kopf.timer('kex', interval=5)
def tick(by_label: kopf.Index, **_: Any) -> None:
print(list(by_label.get(('label2', 'value2b'), [])))
# ['pod1', 'pod3']
for podname in by_label.get(('label2', 'value2b'), []):
print(f"==> {podname}")
# ==> pod1
# ==> pod3
Note the multiple occurrences of some pods because they have two or more labels. However, they never repeat within the same label — a label can have only one value.
Recipes¶
Unindexed collections¶
When an indexing function returns a non-dict value — i.e. strings, numbers,
tuples, lists, sets, memos, or any other object except dict — the key
is assumed to be None and a flat index with only one key is constructed.
The resources are not indexed by key, but rather collected under the same key
(which is still considered indexing):
import kopf
from typing import Any
@kopf.index('pods')
async def pod_names(name: str, **_: Any) -> Any:
return name
# {None: ['pod1', 'pod2', ...]}
Other types and complex objects returned from the indexing function are stored as-is (i.e. with no special treatment):
import kopf
from typing import Any
@kopf.index('pods')
async def container_names(spec: kopf.Spec, **_: Any) -> Any:
return {container['name'] for container in spec.get('containers', [])}
# {None: [{'main1', 'sidecar2'}, {'main2'}, ...]}
Enumerating resources¶
If the goal is not to store any payload but only to list existing resources, index the resources’ identities (usually their namespaces and names).
One way is to collect their identities in a flat collection — useful when you mostly need to iterate over all of them without key lookups:
import kopf
from typing import Any
@kopf.index('pods')
async def pods_list(namespace: str | None, name: str, **_: Any) -> Any:
return namespace, name
# {None: [('namespace1', 'pod1a'),
# ('namespace1', 'pod1b'),
# ('namespace2', 'pod2a'),
# ('namespace2', 'pod2b'),
# ...]}
@kopf.timer('kopfexamples', interval=5)
def tick_list(pods_list: kopf.Index, **_: Any) -> None:
for ns, name in pods_list.get(None, []):
print(f"{ns}::{name}")
Another way is to index them by key — when index lookups happen more often than full iterations:
import kopf
from typing import Any
@kopf.index('pods')
async def pods_dict(namespace: str | None, name: str, **_: Any) -> Any:
return {(namespace, name): None}
# {('namespace1', 'pod1a'): [None],
# ('namespace1', 'pod1b'): [None],
# ('namespace2', 'pod2a'): [None],
# ('namespace2', 'pod2b'): [None],
# ...}
@kopf.timer('kopfexamples', interval=5)
def tick_dict(pods_dict: kopf.Index, spec: kopf.Spec, namespace: str | None, **_: Any) -> None:
monitored_namespace = spec.get('monitoredNamespace', namespace)
for ns, name in pods_dict:
if ns == monitored_namespace:
print(f"in {ns}: {name}")
Mirroring resources¶
To store the entire resource or its essential parts, return them explicitly:
import kopf
from typing import Any
@kopf.index('deployments')
async def whole_deployments(name: str, namespace: str | None, body: kopf.Body, **_: Any) -> Any:
return {(namespace, name): body}
@kopf.timer('kopfexamples', interval=5)
def tick(whole_deployments: kopf.Index, **_: Any) -> None:
deployment, *_ = whole_deployments[('kube-system', 'coredns')]
actual = deployment.status.get('replicas')
desired = deployment.spec.get('replicas')
print(f"{deployment.meta.name}: {actual}/{desired}")
Note
Be mindful of memory consumption on large clusters and/or overly verbose objects. Pay particular attention to memory consumption from “managed fields” (see kubernetes/kubernetes#90066).
Indices of indices¶
Iterating over all keys of an index can be slow (especially when there are many keys — e.g. with thousands of pods). In such cases, an index of an index can be built: one primary index contains the real values to be used, while a secondary index contains only the keys of the primary index (in full or in part).
By looking up a single key in the secondary index, the operator can directly obtain or reconstruct all the necessary keys in the primary index instead of iterating over the primary index with filtering.
For example, suppose you want to get all container names of all pods in a namespace. In that case, the primary index indexes containers by pods’ namespace and name, while the secondary index indexes pod names by namespace only:
import kopf
from typing import Any
@kopf.index('pods')
async def primary(namespace: str | None, name: str, spec: kopf.Spec, **_: Any) -> Any:
container_names = {container['name'] for container in spec['containers']}
return {(namespace, name): container_names}
# {('namespace1', 'pod1a'): [{'main'}],
# ('namespace1', 'pod1b'): [{'main', 'sidecar'}],
# ('namespace2', 'pod2a'): [{'main'}],
# ('namespace2', 'pod2b'): [{'the-only-one'}],
# ...}
@kopf.index('pods')
async def secondary(namespace: str | None, name: str, **_: Any) -> Any:
return {namespace: name}
# {'namespace1': ['pod1a', 'pod1b'],
# 'namespace2': ['pod2a', 'pod2b'],
# ...}
@kopf.timer('kopfexamples', interval=5)
def tick(primary: kopf.Index, secondary: kopf.Index, spec: kopf.Spec, **_: Any) -> None:
namespace_containers: set[str] = set()
monitored_namespace = spec.get('monitoredNamespace', 'default')
for pod_name in secondary.get(monitored_namespace, []):
reconstructed_key = (monitored_namespace, pod_name)
pod_containers, *_ = primary[reconstructed_key]
namespace_containers |= pod_containers
print(f"containers in {monitored_namespace}: {namespace_containers}")
# containers in namespace1: {'main', 'sidecar'}
# containers in namespace2: {'main', 'the-only-one'}
However, such complex structures and performance requirements are rare. For simplicity and performance, nested indices are not directly provided by the framework as a built-in feature, only as this tip based on other official features.
Conditional indexing¶
Besides the usual filters (see Filtering), resources can be skipped
from indexing by returning None (Python’s default return value for functions with no result).
If the indexing function returns None or does not return anything,
its result is ignored and nothing is indexed. The existing values in the index
are preserved as-is (this is also the case when unexpected errors
occur in the indexing function with the errors mode set to IGNORED):
import kopf
from typing import Any
@kopf.index('pods')
async def empty_index(**_: Any) -> Any:
pass
# {}
However, if the indexing function returns a dict with None as values,
those values are indexed normally (they are not ignored). None values
can be used as placeholders when only the keys are sufficient; otherwise,
indices and collections that have no values left in them are removed from the index:
import kopf
from typing import Any
@kopf.index('pods')
async def index_of_nones(**_: Any) -> Any:
return {'key': None}
# {'key': [None, None, ...]}
Errors in indexing¶
Indexing functions are expected to be fast and non-blocking, as they can delay the operator startup and resource processing. For this reason, when errors occur in indexing handlers, the handlers are never retried.
Arbitrary exceptions with errors=IGNORED (the default) cause the framework
to ignore the error and keep the existing indexed values (which are now stale).
This means that new values are expected to appear soon, but the old values
are good enough in the meantime (which is usually highly likely). This is the same
as returning None, except that the exception’s stack trace is also logged:
import kopf
from typing import Any
@kopf.index('pods', errors=kopf.ErrorsMode.IGNORED) # the default
async def fn1(**_: Any) -> Any:
raise Exception("Keep the stale values, if any.")
kopf.PermanentError and arbitrary exceptions with errors=PERMANENT
remove any existing indexed values and the resource’s keys from the index,
and exclude the failed resource from future indexing by this index
(so that the indexing function is not even invoked for that resource):
import kopf
from typing import Any
@kopf.index('pods', errors=kopf.ErrorsMode.PERMANENT)
async def fn1(**_: Any) -> Any:
raise Exception("Excluded forever.")
@kopf.index('pods')
async def fn2(**_: Any) -> Any:
raise kopf.PermamentError("Excluded forever.")
kopf.TemporaryError and arbitrary exceptions with errors=TEMPORARY
remove any existing indexed values and the resource’s keys from the index,
and exclude the failed resource from indexing for a specified duration
(via the error’s delay option; set to 0 or None for no delay).
The resource is expected to be reindexed in the future,
but current problems are preventing that from happening:
import kopf
from typing import Any
@kopf.index('pods', errors=kopf.ErrorsMode.TEMPORARY)
async def fn1(**_: Any) -> Any:
raise Exception("Excluded for 60s.")
@kopf.index('pods')
async def fn2(**_: Any) -> Any:
raise kopf.TemporaryError("Excluded for 30s.", delay=30)
In the “temporary” mode, the decorator’s error-handling options are used:
backoff= is the default delay before the resource can be re-indexed
(the default is 60 seconds; use 0 explicitly for no delay);
retries= and timeout= set the retry limit and the overall
duration from the first failure until the resource is marked
as permanently excluded from indexing (unless it succeeds at some point).
The handler kwargs retry, started, and runtime
report the retry attempts since the first indexing failure.
Successful indexing resets all counters and timeouts; the retry state
is not persisted (to save memory).
As with regular handlers (Error handling), Kopf’s error classes (expected errors) only log a short message, while arbitrary exceptions (unexpected errors) also print their stack traces.
This matches the semantics of regular handlers, but with in-memory specifics.
Warning
There is no ideal out-of-the-box default mode for error handling: any kind of error in the indexing functions means the index becomes inconsistent with the actual state of the cluster and its resources. Entries for matching resources are either “lost” (permanent or temporary errors) or contain possibly outdated/stale values (ignored errors) — all of these cases represent misinformation about the actual state of the cluster.
The default mode is chosen to minimize index changes and reindexing in case of frequent errors — by making no changes to the index. Additionally, the stale values may still be relevant and useful to some extent.
For the other two cases, operator developers must explicitly accept the
risks by setting errors= if the operator can afford to lose the keys.
Kwargs safety¶
Indices injected into kwargs overwrite any framework kwargs, both existing and those to be added in the future. This guarantees that new framework versions will not break an operator if new kwargs are added with the same name as existing indices.
The trade-off is that handlers cannot use the new features until their indices are renamed to something else. Since the new features are new, existing operator code does not use them, so this is backwards compatible.
To reduce the probability of name collisions, keep these conventions in mind when naming indices (they are entirely optional and provided for convenience only):
System kwargs are usually one word; name your indices with two or more words.
System kwargs are usually singular (not always); name your indices in the plural.
System kwargs are usually nouns; using abbreviations or prefixes/suffixes (e.g.
cnames,rpods) reduces the probability of collisions.
Performance¶
Indexing can be a CPU- and RAM-intensive operation. The data structures behind indices are designed to be as efficient as possible:
Index lookups are O(1) — as in Python’s
dict.Store updates and deletions are O(1) — a
dictis used internally.Overall updates and deletions are O(k), where “k” is the number of keys per object (not the total number of keys), which is fixed in most cases, so it is O(1).
Neither the number of values stored in the index nor the total number of keys affects performance (in theory).
Some performance may be lost to additional method calls on the user-facing
mappings and collections that hide the internal dict structures.
This is assumed to be negligible compared to the overall code overhead.
Guarantees¶
If an index is declared, there is no need to pre-check for its existence — the index exists immediately, even if it contains no resources.
The indices are guaranteed to be fully populated before any other resource-related handlers are invoked in the operator. As such, even creation handlers or raw event handlers are guaranteed to have a complete indexed overview of the cluster, not just a partial snapshot from the moment they were triggered.
There is no such guarantee for operator-level handlers, such as startup/cleanup, authentication, health probing, or the indexing functions themselves: the indices are available in kwargs but may be empty or only partially populated during the operator’s startup and index pre-population stage. This can affect cleanup, login, and probe handlers if they are invoked at that stage.
However, the indices are safe to pass to threads or tasks for later processing if those threads or tasks are started from the startup handlers mentioned above.
Limitations¶
All in-memory values are lost when the operator restarts; there is no persistence.
In particular, the indices are fully recalculated on operator restart during
the initial listing of resources (equivalent to @kopf.on.event).
On large clusters with thousands of resources, the initial index population can take time, so operator processing will be delayed regardless of whether the handlers use the indices or not (the framework cannot know this ahead of time).
See also
In-memory containers — other in-memory structures with similar limitations.
See also
Indexers and indices are conceptually similar to client-go’s indexers — with all the underlying components implemented inside the framework (“batteries included”).
Precautions for huge clusters¶
Warning
On very large clusters with many resources, it is possible to hit a deadlock with indexing if the worker limit is lower than the number of resources being indexed.
All operator activities for every individual object freeze until the operator has fully indexed the cluster. This means that at startup, there will be as many workers (asyncio tasks) as there are resources matching the indexing criteria (by resource selectors and filters).
To resolve this deadlock, use one of these two solutions:
Keep
settings.queueing.worker_limit=None(default). Workers will still shut down after some time of inactivity (≈5s) and release all system resources.Ensure that
settings.queueing.worker_limitis larger than the number of resources being indexed, plus some headroom.
Also, minimize the number of resources indexed with more precise filters.
Warning
Similarly, on very large clusters with many resources, there will be heavy CPU load if synchronous handlers are used for indexing.
Synchronous handlers are executed in thread pools. If too many resources rush to be indexed at operator startup, the pool can be overwhelmed. Threads are reused and indexing handlers will pass through the pool quickly, since indexing handlers are expected to be fast and purely computational. But this will not help against the initial surge.
Thread pools do not scale down automatically, so the peak thread count will remain throughout the lifetime of the operator.
To resolve the thread explosion problem, use one of these two solutions:
Declare the handlers as
async def, see Async/Await. This eliminates threading from the indexing stage entirely.Set
settings.execution.max_workersto a reasonable number of allowed threads. It can be much lower than the number of resources, but this will slow down the initial indexing proportionally. There is no limit by default (max_workers=None).
Also, minimize the number of resources indexed with more precise filters.
See also
See Configuration for details on these settings.