pywry.state.redis¶
Redis-backed state implementations for multi-worker production deployments.
Widget Store¶
pywry.state.redis.RedisWidgetStore
¶
RedisWidgetStore(redis_url: str = 'redis://localhost:6379/0', prefix: str = 'pywry', widget_ttl: int = 86400, pool_size: int = 10, *, redis_client: Redis | None = None)
Bases: WidgetStore
Redis-backed widget store for horizontal scaling.
Uses Redis hashes for widget data with automatic TTL expiry.
Initialize the Redis widget store.
| PARAMETER | DESCRIPTION |
|---|---|
redis_url
|
Redis connection URL.
TYPE:
|
prefix
|
Key prefix for all Redis keys.
TYPE:
|
widget_ttl
|
Widget data TTL in seconds.
TYPE:
|
pool_size
|
Connection pool size.
TYPE:
|
redis_client
|
Pre-configured Redis client (for testing with fakeredis).
TYPE:
|
Event Bus¶
pywry.state.redis.RedisEventBus
¶
RedisEventBus(redis_url: str = 'redis://localhost:6379/0', prefix: str = 'pywry', pool_size: int = 10, *, redis_client: Redis | None = None)
Bases: EventBus
Redis Pub/Sub for cross-worker event delivery.
Uses Redis Pub/Sub for real-time event distribution. Events are fire-and-forget (no persistence).
Initialize the Redis event bus.
| PARAMETER | DESCRIPTION |
|---|---|
redis_url
|
Redis connection URL.
TYPE:
|
prefix
|
Key prefix for channel names.
TYPE:
|
pool_size
|
Connection pool size.
TYPE:
|
redis_client
|
Pre-configured Redis client (for testing with fakeredis).
TYPE:
|
Functions¶
subscribe
async
¶
subscribe(channel: str) -> AsyncIterator[EventMessage]
Subscribe to events on a channel.
unsubscribe
async
¶
Unsubscribe from a channel.
Note: Subscriptions are managed by the subscribe generator. This is a no-op for cleanup.
Connection Router¶
pywry.state.redis.RedisConnectionRouter
¶
RedisConnectionRouter(redis_url: str = 'redis://localhost:6379/0', prefix: str = 'pywry', connection_ttl: int = 300, pool_size: int = 10, *, redis_client: Redis | None = None)
Bases: ConnectionRouter
Track which worker owns which WebSocket connection.
Uses Redis hashes with TTL for connection tracking. Heartbeat refresh extends the TTL.
Initialize the Redis connection router.
| PARAMETER | DESCRIPTION |
|---|---|
redis_url
|
Redis connection URL.
TYPE:
|
prefix
|
Key prefix for Redis keys.
TYPE:
|
connection_ttl
|
Connection TTL in seconds (refresh via heartbeat).
TYPE:
|
pool_size
|
Connection pool size.
TYPE:
|
redis_client
|
Pre-configured Redis client (for testing with fakeredis).
TYPE:
|
Functions¶
register_connection
async
¶
register_connection(widget_id: str, worker_id: str, user_id: str | None = None, session_id: str | None = None) -> None
Register that a widget is connected to a specific worker.
get_connection_info
async
¶
get_connection_info(widget_id: str) -> ConnectionInfo | None
Get connection information for a widget.
get_owner
async
¶
Get the worker ID that owns this widget's connection.
refresh_heartbeat
async
¶
Refresh the heartbeat timestamp for a connection.
unregister_connection
async
¶
Unregister a connection.
list_worker_connections
async
¶
List all widget IDs connected to a specific worker.
Session Store¶
pywry.state.redis.RedisSessionStore
¶
RedisSessionStore(redis_url: str = 'redis://localhost:6379/0', prefix: str = 'pywry', default_ttl: int = 86400, pool_size: int = 10, *, redis_client: Redis | None = None)
Bases: SessionStore
Redis-backed session store for RBAC support.
Uses Redis hashes for session data with automatic TTL expiry.
Initialize the Redis session store.
| PARAMETER | DESCRIPTION |
|---|---|
redis_url
|
Redis connection URL.
TYPE:
|
prefix
|
Key prefix for Redis keys.
TYPE:
|
default_ttl
|
Default session TTL in seconds.
TYPE:
|
pool_size
|
Connection pool size.
TYPE:
|
redis_client
|
Pre-configured Redis client (for testing with fakeredis).
TYPE:
|
Functions¶
create_session
async
¶
create_session(session_id: str, user_id: str, roles: list[str] | None = None, ttl: int | None = None, metadata: dict[str, Any] | None = None) -> UserSession
Create a new user session.
validate_session
async
¶
Validate a session is active and not expired.
refresh_session
async
¶
Refresh a session's expiry time.
list_user_sessions
async
¶
list_user_sessions(user_id: str) -> list[UserSession]
List all sessions for a user.
check_permission
async
¶
Check if a session has permission to access a resource.
set_role_permissions
async
¶
Configure permissions for a role.
Factory¶
pywry.state.redis.create_redis_stores
¶
create_redis_stores(redis_url: str = 'redis://localhost:6379/0', prefix: str = 'pywry', widget_ttl: int = 86400, connection_ttl: int = 300, session_ttl: int = 86400, pool_size: int = 10) -> tuple[RedisWidgetStore, RedisEventBus, RedisConnectionRouter, RedisSessionStore]
Create all Redis state stores with shared configuration.
| PARAMETER | DESCRIPTION |
|---|---|
redis_url
|
Redis connection URL.
TYPE:
|
prefix
|
Key prefix for all Redis keys.
TYPE:
|
widget_ttl
|
Widget data TTL in seconds.
TYPE:
|
connection_ttl
|
Connection routing TTL in seconds.
TYPE:
|
session_ttl
|
Session TTL in seconds.
TYPE:
|
pool_size
|
Connection pool size per store.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
tuple
|
(widget_store, event_bus, connection_router, session_store) |