Skip to content

pywry.state.redis

Redis-backed state implementations for multi-worker production deployments.


Widget Store

pywry.state.redis.RedisWidgetStore

RedisWidgetStore(redis_url: str = 'redis://localhost:6379/0', prefix: str = 'pywry', widget_ttl: int = 86400, pool_size: int = 10, *, redis_client: Redis | None = None)

Bases: WidgetStore

Redis-backed widget store for horizontal scaling.

Uses Redis hashes for widget data with automatic TTL expiry.

Initialize the Redis widget store.

PARAMETER DESCRIPTION
redis_url

Redis connection URL.

TYPE: str DEFAULT: 'redis://localhost:6379/0'

prefix

Key prefix for all Redis keys.

TYPE: str DEFAULT: 'pywry'

widget_ttl

Widget data TTL in seconds.

TYPE: int DEFAULT: 86400

pool_size

Connection pool size.

TYPE: int DEFAULT: 10

redis_client

Pre-configured Redis client (for testing with fakeredis).

TYPE: Redis DEFAULT: None

Functions

register async

register(widget_id: str, html: str, token: str | None = None, owner_worker_id: str | None = None, metadata: dict[str, Any] | None = None) -> None

Register a widget with its HTML content.

get async

get(widget_id: str) -> WidgetData | None

Get complete widget data.

get_html async

get_html(widget_id: str) -> str | None

Get widget HTML content.

get_token async

get_token(widget_id: str) -> str | None

Get widget authentication token.

exists async

exists(widget_id: str) -> bool

Check if a widget exists.

delete async

delete(widget_id: str) -> bool

Delete a widget.

list_active async

list_active() -> list[str]

List all active widget IDs.

update_html async

update_html(widget_id: str, html: str) -> bool

Update widget HTML content.

update_token async

update_token(widget_id: str, token: str) -> bool

Update widget authentication token.

count async

count() -> int

Get the number of active widgets.

close async

close() -> None

Close any resources (no-op for connection-per-call pattern).


Event Bus

pywry.state.redis.RedisEventBus

RedisEventBus(redis_url: str = 'redis://localhost:6379/0', prefix: str = 'pywry', pool_size: int = 10, *, redis_client: Redis | None = None)

Bases: EventBus

Redis Pub/Sub for cross-worker event delivery.

Uses Redis Pub/Sub for real-time event distribution. Events are fire-and-forget (no persistence).

Initialize the Redis event bus.

PARAMETER DESCRIPTION
redis_url

Redis connection URL.

TYPE: str DEFAULT: 'redis://localhost:6379/0'

prefix

Key prefix for channel names.

TYPE: str DEFAULT: 'pywry'

pool_size

Connection pool size.

TYPE: int DEFAULT: 10

redis_client

Pre-configured Redis client (for testing with fakeredis).

TYPE: Redis DEFAULT: None

Functions

publish async

publish(channel: str, event: EventMessage) -> None

Publish an event to a channel.

subscribe async

subscribe(channel: str) -> AsyncIterator[EventMessage]

Subscribe to events on a channel.

unsubscribe async

unsubscribe(channel: str) -> None

Unsubscribe from a channel.

Note: Subscriptions are managed by the subscribe generator. This is a no-op for cleanup.

close async

close() -> None

Close any resources (no-op for connection-per-call pattern).


Connection Router

pywry.state.redis.RedisConnectionRouter

RedisConnectionRouter(redis_url: str = 'redis://localhost:6379/0', prefix: str = 'pywry', connection_ttl: int = 300, pool_size: int = 10, *, redis_client: Redis | None = None)

Bases: ConnectionRouter

Track which worker owns which WebSocket connection.

Uses Redis hashes with TTL for connection tracking. Heartbeat refresh extends the TTL.

Initialize the Redis connection router.

PARAMETER DESCRIPTION
redis_url

Redis connection URL.

TYPE: str DEFAULT: 'redis://localhost:6379/0'

prefix

Key prefix for Redis keys.

TYPE: str DEFAULT: 'pywry'

connection_ttl

Connection TTL in seconds (refresh via heartbeat).

