Skip to content

pywry.state

State management interfaces and implementations.


Factory Functions

pywry.state.get_widget_store cached

get_widget_store() -> WidgetStore

Get the configured widget store instance.

Uses Redis backend in deploy mode if configured, otherwise memory.

Returns:

Type Description
WidgetStore

The widget store instance.

Notes

The returned store is cached per process and re-created only after calling clear_state_caches().

pywry.state.get_event_bus cached

get_event_bus() -> EventBus

Get the configured event bus instance.

Uses Redis Pub/Sub in deploy mode if configured, otherwise memory.

Returns:

Type Description
EventBus

The event bus instance.

Notes

The returned bus is cached per process and may be backed by Redis Pub/Sub in deploy mode.

pywry.state.get_connection_router cached

get_connection_router() -> ConnectionRouter

Get the configured connection router instance.

Uses Redis in deploy mode if configured, otherwise memory.

Returns:

Type Description
ConnectionRouter

The connection router instance.

Notes

The returned router is cached per process and uses the configured backend.

pywry.state.get_session_store cached

get_session_store() -> SessionStore

Get the configured session store instance.

Uses Redis in deploy mode if configured, otherwise memory.

Returns:

Type Description
SessionStore

The session store instance.

Notes

Session storage is primarily relevant in deploy-mode RBAC and multi-tenant scenarios, but a memory implementation is always available.

pywry.state.get_callback_registry

get_callback_registry() -> CallbackRegistry

Get the global callback registry instance.

Returns:

Type Description
CallbackRegistry

The singleton callback registry.

Notes

The registry is process-local because Python callback objects cannot be serialized across workers.

pywry.state.get_worker_id

get_worker_id() -> str

Get the unique worker ID for this process.

The worker ID is used for connection routing and callback dispatch in multi-worker deployments.

Returns:

Type Description
str

Unique worker identifier.

pywry.state.get_state_backend

get_state_backend() -> StateBackend

Get the configured state backend.

Returns:

Type Description
StateBackend

The configured backend (MEMORY or REDIS).

Notes

The backend is selected from the PYWRY_DEPLOY__STATE_BACKEND environment variable and defaults to in-memory storage.

pywry.state.is_deploy_mode

is_deploy_mode() -> bool

Check if running in deploy mode.

Deploy mode enables: - Redis state backend (if configured) - Multi-worker support - External state storage - Session/RBAC support

Returns:

Type Description
bool

True if running in deploy mode.

Notes

Deploy mode may be enabled explicitly or inferred from a Redis-backed state configuration.

pywry.state.clear_state_caches

clear_state_caches() -> None

Clear all cached state store instances.

Call this to force re-creation of stores (e.g., after config change).

Notes

This does not delete persisted backend data; it only clears in-process factory caches so subsequent accessor calls build fresh instances.


Abstract Interfaces

pywry.state.WidgetStore

Bases: ABC

Abstract widget storage interface.

Handles storage and retrieval of widget HTML content and metadata. Implementations must be thread-safe and support async operations.

Notes

The widget store is the canonical source of widget HTML, metadata, and authentication tokens for both single-process and deploy-mode backends.

Functions

register abstractmethod async

register(widget_id: str, html: str, token: str | None = None, owner_worker_id: str | None = None, metadata: dict[str, Any] | None = None) -> None

Register a widget with its HTML content.

Parameters:

Name Type Description Default
widget_id str

Unique identifier for the widget.

required
html str

The HTML content of the widget.

required
token str or None

Optional per-widget authentication token.

None
owner_worker_id str or None

ID of the worker that created this widget.

None
metadata dict[str, Any] or None

Additional metadata (title, theme, etc.).

None

get abstractmethod async

get(widget_id: str) -> WidgetData | None

Get complete widget data.

Parameters:

Name Type Description Default
widget_id str

The widget ID to retrieve.

required

Returns:

Type Description
WidgetData or None

The widget data if found, None otherwise.

get_html abstractmethod async

get_html(widget_id: str) -> str | None

Get widget HTML content.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
str or None

The HTML content if found, None otherwise.

get_token abstractmethod async

get_token(widget_id: str) -> str | None

Get widget authentication token.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
str or None

The token if set, None otherwise.

exists abstractmethod async

exists(widget_id: str) -> bool

Check if a widget exists.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
bool

