Skip to content

pywry.state

State management interfaces and implementations.


Factory Functions

pywry.state.get_widget_store cached

get_widget_store() -> WidgetStore

Get the configured widget store instance.

Uses Redis backend in deploy mode if configured, otherwise memory.

RETURNS DESCRIPTION
WidgetStore

The widget store instance.

pywry.state.get_event_bus cached

get_event_bus() -> EventBus

Get the configured event bus instance.

Uses Redis Pub/Sub in deploy mode if configured, otherwise memory.

RETURNS DESCRIPTION
EventBus

The event bus instance.

pywry.state.get_connection_router cached

get_connection_router() -> ConnectionRouter

Get the configured connection router instance.

Uses Redis in deploy mode if configured, otherwise memory.

RETURNS DESCRIPTION
ConnectionRouter

The connection router instance.

pywry.state.get_session_store cached

get_session_store() -> SessionStore

Get the configured session store instance.

Uses Redis in deploy mode if configured, otherwise memory.

RETURNS DESCRIPTION
SessionStore

The session store instance.

pywry.state.get_callback_registry

get_callback_registry() -> CallbackRegistry

Get the global callback registry instance.

RETURNS DESCRIPTION
CallbackRegistry

The singleton callback registry.

pywry.state.get_worker_id

get_worker_id() -> str

Get the unique worker ID for this process.

The worker ID is used for connection routing and callback dispatch in multi-worker deployments.

RETURNS DESCRIPTION
str

Unique worker identifier.

pywry.state.get_state_backend

get_state_backend() -> StateBackend

Get the configured state backend.

RETURNS DESCRIPTION
StateBackend

The configured backend (MEMORY or REDIS).

pywry.state.is_deploy_mode

is_deploy_mode() -> bool

Check if running in deploy mode.

Deploy mode enables: - Redis state backend (if configured) - Multi-worker support - External state storage - Session/RBAC support

RETURNS DESCRIPTION
bool

True if running in deploy mode.

pywry.state.clear_state_caches

clear_state_caches() -> None

Clear all cached state store instances.

Call this to force re-creation of stores (e.g., after config change).


Abstract Interfaces

pywry.state.WidgetStore

Bases: ABC

Abstract widget storage interface.

Handles storage and retrieval of widget HTML content and metadata. Implementations must be thread-safe and support async operations.

Functions

register abstractmethod async

register(widget_id: str, html: str, token: str | None = None, owner_worker_id: str | None = None, metadata: dict[str, Any] | None = None) -> None

Register a widget with its HTML content.

PARAMETER DESCRIPTION
widget_id

Unique identifier for the widget.

TYPE: str

html

The HTML content of the widget.

TYPE: str

token

Optional per-widget authentication token.

TYPE: str or None DEFAULT: None

owner_worker_id

ID of the worker that created this widget.

TYPE: str or None DEFAULT: None

metadata

Additional metadata (title, theme, etc.).

TYPE: dict[str, Any] or None DEFAULT: None

get abstractmethod async

get(widget_id: str) -> WidgetData | None

Get complete widget data.

PARAMETER DESCRIPTION
widget_id

The widget ID to retrieve.

TYPE: str

RETURNS DESCRIPTION
WidgetData or None

The widget data if found, None otherwise.

get_html abstractmethod async

get_html(widget_id: str) -> str | None

Get widget HTML content.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
str or None

The HTML content if found, None otherwise.

get_token abstractmethod async

get_token(widget_id: str) -> str | None

Get widget authentication token.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
str or None

The token if set, None otherwise.

exists abstractmethod async

exists(widget_id: str) -> bool

Check if a widget exists.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
bool

True if the widget exists.

delete abstractmethod async

delete(widget_id: str) -> bool

Delete a widget.

PARAMETER DESCRIPTION
widget_id

The widget ID to delete.

TYPE: str

RETURNS DESCRIPTION
bool

True if the widget was deleted, False if it didn't exist.

list_active abstractmethod async

list_active() -> list[str]

List all active widget IDs.

RETURNS DESCRIPTION
list[str]

List of active widget IDs.

update_html abstractmethod async

update_html(widget_id: str, html: str) -> bool

Update widget HTML content.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

html

The new HTML content.

TYPE: str

RETURNS DESCRIPTION
bool

True if updated, False if widget doesn't exist.

update_token abstractmethod async

update_token(widget_id: str, token: str) -> bool

Update widget authentication token.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

token

The new authentication token.

TYPE: str

RETURNS DESCRIPTION
bool

