katsdptelstate package

Submodules

katsdptelstate.backend module

class katsdptelstate.backend.KeyUpdateBase

Bases: object

Indicates that the caller of the monitor should check again.

This base class contains no information about what changed in the database. Sub-classes contain more specific information.

class katsdptelstate.backend.KeyUpdate(key: bytes, value: bytes)

Bases: abc.ABC, katsdptelstate.backend.KeyUpdateBase

Update notification for a specific key.

This is a base class for type-specific notification classes and should not be instantiated directly.

abstract property key_type
class katsdptelstate.backend.MutableKeyUpdate(key: bytes, value: bytes, timestamp: float)

Bases: katsdptelstate.backend.KeyUpdate

Update notification for a mutable key.

property key_type
class katsdptelstate.backend.ImmutableKeyUpdate(key: bytes, value: bytes)

Bases: katsdptelstate.backend.KeyUpdate

Update notification for an immutable key.

property key_type
class katsdptelstate.backend.IndexedKeyUpdate(key: bytes, sub_key: bytes, value: bytes)

Bases: katsdptelstate.backend.KeyUpdate

Update notification for an indexed key.

property key_type
class katsdptelstate.backend.Backend

Bases: abc.ABC

Low-level interface for telescope state backends.

The backend interface does not deal with namespaces or encodings, which are handled by the frontend TelescopeState class. A backend must be able to store the same types as TelescopeState, but keys and values will be bytes rather than arbitrary Python objects.

abstract load_from_file(file: Union[bytes, str, os.PathLike, BinaryIO]) → int

Implements TelescopeState.load_from_file().

abstract keys(filter: bytes) → List[bytes]

Return all keys matching filter.

The filter is a redis pattern. Backends might only support b'*' as a filter.

abstract delete(key: bytes) → None

Delete a key (no-op if it does not exist)

abstract clear() → None

Remove all keys

abstract key_type(key: bytes) → Optional[katsdptelstate.utils.KeyType]

Get type of key, or None if it does not exist.

abstract set_immutable(key: bytes, value: bytes) → Optional[bytes]

Set the value of an immutable key.

If the key already exists (and is immutable), returns the existing value and does not update it. Otherwise, returns None.

Raises

ImmutableKeyError – If the key exists and is not immutable.

abstract get(key: bytes) → Union[Tuple[None, None], Tuple[bytes, None], Tuple[bytes, float], Tuple[Dict[bytes, bytes], None]]

Get the value and timestamp of a key.

The return value depends on the key type:

immutable

The value.

mutable

The most recent value.

indexed

A dictionary of all values (with undefined iteration order).

absent

None

The timestamp will be None for types other than mutable.

abstract add_mutable(key: bytes, value: bytes, timestamp: float) → None

Set a (value, timestamp) pair in a mutable key.

The timestamp will be a non-negative float value.

Raises

ImmutableKeyError – If the key exists and is not mutable

abstract set_indexed(key: bytes, sub_key: bytes, value: bytes) → Optional[bytes]

Add value in an indexed immutable key.

If the sub-key already exists, returns the existing value and does not update it. Otherwise, returns None.

Raises

ImmutableKeyError – If the key exists and is not indexed.

abstract get_indexed(key: bytes, sub_key: bytes) → Optional[bytes]

Get the value of an indexed immutable key.

Returns None if the key exists but the sub-key does not exist.

Raises
  • KeyError – If the key does not exist.

  • ImmutableKeyError – If the key exists and is not indexed.

abstract get_range(key: bytes, start_time: float, end_time: float, include_previous: bool, include_end: bool) → Optional[List[Tuple[bytes, float]]]

Obtain a range of values from a mutable key.

If the key does not exist, returns None.

Parameters
  • key (bytes) – Key to search

  • start_time (float) – Start of the range (inclusive).

  • end_time (float) – End of the range. It is guaranteed to be non-negative.

  • include_previous (bool) – If true, also return the last entry prior to start_time.

  • include_end (bool) – If true, treat end_time as inclusive, otherwise exclusive.

Raises

ImmutableKeyError – If the key exists and is not mutable

abstract dump(key: bytes) → Optional[bytes]

Return a key in the same format as the Redis DUMP command, or None if not present.

monitor_keys(keys: Iterable[bytes]) → Generator[[Optional[katsdptelstate.backend.KeyUpdateBase], Optional[float]], None]

Report changes to keys in keys.

Returns a generator. The first yield from the generator is a no-op. After that, the caller sends a timeout and gets back an update event (of type KeyUpdateBase or a subclass). If there is no event within the timeout, returns None.

It is acceptable (but undesirable) for this function to miss the occasional update e.g. due to a network connection outage. The caller takes care to use a low timeout and retry rather than blocking for long periods.

The generator runs until it is closed.

katsdptelstate.encoding module

Handles details of encoding and decoding strings stored in telescope state.

katsdptelstate.encoding.PICKLE_PROTOCOL = 2
katsdptelstate.encoding.ENCODING_PICKLE = b'\x80'
katsdptelstate.encoding.ENCODING_MSGPACK = b'\xff'

Header byte indicating a msgpack-encoded value

katsdptelstate.encoding.ENCODING_DEFAULT = b'\xff'

Default encoding for encode_value()

katsdptelstate.encoding.ALLOWED_ENCODINGS = frozenset({b'\x80', b'\xff'})

All encodings that can be used with encode_value()