True if the widget exists.

delete abstractmethod async

delete(widget_id: str) -> bool

Delete a widget.

Parameters:

Name Type Description Default
widget_id str

The widget ID to delete.

required

Returns:

Type Description
bool

True if the widget was deleted, False if it didn't exist.

list_active abstractmethod async

list_active() -> list[str]

List all active widget IDs.

Returns:

Type Description
list[str]

List of active widget IDs.

update_html abstractmethod async

update_html(widget_id: str, html: str) -> bool

Update widget HTML content.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
html str

The new HTML content.

required

Returns:

Type Description
bool

True if updated, False if widget doesn't exist.

update_token abstractmethod async

update_token(widget_id: str, token: str) -> bool

Update widget authentication token.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
token str

The new authentication token.

required

Returns:

Type Description
bool

True if updated, False if widget doesn't exist.

count abstractmethod async

count() -> int

Get the number of active widgets.

Returns:

Type Description
int

Number of active widgets.

pywry.state.EventBus

Bases: ABC

Abstract event publishing interface.

Handles cross-worker event delivery for callback dispatch and real-time updates.

Notes

EventBus implementations are expected to provide best-effort fan-out for widget and worker channels without assuming in-process execution.

Functions

publish abstractmethod async

publish(channel: str, event: EventMessage) -> None

Publish an event to a channel.

Parameters:

Name Type Description Default
channel str

The channel name (e.g., "widget:{id}", "worker:{id}").

required
event EventMessage

The event to publish.

required

subscribe abstractmethod async

subscribe(channel: str) -> AsyncIterator[EventMessage]

Subscribe to events on a channel.

Parameters:

Name Type Description Default
channel str

The channel name.

required

Yields:

Type Description
EventMessage

Events received on the channel.

unsubscribe abstractmethod async

unsubscribe(channel: str) -> None

Unsubscribe from a channel.

Parameters:

Name Type Description Default
channel str

The channel to unsubscribe from.

required

pywry.state.ConnectionRouter

Bases: ABC

Abstract connection routing interface.

Tracks which worker owns which WebSocket connection, enabling cross-worker message routing.

Notes

Routers are used to decide where outbound callback and state events must be forwarded when multiple workers are serving widgets concurrently.

Functions

register_connection abstractmethod async

register_connection(widget_id: str, worker_id: str, user_id: str | None = None, session_id: str | None = None) -> None

Register that a widget is connected to a specific worker.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
worker_id str

The worker ID that owns this connection.

required
user_id str or None

Optional user ID for RBAC.

None
session_id str or None

Optional session ID for tracking.

None

get_connection_info abstractmethod async

get_connection_info(widget_id: str) -> ConnectionInfo | None

Get connection information for a widget.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
ConnectionInfo or None

Connection information if connected, None otherwise.

get_owner abstractmethod async

get_owner(widget_id: str) -> str | None

Get the worker ID that owns this widget's connection.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
str or None

The worker ID if connected, None otherwise.

refresh_heartbeat abstractmethod async

refresh_heartbeat(widget_id: str) -> bool

Refresh the heartbeat timestamp for a connection.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
bool

True if refreshed, False if connection doesn't exist.

unregister_connection abstractmethod async

unregister_connection(widget_id: str) -> bool

Unregister a connection.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
bool

True if unregistered, False if didn't exist.

list_worker_connections abstractmethod async

list_worker_connections(worker_id: str) -> list[str]

List all widget IDs connected to a specific worker.

Parameters:

Name Type Description Default
worker_id str

The worker ID.

required

Returns:

Type Description
list[str]

List of widget IDs connected to this worker.

pywry.state.SessionStore

Bases: ABC

Abstract session storage interface for RBAC support.

Handles user sessions and access control for multi-tenant deployments.

Notes

Session stores back RBAC checks, session expiry, and multi-session user tracking for deploy-mode hosting.

Functions

create_session abstractmethod async

create_session(session_id: str, user_id: str, roles: list[str] | None = None, ttl: int | None = None, metadata: dict[str, Any] | None = None) -> UserSession

Create a new user session.

Parameters:

Name Type Description Default
session_id str

Unique session identifier.

required
user_id str

User identifier.

required
roles list[str] or None

User roles for access control.

None
ttl int or None

Time-to-live in seconds (None for no expiry).