True if updated, False if widget doesn't exist.

count abstractmethod async

count() -> int

Get the number of active widgets.

RETURNS DESCRIPTION
int

Number of active widgets.

pywry.state.EventBus

Bases: ABC

Abstract event publishing interface.

Handles cross-worker event delivery for callback dispatch and real-time updates.

Functions

publish abstractmethod async

publish(channel: str, event: EventMessage) -> None

Publish an event to a channel.

PARAMETER DESCRIPTION
channel

The channel name (e.g., "widget:{id}", "worker:{id}").

TYPE: str

event

The event to publish.

TYPE: EventMessage

subscribe abstractmethod async

subscribe(channel: str) -> AsyncIterator[EventMessage]

Subscribe to events on a channel.

PARAMETER DESCRIPTION
channel

The channel name.

TYPE: str

YIELDS DESCRIPTION
EventMessage

Events received on the channel.

unsubscribe abstractmethod async

unsubscribe(channel: str) -> None

Unsubscribe from a channel.

PARAMETER DESCRIPTION
channel

The channel to unsubscribe from.

TYPE: str

pywry.state.ConnectionRouter

Bases: ABC

Abstract connection routing interface.

Tracks which worker owns which WebSocket connection, enabling cross-worker message routing.

Functions

register_connection abstractmethod async

register_connection(widget_id: str, worker_id: str, user_id: str | None = None, session_id: str | None = None) -> None

Register that a widget is connected to a specific worker.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

worker_id

The worker ID that owns this connection.

TYPE: str

user_id

Optional user ID for RBAC.

TYPE: str or None DEFAULT: None

session_id

Optional session ID for tracking.

TYPE: str or None DEFAULT: None

get_connection_info abstractmethod async

get_connection_info(widget_id: str) -> ConnectionInfo | None

Get connection information for a widget.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
ConnectionInfo or None

Connection information if connected, None otherwise.

get_owner abstractmethod async

get_owner(widget_id: str) -> str | None

Get the worker ID that owns this widget's connection.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
str or None

The worker ID if connected, None otherwise.

refresh_heartbeat abstractmethod async

refresh_heartbeat(widget_id: str) -> bool

Refresh the heartbeat timestamp for a connection.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
bool

True if refreshed, False if connection doesn't exist.

unregister_connection abstractmethod async

unregister_connection(widget_id: str) -> bool

Unregister a connection.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
bool

True if unregistered, False if didn't exist.

list_worker_connections abstractmethod async

list_worker_connections(worker_id: str) -> list[str]

List all widget IDs connected to a specific worker.

PARAMETER DESCRIPTION
worker_id

The worker ID.

TYPE: str

RETURNS DESCRIPTION
list[str]

List of widget IDs connected to this worker.

pywry.state.SessionStore

Bases: ABC

Abstract session storage interface for RBAC support.

Handles user sessions and access control for multi-tenant deployments.

Functions

create_session abstractmethod async

create_session(session_id: str, user_id: str, roles: list[str] | None = None, ttl: int | None = None, metadata: dict[str, Any] | None = None) -> UserSession

Create a new user session.

PARAMETER DESCRIPTION
session_id

Unique session identifier.

TYPE: str

user_id

User identifier.

TYPE: str

roles

User roles for access control.

TYPE: list[str] or None DEFAULT: None

ttl

Time-to-live in seconds (None for no expiry).

TYPE: int or None DEFAULT: None

metadata

Additional session metadata.

TYPE: dict[str, Any] or None DEFAULT: None

RETURNS DESCRIPTION
UserSession

The created session.

get_session abstractmethod async

get_session(session_id: str) -> UserSession | None

Get a session by ID.

PARAMETER DESCRIPTION
session_id

The session ID.

TYPE: str

RETURNS DESCRIPTION
UserSession or None

The session if found and not expired, None otherwise.

validate_session abstractmethod async

validate_session(session_id: str) -> bool

Validate a session is active and not expired.

PARAMETER DESCRIPTION
session_id

The session ID.

TYPE: str

RETURNS DESCRIPTION
bool

True if the session is valid.

delete_session abstractmethod async

delete_session(session_id: str) -> bool

Delete a session.

PARAMETER DESCRIPTION
session_id

The session ID.

TYPE: str

RETURNS DESCRIPTION
bool

True if deleted, False if didn't exist.

refresh_session abstractmethod async

refresh_session(session_id: str, extend_ttl: int | None = None) -> bool

Refresh a session's expiry time.

