Timers

Timers are schedules of regular handler execution as long as the object exists, no matter if there were any changes or not — unlike the regular handlers, which are event-driven and are triggered only when something changes.

Intervals

The interval defines how often to trigger the handler (in seconds):

import asyncio
import kopf
import time
from typing import Any

@kopf.timer('kopfexamples', interval=1.0)
def ping_kex(spec: kopf.Spec, **_: Any) -> None:
    pass

Sharpness

Usually (by default), the timers are invoked with the specified interval between each call. The time taken by the handler itself is not taken into account. It is possible to define timers with a sharp schedule: i.e. invoked every number of seconds sharp, no matter how long it takes to execute it:

import asyncio
import kopf
import time
from typing import Any

@kopf.timer('kopfexamples', interval=1.0, sharp=True)
def ping_kex(spec: kopf.Spec, **_: Any) -> None:
    time.sleep(0.3)

In this example, the timer takes 0.3 seconds to execute. The actual interval between the timers will be 0.7 seconds in the sharp mode: whatever is left of the declared interval of 1.0 seconds minus the execution time.

Idling

Timers can be defined to idle if the resource changes too often, and only be invoked when it is stable for some time:

import asyncio
import kopf
from typing import Any

@kopf.timer('kopfexamples', idle=10)
def ping_kex(spec: kopf.Spec, **_: Any) -> None:
    print(f"FIELD={spec['field']}")

The creation of a resource is considered as a change, so idling also shifts the very first invocation by that time.

The default is to have no idle time, just the intervals.

It is possible to have a timer with both idling and interval. In that case, the timer will be invoked only if there were no changes in the resource for the specified duration (idle time), and every N seconds after that (interval) as long as the object does not change. Once changed, the timer will stop and wait for the new idling time:

import asyncio
import kopf
from typing import Any

@kopf.timer('kopfexamples', idle=10, interval=1)
def ping_kex(spec: kopf.Spec, **_: Any) -> None:
    print(f"FIELD={spec['field']}")

Postponing

Normally, timers are invoked immediately once the resource becomes visible to the operator (unless idling is declared).

It is possible to postpone the invocations:

import asyncio
import kopf
import time
from typing import Any

@kopf.timer('kopfexamples', interval=1, initial_delay=5)
def ping_kex(spec: kopf.Spec, **_: Any) -> None:
    print(f"FIELD={spec['field']}")

This is similar to idling, except that it is applied only once per resource/operator lifecycle in the very beginning.

The initial_delay can also be a callable, which accepts the same arguments as the handler itself, and returns the delay in seconds:

import kopf
import random
from typing import Any

def get_delay(body: kopf.Body, **_: Any) -> int:
    return random.randint(
        body.get('spec', {}).get('minDelay', 0),
        body.get('spec', {}).get('maxDelay', 60),
    )

@kopf.timer('kopfexamples', interval=1, initial_delay=get_delay)
def ping_kex(spec: kopf.Spec, **_: Any) -> None:
    ...

This is primarily intended for load balancing during operator restarts (e.g. by using a random delay). If you need more complex or periodic random timing, consider using a daemon with custom sleeps instead of a timer.

Combined timing

It is possible to combine all scheduled intervals to achieve the desired effect. For example, to give an operator 1 minute for warming up, and then pinging the resources every 10 seconds if they are unmodified for 10 minutes:

import kopf
from typing import Any

@kopf.timer('kopfexamples',
            initial_delay=60, interval=10, idle=600)
def ping_kex(spec: kopf.Spec, **_: Any) -> None:
    pass

Errors in timers

The timers follow the standard error handling protocol: TemporaryError and arbitrary exceptions are treated according to the errors, timeout, retries, backoff options of the handler. The kwargs retry, started, runtime are provided too.

The default behavior is to retry an arbitrary error (similar to the regular resource handlers).