None
metadata dict[str, Any] or None

Additional session metadata.

None

Returns:

Type Description
UserSession

The created session.

get_session abstractmethod async

get_session(session_id: str) -> UserSession | None

Get a session by ID.

Parameters:

Name Type Description Default
session_id str

The session ID.

required

Returns:

Type Description
UserSession or None

The session if found and not expired, None otherwise.

validate_session abstractmethod async

validate_session(session_id: str) -> bool

Validate a session is active and not expired.

Parameters:

Name Type Description Default
session_id str

The session ID.

required

Returns:

Type Description
bool

True if the session is valid.

delete_session abstractmethod async

delete_session(session_id: str) -> bool

Delete a session.

Parameters:

Name Type Description Default
session_id str

The session ID.

required

Returns:

Type Description
bool

True if deleted, False if didn't exist.

refresh_session abstractmethod async

refresh_session(session_id: str, extend_ttl: int | None = None) -> bool

Refresh a session's expiry time.

Parameters:

Name Type Description Default
session_id str

The session ID.

required
extend_ttl int or None

New TTL in seconds (None to use original TTL).

None

Returns:

Type Description
bool

True if refreshed, False if session doesn't exist.

list_user_sessions abstractmethod async

list_user_sessions(user_id: str) -> list[UserSession]

List all sessions for a user.

Parameters:

Name Type Description Default
user_id str

The user ID.

required

Returns:

Type Description
list[UserSession]

List of active sessions for this user.

check_permission abstractmethod async

check_permission(session_id: str, resource_type: str, resource_id: str, permission: str) -> bool

Check if a session has permission to access a resource.

Parameters:

Name Type Description Default
session_id str

The session ID.

required
resource_type str

The type of resource (e.g., "widget", "user").

required
resource_id str

The resource identifier.

required
permission str

The required permission (e.g., "read", "write", "admin").

required

Returns:

Type Description
bool

True if the session has the required permission.


Data Types

pywry.state.WidgetData dataclass

WidgetData(widget_id: str, html: str, token: str | None = None, created_at: float = 0.0, owner_worker_id: str | None = None, metadata: dict[str, Any] = dict())

Widget data stored in the state store.

Attributes:

Name Type Description
widget_id str

Unique identifier for the widget.

html str

The HTML content of the widget.

token str or None

Optional per-widget authentication token.

created_at float

Unix timestamp when the widget was created.

owner_worker_id str or None

ID of the worker that created/owns this widget's callbacks.

metadata dict[str, Any]

Additional metadata (title, theme, etc.).

pywry.state.EventMessage dataclass

EventMessage(event_type: str, widget_id: str, data: dict[str, Any], source_worker_id: str, target_worker_id: str | None = None, timestamp: float = 0.0, message_id: str = '')

Event message for cross-worker communication.

Attributes:

Name Type Description
event_type str

The type of event (e.g., "click", "cellValueChanged").

widget_id str

The target widget ID.

data dict[str, Any]

The event payload.

source_worker_id str

The worker that sent this event.

target_worker_id str or None

The specific worker to receive this event (None for broadcast).

timestamp float

Unix timestamp when the event was created.

message_id str

Unique identifier for this message.

pywry.state.ConnectionInfo dataclass

ConnectionInfo(widget_id: str, worker_id: str, connected_at: float = 0.0, last_heartbeat: float = 0.0, user_id: str | None = None, session_id: str | None = None)

Information about a WebSocket connection.

Attributes:

Name Type Description
widget_id str

The widget this connection is for.

worker_id str

The worker ID that owns this connection.

connected_at float

Unix timestamp when the connection was established.

last_heartbeat float

Unix timestamp of the last heartbeat/activity.

user_id str or None

Optional user ID for RBAC.

session_id str or None

Optional session ID for tracking.

pywry.state.UserSession dataclass

UserSession(session_id: str, user_id: str, roles: list[str] = list(), created_at: float = 0.0, expires_at: float | None = None, metadata: dict[str, Any] = dict())

User session information for RBAC support.

Attributes:

Name Type Description
session_id str

Unique session identifier.

user_id str

User identifier.

roles list[str]

User roles for access control.

created_at float

Unix timestamp when the session was created.

expires_at float or None

Unix timestamp when the session expires (None for no expiry).

metadata dict[str, Any]