katsdptelstate.encoding.MSGPACK_EXT_TUPLE = 1
katsdptelstate.encoding.MSGPACK_EXT_COMPLEX128 = 2
katsdptelstate.encoding.MSGPACK_EXT_NDARRAY = 3
katsdptelstate.encoding.MSGPACK_EXT_NUMPY_SCALAR = 4
katsdptelstate.encoding.PICKLE_WARNING = 'The telescope state contains pickled values. This is a security risk, but you have enabled it with set_allow_pickle.'
katsdptelstate.encoding.PICKLE_ERROR = 'The telescope state contains pickled values. This is a security risk, so is disabled by default. If you trust the source of the data, you can allow the pickles to be loaded by setting KATSDPTELSTATE_ALLOW_PICKLE=1 in the environment. This is needed for MeerKAT data up to March 2019.'
katsdptelstate.encoding.set_allow_pickle(allow: bool, warn: bool = False) → None

Control whether pickles are allowed.

This overrides the defaults which are determined from the environment.

Parameters
  • allow (bool) – If true, allow pickles to be loaded.

  • warn (bool) – If true, warn the user the next time a pickle is loaded (after which it becomes false). This has no effect if allow is false.

katsdptelstate.encoding.encode_value(value: Any, encoding: bytes = b'\xff') → bytes

Encode a value to a byte array for storage in redis.

Parameters
  • value – Value to encode

  • encoding – Encoding method to use, one of the values in ALLOWED_ENCODINGS

Raises
  • ValueError – If encoding is not a recognised encoding

  • EncodeError – EncodeError if the value was not encodable with the chosen encoding.

katsdptelstate.encoding.decode_value(value: bytes, allow_pickle: Optional[bool] = None) → Any

Decode a value encoded with encode_value().

The encoded value is self-describing, so it is not necessary to specify which encoding was used.

Parameters
  • value (bytes) – Encoded value to decode

  • allow_pickle (bool, optional) – If false, ENCODING_PICKLE is disabled. This is useful for security as pickle decoding can execute arbitrary code. If the default of None is used, it is controlled by the KATSDPTELSTATE_ALLOW_PICKLE environment variable. If that is not set, the default is false. The default may also be overridden with set_allow_pickle().

Raises
  • DecodeError – if allow_pickle is false and value is a pickle

  • DecodeError – if there was an error in the encoding of value

katsdptelstate.encoding.equal_encoded_values(a: bytes, b: bytes) → bool

Test whether two encoded values represent the same/equivalent objects.

This is not a complete implementation. Mostly, it just checks that the encoded representation is the same, but to ease the transition to Python 3, it will also compare the values if both arguments decode to either bytes or str (and allows bytes and strings to be equal assuming UTF-8 encoding). However, it will not do this recursively in structured data.

katsdptelstate.encoding.ensure_binary(s, encoding='utf-8', *, errors='surrogateescape')

Coerce s to six.binary_type.

For Python 2:
  • unicode -> encoded to str

  • str -> str

For Python 3:
  • str -> encoded to bytes

  • bytes -> bytes

katsdptelstate.endpoint module

class katsdptelstate.endpoint.Endpoint(host: Any, port: Any)

Bases: object

A TCP or UDP endpoint consisting of a host and a port.

Typically the host should be a string (whether a hostname or IP address) and the port should be an integer, but users are free to use other conventions.

multicast_subscribe(sock: socket.socket) → bool

If the address is an IPv4 multicast address, subscribe to the group on sock.

Return True if the host is a multicast address.

katsdptelstate.endpoint.endpoint_parser(default_port: Any) → Callable[str, katsdptelstate.endpoint.Endpoint]

Return a factory function that parses a string.

The string is either hostname, or hostname:port, where port is an integer. IPv6 addresses are written in square brackets (similar to RFC 2732) to disambiguate the embedded colons.

katsdptelstate.endpoint.endpoint_list_parser(default_port: Any, single_port: bool = False) → Callable[str, List[katsdptelstate.endpoint.Endpoint]]

Return a factory function that parses a string.

The string comprises a comma-separated list, each element of which is of the form taken by endpoint_parser(). Optionally, the hostname may be followed by +count, where count is an integer specifying a number of sequential IP addresses (in addition to the explicitly named one). This variation is only valid with IPv4 addresses.

If single_port is true, then it will reject any list that contains more than one distinct port number, as well as an empty list. This allows the user to determine a unique port for the list.

katsdptelstate.endpoint.endpoints_to_str(endpoints: Iterable[katsdptelstate.endpoint.Endpoint]) → str

Convert a list of endpoints into a compact string that generates the same list.

This is the inverse of katsdptelstate.endpoint.endpoint_list_parser().

katsdptelstate.errors module

exception katsdptelstate.errors.TelstateError

Bases: RuntimeError

Base class for errors from this package.

exception katsdptelstate.errors.ConnectionError

Bases: katsdptelstate.errors.TelstateError

The initial connection to the Redis server failed.

exception katsdptelstate.errors.RdbParseError(filename: Union[bytes, str, os.PathLike, None] = None)

Bases: katsdptelstate.errors.TelstateError

Error parsing RDB file.

exception katsdptelstate.errors.InvalidKeyError

Bases: katsdptelstate.errors.TelstateError

A key collides with a class attribute.

This is kept only for backwards compatibility. It is no longer considered an error.

exception katsdptelstate.errors.InvalidTimestampError

Bases: katsdptelstate.errors.TelstateError

Negative or non-finite timestamp

exception katsdptelstate.errors.ImmutableKeyError

Bases: katsdptelstate.errors.TelstateError

An attempt was made to modify an immutable key

