katsdptelstate.aio package

Submodules

katsdptelstate.aio.backend module

class katsdptelstate.aio.backend.Backend

Bases: abc.ABC

Low-level interface for telescope state backends.

The backend interface does not deal with namespaces or encodings, which are handled by the frontend TelescopeState class. A backend must be able to store the same types as TelescopeState, but keys and values will be bytes rather than arbitrary Python objects.

abstract async exists(key: bytes)bool

Return if key is in the backend.

abstract async keys(filter: bytes)List[bytes]

Return all keys matching filter.

The filter is a redis pattern. Backends might only support b'*' as a filter.

abstract async delete(key: bytes)None

Delete a key (no-op if it does not exist)

abstract async clear()None

Remove all keys

abstract async key_type(key: bytes)Optional[katsdptelstate.utils.KeyType]

Get type of key, or None if it does not exist.

abstract async set_immutable(key: bytes, value: bytes)Optional[bytes]

Set the value of an immutable key.

If the key already exists (and is immutable), returns the existing value and does not update it. Otherwise, returns None.

Raises

ImmutableKeyError – If the key exists and is not immutable.

abstract async get(key: bytes)Union[Tuple[None, None], Tuple[bytes, None], Tuple[bytes, float], Tuple[Dict[bytes, bytes], None]]

Get the value and timestamp of a key.

The return value depends on the key type:

immutable

The value.

mutable

The most recent value.

indexed

A dictionary of all values (with undefined iteration order).

absent

None

The timestamp will be None for types other than mutable.

abstract async add_mutable(key: bytes, value: bytes, timestamp: float)None

Set a (value, timestamp) pair in a mutable key.

The timestamp will be a non-negative float value.

Raises

ImmutableKeyError – If the key exists and is not mutable

abstract async set_indexed(key: bytes, sub_key: bytes, value: bytes)Optional[bytes]

Add value in an indexed immutable key.

If the sub-key already exists, returns the existing value and does not update it. Otherwise, returns None.

Raises

ImmutableKeyError – If the key exists and is not indexed.

abstract async get_indexed(key: bytes, sub_key: bytes)Optional[bytes]

Get the value of an indexed immutable key.

Returns None if the key exists but the sub-key does not exist.

Raises
  • KeyError – If the key does not exist.

  • ImmutableKeyError – If the key exists and is not indexed.

abstract async get_range(key: bytes, start_time: float, end_time: float, include_previous: bool, include_end: bool)Optional[List[Tuple[bytes, float]]]

Obtain a range of values from a mutable key.

If the key does not exist, returns None.

Parameters
  • key (bytes) – Key to search

  • start_time (float) – Start of the range (inclusive).

  • end_time (float) – End of the range. It is guaranteed to be non-negative.

  • include_previous (bool) – If true, also return the last entry prior to start_time.

  • include_end (bool) – If true, treat end_time as inclusive, otherwise exclusive.

Raises

ImmutableKeyError – If the key exists and is not mutable

abstract async dump(key: bytes)Optional[bytes]

Return a key in the same format as the Redis DUMP command, or None if not present.

abstract monitor_keys(keys: Iterable[bytes])AsyncGenerator[katsdptelstate.backend.KeyUpdateBase, None]

Report changes to keys in keys.

Returns an asynchronous iterator that yields an infinite stream of update notifications. When no longer needed it should be closed.

abstract close()None

Start shutting down the connection to the backing storage.

abstract async wait_closed()None

Wait until the shutdown started by close() has completed.

katsdptelstate.aio.memory module

class katsdptelstate.aio.memory.MemoryBackend

Bases: katsdptelstate.aio.backend.Backend

Telescope state backend that keeps data in memory.

See katsdptelstate.memory.MemoryBackend for details. This class is a thin asynchronous wrapper around that version.

to_sync()katsdptelstate.memory.MemoryBackend

Get a synchronous backend with the same underlying data.

static from_sync(sync: katsdptelstate.memory.MemoryBackend)katsdptelstate.aio.memory.MemoryBackend

Create an asynchronous backend that shares data with a synchronous one.

async exists(key: bytes)bool

Return if key is in the backend.

async keys(filter: bytes)List[bytes]

Return all keys matching filter.

The filter is a redis pattern. Backends might only support b'*' as a filter.

async delete(key: bytes)None

Delete a key (no-op if it does not exist)

async clear()None

Remove all keys

async key_type(key: bytes)Optional[katsdptelstate.utils.KeyType]

Get type of key, or None if it does not exist.

async set_immutable(key: bytes, value: bytes)Optional[bytes]

Set the value of an immutable key.

If the key already exists (and is immutable), returns the existing value and does not update it. Otherwise, returns None.

Raises

ImmutableKeyError – If the key exists and is not immutable.

async get(key: bytes)Union[Tuple[None, None], Tuple[bytes, None], Tuple[bytes, float], Tuple[Dict[bytes, bytes], None]]

Get the value and timestamp of a key.

The return value depends on the key type:

immutable

The value.

mutable

The most recent value.

indexed

A dictionary of all values (with undefined iteration order).

absent

None