Additional session metadata.

pywry.state.StateBackend

Bases: str, Enum

Available state storage backends.


Callback Registry

pywry.state.CallbackRegistry

CallbackRegistry()

In-process callback registry.

Callbacks cannot be serialized to Redis, so they remain local to each worker. The registry tracks which callbacks are registered per widget and event type.

Attributes:

Name Type Description
_callbacks dict[str, dict[str, CallbackRegistration]]

Nested mapping of widget IDs to event registrations.

_lock Lock

Synchronizes callback registration and lookup.

Initialize the callback registry.

Functions

register async

register(widget_id: str, event_type: str, callback: CallbackFunc | AsyncCallbackFunc) -> None

Register a callback for a widget event.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
event_type str

The event type (e.g., "click", "cellValueChanged").

required
callback CallbackFunc

The callback function to execute.

required
Notes

Async callbacks are detected automatically and stored with their execution mode for later invocation.

get async

get(widget_id: str, event_type: str) -> CallbackRegistration | None

Get a callback registration.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
event_type str

The event type.

required

Returns:

Type Description
CallbackRegistration or None

The registration if found.

has_widget async

has_widget(widget_id: str) -> bool

Check if any callbacks are registered for a widget.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
bool

True if callbacks exist for this widget.

has_callback async

has_callback(widget_id: str, event_type: str) -> bool

Check if a specific callback is registered.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
event_type str

The event type.

required

Returns:

Type Description
bool

True if the callback exists.

invoke async

invoke(widget_id: str, event_type: str, data: dict[str, Any]) -> tuple[bool, Any]

Invoke a callback if it exists locally.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
event_type str

The event type.

required
data dict

The event data to pass to the callback.

required

Returns:

Type Description
tuple[bool, Any]

(success, result) - success is True if callback was found and executed.

Notes

Synchronous callbacks are dispatched through the default executor so the event loop remains responsive.

unregister async

unregister(widget_id: str, event_type: str) -> bool

Unregister a specific callback.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
event_type str

The event type.

required

Returns:

Type Description
bool

True if the callback was removed.

unregister_widget async

unregister_widget(widget_id: str) -> int

Unregister all callbacks for a widget.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
int

Number of callbacks removed.

list_widget_events async

list_widget_events(widget_id: str) -> list[str]

List all event types registered for a widget.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
list[str]

List of event types.

list_widgets async

list_widgets() -> list[str]

List all widget IDs with registered callbacks.

Returns:

Type Description
list[str]

List of widget IDs.

get_stats async

get_stats() -> dict[str, Any]

Get registry statistics.

Returns:

Type Description
dict

Statistics about registered callbacks.

pywry.state.reset_callback_registry

reset_callback_registry() -> None

Reset the global callback registry.

Notes

Primarily intended for tests that need a clean process-local registry.


Memory Implementations

pywry.state.MemoryWidgetStore

MemoryWidgetStore()

Bases: WidgetStore

In-memory widget store for single-process deployments.

Thread-safe implementation using asyncio locks.

Attributes:

Name Type Description
_widgets dict[str, WidgetData]

In-memory mapping of widget IDs to widget state.

_lock Lock

Synchronizes concurrent access to the in-memory mapping.

Initialize the in-memory widget store.

Functions

register async

register(widget_id: str, html: str, token: str | None = None, owner_worker_id: str | None = None, metadata: dict[str, Any] | None = None) -> None

Register a widget with its HTML content.

Parameters:

Name Type Description Default
widget_id str

Widget identifier.

required
html str

Initial widget HTML.

required
token str | None

Optional authentication token associated with the widget.

None
owner_worker_id str | None

Worker ID recorded for deploy-mode compatibility.

None
metadata dict[str, Any] | None

Arbitrary widget metadata.

None

get async

get(widget_id: str) -> WidgetData | None

Get complete widget data.

Parameters:

Name Type Description Default
widget_id str

Widget identifier to retrieve.

required

Returns:

Type Description
WidgetData | None

Stored widget data, or None when the widget is unknown.

get_html async

get_html(widget_id: str) -> str | None

Get widget HTML content.

Parameters:

Name Type Description Default
widget_id str

Widget identifier.

required

Returns:

Type Description
str | None

Stored HTML, or None when the widget is unknown.

get_token async

get_token(widget_id: str) -> str | None