exception katsdptelstate.errors.TimeoutError

Bases: katsdptelstate.errors.TelstateError

A wait for a key timed out

exception katsdptelstate.errors.CancelledError

Bases: katsdptelstate.errors.TelstateError

A wait for a key was cancelled

exception katsdptelstate.errors.DecodeError

Bases: ValueError, katsdptelstate.errors.TelstateError

An encoded value found in telstate could not be decoded

exception katsdptelstate.errors.EncodeError

Bases: ValueError, katsdptelstate.errors.TelstateError

A value could not be encoded

katsdptelstate.memory module

katsdptelstate.memory.rdb_reader = <module 'katsdptelstate.rdb_reader' from '/home/docs/checkouts/readthedocs.org/user_builds/katsdptelstate/checkouts/stable/katsdptelstate/rdb_reader.py'>
class katsdptelstate.memory.BackendCallback

Bases: rdbtools.parser.RdbCallback

A callback adapter that stores keys in backend as RDB file is parsed.

katsdptelstate.memory.logger = <Logger katsdptelstate.memory (WARNING)>
class katsdptelstate.memory.MemoryCallback(data: Dict[bytes, Union[bytes, Dict[bytes, bytes], List[bytes]]])

Bases: katsdptelstate.rdb_reader.BackendCallback

RDB callback that stores keys in MemoryBackend.

set(key: bytes, value: bytes, expiry: Optional[datetime.datetime], info: dict) → None

Callback to handle a key with a string value and an optional expiry

key is the redis key value is a string or a number expiry is a datetime object. None and can be None info is a dictionary containing additional information about this object.

start_sorted_set(key: bytes, length: int, expiry: Optional[datetime.datetime], info: dict) → None

Callback to handle the start of a sorted set

key is the redis key for this sorted length is the number of elements in this sorted set expiry is a datetime object. None means the object does not expire info is a dictionary containing additional information about this object.

After start_sorted_set, the method zadd will be called with key exactly length times. Also, zadd will be called in a sorted order, so as to preserve the ordering of this sorted set. After that, the end_sorted_set method will be called to indicate the end of this sorted set

Note : This callback handles sorted sets in that are stored as ziplists or skiplists

zadd(key: bytes, score: float, member: bytes) → None

Callback to insert a new value into this sorted set

key is the redis key for this sorted set score is the score for this value value is the element being inserted

end_sorted_set(key: bytes) → None

Called when there are no more elements in this sorted set

key is the redis key for this sorted set

start_hash(key: bytes, length: int, expiry: Optional[datetime.datetime], info: dict) → None

Callback to handle the start of a hash

key is the redis key length is the number of elements in this hash. expiry is a datetime object. None means the object does not expire info is a dictionary containing additional information about this object.

After start_hash, the method hset will be called with this key exactly length times. After that, the end_hash method will be called.

hset(key: bytes, field: bytes, value: bytes) → None

Callback to insert a field=value pair in an existing hash

key is the redis key for this hash field is a string value is the value to store for this field

class katsdptelstate.memory.MemoryBackend

Bases: katsdptelstate.backend.Backend

Telescope state backend that keeps data in memory.

It is optimised for read-only use, loading data from a .rdb file. Write operations are supported only to facilitate testing, but are not intended for production use. For that, use a RedisBackend with an in-memory Redis emulation. The monitor_keys() method is not implemented.

Mutable keys are stored as sorted lists, and encode timestamps in-place using the same packing as RedisBackend.

load_from_file(file: Union[bytes, str, os.PathLike, BinaryIO]) → int

Implements TelescopeState.load_from_file().

keys(filter: bytes) → List[bytes]

Return all keys matching filter.

The filter is a redis pattern. Backends might only support b'*' as a filter.

delete(key: bytes) → None

Delete a key (no-op if it does not exist)

clear() → None

Remove all keys

key_type(key: bytes) → Optional[katsdptelstate.utils.KeyType]

Get type of key, or None if it does not exist.

set_immutable(key: bytes, value: bytes) → Optional[bytes]

Set the value of an immutable key.

If the key already exists (and is immutable), returns the existing value and does not update it. Otherwise, returns None.

Raises

ImmutableKeyError – If the key exists and is not immutable.

get(key: bytes) → Union[Tuple[None, None], Tuple[bytes, None], Tuple[bytes, float], Tuple[Dict[bytes, bytes], None]]

Get the value and timestamp of a key.

The return value depends on the key type:

immutable

The value.

mutable

The most recent value.

indexed

A dictionary of all values (with undefined iteration order).

absent

None

The timestamp will be None for types other than mutable.

add_mutable(key: bytes, value: bytes, timestamp: float) → None

Set a (value, timestamp) pair in a mutable key.

The timestamp will be a non-negative float value.

Raises

ImmutableKeyError – If the key exists and is not mutable

set_indexed(key: bytes, sub_key: bytes, value: bytes) → Optional[bytes]

Add value in an indexed immutable key.

If the sub-key already exists, returns the existing value and does not update it. Otherwise, returns None.

Raises

ImmutableKeyError – If the key exists and is not indexed.

get_indexed(key: bytes, sub_key: bytes) → Optional[bytes]

Get the value of an indexed immutable key.

Returns None if the key exists but the sub-key does not exist.

Raises
  • KeyError – If the key does not exist.

  • ImmutableKeyError – If the key exists and is not indexed.

get_range(key: bytes, start_time: float, end_time: float, include_previous: bool, include_end: bool) → Optional[List[Tuple[bytes, float]]]