PARAMETER DESCRIPTION
session_id

The session ID.

TYPE: str

extend_ttl

New TTL in seconds (None to use original TTL).

TYPE: int or None DEFAULT: None

RETURNS DESCRIPTION
bool

True if refreshed, False if session doesn't exist.

list_user_sessions abstractmethod async

list_user_sessions(user_id: str) -> list[UserSession]

List all sessions for a user.

PARAMETER DESCRIPTION
user_id

The user ID.

TYPE: str

RETURNS DESCRIPTION
list[UserSession]

List of active sessions for this user.

check_permission abstractmethod async

check_permission(session_id: str, resource_type: str, resource_id: str, permission: str) -> bool

Check if a session has permission to access a resource.

PARAMETER DESCRIPTION
session_id

The session ID.

TYPE: str

resource_type

The type of resource (e.g., "widget", "user").

TYPE: str

resource_id

The resource identifier.

TYPE: str

permission

The required permission (e.g., "read", "write", "admin").

TYPE: str

RETURNS DESCRIPTION
bool

True if the session has the required permission.


Data Types

pywry.state.WidgetData dataclass

WidgetData(widget_id: str, html: str, token: str | None = None, created_at: float = 0.0, owner_worker_id: str | None = None, metadata: dict[str, Any] = dict())

Widget data stored in the state store.

ATTRIBUTE DESCRIPTION
widget_id

Unique identifier for the widget.

TYPE: str

html

The HTML content of the widget.

TYPE: str

token

Optional per-widget authentication token.

TYPE: str or None

created_at

Unix timestamp when the widget was created.

TYPE: float

owner_worker_id

ID of the worker that created/owns this widget's callbacks.

TYPE: str or None

metadata

Additional metadata (title, theme, etc.).

TYPE: dict[str, Any]

pywry.state.EventMessage dataclass

EventMessage(event_type: str, widget_id: str, data: dict[str, Any], source_worker_id: str, target_worker_id: str | None = None, timestamp: float = 0.0, message_id: str = '')

Event message for cross-worker communication.

ATTRIBUTE DESCRIPTION
event_type

The type of event (e.g., "click", "cellValueChanged").

TYPE: str

widget_id

The target widget ID.

TYPE: str

data

The event payload.

TYPE: dict[str, Any]

source_worker_id

The worker that sent this event.

TYPE: str

target_worker_id

The specific worker to receive this event (None for broadcast).

TYPE: str or None

timestamp

Unix timestamp when the event was created.

TYPE: float

message_id

Unique identifier for this message.

TYPE: str

pywry.state.ConnectionInfo dataclass

ConnectionInfo(widget_id: str, worker_id: str, connected_at: float = 0.0, last_heartbeat: float = 0.0, user_id: str | None = None, session_id: str | None = None)

Information about a WebSocket connection.

ATTRIBUTE DESCRIPTION
widget_id

The widget this connection is for.

TYPE: str

worker_id

The worker ID that owns this connection.

TYPE: str

connected_at

Unix timestamp when the connection was established.

TYPE: float

last_heartbeat

Unix timestamp of the last heartbeat/activity.

TYPE: float

user_id

Optional user ID for RBAC.

TYPE: str or None

session_id

Optional session ID for tracking.

TYPE: str or None

pywry.state.UserSession dataclass

UserSession(session_id: str, user_id: str, roles: list[str] = list(), created_at: float = 0.0, expires_at: float | None = None, metadata: dict[str, Any] = dict())

User session information for RBAC support.

ATTRIBUTE DESCRIPTION
session_id

Unique session identifier.

TYPE: str

user_id

User identifier.

TYPE: str

roles

User roles for access control.

TYPE: list[str]

created_at

Unix timestamp when the session was created.

TYPE: float

expires_at

Unix timestamp when the session expires (None for no expiry).

TYPE: float or None

metadata

Additional session metadata.

TYPE: dict[str, Any]

pywry.state.StateBackend

Bases: str, Enum

Available state storage backends.


Callback Registry

pywry.state.CallbackRegistry

CallbackRegistry()

In-process callback registry.

Callbacks cannot be serialized to Redis, so they remain local to each worker. The registry tracks which callbacks are registered per widget and event type.

Initialize the callback registry.

Functions

register async

register(widget_id: str, event_type: str, callback: CallbackFunc | AsyncCallbackFunc) -> None

Register a callback for a widget event.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

event_type

The event type (e.g., "click", "cellValueChanged").

TYPE: str

callback