The timestamp will be None for types other than mutable.

async add_mutable(key: bytes, value: bytes, timestamp: float)None

Set a (value, timestamp) pair in a mutable key.

The timestamp will be a non-negative float value.

Raises

ImmutableKeyError – If the key exists and is not mutable

async set_indexed(key: bytes, sub_key: bytes, value: bytes)Optional[bytes]

Add value in an indexed immutable key.

If the sub-key already exists, returns the existing value and does not update it. Otherwise, returns None.

Raises

ImmutableKeyError – If the key exists and is not indexed.

async get_indexed(key: bytes, sub_key: bytes)Optional[bytes]

Get the value of an indexed immutable key.

Returns None if the key exists but the sub-key does not exist.

Raises
  • KeyError – If the key does not exist.

  • ImmutableKeyError – If the key exists and is not indexed.

async get_range(key: bytes, start_time: float, end_time: float, include_previous: bool, include_end: bool)Optional[List[Tuple[bytes, float]]]

Obtain a range of values from a mutable key.

If the key does not exist, returns None.

Parameters
  • key (bytes) – Key to search

  • start_time (float) – Start of the range (inclusive).

  • end_time (float) – End of the range. It is guaranteed to be non-negative.

  • include_previous (bool) – If true, also return the last entry prior to start_time.

  • include_end (bool) – If true, treat end_time as inclusive, otherwise exclusive.

Raises

ImmutableKeyError – If the key exists and is not mutable

async dump(key: bytes)Optional[bytes]

Return a key in the same format as the Redis DUMP command, or None if not present.

close()None

Start shutting down the connection to the backing storage.

async wait_closed()None

Wait until the shutdown started by close() has completed.

monitor_keys(keys: Iterable[bytes])AsyncGenerator[katsdptelstate.backend.KeyUpdateBase, None]

Report changes to keys in keys.

Returns an asynchronous iterator that yields an infinite stream of update notifications. When no longer needed it should be closed.

katsdptelstate.aio.redis module

katsdptelstate.aio.telescope_state module

class katsdptelstate.aio.telescope_state.TelescopeState(backend: Optional[katsdptelstate.aio.backend.Backend] = None, prefixes: Tuple[Union[bytes, str], ] = (b''), base: Optional[katsdptelstate.aio.telescope_state.TelescopeState] = None)

Bases: katsdptelstate.telescope_state_base.TelescopeStateBase[katsdptelstate.aio.backend.Backend]

Interface to attributes and sensors stored in a database.

Refer to the README for a description of the types of keys supported.

A Redis database used with this class must only be used with this class, as it assumes that all keys were encoded by this package. It should however be robust to malicious data, failing gracefully rather than executing arbitrary code or consuming unreasonable amounts of time or memory.

Each instance of this class has an associated list of prefixes. Lookups try each key in turn until a match is found. Writes use the first prefix in the list. Conventionally, keys are arranged into a hierarchy, separated by underscores. A view() convenience method helps with constructing prefix lists by automatically adding the trailing underscore to prefixes.

Care should be used when attributes share a suffix. They may shadow each other for some views, causing the attribute to appear to have changed value. This class does not prevent it, because there is no way to know which namespaces may be shared in a view, and because doing it in a race-free way would be prohibitively expensive.

Parameters
  • backend (Backend) – Backend supplying the storage. If not specified (and base is also not specified), an in-memory backend is created.

  • prefixes (tuple of str/bytes) – Prefixes that will be tried in turn for key lookup. While this can be specified directly for advanced cases, it is normally generated by view(). Writes are made using the first prefix in the list.

  • base (TelescopeState) – Existing telescope state instance, from which the backend will be taken. This allows new views to be created by specifying prefixes, without creating new backends.

Raises

ValueError – If both base and backend are specified

async set(key: Union[bytes, str], value: Any)None
async exists(key: Union[bytes, str])bool

Check to see if the specified key exists in the database.

async key_type(key: Union[bytes, str])Optional[katsdptelstate.utils.KeyType]

Get the type of a key.

If the key does not exist, returns None.

async keys(filter: Union[bytes, str] = '*')List[str]

Return a list of keys currently in the model.

This function ignores the prefix list and returns all keys with fully-qualified names.

Parameters

filter (str or bytes, optional) – Wildcard string passed to Redis to restrict keys

Returns

keys – The key names, in sorted order

Return type

list of str

async delete(key: Union[bytes, str])None

Remove a key, and all values, from the model.

The key is deleted from every namespace in the prefix list.

Note

This function should be used rarely, ideally only in tests, as it violates the immutability of keys added with immutable=True.

async clear()None

Remove all keys in all namespaces.

Note

This function should be used rarely, ideally only in tests, as it violates the immutability of keys added with immutable=True.

async add(key: Union[bytes, str], value: Any, ts: Optional[float] = None, immutable: bool = False, encoding: bytes = b'\xff')None

Add a new key / value pair to the model.

If immutable is true, then either the key must not previously have been set, or it must have been previously set immutable with exactly the same value (see equal_encoded_values()). Thus, immutable keys only ever have one value for the lifetime of the telescope state. They also have no associated timestamp.