Obtain a range of values from a mutable key.

If the key does not exist, returns None.

Parameters
  • key (bytes) – Key to search

  • start_time (float) – Start of the range (inclusive).

  • end_time (float) – End of the range. It is guaranteed to be non-negative.

  • include_previous (bool) – If true, also return the last entry prior to start_time.

  • include_end (bool) – If true, treat end_time as inclusive, otherwise exclusive.

Raises

ImmutableKeyError – If the key exists and is not mutable

dump(key: bytes) → Optional[bytes]

Return a key in the same format as the Redis DUMP command, or None if not present.

katsdptelstate.rdb_reader module

katsdptelstate.rdb_reader.logger = <Logger katsdptelstate.rdb_reader (WARNING)>
class katsdptelstate.rdb_reader.BackendCallback

Bases: rdbtools.parser.RdbCallback

A callback adapter that stores keys in backend as RDB file is parsed.

katsdptelstate.rdb_reader.load_from_file(callback: katsdptelstate.rdb_reader.BackendCallback, file: Union[bytes, str, os.PathLike, BinaryIO]) → int

Load keys from the specified RDB-compatible dump file into backend.

Parameters
  • callback (katsdptelstate.rdb_reader.BackendCallback) – Backend-specific callback that stores keys as RDB file is parsed

  • file (str or file-like object) – Filename of .rdb file to import, or object representing contents of RDB

Returns

Number of keys loaded into backend

Return type

int

Raises

RdbParseError – If file does not represent a valid RDB file

katsdptelstate.rdb_utility module

Implements encoding to the RDB file format.

Some documentation is included here, but for a more complete reference, see https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format

katsdptelstate.rdb_utility.DUMP_POSTFIX = b'\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00'
katsdptelstate.rdb_utility.encode_len(length: int) → bytes

Encodes the specified length as 1,2 or 5 bytes of RDB specific length encoded byte.

  • For values less than 64 (i.e two MSBs zero - encode directly in the byte)

  • For values less than 16384 use two bytes, leading MSBs are 01 followed by 14 bits encoding the value

  • For values less than (2^32 - 1) use 5 bytes, leading MSBs are 10. Length encoded only in the lowest 32 bits.

katsdptelstate.rdb_utility.encode_prev_length(length: int) → bytes

Special helper for zset previous entry lengths.

If length < 253 then use 1 byte directly, otherwise set first byte to 254 and add 4 trailing bytes as an unsigned integer.

katsdptelstate.rdb_utility.dump_string(data: bytes) → bytes

Encode a binary string as per redis DUMP command

katsdptelstate.rdb_utility.encode_ziplist(entries: Iterable[Union[bytes, int]]) → bytes

Create an RDB ziplist, including the envelope.

The entries can either be byte strings or integers, and any iterable can be used.

The ziplist string envelope itself is an RDB-encoded string with the following form: (format descriptions are provided in the description for each component)

(bytes in list ‘<i’)(offset to tail ‘<i’) (number of entries ‘<h’)(entry 1)(entry 2)(entry N)(terminator 0xFF)

The entries themselves are encoded as follows:

(previous entry length 1byte or 5bytes) (entry length - up to 5 bytes as per length encoding)(value)

katsdptelstate.rdb_utility.dump_iterable(prefix: bytes, suffix: bytes, entries: Iterable[bytes]) → bytes

Output a sequence of strings, bracketed by a prefix and suffix.

This is an internal function used to implement dump_zset() and dump_hash().

katsdptelstate.rdb_utility.dump_zset(data: Sequence[bytes]) → bytes

Encode a set of values as a redis zset, as per redis DUMP command.

All scores are assumed to be zero and encoded in the most efficient way possible.

Redis uses both LZF and Ziplist format depending on various arcane heuristics. For maximum compatibility we use Ziplist for small lists.

There are two possible ways to encode the zset:

  1. Using a ziplist: (0c)(ziplist) where (ziplist) is string-encoded.

  2. Using a simple list (03)(num entries)(entry)(entry)…

katsdptelstate.rdb_utility.dump_hash(data: Mapping[bytes, bytes]) → bytes

Encode a dictionary as a redis hash, as per redis DUMP command.

The encoding is similar to zsets, but using the value instead of the score.

katsdptelstate.rdb_writer module

katsdptelstate.rdb_writer.RDB_HEADER = b'REDIS0006\xfe\x00'
katsdptelstate.rdb_writer.RDB_TERMINATOR = b'\xff'
katsdptelstate.rdb_writer.RDB_CHECKSUM = b'\x00\x00\x00\x00\x00\x00\x00\x00'
katsdptelstate.rdb_writer.encode_item(key: bytes, dumped_value: bytes) → bytes

Encode key and corresponding DUMPed value to RDB format.

The first byte indicates the encoding used for the value of this key. This is essentially just the Redis type. Subsequent bytes represent the key name, which consists of a length encoding followed by the actual byte representation of the string. Thereafter follows the value, encoded according to its appropriate schema. As a shortcut, the Redis DUMP command is used to generate the encoded value.

Note: Redis provides a mechanism for optional key expiry, which we ignore here.

class katsdptelstate.rdb_writer.RDBWriter(filename: Union[bytes, str, os.PathLike])

Bases: object

RDB file resource that stores keys from one (or more) Redis DBs.

Upon initialisation this opens the RDB file and writes the header. It is necessary to call close() to write the trailer of the file and to close it properly (or use this object as a context manager).

Parameters

filename (path-like) – Destination filename. Will be opened in ‘wb’.

