================== In-memory indexing ================== Indexers automatically maintain in-memory overviews of resources (indices), grouped by keys that are usually calculated based on these resources. The indices can be used for cross-resource awareness: e.g., when a resource of kind X is changed, it can get all the information about all resources of kind Y without talking to the Kubernetes API. Under the hood, the centralised watch streams --- one per resource kind --- are more efficient at gathering information than individual listing requests. Index declaration ================= Indices are declared with a ``@kopf.index`` decorator on an indexing function (all standard filters are supported --- see :doc:`filters`): .. code-block:: python import kopf from typing import Any @kopf.index('pods') async def my_idx(**_: Any) -> Any: ... The name of the function or its ``id=`` option is the index's name. The indices are then available to all resource- and operator-level handlers as direct kwargs named the same as the index (type hints are optional): .. code-block:: python import kopf from typing import Any # ... continued from previous examples: @kopf.timer('KopfExample', interval=5) def tick(my_idx: kopf.Index, **_: Any) -> None: ... @kopf.on.probe() def metric(my_idx: kopf.Index, **_: Any) -> Any: ... When a resource is created or starts matching the filters, it is processed by all relevant indexing functions, and the result is put into the indices. When a previously indexed resource is deleted or stops matching the filters, all associated values are removed (as are all empty collections after that --- to keep the indices clean). .. seealso:: :doc:`/probing` for probing handlers in the example above. Index structure =============== An index is always a read-only *mapping* of type :class:`kopf.Index` with arbitrary keys pointing to *collections* of type :class:`kopf.Store`, which in turn contain arbitrary values generated by the indexing functions. The index is initially empty. Collections are never empty: they are removed when their last item is removed. For example, if several individual resources return the following results from the same indexing function, then the index gets the following structure (shown in the comment below the code): .. code-block:: python return {'key1': 'valueA'} # 1st return {'key1': 'valueB'} # 2nd return {'key2': 'valueC'} # 3rd # {'key1': ['valueA', 'valueB'], # 'key2': ['valueC']} Indices are not nested. A 2nd-level mapping in the result is stored as a regular value: .. code-block:: python return {'key1': 'valueA'} # 1st return {'key1': 'valueB'} # 2nd return {'key2': {'key3': 'valueC'}} # 3rd # {'key1': ['valueA', 'valueB'], # 'key2': [{'key3': 'valueC'}]} Index content ============= When an indexing function returns a ``dict`` (strictly ``dict`` --- not a generic mapping, not even a subclass of ``dict``, such as :class:`kopf.Memo`), it is merged into the index under the key taken from the result: .. code-block:: python import kopf from typing import Any @kopf.index('pods') async def string_keys(namespace: str | None, name: str, **_: Any) -> Any: return {namespace: name} # {'namespace1': ['pod1a', 'pod1b', ...], # 'namespace2': ['pod2a', 'pod2b', ...], # ...] Multi-value keys are possible using tuples or other hashable types: .. code-block:: python import kopf from typing import Any @kopf.index('pods') async def tuple_keys(namespace: str | None, name: str, **_: Any) -> Any: return {(namespace, name): 'hello'} # {('namespace1', 'pod1a'): ['hello'], # ('namespace1', 'pod1b'): ['hello'], # ('namespace2': 'pod2a'): ['hello'], # ('namespace2', 'pod2b'): ['hello'], # ...} Multiple keys can be returned at once for a single resource, and they are all merged into their respective places in the index: .. code-block:: python import kopf from typing import Any @kopf.index('pods') async def by_label(labels: kopf.Labels, name: str, **_: Any) -> Any: return {(label, value): name for label, value in labels.items()} # {('label1', 'value1a'): ['pod1', 'pod2', ...], # ('label1', 'value1b'): ['pod3', 'pod4', ...], # ('label2', 'value2a'): ['pod5', 'pod6', ...], # ('label2', 'value2b'): ['pod1', 'pod3', ...], # ...} @kopf.timer('kex', interval=5) def tick(by_label: kopf.Index, **_: Any) -> None: print(list(by_label.get(('label2', 'value2b'), []))) # ['pod1', 'pod3'] for podname in by_label.get(('label2', 'value2b'), []): print(f"==> {podname}") # ==> pod1 # ==> pod3 Note the multiple occurrences of some pods because they have two or more labels. However, they never repeat within the same label --- a label can have only one value. Recipes ======= Unindexed collections --------------------- When an indexing function returns a non-``dict`` value --- i.e. strings, numbers, tuples, lists, sets, memos, or any other object except ``dict`` --- the key is assumed to be ``None`` and a flat index with only one key is constructed. The resources are not indexed by key, but rather collected under the same key (which is still considered indexing): .. code-block:: python import kopf from typing import Any @kopf.index('pods') async def pod_names(name: str, **_: Any) -> Any: return name # {None: ['pod1', 'pod2', ...]} Other types and complex objects returned from the indexing function are stored as-is (i.e. with no special treatment): .. code-block:: python import kopf from typing import Any @kopf.index('pods') async def container_names(spec: kopf.Spec, **_: Any) -> Any: return {container['name'] for container in spec.get('containers', [])} # {None: [{'main1', 'sidecar2'}, {'main2'}, ...]} Enumerating resources --------------------- If the goal is not to store any payload but only to list existing resources, index the resources' identities (usually their namespaces and names). One way is to collect their identities in a flat collection --- useful when you mostly need to iterate over all of them without key lookups: .. code-block:: python import kopf from typing import Any @kopf.index('pods') async def pods_list(namespace: str | None, name: str, **_: Any) -> Any: return namespace, name # {None: [('namespace1', 'pod1a'), # ('namespace1', 'pod1b'), # ('namespace2', 'pod2a'), # ('namespace2', 'pod2b'), # ...]} @kopf.timer('kopfexamples', interval=5) def tick_list(pods_list: kopf.Index, **_: Any) -> None: for ns, name in pods_list.get(None, []): print(f"{ns}::{name}") Another way is to index them by key --- when index lookups happen more often than full iterations: .. code-block:: python import kopf from typing import Any @kopf.index('pods') async def pods_dict(namespace: str | None, name: str, **_: Any) -> Any: return {(namespace, name): None} # {('namespace1', 'pod1a'): [None], # ('namespace1', 'pod1b'): [None], # ('namespace2', 'pod2a'): [None], # ('namespace2', 'pod2b'): [None], # ...} @kopf.timer('kopfexamples', interval=5) def tick_dict(pods_dict: kopf.Index, spec: kopf.Spec, namespace: str | None, **_: Any) -> None: monitored_namespace = spec.get('monitoredNamespace', namespace) for ns, name in pods_dict: if ns == monitored_namespace: print(f"in {ns}: {name}") Mirroring resources ------------------- To store the entire resource or its essential parts, return them explicitly: .. code-block:: python import kopf from typing import Any @kopf.index('deployments') async def whole_deployments(name: str, namespace: str | None, body: kopf.Body, **_: Any) -> Any: return {(namespace, name): body} @kopf.timer('kopfexamples', interval=5) def tick(whole_deployments: kopf.Index, **_: Any) -> None: deployment, *_ = whole_deployments[('kube-system', 'coredns')] actual = deployment.status.get('replicas') desired = deployment.spec.get('replicas') print(f"{deployment.meta.name}: {actual}/{desired}") .. note:: Be mindful of memory consumption on large clusters and/or overly verbose objects. Pay particular attention to memory consumption from "managed fields" (see `kubernetes/kubernetes#90066`__). __ https://github.com/kubernetes/kubernetes/issues/90066 Indices of indices ------------------ Iterating over all keys of an index can be slow (especially when there are many keys --- e.g. with thousands of pods). In such cases, an index of an index can be built: one primary index contains the real values to be used, while a secondary index contains only the keys of the primary index (in full or in part). By looking up a single key in the secondary index, the operator can directly obtain or reconstruct all the necessary keys in the primary index instead of iterating over the primary index with filtering. For example, suppose you want to get all container names of all pods in a namespace. In that case, the primary index indexes containers by pods' namespace and name, while the secondary index indexes pod names by namespace only: .. code-block:: python import kopf from typing import Any @kopf.index('pods') async def primary(namespace: str | None, name: str, spec: kopf.Spec, **_: Any) -> Any: container_names = {container['name'] for container in spec['containers']} return {(namespace, name): container_names} # {('namespace1', 'pod1a'): [{'main'}], # ('namespace1', 'pod1b'): [{'main', 'sidecar'}], # ('namespace2', 'pod2a'): [{'main'}], # ('namespace2', 'pod2b'): [{'the-only-one'}], # ...} @kopf.index('pods') async def secondary(namespace: str | None, name: str, **_: Any) -> Any: return {namespace: name} # {'namespace1': ['pod1a', 'pod1b'], # 'namespace2': ['pod2a', 'pod2b'], # ...} @kopf.timer('kopfexamples', interval=5) def tick(primary: kopf.Index, secondary: kopf.Index, spec: kopf.Spec, **_: Any) -> None: namespace_containers: set[str] = set() monitored_namespace = spec.get('monitoredNamespace', 'default') for pod_name in secondary.get(monitored_namespace, []): reconstructed_key = (monitored_namespace, pod_name) pod_containers, *_ = primary[reconstructed_key] namespace_containers |= pod_containers print(f"containers in {monitored_namespace}: {namespace_containers}") # containers in namespace1: {'main', 'sidecar'} # containers in namespace2: {'main', 'the-only-one'} However, such complex structures and performance requirements are rare. For simplicity and performance, nested indices are not directly provided by the framework as a built-in feature, only as this tip based on other official features. Conditional indexing ==================== Besides the usual filters (see :doc:`/filters`), resources can be skipped from indexing by returning ``None`` (Python's default return value for functions with no result). If the indexing function returns ``None`` or does not return anything, its result is ignored and nothing is indexed. The existing values in the index are preserved as-is (this is also the case when unexpected errors occur in the indexing function with the errors mode set to ``IGNORED``): .. code-block:: python import kopf from typing import Any @kopf.index('pods') async def empty_index(**_: Any) -> Any: pass # {} However, if the indexing function returns a dict with ``None`` as values, those values are indexed normally (they are not ignored). ``None`` values can be used as placeholders when only the keys are sufficient; otherwise, indices and collections that have no values left in them are removed from the index: .. code-block:: python import kopf from typing import Any @kopf.index('pods') async def index_of_nones(**_: Any) -> Any: return {'key': None} # {'key': [None, None, ...]} Errors in indexing ================== Indexing functions are expected to be fast and non-blocking, as they can delay the operator startup and resource processing. For this reason, when errors occur in indexing handlers, the handlers are never retried. Arbitrary exceptions with ``errors=IGNORED`` (the default) cause the framework to ignore the error and keep the existing indexed values (which are now stale). This means that new values are expected to appear soon, but the old values are good enough in the meantime (which is usually highly likely). This is the same as returning ``None``, except that the exception's stack trace is also logged: .. code-block:: python import kopf from typing import Any @kopf.index('pods', errors=kopf.ErrorsMode.IGNORED) # the default async def fn1(**_: Any) -> Any: raise Exception("Keep the stale values, if any.") :class:`kopf.PermanentError` and arbitrary exceptions with ``errors=PERMANENT`` remove any existing indexed values and the resource's keys from the index, and exclude the failed resource from future indexing by this index (so that the indexing function is not even invoked for that resource): .. code-block:: python import kopf from typing import Any @kopf.index('pods', errors=kopf.ErrorsMode.PERMANENT) async def fn1(**_: Any) -> Any: raise Exception("Excluded forever.") @kopf.index('pods') async def fn2(**_: Any) -> Any: raise kopf.PermamentError("Excluded forever.") :class:`kopf.TemporaryError` and arbitrary exceptions with ``errors=TEMPORARY`` remove any existing indexed values and the resource's keys from the index, and exclude the failed resource from indexing for a specified duration (via the error's ``delay`` option; set to ``0`` or ``None`` for no delay). The resource is expected to be reindexed in the future, but current problems are preventing that from happening: .. code-block:: python import kopf from typing import Any @kopf.index('pods', errors=kopf.ErrorsMode.TEMPORARY) async def fn1(**_: Any) -> Any: raise Exception("Excluded for 60s.") @kopf.index('pods') async def fn2(**_: Any) -> Any: raise kopf.TemporaryError("Excluded for 30s.", delay=30) In the "temporary" mode, the decorator's error-handling options are used: ``backoff=`` is the default delay before the resource can be re-indexed (the default is 60 seconds; use ``0`` explicitly for no delay); ``retries=`` and ``timeout=`` set the retry limit and the overall duration from the first failure until the resource is marked as permanently excluded from indexing (unless it succeeds at some point). The handler kwargs :kwarg:`retry`, :kwarg:`started`, and :kwarg:`runtime` report the retry attempts since the first indexing failure. Successful indexing resets all counters and timeouts; the retry state is not persisted (to save memory). As with regular handlers (:doc:`errors`), Kopf's error classes (expected errors) only log a short message, while arbitrary exceptions (unexpected errors) also print their stack traces. This matches the semantics of regular handlers, but with in-memory specifics. .. warning:: **There is no ideal out-of-the-box default mode for error handling:** any kind of error in the indexing functions means the index becomes inconsistent with the actual state of the cluster and its resources. Entries for matching resources are either "lost" (permanent or temporary errors) or contain possibly outdated/stale values (ignored errors) --- all of these cases represent misinformation about the actual state of the cluster. The default mode is chosen to minimize index changes and reindexing in case of frequent errors --- by making no changes to the index. Additionally, the stale values may still be relevant and useful to some extent. For the other two cases, operator developers must explicitly accept the risks by setting ``errors=`` if the operator can afford to lose the keys. Kwargs safety ============= Indices injected into kwargs overwrite any framework kwargs, both existing and those to be added in the future. This guarantees that new framework versions will not break an operator if new kwargs are added with the same name as existing indices. The trade-off is that handlers cannot use the new features until their indices are renamed to something else. Since the new features are new, existing operator code does not use them, so this is backwards compatible. To reduce the probability of name collisions, keep these conventions in mind when naming indices (they are entirely optional and provided for convenience only): * System kwargs are usually one word; name your indices with two or more words. * System kwargs are usually singular (not always); name your indices in the plural. * System kwargs are usually nouns; using abbreviations or prefixes/suffixes (e.g. ``cnames``, ``rpods``) reduces the probability of collisions. Performance =========== Indexing can be a CPU- and RAM-intensive operation. The data structures behind indices are designed to be as efficient as possible: * Index lookups are O(1) --- as in Python's ``dict``. * Store updates and deletions are O(1) --- a ``dict`` is used internally. * Overall updates and deletions are O(k), where "k" is the number of keys per object (not the total number of keys), which is fixed in most cases, so it is O(1). Neither the number of values stored in the index nor the total number of keys affects performance (in theory). Some performance may be lost to additional method calls on the user-facing mappings and collections that hide the internal ``dict`` structures. This is assumed to be negligible compared to the overall code overhead. Guarantees ========== If an index is declared, there is no need to pre-check for its existence --- the index exists immediately, even if it contains no resources. The indices are guaranteed to be fully populated before any other resource-related handlers are invoked in the operator. As such, even creation handlers or raw event handlers are guaranteed to have a complete indexed overview of the cluster, not just a partial snapshot from the moment they were triggered. There is no such guarantee for operator-level handlers, such as startup/cleanup, authentication, health probing, or the indexing functions themselves: the indices are available in kwargs but may be empty or only partially populated during the operator's startup and index pre-population stage. This can affect cleanup, login, and probe handlers if they are invoked at that stage. However, the indices are safe to pass to threads or tasks for later processing if those threads or tasks are started from the startup handlers mentioned above. Limitations =========== All in-memory values are lost when the operator restarts; there is no persistence. In particular, the indices are fully recalculated on operator restart during the initial listing of resources (equivalent to ``@kopf.on.event``). On large clusters with thousands of resources, the initial index population can take time, so operator processing will be delayed regardless of whether the handlers use the indices or not (the framework cannot know this ahead of time). .. seealso:: :doc:`/memos` --- other in-memory structures with similar limitations. .. seealso:: Indexers and indices are conceptually similar to `client-go's indexers`__ --- with all the underlying components implemented inside the framework ("batteries included"). __ https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md Precautions for huge clusters ============================= .. warning:: On very large clusters with many resources, it is possible to hit a deadlock with indexing if the worker limit is lower than the number of resources being indexed. All operator activities for every individual object freeze until the operator has fully indexed the cluster. This means that at startup, there will be as many workers (asyncio tasks) as there are resources matching the indexing criteria (by resource selectors and filters). To resolve this deadlock, use one of these two solutions: - Keep ``settings.queueing.worker_limit=None`` (default). Workers will still shut down after some time of inactivity (≈5s) and release all system resources. - Ensure that ``settings.queueing.worker_limit`` is larger than the number of resources being indexed, plus some headroom. Also, minimize the number of resources indexed with more precise filters. .. warning:: Similarly, on very large clusters with many resources, there will be heavy CPU load if synchronous handlers are used for indexing. Synchronous handlers are executed in thread pools. If too many resources rush to be indexed at operator startup, the pool can be overwhelmed. Threads are reused and indexing handlers will pass through the pool quickly, since indexing handlers are expected to be fast and purely computational. But this will not help against the initial surge. Thread pools do not scale down automatically, so the peak thread count will remain throughout the lifetime of the operator. To resolve the thread explosion problem, use one of these two solutions: - Declare the handlers as ``async def``, see :doc:`async`. This eliminates threading from the indexing stage entirely. - Set ``settings.execution.max_workers`` to a reasonable number of allowed threads. It can be much lower than the number of resources, but this will slow down the initial indexing proportionally. There is no limit by default (``max_workers=None``). Also, minimize the number of resources indexed with more precise filters. .. seealso:: See :doc:`configuration` for details on these settings.