Daemons¶
Daemons are a special type of handlers for background logic that accompanies the Kubernetes resources during their life cycle.
Unlike event-driven short-running handlers declared with @kopf.on,
daemons are started for every individual object when it is created
(or when an operator is started/restarted while the object exists),
and are capable of running indefinitely.
The object’s daemons are stopped when the object is deleted or the whole operator is exiting/restarting.
Spawning¶
To have a daemon accompanying a resource of some kind, decorate a function
with @kopf.daemon and make it run for a long time or forever:
import asyncio
import kopf
import time
from typing import Any
@kopf.daemon('kopfexamples')
async def monitor_kex_async(**_: Any) -> None:
while True:
... # check something
await asyncio.sleep(10)
@kopf.daemon('kopfexamples')
def monitor_kex_sync(stopped: kopf.DaemonStopped, **_: Any) -> None:
while not stopped:
... # check something
time.sleep(10)
Synchronous functions are executed in threads, asynchronous functions are executed directly in the asyncio event loop of the operator — same as with regular handlers. See Async/Await.
The same executor is used both for regular sync handlers and for sync daemons. If you expect a large number of synchronous daemons (e.g. for large clusters), make sure to pre-scale the executor accordingly. See Configuration (Synchronous handlers).
Termination¶
The daemons are terminated when either their resource is marked for deletion, or the operator itself is exiting or pausing (see Peering).
In both cases, Kopf requests all daemons to terminate gracefully by setting
the stopped kwarg. The synchronous daemons MUST, and asynchronous
daemons SHOULD check for the value of this flag as often as possible:
import asyncio
import kopf
from typing import Any
@kopf.daemon('kopfexamples')
def monitor_kex(stopped: kopf.DaemonStopped, **_: Any) -> None:
while not stopped:
time.sleep(1.0)
print("We are done. Bye.")
The asynchronous daemons can skip these checks if they define the cancellation
timeout. In that case, they can expect an asyncio.CancelledError
raised at any point of their code (specifically, at any await clause):
import asyncio
import kopf
from typing import Any
@kopf.daemon('kopfexamples', cancellation_timeout=1.0)
async def monitor_kex(**_: Any) -> None:
try:
while True:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("We are done. Bye.")
With no cancellation timeout set, cancellation is not performed at all, as it is unclear how long the coroutine should be awaited. However, it is cancelled when the operator exits and stops all “hung” left-over tasks (not specifically daemons).
Timeouts¶
The termination sequence parameters can be controlled when declaring a daemon:
import asyncio
import kopf
from typing import Any
@kopf.daemon('kopfexamples',
cancellation_backoff=1.0, cancellation_timeout=3.0)
async def monitor_kex(stopped: kopf.DaemonStopped, **_: Any) -> None:
while not stopped:
await asyncio.sleep(1)
There are three stages of how the daemon is terminated:
Graceful termination:
stoppedis set immediately (unconditionally).cancellation_backoffis awaited (if set).
Forced termination — only if
cancellation_timeoutis set:
asyncio.CancelledErroris raised (for async daemons only).cancellation_timeoutis awaited (if set).
3a. Giving up and abandoning — only if
cancellation_timeoutis set:A
ResourceWarningis issued for potential OS resource leaks.The finalizer is removed, and the object is released for potential deletion.
3b. Forever polling — only if
cancellation_timeoutis not set:The daemon awaiting continues forever, logging from time to time.
The finalizer is not removed and the object remains blocked from deletion.
The cancellation_timeout is measured from the point when the daemon
is cancelled (forced termination begins), not from where the termination
itself begins; i.e., since the moment when the cancellation backoff is over.
The total termination time is cancellation_backoff + cancellation_timeout.
Warning
When the operator is terminating, it has its timeout of 5 seconds for all “hung” tasks. This includes the daemons after they are requested to finish gracefully and all timeouts are reached.
If the daemon termination takes longer than this for any reason, the daemon will be cancelled (by the operator, not by the daemon guard) regardless of the graceful timeout of the daemon. If this does not help, the operator will be waiting for all hung tasks until SIGKILL’ed.
Warning
If the operator is running in a cluster, there can be timeouts set for a pod
(terminationGracePeriodSeconds, the default is 30 seconds).
If the daemon termination is longer than this timeout, the daemons will not be finished in full at the operator exit, as the pod will be SIGKILL’ed.
Kopf itself does not set any implicit timeouts for the daemons.
Either design the daemons to exit as fast as possible, or configure
terminationGracePeriodSeconds and cancellation timeouts accordingly.
Safe sleep¶
For synchronous daemons, it is recommended to use stopped.wait()
instead of time.sleep(): the wait will end when either the time is reached
(as with the sleep), or immediately when the stopped flag is set:
import kopf
from typing import Any
@kopf.daemon('kopfexamples')
def monitor_kex(stopped: kopf.DaemonStopped, **_: Any) -> None:
while not stopped:
stopped.wait(10)
For asynchronous handlers, regular asyncio.sleep() should be sufficient,
as it is cancellable via asyncio.CancelledError. If a cancellation
is neither configured nor desired, stopped.wait() can be used too
(with await):
import kopf
from typing import Any
@kopf.daemon('kopfexamples')
async def monitor_kex(stopped: kopf.DaemonStopped, **_: Any) -> None:
while not stopped:
await stopped.wait(10)
This way, the daemon will exit as soon as possible when the stopped
is set, not when the next sleep is over. Therefore, the sleeps can be of any
duration while the daemon remains terminable (leads to no OS resource leakage).
Note
Synchronous and asynchronous daemons get different types of stop-checker:
with synchronous and asynchronous interfaces respectively.
Therefore, they should be used accordingly: without or with await.
Postponing¶
Normally, daemons are spawned immediately once a resource becomes visible to the operator: i.e. on resource creation or operator startup.
It is possible to postpone the daemon spawning:
import asyncio
import kopf
from typing import Any
@kopf.daemon('kopfexamples', initial_delay=30)
async def monitor_kex(stopped: kopf.DaemonStopped, **_: Any) -> None:
while True:
await asyncio.sleep(1.0)
The start of the daemon will be delayed by 30 seconds after the resource creation (or operator startup). For example, this can be used to give some time for regular event-driven handlers to finish without producing too much activity.
The initial_delay can also be a callable, which accepts the same arguments
as the handler itself, and returns the delay in seconds:
import kopf
import random
from typing import Any
def get_delay(body: kopf.Body, **_: Any) -> int:
return random.randint(
body.get('spec', {}).get('minDelay', 0),
body.get('spec', {}).get('maxDelay', 60),
)
@kopf.daemon('kopfexamples', initial_delay=get_delay)
async def monitor_kex(stopped: kopf.DaemonStopped, **_: Any) -> None:
...
This is primarily intended for load balancing during operator restarts (e.g. by using a random delay). If you need more complex or periodic random timing, consider using a daemon with custom sleeps instead of a timer.
Restarting¶
It is generally expected that daemons are designed to run forever. However, a daemon can exit prematurely, i.e. before the resource is deleted or the operator terminates.
In that case, the daemon will not be restarted again during the lifecycle of this resource in this operator process (however, it will be spawned again if the operator restarts). This way, it becomes a long-running equivalent of on-creation/on-resuming handlers.
To simulate restarting, raise kopf.TemporaryError with a delay set.
import asyncio
import kopf
from typing import Any
@kopf.daemon('kopfexamples')
async def monitor_kex(stopped: kopf.DaemonStopped, **_: Any) -> None:
await asyncio.sleep(10.0)
raise kopf.TemporaryError("Need to restart.", delay=10)
Same as with regular error handling, a delay of None means instant restart.
See also: Excluding handlers forever to prevent daemons from spawning across operator restarts.
Deletion prevention¶
Normally, a finalizer is put on the resource if there are daemons running for it — to prevent its actual deletion until all the daemons are terminated.
Only after the daemons are terminated, the finalizer is removed to release the object for actual deletion.
However, it is possible to have daemons that disobey the exiting signals and continue running after the timeouts. In that case, the finalizer is removed anyway, and the orphaned daemons are left to themselves.
Resource fields access¶
The resource’s current state is accessible at any time through regular kwargs
(see Arguments): body, spec, meta,
status, uid, name, namespace, etc.
The values are “live views” of the current state of the object as it is being modified during its lifecycle (not frozen as in the event-driven handlers):
import kopf
import random
import time
from typing import Any
@kopf.daemon('kopfexamples')
def monitor_kex(stopped: kopf.DaemonStopped, logger: kopf.Logger, body: kopf.Body, spec: kopf.Spec, **_: Any) -> None:
while not stopped:
logger.info(f"FIELD={spec['field']}")
time.sleep(1)
@kopf.timer('kopfexamples', interval=2.5)
def modify_kex_sometimes(patch: kopf.Patch, **_: Any) -> None:
patch.spec['field'] = random.randint(0, 100)
Always access the fields through the provided kwargs, and do not store them in local variables. Internally, Kopf substitutes the whole object’s body on every external change. Storing the field values to the variables will remember their value as it was at that moment in time, and will not be updated as the object changes.
Error handling¶
The error handling is the same as for all other handlers: see Error handling:
@kopf.daemon('kopfexamples',
errors=kopf.ErrorsMode.TEMPORARY, backoff=1, retries=10)
def monitor_kex(retry: int, **_: Any) -> None:
if retry < 3:
raise kopf.TemporaryError("I'll be back!", delay=1)
elif retry < 5:
raise EnvironmentError("Something happened!")
else:
raise kopf.PermanentError("Bye-bye!")
If a permanent error is raised, the daemon will never be restarted again. Same as when the daemon exits on its own (but this could be reconsidered in the future).
Results delivery¶
As with any other handlers, the daemons can return arbitrary JSON-serializable values to be put on the resource’s status:
import asyncio
import kopf
from typing import Any
@kopf.daemon('kopfexamples')
async def monitor_kex(stopped: kopf.DaemonStopped, **_: Any) -> dict[str, bool]:
await asyncio.sleep(10.0)
return {'finished': True}
Patching¶
Daemons can modify the resource via the patch keyword argument,
including both the merge-patch dictionary and the transformation functions
(see Patching for details).
import asyncio
import kopf
import random
from typing import Any
# Transformation functions and JSON-patches are useful specifically for the lists.
def set_conditions(body: kopf.RawBody) -> None:
conditions = body.setdefault('status', {}).setdefault('conditions', [])
conditions[:] = [cond for cond in conditions if cond.get('type') != 'Whatever']
conditions.append({'type': 'Whatever', 'status': 'True', 'reason': 'SomeReason', 'message': 'Some message'})
@kopf.daemon('kopfexamples')
async def update_status(stopped: kopf.DaemonStopped, patch: kopf.Patch, **_: Any) -> None:
# This goes to the merge-patch.
patch.status['replicas'] = random.randint(1, 10)
# This goes to the JSON-patch.
patch.fns.append(set_conditions)
# Exit the daemon so that it restarts again (otherwise exits forever).
raise kopf.TemporaryError("retry a bit later", delay=5)
The patch is applied after the handler exits on each iteration of the run loop.
This includes when the handler raises kopf.TemporaryError for retrying:
all changes accumulated in the patch during that attempt are sent to
the Kubernetes API before the next retry begins.
After the patch is applied, it is cleared for the next iteration.
If a transformation function’s JSON Patch hits a resourceVersion mismatch
(HTTP 422), the transformation functions are carried forward and retried
on the next iteration — not in the background. The handler can detect this
by checking bool(patch) at the start: if it is true before the handler
has made any changes, there are pending transformation functions from
a previous iteration.
Filtering¶
It is also possible to use the existing Filtering to only spawn daemons for specific resources:
import kopf
import time
from typing import Any
@kopf.daemon('kopfexamples',
annotations={'some-annotation': 'some-value'},
labels={'some-label': 'some-value'},
when=lambda name, **_: 'some' in name)
def monitor_selected_kexes(stopped: kopf.DaemonStopped, **_: Any) -> None:
while not stopped:
time.sleep(1)
Other (non-matching) resources of that kind will be ignored.
The daemons will be executed only while the filtering criteria are met.
Both the resource’s state and the criteria can be highly dynamic (e.g.
due to when= callable filters or labels/annotations value callbacks).
Once the daemon stops matching the criteria (either because the resource
or the criteria have been changed (e.g. for when= callbacks)),
the daemon is stopped. Once it matches the criteria again, it is re-spawned.
The checking is done only when the resource changes (any watch-event arrives). The criteria themselves are not re-evaluated if nothing changes.
Warning
A daemon that is terminating is considered as still running, therefore it will not be re-spawned until it fully terminates. It will be re-spawned the next time a watch-event arrives after the daemon has truly exited.
System resources¶
Warning
A separate OS thread or asyncio task is started for each resource and each handler.
Having hundreds or thousands of OS threads or asyncio tasks can consume system resources significantly. Make sure you only have daemons and timers with appropriate filters (e.g., by labels, annotations, or so).
For the same reason, prefer to use async handlers (with properly designed async/await code), since asyncio tasks are somewhat cheaper than threads. See Async/Await for details.