Parameters
  • key (str or bytes) – Key name, which must not collide with a class attribute

  • value (object) – Arbitrary value (must be encodable with encoding)

  • ts (float, optional) – Timestamp associated with the update, ignored for immutables. If not specified, defaults to time.time().

  • immutable (bool, optional) – See description above.

  • encoding (bytes) – See encode_value()

Raises
  • ImmutableKeyError – if an attempt is made to change the value of an immutable

  • ImmutableKeyError – if the key already exists and is not an immutable

  • redis.ResponseError – if there is some other error from the Redis server

async set_indexed(key: Union[bytes, str], sub_key: Any, value: Any, encoding: bytes = b'\xff')None

Set a sub-key of an indexed key.

Parameters
  • key (str or bytes) – Main key

  • sub_key (object) – Sub-key within key to associate with the value. It must be both hashable and serialisable.

  • encoding (bytes) – Encoding used for value (see encode_value()). Note that it does not affect the encoding of sub_key.

Raises
  • ImmutableKeyError – if the sub-key already exists with a different value

  • ImmutableKeyError – if the key already exists and is not indexed

  • redis.ResponseError – if there is some other error from the Redis server

async get_indexed(key: Union[bytes, str], sub_key: Any, default: Optional[Any] = None, return_encoded: bool = False)Any

Retrieve an indexed value set with set_indexed().

Parameters
  • key (str or bytes) – Main key

  • sub_key (object) – Sub-key within key, which must be hashable and serialisable

  • default (object) – Value to return if the sub-key is not found

  • return_encoded (bool, optional) – Default ‘False’ - return values are first decoded from internal storage ‘True’ - return values are retained in encoded form.

async wait_key(key: Union[bytes, str], condition: Optional[Callable[[Any, Optional[float]], bool]] = None)

Wait for a key to exist, possibly with some condition.

This can block for an arbitrary amount of time, but can be safely cancelled.

Parameters
  • key (str or bytes) – Key name to monitor

  • condition (callable, signature bool = condition(value, ts), optional) – If not specified, wait until the key exists. Otherwise, the callable should have the signature bool = condition(value, ts) where value is the latest value of the key, ts is its associated timestamp (or None if immutable), and the return value indicates whether the condition is satisfied.

async wait_indexed(key: Union[bytes, str], sub_key: Any, condition: Optional[Callable[Any, bool]] = None)None

Wait for a sub-key of an indexed key to exist, possibly with some condition.

This can block for an arbitrary amount of time, but can be safely cancelled.

Parameters
  • key (str or bytes) – Key name to monitor

  • sub_key (object) – Sub-key to monitor within key.

  • condition (callable, signature bool = condition(value), optional) – If not specified, wait until the sub-key exists. Otherwise, the callable should have the signature bool = condition(value) where value is the value associated with the sub-key, and the return value indicates whether the condition is satisfied.

Raises
  • TimeoutError – if a timeout was specified and was exceeded

  • CancelledError – if a cancellation future was specified and done

  • ImmutableKeyError – if the key exists (or is created while waiting) but is not indexed

async get(key: Union[bytes, str], default: Optional[Any] = None, return_encoded: bool = False)Any

Get a single value from the model.

Parameters
  • key (str or bytes) – Key to retrieve

  • default (object, optional) – Object to return if key not found

  • return_encoded (bool, optional) – Default ‘False’ - return values are first decoded from internal storage ‘True’ - return values are retained in encoded form.

Returns

for non-immutable key return the most recent value

Return type

value

async get_range(key: Union[bytes, str], st: Optional[float] = None, et: Optional[float] = None, include_previous: Optional[bool] = None, include_end: bool = False, return_encoded: bool = False)List[Tuple[Any, float]]

Get the range of values specified by the key and timespec from the model.

Parameters
  • key (str or bytes) – Database key to extract

  • st (float, optional) – Start time, default returns the most recent value prior to et

  • et (float, optional) – End time, defaults to the end of time

  • include_previous (bool, optional) – If True, the method also returns the last value prior to the start time (if any). This defaults to False if st is specified and True if st is unspecified.

  • include_end (bool, optional) – If False (default), returns values in [st, et), otherwise [st, et].

  • return_encoded (bool, optional) – Default ‘False’ - return values are first decoded from internal storage ‘True’ - return values are retained in encoded form.

Returns

list of (value, time) records in specified time range

Return type

list

Raises
  • KeyError – if key does not exist (with any prefix)

  • ImmutableKeyError – if key refers to an existing key which is not mutable

Notes

By default, timestamps exactly equal to the start time are included, while those equal to the end time are excluded.

Usage examples:

get_range(‘key’)

returns most recent record

get_range(‘key’,st=0)

returns list of all records in the telescope state database

get_range(‘key’,st=0,et=t1)

returns list of all records before time t1

get_range(‘key’,st=t0,et=t1)

returns list of all records in the range [t0,t1)

get_range(‘key’,st=t0)

returns list of all records after time t0

get_range(‘key’,et=t1)

returns the most recent record prior to time t1

Module contents