Get widget authentication token.

Parameters:

Name Type Description Default
widget_id str

Widget identifier.

required

Returns:

Type Description
str | None

Stored token, or None when no token is set.

exists async

exists(widget_id: str) -> bool

Check if a widget exists.

Parameters:

Name Type Description Default
widget_id str

Widget identifier.

required

Returns:

Type Description
bool

True when the widget exists in memory.

delete async

delete(widget_id: str) -> bool

Delete a widget.

Parameters:

Name Type Description Default
widget_id str

Widget identifier.

required

Returns:

Type Description
bool

True when the widget existed and was removed.

list_active async

list_active() -> list[str]

List all active widget IDs.

Returns:

Type Description
list[str]

All widget IDs currently stored in memory.

update_html async

update_html(widget_id: str, html: str) -> bool

Update widget HTML content.

Parameters:

Name Type Description Default
widget_id str

Widget identifier.

required
html str

Replacement HTML content.

required

Returns:

Type Description
bool

True when the widget existed and was updated.

update_token async

update_token(widget_id: str, token: str) -> bool

Update widget authentication token.

Parameters:

Name Type Description Default
widget_id str

Widget identifier.

required
token str

Replacement authentication token.

required

Returns:

Type Description
bool

True when the widget existed and was updated.

count async

count() -> int

Get the number of active widgets.

Returns:

Type Description
int

Count of widgets stored in memory.

pywry.state.MemoryEventBus

MemoryEventBus()

Bases: EventBus

In-memory event bus for single-process deployments.

Uses asyncio.Queue for inter-task communication.

Attributes:

Name Type Description
_channels dict[str, list[Queue[EventMessage]]]

Active subscriber queues keyed by channel name.

_lock Lock

Synchronizes channel registration and removal.

Initialize the in-memory event bus.

Functions

publish async

publish(channel: str, event: EventMessage) -> None

Publish an event to a channel.

Parameters:

Name Type Description Default
channel str

Channel name to publish to.

required
event EventMessage

Event payload to fan out to subscribers.

required

subscribe async

subscribe(channel: str) -> AsyncIterator[EventMessage]

Subscribe to events on a channel.

Parameters:

Name Type Description Default
channel str

Channel name to subscribe to.

required

Yields:

Type Description
EventMessage

Event payloads published to the subscribed channel.

unsubscribe async

unsubscribe(channel: str) -> None

Unsubscribe from a channel.

Parameters:

Name Type Description Default
channel str

Channel name whose subscribers should be removed.

required
Notes

This implementation clears all subscribers for the channel.

pywry.state.MemoryConnectionRouter

MemoryConnectionRouter()

Bases: ConnectionRouter

In-memory connection router for single-process deployments.

Attributes:

Name Type Description
_connections dict[str, ConnectionInfo]

Connection metadata keyed by widget ID.

_worker_connections dict[str, set[str]]

Reverse index of widget IDs by owning worker.

_lock Lock

Synchronizes access to connection registries.

Initialize the in-memory connection router.

Functions

register_connection async

register_connection(widget_id: str, worker_id: str, user_id: str | None = None, session_id: str | None = None) -> None

Register that a widget is connected to a specific worker.

Parameters:

Name Type Description Default
widget_id str

Widget identifier.

required
worker_id str

Worker currently serving the widget connection.

required
user_id str | None

Optional user identifier associated with the connection.

None
session_id str | None

Optional session identifier associated with the connection.

None

get_connection_info async

get_connection_info(widget_id: str) -> ConnectionInfo | None

Get connection information for a widget.

Parameters:

Name Type Description Default
widget_id str

Widget identifier.

required

Returns:

Type Description
ConnectionInfo | None

Stored connection metadata, or None when disconnected.

get_owner async

get_owner(widget_id: str) -> str | None

Get the worker ID that owns this widget's connection.

Parameters:

Name Type Description Default
widget_id str

Widget identifier.

required

Returns:

Type Description
str | None

Owning worker ID, or None when disconnected.

refresh_heartbeat async

refresh_heartbeat(widget_id: str) -> bool

Refresh the heartbeat timestamp for a connection.

Parameters:

Name Type Description Default
widget_id str

Widget identifier.

required

Returns:

Type Description
bool

True when the connection exists and was updated.

unregister_connection async