The callback function to execute.

TYPE: CallbackFunc

get async

get(widget_id: str, event_type: str) -> CallbackRegistration | None

Get a callback registration.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

event_type

The event type.

TYPE: str

RETURNS DESCRIPTION
CallbackRegistration or None

The registration if found.

has_widget async

has_widget(widget_id: str) -> bool

Check if any callbacks are registered for a widget.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
bool

True if callbacks exist for this widget.

has_callback async

has_callback(widget_id: str, event_type: str) -> bool

Check if a specific callback is registered.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

event_type

The event type.

TYPE: str

RETURNS DESCRIPTION
bool

True if the callback exists.

invoke async

invoke(widget_id: str, event_type: str, data: dict[str, Any]) -> tuple[bool, Any]

Invoke a callback if it exists locally.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

event_type

The event type.

TYPE: str

data

The event data to pass to the callback.

TYPE: dict

RETURNS DESCRIPTION
tuple[bool, Any]

(success, result) - success is True if callback was found and executed.

unregister async

unregister(widget_id: str, event_type: str) -> bool

Unregister a specific callback.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

event_type

The event type.

TYPE: str

RETURNS DESCRIPTION
bool

True if the callback was removed.

unregister_widget async

unregister_widget(widget_id: str) -> int

Unregister all callbacks for a widget.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
int

Number of callbacks removed.

list_widget_events async

list_widget_events(widget_id: str) -> list[str]

List all event types registered for a widget.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
list[str]

List of event types.

list_widgets async

list_widgets() -> list[str]

List all widget IDs with registered callbacks.

RETURNS DESCRIPTION
list[str]

List of widget IDs.

get_stats async

get_stats() -> dict[str, Any]

Get registry statistics.

RETURNS DESCRIPTION
dict

Statistics about registered callbacks.

pywry.state.reset_callback_registry

reset_callback_registry() -> None

Reset the global callback registry (for testing).


Memory Implementations

pywry.state.MemoryWidgetStore

MemoryWidgetStore()

Bases: WidgetStore

In-memory widget store for single-process deployments.

Thread-safe implementation using asyncio locks.

Initialize the memory widget store.

Functions

register async

register(widget_id: str, html: str, token: str | None = None, owner_worker_id: str | None = None, metadata: dict[str, Any] | None = None) -> None

Register a widget with its HTML content.

get async

get(widget_id: str) -> WidgetData | None

Get complete widget data.

get_html async

get_html(widget_id: str) -> str | None

Get widget HTML content.

get_token async

get_token(widget_id: str) -> str | None

Get widget authentication token.

exists async

exists(widget_id: str) -> bool

Check if a widget exists.

delete async

delete(widget_id: str) -> bool

Delete a widget.

list_active async

list_active() -> list[str]

List all active widget IDs.

update_html async

update_html(widget_id: str, html: str) -> bool

Update widget HTML content.

update_token async

update_token(widget_id: str, token: str) -> bool

Update widget authentication token.

count async

count() -> int

Get the number of active widgets.

pywry.state.MemoryEventBus

MemoryEventBus()

Bases: EventBus

In-memory event bus for single-process deployments.

Uses asyncio.Queue for inter-task communication.

Initialize the memory event bus.

Functions

publish async

publish(channel: str, event: EventMessage) -> None

Publish an event to a channel.

subscribe async

subscribe(channel: str) -> AsyncIterator[EventMessage]

Subscribe to events on a channel.

unsubscribe async

unsubscribe(channel: str) -> None

Unsubscribe from a channel (clears all subscriptions).

pywry.state.MemoryConnectionRouter

MemoryConnectionRouter()

Bases: ConnectionRouter

In-memory connection router for single-process deployments.

Initialize the memory connection router.

Functions

register_connection async

register_connection(widget_id: str, worker_id: str, user_id: str | None = None, session_id: str | None = None) -> None

Register that a widget is connected to a specific worker.

get_connection_info async

get_connection_info(widget_id: str) -> ConnectionInfo | None

Get connection information for a widget.

get_owner async

get_owner(widget_id: str) -> str | None

Get the worker ID that owns this widget's connection.

refresh_heartbeat async

refresh_heartbeat(widget_id: str) -> bool

Refresh the heartbeat timestamp for a connection.

unregister_connection async

unregister_connection(widget_id: str) -> bool

Unregister a connection.

list_worker_connections async

list_worker_connections(worker_id: str) -> list[str]

List all widget IDs connected to a specific worker.

pywry.state.MemorySessionStore

