API Reference Guide

For information on constants used w/ SNAPpy scripts, see the SNAP Reference.

  1. Copyright 2019 Synapse Wireless, Inc.

SNAPstack: the SNAP2 Asyncio API

class Snap(address: bytes = _Nothing.NOTHING, config: SnapConfig = _Nothing.NOTHING, node: Node = _Nothing.NOTHING, rsvp_stack: List[Rsvp] = _Nothing.NOTHING, data_handler: Callable = None, sniffer_handler: Callable = None, unexpected_rpc_handler: Callable = None, interfaces: Dict[str, Any] = _Nothing.NOTHING, tcp_server: TcpServer = None, event_dispatch_table: Dict[type, Callable] = _Nothing.NOTHING, next_gensym_suffix: int = 0, next_hook_id: int = 0, interface_debug_handler: Optional[Callable] = None, borrowed_interfaces: Set[str] = _Nothing.NOTHING, tx_id_num=_Nothing.NOTHING, tx_in_flight: Dict[int, Callable] = _Nothing.NOTHING, poll_task=None)

Method generated by attrs for class Snap.

add_rpc_handler(rpc_name: bytes, handler: Callable, **kwargs) Rsvp

Add a handler for incoming RPCs. :param rpc_name: The name of RPC :param handler: The function to call on receipt of the named RPC :return: Reservation for the handler (used to remove it)

async call_dmcast_rpc(targets: Sequence[bytes], func: bytes, args: Tuple[Union[None, bool, int, bytes, CFunc, Sequence[bytes]], ...], group: Optional[int] = None, ttl: Optional[int] = None, delay: Optional[int] = None, timeout_ms: Optional[int] = None, tries: Optional[int] = None, callback_name: Optional[bytes] = None, node_limit: int = 0, chunk_delay_ms: int = 0, skip_initial_slot: bool = True, outgoing_link_id: Optional[str] = None, gensym: Optional[bytes] = None) Dict[bytes, Optional[DmcastRpcPacket]]

Send a callback directed multicast RPC with automatic callback, timeout, and retry handling.

Group, TTL, and delay use defaults unless otherwise specified.

Timeout and tries use defaults unless otherwise specified.

If a callback name is not specified, a gensym name will be used w/ the dmCallout builtin.

async call_ucast_rpc(target: bytes, func: bytes, args: Tuple[Union[None, bool, int, bytes, CFunc, Sequence[bytes]], ...], timeout_ms: Optional[int] = None, tries: Optional[int] = None, callback_name: Optional[bytes] = None) Optional[RpcPacket]

Send a callback RPC with automatic callback, timeout, and retry handling.

Timeout and tries use defaults unless otherwise specified.

If a callback name is not specified, a gensym name will be used w/ the callback builtin.

async check_call_ucast_rpc(*args, **kwargs) RpcPacket

Convenience wrapper that raises an exception if no response is received.

async check_serial_bridge_address(device: str) bytes

Convenience wrapper that throws an exception if the bridge cannot be found.

close_serial(device: str)

Close a serial interface

close_tcp(host='127.0.0.1', port=48625)

Close a TCP interface

async get_serial_bridge_address(device: str) Optional[bytes]

Gets the MAC address of the bridge node connected to the provided (already opened) serial device :param device: A currently opened serial device :return: The address of the bridge node, or None if no response is received from the bridge node

open_serial(device: str, serial_type: SerialType, baudrate: int = 38400, multicast_forward_group_mask: int = 1)

Open a serial interface

open_tcp(host: str = '127.0.0.1', port: int = 48625, user: str = 'public', password: str = 'public', multicast_forward_group_mask: int = 1, tcp_keepalives: bool = False) Future

Open a TCP (client) interface

poll()

Do things we need to do periodically.

remove_rpc_handler(reservation: Rsvp) bool

Remove a handler for incoming RPCs. :param reservation: The reservation returned when the handler was added :return: True if the reservation exists and the associated handler was removed

async send_dmcast_rpc(targets: Sequence[bytes], func: bytes, args: Tuple[Union[None, bool, int, bytes, CFunc, Sequence[bytes]], ...], group: Optional[int] = None, ttl: Optional[int] = None, delay: Optional[int] = None, shots: int = 1, shot_delay_ms: Optional[int] = None, outgoing_link_id: Optional[str] = None) bool

Send a dmcast RPC.

Group, TTL, and Delay use defaults unless otherwise specified.

If shots > 1, will send multiple times w/ shot_delay_ms between shots.

async send_mcast_rpc(func: bytes, args: Tuple[Union[None, bool, int, bytes, CFunc, Sequence[bytes]], ...], group: Optional[int] = None, ttl: Optional[int] = None, shots: int = 1, shot_delay_ms: Optional[int] = None) bool

Send a multicast RPC.

Group and TTL use defaults unless otherwise specified.

If shots > 1, will send multiple times w/ shot_delay_ms between shots.

async send_ucast_rpc(target: bytes, func: bytes, args: Tuple[Union[None, bool, int, bytes, CFunc, Sequence[bytes]], ...]) bool

Send a unicast RPC.

set_crypto(encryption_key: Optional[bytes] = None) None

Enable/disable encryption.

Parameters

encryption_key – 16-byte string to enable, None to disable

set_handler_targets(reservation: Rsvp, targets: Sequence[bytes], node_limit: int = 0)

Sets the list of targets and the node limit for the given handler :param reservation: The reservation returned when the handler was added :param targets: List of allowed targets :return: True if the handler_id exists and was updated

set_ttl(ttl: int) None

Sets up a new TTL

start()

Creates an (internal) asyncio Task that runs our poll loop.

stop()

Stops the (internal) poll task.

class SnapConfig(default_mcast_group: int = 1, default_mcast_ttl: int = 5, default_dmcast_delay: int = 50, default_call_tries: int = 3, default_call_timeout_ms: int = 2000, default_call_dmcast_node_limit: int = 4, default_shot_delay_ms: int = 300)

Method generated by attrs for class SnapConfig.

class CodecConfig(check_packet_length: bool = True, crypto=_Nothing.NOTHING, append_rpc_crc: bool = True, validate_rpc_crc: bool = False, validate_rpc_crc_if_present: bool = True, allocate_packet_crc: bool = True)

Method generated by attrs for class CodecConfig.