unregister_connection(widget_id: str) -> bool

Unregister a connection.

Parameters:

Name Type Description Default
widget_id str

Widget identifier.

required

Returns:

Type Description
bool

True when the connection existed and was removed.

list_worker_connections async

list_worker_connections(worker_id: str) -> list[str]

List all widget IDs connected to a specific worker.

Parameters:

Name Type Description Default
worker_id str

Worker identifier.

required

Returns:

Type Description
list[str]

Widget IDs currently assigned to the worker.

pywry.state.MemorySessionStore

MemorySessionStore()

Bases: SessionStore

In-memory session store for RBAC support.

For single-process deployments and development.

Attributes:

Name Type Description
_sessions dict[str, UserSession]

Active sessions keyed by session ID.

_user_sessions dict[str, set[str]]

Reverse index of session IDs by user.

_role_permissions dict[str, set[str]]

Simple in-memory role-to-permission mapping.

_lock Lock

Synchronizes session and permission updates.

Initialize the in-memory session store.

Functions

create_session async

create_session(session_id: str, user_id: str, roles: list[str] | None = None, ttl: int | None = None, metadata: dict[str, Any] | None = None) -> UserSession

Create a new user session.

Parameters:

Name Type Description Default
session_id str

Session identifier.

required
user_id str

User identifier.

required
roles list[str] | None

Roles granted to the user.

None
ttl int | None

Session lifetime in seconds.

None
metadata dict[str, Any] | None

Arbitrary session metadata.

None

Returns:

Type Description
UserSession

Newly created in-memory session record.

get_session async

get_session(session_id: str) -> UserSession | None

Get a session by ID.

Parameters:

Name Type Description Default
session_id str

Session identifier.

required

Returns:

Type Description
UserSession | None

Active session record, or None when absent or expired.

validate_session async

validate_session(session_id: str) -> bool

Validate a session is active and not expired.

Parameters:

Name Type Description Default
session_id str

Session identifier.

required

Returns:

Type Description
bool

True when the session exists and has not expired.

delete_session async

delete_session(session_id: str) -> bool

Delete a session.

Parameters:

Name Type Description Default
session_id str

Session identifier.

required

Returns:

Type Description
bool

True when the session existed and was removed.

refresh_session async

refresh_session(session_id: str, extend_ttl: int | None = None) -> bool

Refresh a session's expiry time.

Parameters:

Name Type Description Default
session_id str

Session identifier.

required
extend_ttl int | None

New TTL in seconds. When omitted, the original TTL duration is reused.

None

Returns:

Type Description
bool

True when the session exists and remains active after refresh.

list_user_sessions async

list_user_sessions(user_id: str) -> list[UserSession]

List all sessions for a user.

Parameters:

Name Type Description Default
user_id str

User identifier.

required

Returns:

Type Description
list[UserSession]

Active sessions currently associated with the user.

check_permission async

check_permission(session_id: str, _resource_type: str, _resource_id: str, permission: str) -> bool

Check if a session has permission to access a resource.

Currently implements simple role-based access control. Resource-specific permissions can be added via metadata.

Parameters:

Name Type Description Default
session_id str

Session identifier.

required
_resource_type str

Resource type placeholder for interface compatibility.

required
_resource_id str

Resource identifier placeholder for interface compatibility.

required
permission str

Permission to test.

required

Returns:

Type Description
bool

True when any of the session's roles grants the requested permission.

set_role_permissions

set_role_permissions(role: str, permissions: set[str]) -> None

Configure permissions for a role.

Parameters:

Name Type Description Default
role str

Role name to update.

required
permissions set[str]

Permissions granted to the role.

required
Notes

This method is synchronous because it is intended for startup-time setup.


Server State

pywry.state.ServerStateManager

ServerStateManager()

Unified state manager for PyWry server.

Automatically selects the appropriate storage backend based on deploy mode configuration. Provides both sync and async interfaces where appropriate.

Attributes:

Name Type Description
deploy_mode bool

True if running in horizontally scaled deploy mode.

worker_id str

Unique identifier for this worker process.

Notes

This manager bridges local in-process WebSocket state with the configured distributed backend so the rest of PyWry can use one state API.

Initialize the state manager.

Notes

Backend stores are initialized lazily to avoid circular imports and to defer environment-dependent setup until the manager is first used.