keys_written

Number of keys written to the file.

Type

int

keys_failed

Number of keys that failed to be written.

Type

int

close() → None

Close off RDB file and delete it if it contains no keys.

save(client: Union[katsdptelstate.telescope_state.TelescopeState, katsdptelstate.backend.Backend, redis.client.Redis], keys: Optional[Iterable[Union[str, bytes]]] = None) → None

Save a specified subset of keys from a Redis DB to the RDB dump file.

This is a very limited RDB dump utility. It takes a Redis database either from a high-level TelescopeState object, its low-level backend or a compatible redis client. Missing or broken keys are skipped and logged without raising an exception.

Parameters
  • client (TelescopeState or Backend or Redis-like) – A telstate, backend, or Redis-compatible client instance supporting keys() and dump()

  • keys (iterable of str or bytes, optional) – The keys to extract from Redis and include in the dump. Keys that don’t exist will not raise an Exception, only a log message. None (default) includes all keys.

katsdptelstate.rdb_writer.logger = <Logger katsdptelstate.rdb_writer (WARNING)>
katsdptelstate.rdb_writer.ensure_binary(s, encoding='utf-8', *, errors='surrogateescape')

Coerce s to six.binary_type.

For Python 2:
  • unicode -> encoded to str

  • str -> str

For Python 3:
  • str -> encoded to bytes

  • bytes -> bytes

katsdptelstate.redis module

katsdptelstate.redis.rdb_reader = <module 'katsdptelstate.rdb_reader' from '/home/docs/checkouts/readthedocs.org/user_builds/katsdptelstate/checkouts/stable/katsdptelstate/rdb_reader.py'>
class katsdptelstate.redis.BackendCallback

Bases: rdbtools.parser.RdbCallback

A callback adapter that stores keys in backend as RDB file is parsed.

class katsdptelstate.redis.RedisCallback(client: redis.client.Redis)

Bases: katsdptelstate.rdb_reader.BackendCallback

RDB callback that stores keys in redis.Redis-like client.

set(key: bytes, value: bytes, expiry: Optional[datetime.datetime], info: dict) → None

Callback to handle a key with a string value and an optional expiry

key is the redis key value is a string or a number expiry is a datetime object. None and can be None info is a dictionary containing additional information about this object.

start_sorted_set(key: bytes, length: int, expiry: Optional[datetime.datetime], info: dict) → None

Callback to handle the start of a sorted set

key is the redis key for this sorted length is the number of elements in this sorted set expiry is a datetime object. None means the object does not expire info is a dictionary containing additional information about this object.

After start_sorted_set, the method zadd will be called with key exactly length times. Also, zadd will be called in a sorted order, so as to preserve the ordering of this sorted set. After that, the end_sorted_set method will be called to indicate the end of this sorted set

Note : This callback handles sorted sets in that are stored as ziplists or skiplists

zadd(key: bytes, score: float, member: bytes) → None

Callback to insert a new value into this sorted set

key is the redis key for this sorted set score is the score for this value value is the element being inserted

end_sorted_set(key)

Called when there are no more elements in this sorted set

key is the redis key for this sorted set

start_hash(key: bytes, length: int, expiry: Optional[datetime.datetime], info: dict) → None

Callback to handle the start of a hash

key is the redis key length is the number of elements in this hash. expiry is a datetime object. None means the object does not expire info is a dictionary containing additional information about this object.

After start_hash, the method hset will be called with this key exactly length times. After that, the end_hash method will be called.

hset(key: bytes, field: bytes, value: bytes) → None

Callback to insert a field=value pair in an existing hash

key is the redis key for this hash field is a string value is the value to store for this field

end_hash(key: bytes) → None

Called when there are no more elements in the hash

key is the redis key for the hash

class katsdptelstate.redis.RedisBackend(client: redis.client.Redis)

Bases: katsdptelstate.backend.Backend

Backend for TelescopeState using redis for storage.

load_from_file(file: Union[bytes, str, os.PathLike, BinaryIO]) → int

Implements TelescopeState.load_from_file().

keys(filter: bytes) → List[bytes]

Return all keys matching filter.

The filter is a redis pattern. Backends might only support b'*' as a filter.

delete(key: bytes) → None

Delete a key (no-op if it does not exist)

clear() → None

Remove all keys

key_type(key: bytes) → Optional[katsdptelstate.utils.KeyType]

Get type of key, or None if it does not exist.

set_immutable(key: bytes, value: bytes) → Optional[bytes]

Set the value of an immutable key.

If the key already exists (and is immutable), returns the existing value and does not update it. Otherwise, returns None.

Raises

ImmutableKeyError – If the key exists and is not immutable.

get(key: bytes) → Union[Tuple[None, None], Tuple[bytes, None], Tuple[bytes, float], Tuple[Dict[bytes, bytes], None]]

Get the value and timestamp of a key.

The return value depends on the key type:

immutable

The value.

mutable

The most recent value.

indexed

A dictionary of all values (with undefined iteration order).

absent

None

The timestamp will be None for types other than mutable.

add_mutable(key: bytes, value: bytes, timestamp: float) → None

Set a (value, timestamp) pair in a mutable key.

The timestamp will be a non-negative float value.

Raises

ImmutableKeyError – If the key exists and is not mutable

set_indexed(key: bytes, sub_key: bytes, value: bytes) → Optional[bytes]

Add value in an indexed immutable key.

If the sub-key already exists, returns the existing value and does not update it. Otherwise, returns None.

Raises

ImmutableKeyError – If the key exists and is not indexed.