MemorySessionStore()

Bases: SessionStore

In-memory session store for RBAC support.

For single-process deployments and development.

Initialize the memory session store.

Functions

create_session async

create_session(session_id: str, user_id: str, roles: list[str] | None = None, ttl: int | None = None, metadata: dict[str, Any] | None = None) -> UserSession

Create a new user session.

get_session async

get_session(session_id: str) -> UserSession | None

Get a session by ID.

validate_session async

validate_session(session_id: str) -> bool

Validate a session is active and not expired.

delete_session async

delete_session(session_id: str) -> bool

Delete a session.

refresh_session async

refresh_session(session_id: str, extend_ttl: int | None = None) -> bool

Refresh a session's expiry time.

list_user_sessions async

list_user_sessions(user_id: str) -> list[UserSession]

List all sessions for a user.

check_permission async

check_permission(session_id: str, _resource_type: str, _resource_id: str, permission: str) -> bool

Check if a session has permission to access a resource.

Currently implements simple role-based access control. Resource-specific permissions can be added via metadata.

set_role_permissions

set_role_permissions(role: str, permissions: set[str]) -> None

Configure permissions for a role (sync for setup).


Server State

pywry.state.ServerStateManager

ServerStateManager()

Unified state manager for PyWry server.

Automatically selects the appropriate storage backend based on deploy mode configuration. Provides both sync and async interfaces where appropriate.

ATTRIBUTE DESCRIPTION
deploy_mode

True if running in horizontally scaled deploy mode.

TYPE: bool

worker_id

Unique identifier for this worker process.

TYPE: str

Initialize the state manager.

Attributes

deploy_mode property

deploy_mode: bool

Check if running in deploy mode.

worker_id property

worker_id: str

Get this worker's unique ID.

widgets property

widgets: dict[str, dict[str, Any]]

Get local widgets dict.

Note: In deploy mode, this only returns widgets that were created by this worker and may not reflect the full state. Use async methods for accurate state in deploy mode.

widget_tokens property

widget_tokens: dict[str, str]

Get local widget tokens dict.

connections property

connections: dict[str, Any]

Get local connections dict.

event_queues property

event_queues: dict[str, Queue[Any]]

Get local event queues dict.

Functions

register_widget async

register_widget(widget_id: str, html: str, token: str | None = None, metadata: dict[str, Any] | None = None) -> None

Register a widget with its HTML content.

In deploy mode, this stores in Redis. In local mode, stores in-memory.

PARAMETER DESCRIPTION
widget_id

Unique widget identifier.

TYPE: str

html

The widget's HTML content.

TYPE: str

token

Optional per-widget authentication token.

TYPE: str | None DEFAULT: None

metadata

Additional metadata (title, theme, etc.).

TYPE: dict | None DEFAULT: None

get_widget async

get_widget(widget_id: str) -> WidgetData | None

Get widget data by ID.

PARAMETER DESCRIPTION
widget_id

The widget ID to retrieve.

TYPE: str

RETURNS DESCRIPTION
WidgetData | None

Widget data if found.

get_widget_html async

get_widget_html(widget_id: str) -> str | None

Get widget HTML content.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
str | None

The HTML content if widget exists.

update_widget_html async

update_widget_html(widget_id: str, html: str) -> bool

Update widget HTML content.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

html

New HTML content.

TYPE: str

RETURNS DESCRIPTION
bool

True if update succeeded.

widget_exists async

widget_exists(widget_id: str) -> bool

Check if a widget exists.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
bool

True if widget exists.

remove_widget async

remove_widget(widget_id: str) -> bool

Remove a widget.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
bool

True if widget was removed.

list_widgets async

list_widgets() -> list[str]

List all widget IDs.

RETURNS DESCRIPTION
list[str]

List of widget IDs.

register_connection async

register_connection(widget_id: str, websocket: WebSocket) -> Queue[Any]

Register a WebSocket connection for a widget.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

websocket

The WebSocket connection.

TYPE: WebSocket

RETURNS DESCRIPTION
Queue

Event queue for this connection.

unregister_connection async

unregister_connection(widget_id: str) -> None

Unregister a WebSocket connection.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

get_connection async

get_connection(widget_id: str) -> WebSocket | None

Get the WebSocket connection for a widget (local only).

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
WebSocket | None

The connection if it exists on this worker.

get_event_queue async

get_event_queue(widget_id: str) -> Queue[Any] | None

Get the event queue for a widget.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

RETURNS DESCRIPTION
Queue | None