Attributes

deploy_mode property

deploy_mode: bool

Check if running in deploy mode.

Returns:

Type Description
bool

True when distributed state and routing features are enabled.

worker_id property

worker_id: str

Get this worker's unique ID.

Returns:

Type Description
str

Stable identifier for this process instance.

widgets property

widgets: dict[str, dict[str, Any]]

Get local widgets dict.

Note: In deploy mode, this only returns widgets that were created by this worker and may not reflect the full state. Use async methods for accurate state in deploy mode.

Returns:

Type Description
dict[str, dict[str, Any]]

Process-local widget mapping.

widget_tokens property

widget_tokens: dict[str, str]

Get local widget tokens.

Returns:

Type Description
dict[str, str]

Process-local map of widget IDs to authentication tokens.

connections property

connections: dict[str, Any]

Get local WebSocket connections.

Returns:

Type Description
dict[str, Any]

Process-local map of widget IDs to WebSocket objects.

event_queues property

event_queues: dict[str, Queue[Any]]

Get local event queues.

Returns:

Type Description
dict[str, Queue[Any]]

Process-local map of widget IDs to outbound event queues.

Functions

register_widget async

register_widget(widget_id: str, html: str, token: str | None = None, metadata: dict[str, Any] | None = None) -> None

Register a widget with its HTML content.

In deploy mode, this stores in Redis. In local mode, stores in-memory.

Parameters:

Name Type Description Default
widget_id str

Unique widget identifier.

required
html str

The widget's HTML content.

required
token str | None

Optional per-widget authentication token.

None
metadata dict | None

Additional metadata (title, theme, etc.).

None

get_widget async

get_widget(widget_id: str) -> WidgetData | None

Get widget data by ID.

Parameters:

Name Type Description Default
widget_id str

The widget ID to retrieve.

required

Returns:

Type Description
WidgetData | None

Widget data if found.

get_widget_html async

get_widget_html(widget_id: str) -> str | None

Get widget HTML content.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
str | None

The HTML content if widget exists.

update_widget_html async

update_widget_html(widget_id: str, html: str) -> bool

Update widget HTML content.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
html str

New HTML content.

required

Returns:

Type Description
bool

True if update succeeded.

widget_exists async

widget_exists(widget_id: str) -> bool

Check if a widget exists.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
bool

True if widget exists.

remove_widget async

remove_widget(widget_id: str) -> bool

Remove a widget.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
bool

True if widget was removed.

list_widgets async

list_widgets() -> list[str]

List all widget IDs.

Returns:

Type Description
list[str]

List of widget IDs.

register_connection async

register_connection(widget_id: str, websocket: WebSocket) -> Queue[Any]

Register a WebSocket connection for a widget.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
websocket WebSocket

The WebSocket connection.

required

Returns:

Type Description
Queue

Event queue for this connection.

unregister_connection async

unregister_connection(widget_id: str) -> None

Unregister a WebSocket connection.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
Notes

Local connection state is cleared before any distributed router entry is removed.

get_connection async

get_connection(widget_id: str) -> WebSocket | None

Get the WebSocket connection for a widget (local only).

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
WebSocket | None

The connection if it exists on this worker.

get_event_queue async

get_event_queue(widget_id: str) -> Queue[Any] | None

Get the event queue for a widget.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required

Returns:

Type Description
Queue | None

The event queue if widget is connected on this worker.

register_callback async

register_callback(widget_id: str, event_type: str, callback: Any) -> None

Register a callback for widget events.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
event_type str

The event type to handle.

required
callback callable

The callback function.

required
Notes

Callback registrations are always stored locally because Python callables are not serializable across workers.

get_callback async

get_callback(widget_id: str, event_type: str) -> Any | None

Get a callback for a widget event.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
event_type str

The event type.

required

Returns:

Type Description
callable | None

The callback if registered.

invoke_callback async

invoke_callback(widget_id: str, event_type: str, data: dict[str, Any]) -> tuple[bool, Any]

Invoke a callback for a widget event.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
event_type str

The event type.

required
data dict

Event data.

required

Returns:

Type Description
tuple[bool, Any]

(success, result) tuple.

broadcast_event async

broadcast_event(widget_id: str, event_type: str, data: dict[str, Any]) -> None

Broadcast an event to a widget.