get_indexed(key: bytes, sub_key: bytes) → Optional[bytes]

Get the value of an indexed immutable key.

Returns None if the key exists but the sub-key does not exist.

Raises
  • KeyError – If the key does not exist.

  • ImmutableKeyError – If the key exists and is not indexed.

get_range(key: bytes, start_time: float, end_time: float, include_previous: bool, include_end: bool) → Optional[List[Tuple[bytes, float]]]

Obtain a range of values from a mutable key.

If the key does not exist, returns None.

Parameters
  • key (bytes) – Key to search

  • start_time (float) – Start of the range (inclusive).

  • end_time (float) – End of the range. It is guaranteed to be non-negative.

  • include_previous (bool) – If true, also return the last entry prior to start_time.

  • include_end (bool) – If true, treat end_time as inclusive, otherwise exclusive.

Raises

ImmutableKeyError – If the key exists and is not mutable

monitor_keys(keys: Iterable[bytes]) → Generator[[Optional[katsdptelstate.backend.KeyUpdateBase], Optional[float]], None]

Report changes to keys in keys.

Returns a generator. The first yield from the generator is a no-op. After that, the caller sends a timeout and gets back an update event (of type KeyUpdateBase or a subclass). If there is no event within the timeout, returns None.

It is acceptable (but undesirable) for this function to miss the occasional update e.g. due to a network connection outage. The caller takes care to use a low timeout and retry rather than blocking for long periods.

The generator runs until it is closed.

dump(key: bytes) → Optional[bytes]

Return a key in the same format as the Redis DUMP command, or None if not present.

katsdptelstate.redis.ensure_str(s, encoding='utf-8', *, errors='surrogateescape')

Coerce s to str.

For Python 2:
  • unicode -> encoded to str

  • str -> str

For Python 3:
  • str -> str

  • bytes -> decoded to str

katsdptelstate.telescope_state module

katsdptelstate.telescope_state.logger = <Logger katsdptelstate.telescope_state (WARNING)>
class katsdptelstate.telescope_state.TelescopeState(endpoint: Union[str, katsdptelstate.endpoint.Endpoint, katsdptelstate.backend.Backend] = '', db: int = 0, prefixes: Tuple[Union[bytes, str], ...] = (b'',), base: Optional[TelescopeState] = None)

Bases: katsdptelstate.telescope_state_base.TelescopeStateBase

Interface to attributes and sensors stored in a database.

Refer to the README for a description of the types of keys supported.

A Redis database used with this class must only be used with this class, as it assumes that all keys were encoded by this package. It should however be robust to malicious data, failing gracefully rather than executing arbitrary code or consuming unreasonable amounts of time or memory.

Each instance of this class has an associated list of prefixes. Lookups try each key in turn until a match is found. Writes use the first prefix in the list. Conventionally, keys are arranged into a hierarchy, separated by underscores. A view() convenience method helps with constructing prefix lists by automatically adding the trailing underscore to prefixes.

Care should be used when attributes share a suffix. They may shadow each other for some views, causing the attribute to appear to have changed value. This class does not prevent it, because there is no way to know which namespaces may be shared in a view, and because doing it in a race-free way would be prohibitively expensive.