When an error happens, its delay overrides the timer’s schedule or life cycle:

  • For arbitrary exceptions, the timer’s backoff=... option is used.

  • For kopf.TemporaryError, the error’s delay=... option is used.

  • For kopf.PermanentError, the timer stops forever and is not retried.

The timer’s own interval is only used if the function exits successfully.

For example, if the handler fails 3 times with a back-off time set to 5 seconds and the interval set to 10 seconds, it will take 25 seconds (3*5+10) from the first execution to the end of the retrying cycle:

import kopf
from typing import Any

@kopf.timer('kopfexamples',
            errors=kopf.ErrorsMode.TEMPORARY, interval=10, backoff=5)
def monitor_kex_by_time(name: str, retry: int, **_: Any) -> None:
    if retry < 3:
        raise Exception()

It will be executed in that order:

  • A new cycle begins: * 1st execution attempt fails (retry == 0). * Waits for 5 seconds (backoff). * 2nd execution attempt fails (retry == 1). * Waits for 5 seconds (backoff). * 3rd execution attempt fails (retry == 2). * Waits for 5 seconds (backoff). * 4th execution attempt succeeds (retry == 3). * Waits for 10 seconds (interval).

  • A new cycle begins: * 5th execution attempt fails (retry == 0).

The timer never overlaps with itself. Though, multiple timers with different interval settings and execution schedules can eventually overlap with each other and with event-driven handlers.

Results delivery

The timers follow the standard results delivery protocol: the returned values are put on the object’s status under the handler’s id as a key.

import kopf
import random
from typing import Any

@kopf.timer('kopfexamples', interval=10)
def ping_kex(spec: kopf.Spec, **_: Any) -> int:
    return random.randint(0, 100)

Note

Whenever a resulting value is serialized and put on the resource’s status, it modifies the resource, which, in turn, resets the idle timer. Use carefully with both idling & returned results.

Patching

Timers can modify the resource via the patch keyword argument, including both the merge-patch dictionary and the transformation functions (see Patching for details).

import asyncio
import kopf
import random
from typing import Any

# Transformation functions and JSON-patches are useful specifically for the lists.
def set_conditions(body: kopf.RawBody) -> None:
    conditions = body.setdefault('status', {}).setdefault('conditions', [])
    conditions[:] = [cond for cond in conditions if cond.get('type') != 'Whatever']
    conditions.append({'type': 'Whatever', 'status': 'True', 'reason': 'SomeReason', 'message': 'Some message'})

@kopf.timer('kopfexamples', interval=60)
async def update_status(patch: kopf.Patch, **_: Any) -> None:
    # This goes to the merge-patch.
    patch.status['replicas'] = random.randint(1, 10)

    # This goes to the JSON-patch.
    patch.fns.append(set_conditions)

The patch is applied after the handler exits on each timer iteration. This includes when the handler raises kopf.TemporaryError for retrying: all changes accumulated in the patch during that attempt are sent to the Kubernetes API before the next retry begins. After the patch is applied, it is cleared for the next iteration.

If a transformation function’s JSON Patch hits a resourceVersion mismatch (HTTP 422), the transformation functions are carried forward and retried on the next iteration — not in the background. The handler can detect this by checking bool(patch) at the start: if it is true before the handler has made any changes, there are pending transformation functions from a previous iteration.

Filtering

It is also possible to use the existing Filtering:

import kopf
from typing import Any

@kopf.timer('kopfexamples', interval=10,
            annotations={'some-annotation': 'some-value'},
            labels={'some-label': 'some-value'},
            when=lambda name, **_: 'some' in name)
def ping_kex(spec: kopf.Spec, **_: Any) -> None:
    pass

System resources

Warning

Timers are implemented the same way as asynchronous daemons (see Daemons) — via asyncio tasks for every resource & handler.

Although OS threads are not involved until the synchronous functions are invoked (through the asyncio executors), this can lead to significant OS resource usage on large clusters with thousands of resources.

Make sure you only have daemons and timers with appropriate filters (e.g., by labels, annotations, or so).