In deploy mode, uses Redis Pub/Sub to reach the correct worker. In local mode, sends directly to the local event queue.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
event_type str

The event type.

required
data dict

Event data.

required
Notes

In local mode the event is enqueued directly; in deploy mode it is wrapped in an EventMessage and published to the widget channel.

send_to_widget async

send_to_widget(widget_id: str, event: dict[str, Any]) -> bool

Send an event to a specific widget's WebSocket.

Parameters:

Name Type Description Default
widget_id str

The widget ID.

required
event dict

The event to send.

required

Returns:

Type Description
bool

True if event was queued for sending.

Notes

When the widget is not connected locally and deploy mode is active, the event is published so the owning worker can deliver it.

create_session async

create_session(user_id: str, roles: list[str] | None = None, metadata: dict[str, Any] | None = None) -> str

Create a new user session.

Parameters:

Name Type Description Default
user_id str

The user identifier.

required
roles list[str] | None

User roles for RBAC.

None
metadata dict | None

Additional session metadata.

None

Returns:

Type Description
str

The session ID.

get_session async

get_session(session_id: str) -> Any | None

Get a session by ID.

Parameters:

Name Type Description Default
session_id str

The session ID.

required

Returns:

Type Description
UserSession | None

The session if found.

cleanup async

cleanup() -> None

Clean up all state and connections.

Notes

This clears only process-local state and unregisters active local connections; persisted backend data remains in its configured store.

pywry.state.get_server_state

get_server_state() -> ServerStateManager

Get the global server state manager.

Returns:

Type Description
ServerStateManager

The singleton state manager instance.

Notes

The returned manager is process-local and lazily initialized.

pywry.state.reset_server_state

reset_server_state() -> None

Reset the server state manager.

Notes

Primarily intended for tests that need a fresh singleton manager.


Sync Helpers

pywry.state.run_async

run_async(coro: Coroutine[Any, Any, T], timeout: float | None = 5.0) -> T

Run an async coroutine from sync code.

Uses the server's event loop if available, otherwise creates a temporary one. NOTE: This function CANNOT be called from within an async context on the server loop - it will deadlock. Use await directly in async code.

Parameters:

Name Type Description Default
coro Coroutine

The coroutine to run.

required
timeout float

Timeout in seconds. Default is 5.0.

5.0

Returns:

Type Description
T

The result of the coroutine.

Raises:

Type Description
TimeoutError

If the operation times out.

RuntimeError

If called from within the server's event loop (would deadlock).

pywry.state.run_async_fire_and_forget

run_async_fire_and_forget(coro: Coroutine[Any, Any, Any]) -> None

Schedule an async coroutine without waiting for result.

Parameters:

Name Type Description Default
coro Coroutine

The coroutine to run.

required
Notes

The coroutine is submitted to the running server loop when available, otherwise to the shared fallback loop.


Additional Types

pywry.state.callbacks.CallbackRegistration dataclass

CallbackRegistration(widget_id: str, event_type: str, callback: CallbackFunc | AsyncCallbackFunc, is_async: bool = False, created_at: float = time(), invoke_count: int = 0, last_invoked: float | None = None)

Tracks a callback registration.

Attributes:

Name Type Description
widget_id str

Widget that owns the callback.

event_type str

Event type that triggers the callback.

callback CallbackFunc | AsyncCallbackFunc

Registered Python callable.

is_async bool

Indicates whether the callback is awaitable.

created_at float

Unix timestamp when the callback was registered.

invoke_count int

Number of successful invocation attempts.

last_invoked float | None

Unix timestamp of the most recent invocation.

Attributes

widget_id instance-attribute

widget_id: str

event_type instance-attribute

event_type: str

callback instance-attribute

callback: CallbackFunc | AsyncCallbackFunc

is_async class-attribute instance-attribute

is_async: bool = False

created_at class-attribute instance-attribute

created_at: float = field(default_factory=time)

invoke_count class-attribute instance-attribute

invoke_count: int = 0

last_invoked class-attribute instance-attribute

last_invoked: float | None = None

Functions


Memory Store Factory

pywry.state.memory.create_memory_stores

Create all in-memory state stores.

Returns:

Type Description
tuple

(widget_store, event_bus, connection_router, session_store)

Notes

Chat storage is created separately because it is not required by every runtime path that uses the basic widget state stores.