TYPE: int DEFAULT: 300

pool_size

Connection pool size.

TYPE: int DEFAULT: 10

redis_client

Pre-configured Redis client (for testing with fakeredis).

TYPE: Redis DEFAULT: None

Functions

register_connection async

register_connection(widget_id: str, worker_id: str, user_id: str | None = None, session_id: str | None = None) -> None

Register that a widget is connected to a specific worker.

get_connection_info async

get_connection_info(widget_id: str) -> ConnectionInfo | None

Get connection information for a widget.

get_owner async

get_owner(widget_id: str) -> str | None

Get the worker ID that owns this widget's connection.

refresh_heartbeat async

refresh_heartbeat(widget_id: str) -> bool

Refresh the heartbeat timestamp for a connection.

unregister_connection async

unregister_connection(widget_id: str) -> bool

Unregister a connection.

list_worker_connections async

list_worker_connections(worker_id: str) -> list[str]

List all widget IDs connected to a specific worker.

close async

close() -> None

Close any resources (no-op for connection-per-call pattern).


Session Store

pywry.state.redis.RedisSessionStore

RedisSessionStore(redis_url: str = 'redis://localhost:6379/0', prefix: str = 'pywry', default_ttl: int = 86400, pool_size: int = 10, *, redis_client: Redis | None = None)

Bases: SessionStore

Redis-backed session store for RBAC support.

Uses Redis hashes for session data with automatic TTL expiry.

Initialize the Redis session store.

PARAMETER DESCRIPTION
redis_url

Redis connection URL.

TYPE: str DEFAULT: 'redis://localhost:6379/0'

prefix

Key prefix for Redis keys.

TYPE: str DEFAULT: 'pywry'

default_ttl

Default session TTL in seconds.

TYPE: int DEFAULT: 86400

pool_size

Connection pool size.

TYPE: int DEFAULT: 10

redis_client

Pre-configured Redis client (for testing with fakeredis).

TYPE: Redis DEFAULT: None

Functions

create_session async

create_session(session_id: str, user_id: str, roles: list[str] | None = None, ttl: int | None = None, metadata: dict[str, Any] | None = None) -> UserSession

Create a new user session.

get_session async

get_session(session_id: str) -> UserSession | None

Get a session by ID.

validate_session async

validate_session(session_id: str) -> bool

Validate a session is active and not expired.

delete_session async

delete_session(session_id: str) -> bool

Delete a session.

refresh_session async

refresh_session(session_id: str, extend_ttl: int | None = None) -> bool

Refresh a session's expiry time.

list_user_sessions async

list_user_sessions(user_id: str) -> list[UserSession]

List all sessions for a user.

check_permission async

check_permission(session_id: str, resource_type: str, resource_id: str, permission: str) -> bool

Check if a session has permission to access a resource.

set_role_permissions async

set_role_permissions(role: str, permissions: list[str] | set[str]) -> None

Configure permissions for a role.

close async

close() -> None

Close any resources (no-op for connection-per-call pattern).


Factory

pywry.state.redis.create_redis_stores

create_redis_stores(redis_url: str = 'redis://localhost:6379/0', prefix: str = 'pywry', widget_ttl: int = 86400, connection_ttl: int = 300, session_ttl: int = 86400, pool_size: int = 10) -> tuple[RedisWidgetStore, RedisEventBus, RedisConnectionRouter, RedisSessionStore]

Create all Redis state stores with shared configuration.

PARAMETER DESCRIPTION
redis_url

Redis connection URL.

TYPE: str DEFAULT: 'redis://localhost:6379/0'

prefix

Key prefix for all Redis keys.

TYPE: str DEFAULT: 'pywry'

widget_ttl

Widget data TTL in seconds.

TYPE: int DEFAULT: 86400

connection_ttl

Connection routing TTL in seconds.

TYPE: int DEFAULT: 300

session_ttl

Session TTL in seconds.

TYPE: int DEFAULT: 86400

pool_size

Connection pool size per store.

TYPE: int DEFAULT: 10

RETURNS DESCRIPTION
tuple

(widget_store, event_bus, connection_router, session_store)