The event queue if widget is connected on this worker.

register_callback async

register_callback(widget_id: str, event_type: str, callback: Any) -> None

Register a callback for widget events.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

event_type

The event type to handle.

TYPE: str

callback

The callback function.

TYPE: callable

get_callback async

get_callback(widget_id: str, event_type: str) -> Any | None

Get a callback for a widget event.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

event_type

The event type.

TYPE: str

RETURNS DESCRIPTION
callable | None

The callback if registered.

invoke_callback async

invoke_callback(widget_id: str, event_type: str, data: dict[str, Any]) -> tuple[bool, Any]

Invoke a callback for a widget event.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

event_type

The event type.

TYPE: str

data

Event data.

TYPE: dict

RETURNS DESCRIPTION
tuple[bool, Any]

(success, result) tuple.

broadcast_event async

broadcast_event(widget_id: str, event_type: str, data: dict[str, Any]) -> None

Broadcast an event to a widget.

In deploy mode, uses Redis Pub/Sub to reach the correct worker. In local mode, sends directly to the local event queue.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

event_type

The event type.

TYPE: str

data

Event data.

TYPE: dict

send_to_widget async

send_to_widget(widget_id: str, event: dict[str, Any]) -> bool

Send an event to a specific widget's WebSocket.

PARAMETER DESCRIPTION
widget_id

The widget ID.

TYPE: str

event

The event to send.

TYPE: dict

RETURNS DESCRIPTION
bool

True if event was queued for sending.

create_session async

create_session(user_id: str, roles: list[str] | None = None, metadata: dict[str, Any] | None = None) -> str

Create a new user session.

PARAMETER DESCRIPTION
user_id

The user identifier.

TYPE: str

roles

User roles for RBAC.

TYPE: list[str] | None DEFAULT: None

metadata

Additional session metadata.

TYPE: dict | None DEFAULT: None

RETURNS DESCRIPTION
str

The session ID.

get_session async

get_session(session_id: str) -> Any | None

Get a session by ID.

PARAMETER DESCRIPTION
session_id

The session ID.

TYPE: str

RETURNS DESCRIPTION
UserSession | None

The session if found.

cleanup async

cleanup() -> None

Clean up all state and connections.

pywry.state.get_server_state

get_server_state() -> ServerStateManager

Get the global server state manager.

RETURNS DESCRIPTION
ServerStateManager

The singleton state manager instance.

pywry.state.reset_server_state

reset_server_state() -> None

Reset the server state manager (for testing).


Sync Helpers

pywry.state.run_async

run_async(coro: Coroutine[Any, Any, T], timeout: float | None = 5.0) -> T

Run an async coroutine from sync code.

Uses the server's event loop if available, otherwise creates a temporary one. NOTE: This function CANNOT be called from within an async context on the server loop - it will deadlock. Use await directly in async code.

PARAMETER DESCRIPTION
coro

The coroutine to run.

TYPE: Coroutine

timeout

Timeout in seconds. Default is 5.0.

TYPE: float DEFAULT: 5.0

RETURNS DESCRIPTION
T

The result of the coroutine.

RAISES DESCRIPTION
TimeoutError

If the operation times out.

RuntimeError

If called from within the server's event loop (would deadlock).

pywry.state.run_async_fire_and_forget

run_async_fire_and_forget(coro: Coroutine[Any, Any, Any]) -> None

Schedule an async coroutine without waiting for result.

PARAMETER DESCRIPTION
coro

The coroutine to run.

TYPE: Coroutine


Additional Types

pywry.state.callbacks.CallbackRegistration dataclass

CallbackRegistration(widget_id: str, event_type: str, callback: CallbackFunc | AsyncCallbackFunc, is_async: bool = False, created_at: float = time(), invoke_count: int = 0, last_invoked: float | None = None)

Tracks a callback registration.

Attributes

widget_id instance-attribute

widget_id: str

event_type instance-attribute

event_type: str

callback instance-attribute

callback: CallbackFunc | AsyncCallbackFunc

is_async class-attribute instance-attribute

is_async: bool = False

created_at class-attribute instance-attribute

created_at: float = field(default_factory=time)

invoke_count class-attribute instance-attribute

invoke_count: int = 0

last_invoked class-attribute instance-attribute

last_invoked: float | None = None

Functions


Memory Store Factory

pywry.state.memory.create_memory_stores

Create all in-memory state stores.

RETURNS DESCRIPTION
tuple

(widget_store, event_bus, connection_router, session_store)