Source code for tapestry.engine

import asyncio
import logging
import contextlib
from dataclasses import dataclass, field
from typing import Any, Optional

from collections.abc import AsyncIterator
from surrealdb import AsyncSurreal, AsyncHttpSurrealConnection, AsyncWsSurrealConnection


logger = logging.getLogger(__name__)


[docs] class PooledConnection: """ Immutable wrapper around a preconfigured SurrealDB connection. Instances of this class expose safe query/data operations while deliberately hiding session-mutating methods such as ``use()``. This ensures pooled connections cannot be reconfigured after engine startup. The wrapped client is expected to already be authenticated and bound to a fixed namespace/database. """ __slots__ = ("_client",)
[docs] def __init__(self, client: AsyncWsSurrealConnection | AsyncHttpSurrealConnection): object.__setattr__(self, "_client", client)
def __setattr__(self, name: str, value: Any) -> None: raise AttributeError("PooledConnection is immutable")
[docs] async def query(self, query: str, *args, **kwargs): return await self._client.query(query, *args, **kwargs)
[docs] async def query_raw(self, query: str, *args, **kwargs): return await self._client.query_raw(query, *args, **kwargs)
[docs] async def select(self, record, *args, **kwargs): return await self._client.select(record, *args, **kwargs)
[docs] async def create(self, record, data=None, *args, **kwargs): return await self._client.create(record, data, *args, **kwargs)
[docs] async def insert(self, record, data=None, *args, **kwargs): return await self._client.insert(record, data, *args, **kwargs)
[docs] async def update(self, record, data=None, *args, **kwargs): return await self._client.update(record, data, *args, **kwargs)
[docs] async def merge(self, record, data, *args, **kwargs): return await self._client.merge(record, data, *args, **kwargs)
[docs] async def patch(self, record, data, *args, **kwargs): return await self._client.patch(record, data, *args, **kwargs)
[docs] async def delete(self, record, *args, **kwargs): return await self._client.delete(record, *args, **kwargs)
[docs] async def let(self, key: str, value, *args, **kwargs): return await self._client.let(key, value, *args, **kwargs)
[docs] async def unset(self, key: str, *args, **kwargs): return await self._client.unset(key, *args, **kwargs)
[docs] async def run(self, name: str, *args, **kwargs): return await self._client.run(name, *args, **kwargs)
[docs] async def version(self, *args, **kwargs): return await self._client.version(*args, **kwargs)
[docs] async def info(self, *args, **kwargs): return await self._client.info(*args, **kwargs)
def __getattr__(self, name: str) -> Any: if name in {"use", "signin", "authenticate", "invalidate"}: raise AttributeError( f"{self.__class__.__name__} does not expose '{name}'. Pooled connections are preconfigured and immutable." ) return getattr(self._client, name)
[docs] @dataclass(slots=True) class Engine: """ SurrealDB connection pool engine with immutable configuration. The engine represents a pool of authenticated, preconfigured SurrealDB connections. Each connection is bound once at startup to a fixed namespace/database pair and then exposed through an immutable wrapper that does not allow callers to mutate session configuration. Configuration fields are frozen after initialization. Runtime lifecycle fields remain internal and mutable. Example: >>> engine = create_engine( ... url="ws://localhost:8000/rpc", ... auth_payload={"username": "root", "password": "root"}, ... namespace="my_ns", ... database="my_db", ... pool_size=5, ... ) >>> await engine.start() >>> async with engine.connect() as db: ... results = await db.select("person") >>> await engine.close() """ url: str auth_payload: dict[str, Any] namespace: str database: str pool_size: int = 4 connect_timeout: float = 5.0 _queue: Optional[asyncio.Queue[AsyncWsSurrealConnection | AsyncHttpSurrealConnection]] = field( default=None, init=False, repr=False ) _clients: list[AsyncWsSurrealConnection | AsyncHttpSurrealConnection] = field(default_factory=list, init=False, repr=False) _closed: bool = field(default=False, init=False, repr=False) _started: bool = field(default=False, init=False, repr=False) _frozen: bool = field(default=False, init=False, repr=False) def __post_init__(self) -> None: if not self.url: raise ValueError("url must be provided") if not self.auth_payload: raise ValueError("auth_payload must be provided") if not self.namespace: raise ValueError("namespace must be provided") if not self.database: raise ValueError("database must be provided") if self.pool_size <= 0: raise ValueError("pool_size must be greater than 0") if self.connect_timeout <= 0: raise ValueError("connect_timeout must be greater than 0") object.__setattr__(self, "auth_payload", dict(self.auth_payload)) object.__setattr__(self, "_frozen", True) def __setattr__(self, name: str, value: Any) -> None: if getattr(self, "_frozen", False) and name in { "url", "auth_payload", "namespace", "database", "pool_size", "connect_timeout", }: raise AttributeError(f"Engine configuration is immutable: cannot set '{name}'") object.__setattr__(self, name, value) async def _make_client(self) -> AsyncWsSurrealConnection | AsyncHttpSurrealConnection: """ Create, authenticate, and preconfigure a single SurrealDB client. Returns: An authenticated AsyncSurreal client already bound to the engine's namespace and database. """ client = AsyncSurreal(self.url) await client.signin(self.auth_payload) await client.use(self.namespace, self.database) return client
[docs] async def start(self) -> "Engine": """ Initialize the connection pool with authenticated, preconfigured clients. Returns: self: The engine instance for chaining. Raises: RuntimeError: If the engine is already started or closed. Exception: If unable to create the required number of clients. """ if self._started: raise RuntimeError("Engine already started") if self._closed: raise RuntimeError("Engine is closed") self._queue = asyncio.Queue[AsyncWsSurrealConnection | AsyncHttpSurrealConnection]() for i in range(self.pool_size): try: client = await self._make_client() except Exception as e: logger.exception("Failed to create surreal client #%d: %s", i, e) await asyncio.sleep(0.5) client = await self._make_client() self._clients.append(client) self._queue.put_nowait(client) self._started = True logger.info( "SurrealDB engine started with %d connections for %s/%s", len(self._clients), self.namespace, self.database, ) return self
[docs] async def close(self) -> None: """ Close all connections and shut down the engine. """ if self._closed: return self._closed = True if self._queue is not None: while not self._queue.empty(): try: _ = self._queue.get_nowait() self._queue.task_done() except asyncio.QueueEmpty: break for client in self._clients: try: await client.close() except Exception: logger.exception("Error closing surreal client") self._clients.clear() self._started = False logger.info("SurrealDB engine closed")
[docs] @contextlib.asynccontextmanager async def connect(self) -> AsyncIterator[PooledConnection]: """ Acquire a preconfigured immutable connection wrapper from the pool. Yields: PooledConnection: A connection wrapper that does not expose session-mutating operations like ``use()``. Raises: RuntimeError: If the engine is not started or is closed. """ if self._closed: raise RuntimeError("Engine is closed") if not self._started: raise RuntimeError("Engine not started. Call await engine.start() first") if self._queue is None: raise RuntimeError("Engine queue is not initialized") client = await self._queue.get() try: yield PooledConnection(client) except Exception as exc: if self._is_fatal_client_error(exc): logger.warning("Client had fatal error, replacing: %s", exc) await self._replace_client(client) raise finally: if not self._closed and self._queue is not None: self._queue.put_nowait(client)
acquire = connect
[docs] async def execute(self, query: str, *args, retries: int = 1, backoff: float = 0.2, **kwargs): """ Execute a query using a pooled connection with automatic retry. Args: query: The query string to execute *args: Arguments forwarded to the query method retries: Number of retry attempts for transient errors backoff: Base backoff time in seconds between retries **kwargs: Keyword arguments forwarded to the query method """ if self._closed: raise RuntimeError("Engine is closed") if not self._started: raise RuntimeError("Engine not started. Call await engine.start() first") for attempt in range(retries + 1): async with self.connect() as client: try: return await client.query(query, *args, **kwargs) except Exception as exc: logger.exception("Query failed on attempt %d: %s", attempt, exc) if attempt < retries and self._is_retryable_error(exc): await asyncio.sleep(backoff * (2**attempt)) continue raise
async def _replace_client( self, dead_client: AsyncWsSurrealConnection | AsyncHttpSurrealConnection, ) -> None: """ Replace a failed client with a new authenticated preconfigured connection. """ try: await dead_client.close() except Exception: pass try: new_client = await self._make_client() except Exception as e: logger.exception("Failed to create replacement client: %s", e) if self._queue is not None and not self._closed: self._queue.put_nowait(dead_client) return try: idx = self._clients.index(dead_client) self._clients[idx] = new_client except ValueError: self._clients.append(new_client) if self._queue is not None and not self._closed: self._queue.put_nowait(new_client) def _is_retryable_error(self, exc: Exception) -> bool: return isinstance(exc, (asyncio.TimeoutError, ConnectionError)) def _is_fatal_client_error(self, exc: Exception) -> bool: return isinstance(exc, (ConnectionError, RuntimeError))
[docs] def create_engine( url: str, auth_payload: dict[str, Any], namespace: str, database: str, pool_size: int = 4, connect_timeout: float = 5.0, ) -> Engine: """ Create a connection pool engine for SurrealDB with immutable configuration. Args: url: SurrealDB connection URL auth_payload: Authentication credentials namespace: Fixed namespace configured once for every pooled connection database: Fixed database configured once for every pooled connection pool_size: Number of connections to maintain in the pool connect_timeout: Timeout for connection attempts in seconds Returns: Engine: An engine instance with immutable configuration. """ return Engine( url=url, auth_payload=auth_payload, namespace=namespace, database=database, pool_size=pool_size, connect_timeout=connect_timeout, )
[docs] @contextlib.asynccontextmanager async def create_engine_context( url: str, auth_payload: dict[str, Any], namespace: str, database: str, pool_size: int = 4, connect_timeout: float = 5.0, ) -> AsyncIterator[Engine]: """ Create a connection pool engine with automatic lifecycle management. The returned engine has immutable configuration, and all pooled connections are preconfigured once with the provided namespace and database. """ engine = create_engine( url=url, auth_payload=auth_payload, namespace=namespace, database=database, pool_size=pool_size, connect_timeout=connect_timeout, ) await engine.start() try: yield engine finally: await engine.close()