Parameters
  • endpoint (str, Endpoint or Backend) –

    It can be

    • an endpoint: specifies the address of the Redis server

    • an URL (i.e., contains ://): passed to redis.Redis.from_url()

    • an empty string: a MemoryBackend is created

    • any other string: passed to Endpoint to create an endpoint

    • a Backend: used directly.

  • db (int) – Database number within the Redis server.

  • prefixes (tuple of str/bytes) – Prefixes that will be tried in turn for key lookup. While this can be specified directly for advanced cases, it is normally generated by view(). Writes are made using the first prefix in the list.

  • base (TelescopeState) – Existing telescope state instance, from which the backend will be taken. This allows new views to be created by specifying prefixes, without creating new backends.

Raises
  • ConnectionError – If the initial connection to the (real) Redis server fails

  • ValueError – If a base is specified and either endpoint or db is non-default

  • ValueError – If endpoint is a Backend and db is non-default

load_from_file(file: Union[bytes, str, os.PathLike, BinaryIO]) → int

Load keys from a Redis-compatible RDB snapshot file.

Redis keys are extracted sequentially from the RDB file and inserted directly into the backend without any checks and ignoring the view. It is therefore a bad idea to insert keys that already exist in telstate and this will lead to undefined behaviour. The standard approach is to call this method on an empty telstate.

If there is an error reading or parsing the RDB file (indicating either a broken file or a non-RDB file), an RdbParseError is raised. Errors raised while opening the file (like OSError) and errors raised by the backend itself (like redis errors) can also occur.

Parameters

file (str or file object) – Filename or file object representing RDB file

Returns

keys_loaded – Number of keys loaded from RDB file into telstate

Return type

int

Raises
  • ImportError – If the rdbtools package is not installed

  • RdbParseError – If the file could not be parsed (truncated / malformed / not RDB)

is_immutable(key: Union[bytes, str]) → bool

Check to see if the specified key is an immutable.

Note that indexed keys are not considered immutable for this purpose. If the key does not exist, False is returned.

Deprecated since version 0.10: is_immutable() is deprecated and may be removed in a future release. Use key_type() instead.

key_type(key: Union[bytes, str]) → Optional[katsdptelstate.utils.KeyType]

Get the type of a key.

If the key does not exist, returns None.

keys(filter: Union[bytes, str] = '*') → List[str]

Return a list of keys currently in the model.

This function ignores the prefix list and returns all keys with fully-qualified names.

Parameters

filter (str or bytes, optional) – Wildcard string passed to Redis to restrict keys

Returns

keys – The key names, in sorted order

Return type

list of str

delete(key: Union[bytes, str]) → None

Remove a key, and all values, from the model.

The key is deleted from every namespace in the prefix list.

Note

This function should be used rarely, ideally only in tests, as it violates the immutability of keys added with immutable=True.

clear() → None

Remove all keys in all namespaces.

Note

This function should be used rarely, ideally only in tests, as it violates the immutability of keys added with immutable=True.

add(key: Union[bytes, str], value: Any, ts: Optional[float] = None, immutable: bool = False, encoding: bytes = b'\xff') → None

Add a new key / value pair to the model.

If immutable is true, then either the key must not previously have been set, or it must have been previously set immutable with exactly the same value (see equal_encoded_values()). Thus, immutable keys only ever have one value for the lifetime of the telescope state. They also have no associated timestamp.

Parameters
  • key (str or bytes) – Key name, which must not collide with a class attribute

  • value (object) – Arbitrary value (must be encodable with encoding)

  • ts (float, optional) – Timestamp associated with the update, ignored for immutables. If not specified, defaults to time.time().

  • immutable (bool, optional) – See description above.

  • encoding (bytes) – See encode_value()

Raises
  • ImmutableKeyError – if an attempt is made to change the value of an immutable

  • ImmutableKeyError – if the key already exists and is not an immutable

  • redis.ResponseError – if there is some other error from the Redis server

set_indexed(key: Union[bytes, str], sub_key: Any, value: Any, encoding: bytes = b'\xff') → None

Set a sub-key of an indexed key.

Parameters
  • key (str or bytes) – Main key

  • sub_key (object) – Sub-key within key to associate with the value. It must be both hashable and serialisable.

  • encoding (bytes) – Encoding used for value (see encode_value()). Note that it does not affect the encoding of sub_key.

Raises
  • ImmutableKeyError – if the sub-key already exists with a different value

  • ImmutableKeyError – if the key already exists and is not indexed

  • redis.ResponseError – if there is some other error from the Redis server

get_indexed(key: Union[bytes, str], sub_key: Any, default: Any = None, return_encoded: bool = False) → Any

Retrieve an indexed value set with set_indexed().

Parameters
  • key (str or bytes) – Main key

  • sub_key (object) – Sub-key within key, which must be hashable and serialisable

  • default (object) – Value to return if the sub-key is not found

  • return_encoded (bool, optional) – Default ‘False’ - return values are first decoded from internal storage ‘True’ - return values are retained in encoded form.

wait_key(key: Union[bytes, str], condition: Optional[Callable[[Any, Optional[float]], bool]] = None, timeout: Optional[float] = None, cancel_future: Any = None) → None

Wait for a key to exist, possibly with some condition.

Parameters
  • key (str or bytes) – Key name to monitor

  • condition (callable, signature bool = condition(value, ts), optional) – If not specified, wait until the key exists. Otherwise, the callable should have the signature bool = condition(value, ts) where value is the latest value of the key, ts is its associated timestamp (or None if immutable), and the return value indicates whether the condition is satisfied.

  • timeout (float, optional) – If specified and the condition is not met within the time limit, an exception is thrown.

  • cancel_future (future, optional) – If not None, a future object (e.g. concurrent.futures.Future or asyncio.Future). If cancel_future.done() is true before the timeout, raises CancelledError. In the current implementation, it is only polled once a second, rather than waited for.

Raises
wait_indexed(key: Union[bytes, str], sub_key: Any, condition: Optional[Callable[Any, bool]] = None, timeout: Optional[float] = None, cancel_future: Any = None) → None

Wait for a sub-key of an indexed key to exist, possibly with some condition.

Parameters
  • key (str or bytes) – Key name to monitor

  • sub_key (object) – Sub-key to monitor within key.

  • condition (callable, signature bool = condition(value), optional) – If not specified, wait until the sub-key exists. Otherwise, the callable should have the signature bool = condition(value) where value is the value associated with the sub-key, and the return value indicates whether the condition is satisfied.

  • timeout (float, optional) – If specified and the condition is not met within the time limit, an exception is thrown.

  • cancel_future (future, optional) – If not None, a future object (e.g. concurrent.futures.Future or trollius.Future). If cancel_future.done() is true before the timeout, raises CancelledError. In the current implementation, it is only polled once a second, rather than waited for.

Raises
  • TimeoutError – if a timeout was specified and was exceeded

  • CancelledError – if a cancellation future was specified and done

  • ImmutableKeyError – if the key exists (or is created while waiting) but is not indexed

get(key: Union[bytes, str], default: Any = None, return_encoded: bool = False) → Any

Get a single value from the model.

Parameters
  • key (str or bytes) – Key to retrieve

  • default (object, optional) – Object to return if key not found

  • return_encoded (bool, optional) – Default ‘False’ - return values are first decoded from internal storage ‘True’ - return values are retained in encoded form.

Returns

for non-immutable key return the most recent value

Return type

value

get_range(key: Union[bytes, str], st: Optional[float] = None, et: Optional[float] = None, include_previous: Optional[bool] = None, include_end: bool = False, return_encoded: bool = False) → List[Tuple[Any, float]]

Get the range of values specified by the key and timespec from the model.

Parameters
  • key (str or bytes) – Database key to extract

  • st (float, optional) – Start time, default returns the most recent value prior to et

  • et (float, optional) – End time, defaults to the end of time

  • include_previous (bool, optional) – If True, the method also returns the last value prior to the start time (if any). This defaults to False if st is specified and True if st is unspecified.

  • include_end (bool, optional) – If False (default), returns values in [st, et), otherwise [st, et].

  • return_encoded (bool, optional) – Default ‘False’ - return values are first decoded from internal storage ‘True’ - return values are retained in encoded form.

Returns

list of (value, time) records in specified time range

Return type

list

Raises
  • KeyError – if key does not exist (with any prefix)

  • ImmutableKeyError – if key refers to an existing key which is not mutable

Notes

By default, timestamps exactly equal to the start time are included, while those equal to the end time are excluded.

Usage examples:

get_range(‘key’)

returns most recent record

get_range(‘key’,st=0)

returns list of all records in the telescope state database

get_range(‘key’,st=0,et=t1)

returns list of all records before time t1

get_range(‘key’,st=t0,et=t1)

returns list of all records in the range [t0,t1)

get_range(‘key’,st=t0)

returns list of all records after time t0

get_range(‘key’,et=t1)

returns the most recent record prior to time t1

katsdptelstate.telescope_state.ensure_binary(s, encoding='utf-8', *, errors='surrogateescape')

Coerce s to six.binary_type.

For Python 2:
  • unicode -> encoded to str

  • str -> str

For Python 3:
  • str -> encoded to bytes

  • bytes -> bytes

katsdptelstate.telescope_state.ensure_str(s, encoding='utf-8', *, errors='surrogateescape')

Coerce s to str.

For Python 2:
  • unicode -> encoded to str

  • str -> str

For Python 3:
  • str -> str

  • bytes -> decoded to str

katsdptelstate.telescope_state_base module

katsdptelstate.telescope_state_base.logger = <Logger katsdptelstate.telescope_state_base (WARNING)>
katsdptelstate.telescope_state_base.check_immutable_change(key: str, old_enc: bytes, new_enc: bytes, new: Any) → None

Check if an immutable is being changed to the same value.

If not, raise ImmutableKeyError, otherwise just log a message. This is intended to be called by subclasses.

Parameters
  • key (str) – Human-readable version of the key

  • old_enc (bytes) – Previous value, encoded

  • new_enc (bytes) – New value, encoded

  • new – New value, prior to encoding

class katsdptelstate.telescope_state_base.TelescopeStateBase(backend: Optional[_B] = None, prefixes: Tuple[Union[bytes, str], ...] = (b'',), base: Optional[_T] = None)

Bases: typing.Generic

Base class for synchronous and asynchronous telescope state classes.

It contains parts common to katsdptelstate.TelescopeState and katsdptelstate.aio.TelescopeState, mainly relating to setting up namespaces.

SEPARATOR = '_'
SEPARATOR_BYTES = b'_'
property prefixes

The active key prefixes as a tuple of strings.

property backend
classmethod join(*names) → str

Join string components of key with supported separator.

view(name: Union[bytes, str], add_separator: bool = True, exclusive: bool = False) → _T

Create a view with an extra name in the list of namespaces.

Returns a new view with name added as the first prefix, or the only prefix if exclusive is true. If name is non-empty and does not end with the separator, it is added (unless add_separator is false).

root() → _T

Create a view containing only the root namespace.

katsdptelstate.telescope_state_base.ensure_binary(s, encoding='utf-8', *, errors='surrogateescape')

Coerce s to six.binary_type.

For Python 2:
  • unicode -> encoded to str

  • str -> str

For Python 3:
  • str -> encoded to bytes

  • bytes -> bytes

katsdptelstate.telescope_state_base.ensure_str(s, encoding='utf-8', *, errors='surrogateescape')

Coerce s to str.

For Python 2:
  • unicode -> encoded to str

  • str -> str

For Python 3:
  • str -> str

  • bytes -> decoded to str

katsdptelstate.utils module

class katsdptelstate.utils.KeyType

Bases: enum.Enum

An enumeration.

IMMUTABLE = 1
MUTABLE = 2
INDEXED = 3
katsdptelstate.utils.ensure_str(s, encoding='utf-8', *, errors='surrogateescape')

Coerce s to str.

For Python 2:
  • unicode -> encoded to str

  • str -> str

For Python 3:
  • str -> str

  • bytes -> decoded to str

katsdptelstate.utils.ensure_binary(s, encoding='utf-8', *, errors='surrogateescape')

Coerce s to six.binary_type.

For Python 2:
  • unicode -> encoded to str

  • str -> str

For Python 3:
  • str -> encoded to bytes

  • bytes -> bytes

katsdptelstate.utils.display_str(s) → str

Return most human-readable and yet accurate version of s.

katsdptelstate.utils.pack_query_timestamp(time: float, is_end: bool, include_end: bool = False) → bytes

Create a query value for a ZRANGEBYLEX query.

When packing the time for the start of a range, set is_end and include_end to False. When packing the time for the end of a range, set is_end to True, and include_end indicates whether the endpoint is inclusive. The latter is implemented by incrementing the time by the smallest possible amount and then treating it as exclusive.

katsdptelstate.utils.pack_timestamp(timestamp: float) → bytes

Encode a timestamp to a bytes that sorts correctly

katsdptelstate.utils.split_timestamp(packed: bytes) → Tuple[bytes, float]

Split out the value and timestamp from a packed item.

The item contains 8 bytes with the timestamp in big-endian IEEE-754 double precision, followed by the value.

Module contents