Skip to content

Septum FSM API Reference

Core Module

mycorrhizal.septum.core

Septum - Asyncio Finite State Machine Framework

A decorator-based DSL for defining and executing state machines with support for asyncio, timeouts, message passing, and hierarchical state composition.

Usage

from mycorrhizal.septum.core import septum, StateMachine, LabeledTransition from enum import Enum, auto

@septum.state() def IdleState(): class Events(Enum): START = auto() QUIT = auto()

@septum.on_state
async def on_state(ctx):
    if ctx.msg == "start":
        return Events.START
    return None

@septum.transitions
def transitions():
    return [
        LabeledTransition(Events.START, ProcessingState),
        LabeledTransition(Events.QUIT, DoneState),
    ]

Create and run the FSM

fsm = StateMachine(initial_state=IdleState, common_data={}) await fsm.initialize() fsm.send_message("start") await fsm.tick()

Key Classes

StateMachine - Main FSM runtime with message queue and tick-based execution StateConfiguration - Configuration for states (timeout, retries, terminal, can_dwell) SharedContext - Context object passed to state handlers with msg and common data LabeledTransition - Maps events to target states

Transition Types

State references - Direct transition to another state Again - Re-execute current state immediately Unhandled - Wait for next message Retry - Re-enter state with retry counter Restart - Reset retry counter and wait for message Repeat - Re-enter state from on_enter Push(state1, state2, ...) - Push states onto stack Pop - Pop and return to previous state

State Handlers

on_state(ctx) - Main state logic, return transition or None to wait on_enter(ctx) - Called when entering state on_leave(ctx) - Called when leaving state on_timeout(ctx) - Called if timeout expires on_fail(ctx) - Called if exception occurs

StateConfiguration dataclass

StateConfiguration(timeout: Optional[float] = None, retries: Optional[int] = None, terminal: bool = False, can_dwell: bool = False)

Configuration options for states.

Parameters:

Name Type Description Default
timeout Optional[float]

Optional timeout in seconds. If no message received within this time, on_timeout handler is called.

None
retries Optional[int]

Optional number of retries allowed. If state returns Retry, the retry counter increments. When max retries exceeded, state fails.

None
terminal bool

If True, reaching this state completes the FSM (raises StateMachineComplete exception).

False
can_dwell bool

If True, state can wait indefinitely without returning a transition (returning None from on_state is allowed).

False
Example

@septum.state(config=StateConfiguration(timeout=5.0, retries=3)) def MyState(): # State with timeout and retry handling pass

SharedContext dataclass

SharedContext(send_message: Callable, log: Callable, common: T, msg: Optional[Any] = None)

Bases: Generic[T]

Context passed to state handler methods.

The SharedContext provides access to the current message, shared data, and utilities for state handlers.

Attributes:

Name Type Description
send_message Callable

Function to send messages to the state machine

log Callable

Function to log messages (default: prints with [FSM] prefix)

common T

Shared data passed to StateMachine (accessible as ctx.common)

msg Optional[Any]

The current message being processed (if any)

Type parameter T represents the type of the 'common' field for type safety.

Example

@septum.on_state async def on_state(ctx: SharedContext): # Access shared data counter = ctx.common.get("counter", 0)

# Check for messages
if ctx.msg == "start":
    return Events.START

# Send new messages
ctx.send_message("ping")

TransitionType dataclass

TransitionType(AWAITS: bool = False)

Base type for all transitions

StateTransition dataclass

StateTransition(AWAITS: bool = False)

Bases: TransitionType

Represents a transition to a different state

StateContinuation dataclass

StateContinuation(AWAITS: bool = False)

Bases: TransitionType

Represents staying in the same state

StateRenewal dataclass

StateRenewal(AWAITS: bool = False)

Bases: TransitionType

Represents a transition back into the current state

Again dataclass

Again(AWAITS: bool = False)

Bases: StateContinuation

Execute current state immediately without affecting retry counter

Unhandled dataclass

Unhandled(AWAITS: bool = True)

Bases: StateContinuation

Wait for next message/event without affecting retry counter

Retry dataclass

Retry(AWAITS: bool = False)

Bases: StateContinuation

Retry current state, decrementing retry counter, starting from on_enter

Restart dataclass

Restart(AWAITS: bool = True)

Bases: StateRenewal

Restart current state, reset retry counter, await message

Repeat dataclass

Repeat(AWAITS: bool = False)

Bases: StateRenewal

Repeat current state, reset retry counter, execute on_enter immediately

StateRef dataclass

StateRef(state: str)

Bases: StateTransition

String-based state reference for breaking circular imports

Source code in src/mycorrhizal/septum/core.py
def __init__(self, state: str):
    self.state = state
    object.__setattr__(self, "AWAITS", False)

LabeledTransition dataclass

LabeledTransition(label: Enum, transition: TransitionType)

A transition with a label (typically an enum value)

Push dataclass

Push(*states: StateTransition)

Bases: StateTransition

Push one or more states onto the stack

Source code in src/mycorrhizal/septum/core.py
def __init__(self, *states: StateTransition):
    object.__setattr__(self, "push_states", list(states))

Pop dataclass

Pop(AWAITS: bool = False)

Bases: StateTransition

Pop from the stack to return to previous state

StateSpec dataclass

StateSpec(AWAITS: bool = False, name: str = '', qualname: str = '', module: str = '', config: StateConfiguration = StateConfiguration(), on_state: Optional[Callable[[SharedContext], Awaitable[TransitionType]]] = None, on_enter: Optional[Callable[[SharedContext], Awaitable[None]]] = None, on_leave: Optional[Callable[[SharedContext], Awaitable[None]]] = None, on_timeout: Optional[Callable[[SharedContext], Awaitable[TransitionType]]] = None, on_fail: Optional[Callable[[SharedContext], Awaitable[TransitionType]]] = None, transitions: Optional[Callable[[], List[Union[LabeledTransition, TransitionType]]]] = None, Events: Optional[type[Enum]] = None, group_name: str = '', parent_state: Optional[StateSpec] = None, _is_root: bool = False)

Bases: StateTransition

A state defined via decorators.

This is a StateTransition, so it can be used directly in transition lists. It has the same interface as the old State class but is constructed by decorators rather than metaclass magic.

base_name property

base_name: str

Get just the state name without module path

CONFIG property

Alias for config (for compatibility with State class API)

get_transitions

get_transitions() -> List[Union[LabeledTransition, TransitionType]]

Get the transition list for this state

Source code in src/mycorrhizal/septum/core.py
def get_transitions(self) -> List[Union[LabeledTransition, TransitionType]]:
    """Get the transition list for this state"""
    if self.transitions is None:
        return []
    result = self.transitions()
    # Handle single transition returned
    if isinstance(result, (TransitionType, LabeledTransition)):
        return [result]
    return result

ValidationError

Bases: Exception

Raised when state machine validation fails

ValidationResult dataclass

ValidationResult(valid: bool, errors: list[str], warnings: list[str], discovered_states: set[str])

Results of state machine validation

StateRegistry

StateRegistry()

Manages state resolution and validation for StateSpec-based states.

This registry handles: - Resolving StateRef strings to StateSpec objects - Validating all states in a state machine - Getting transition mappings for states - Detecting circular references and invalid transitions - Per-instance state registries for better modularity

Source code in src/mycorrhizal/septum/core.py
def __init__(self):
    self._state_cache: Dict[str, StateSpec] = {}
    self._transition_cache: Dict[str, Dict] = {}
    self._resolved_modules: Dict[str, Any] = {}
    self._validation_errors: list[str] = []
    self._validation_warnings: list[str] = []

resolve_state

resolve_state(state_ref: Union[str, StateRef, StateSpec], validate_only: bool = False) -> Optional[StateSpec]

Resolve a state reference to an actual StateSpec object.

Parameters:

Name Type Description Default
state_ref Union[str, StateRef, StateSpec]

Can be a StateSpec, a StateRef, or a fully qualified state name string

required
validate_only bool

If True, don't cache the result (used during validation)

False

Returns:

Type Description
Optional[StateSpec]

The resolved StateSpec, or None if not found (when validate_only=True)

Source code in src/mycorrhizal/septum/core.py
def resolve_state(
    self, state_ref: Union[str, StateRef, StateSpec], validate_only: bool = False
) -> Optional[StateSpec]:
    """
    Resolve a state reference to an actual StateSpec object.

    Args:
        state_ref: Can be a StateSpec, a StateRef, or a fully qualified state name string
        validate_only: If True, don't cache the result (used during validation)

    Returns:
        The resolved StateSpec, or None if not found (when validate_only=True)
    """
    # If it's already a StateSpec, return it
    if isinstance(state_ref, StateSpec):
        # Track resolved module
        if state_ref.module and state_ref.module not in self._resolved_modules:
            mod = sys.modules.get(state_ref.module)
            self._resolved_modules[state_ref.module] = mod

            # Store short name for easier local file lookup
            if mod and hasattr(mod, "__file__"):
                fp = Path(getattr(mod, "__file__"))
                self._resolved_modules[fp.stem] = mod

        return state_ref

    # If it's a StateRef, resolve the string
    if isinstance(state_ref, StateRef):
        state_name = state_ref.state
    elif isinstance(state_ref, str):
        state_name = state_ref
    else:
        return None

    # Check cache first
    if state_name in self._state_cache and not validate_only:
        return self._state_cache[state_name]

    # Use global registry
    state = get_state(state_name)
    if state:
        if not validate_only:
            self._state_cache[state_name] = state
        return state

    # State not found in registry
    if validate_only:
        self._validation_errors.append(
            f"State '{state_name}' not found in registry"
        )
        return None
    else:
        raise ValidationError(f"State '{state_name}' not found in registry")

validate_all_states

validate_all_states(initial_state: Union[str, StateRef, StateSpec], error_state: Optional[Union[str, StateRef, StateSpec]] = None) -> ValidationResult

Validate all states reachable from the initial state.

Parameters:

Name Type Description Default
initial_state Union[str, StateRef, StateSpec]

The starting state

required
error_state Optional[Union[str, StateRef, StateSpec]]

Optional error state

None

Returns:

Type Description
ValidationResult

ValidationResult with errors, warnings, and discovered states

Source code in src/mycorrhizal/septum/core.py
def validate_all_states(
    self,
    initial_state: Union[str, StateRef, StateSpec],
    error_state: Optional[Union[str, StateRef, StateSpec]] = None,
) -> ValidationResult:
    """
    Validate all states reachable from the initial state.

    Args:
        initial_state: The starting state
        error_state: Optional error state

    Returns:
        ValidationResult with errors, warnings, and discovered states
    """
    self._validation_errors = []
    self._validation_warnings = []

    # Discover all reachable states
    discovered_states = set()
    state_references = set()
    resolved_references = set()
    states_to_visit = []

    # Resolve initial state
    initial_resolved = self.resolve_state(initial_state, validate_only=True)
    if initial_resolved:
        states_to_visit.append(initial_resolved)
        discovered_states.add(initial_resolved.name)

    # Resolve error state if provided
    if error_state:
        error_resolved = self.resolve_state(error_state, validate_only=True)
        if error_resolved:
            states_to_visit.append(error_resolved)
            discovered_states.add(error_resolved.name)

    # Traverse all reachable states
    visited = set()
    while states_to_visit:
        current_state = states_to_visit.pop()
        state_name = current_state.name

        if state_name in visited:
            continue
        visited.add(state_name)

        def handle_state(s):
            discovered_states.add(s.name if hasattr(s, 'name') else str(s))
            states_to_visit.append(s)

        def handle_reference(r):
            state_references.add(r)

        def handle_push(p):
            for state in p.push_states:
                match state:
                    case s if isinstance(s, StateSpec):
                        handle_state(s)
                    case r if isinstance(r, StateRef):
                        handle_reference(r)
                    case invalid:
                        self._validation_errors.append(
                            f"State '{state_name}' attempted to push an invalid transition: {invalid}"
                        )

        try:
            # Get transitions
            raw_transitions = current_state.get_transitions()

            # Handle different return types
            match raw_transitions:
                case list() | tuple():
                    pass  # Already a list
                case _ if isinstance(raw_transitions, (TransitionType, LabeledTransition)):
                    raw_transitions = [raw_transitions]
                case _:
                    raw_transitions = []

            # Validate each transition
            for transition in raw_transitions:
                match transition:
                    case _ if isinstance(transition, Push):
                        # Push transitions must have labels
                        self._validation_errors.append(
                            f"State '{state_name}' has a push transition without a label"
                        )
                        handle_push(transition)
                    case LabeledTransition():
                        match transition.transition:
                            case st if isinstance(st, StateSpec):
                                handle_state(st)
                            case r if isinstance(r, StateRef):
                                handle_reference(r)
                            case p if isinstance(p, Push):
                                handle_push(p)
                    case s if isinstance(s, StateSpec):
                        handle_state(s)
                    case r if isinstance(r, StateRef):
                        self._validation_warnings.append(
                            f"State '{state_name}' has a StateReference transition without a label"
                        )
                        handle_reference(r)
                    case _ if isinstance(transition, (StateContinuation, StateRenewal)):
                        # Valid continuation/renewal
                        pass
                    case invalid:
                        self._validation_errors.append(
                            f"State '{state_name}' has an invalid transition: {invalid}"
                        )

            # Validate that the state has an on_state handler
            if current_state.on_state is None:
                self._validation_errors.append(
                    f"State {state_name} does not have an on_state handler defined"
                )

            # Make sure non-terminal states define transitions
            if not current_state.config.terminal:
                if current_state.transitions is None:
                    self._validation_errors.append(
                        f"Non-terminal state {state_name} does not define any transitions"
                    )

            # Validate timeout configuration
            if current_state.config.timeout is not None:
                if (
                    not isinstance(current_state.config.timeout, (int, float))
                    or current_state.config.timeout <= 0
                ):
                    self._validation_errors.append(
                        f"State '{state_name}' has invalid timeout: {current_state.config.timeout}"
                    )

                has_timeout_handler = current_state.on_timeout is not None
                if not has_timeout_handler:
                    self._validation_warnings.append(
                        f"State '{state_name}' has timeout but no on_timeout handler defined"
                    )

            # Pass 2: Validate all string references
            for state_ref in list(state_references - resolved_references):
                resolved = self.resolve_state(state_ref, validate_only=True)
                if resolved:
                    resolved_references.add(state_ref)
                    if resolved.name not in discovered_states:
                        discovered_states.add(resolved.name)
                        states_to_visit.append(resolved)

        except Exception:
            self._validation_errors.append(
                f"Error processing transitions for state '{state_name}': {traceback.format_exc()}"
            )

    # Run PDA-specific validation
    pda_result = self.validate_pda_properties(initial_state, error_state)
    self._validation_errors.extend(pda_result.errors)
    self._validation_warnings.extend(pda_result.warnings)

    # Run determinism validation
    determ_result = self.validate_determinism(initial_state, error_state)
    self._validation_errors.extend(determ_result.errors)
    self._validation_warnings.extend(determ_result.warnings)

    return ValidationResult(
        valid=len(self._validation_errors) == 0,
        errors=self._validation_errors.copy(),
        warnings=self._validation_warnings.copy(),
        discovered_states=discovered_states,
    )

get_transitions

get_transitions(state: StateSpec) -> Dict

Get the transition mapping for a state, with caching.

Returns a dict mapping event labels/enum values to transition targets.

Source code in src/mycorrhizal/septum/core.py
def get_transitions(self, state: StateSpec) -> Dict:
    """
    Get the transition mapping for a state, with caching.

    Returns a dict mapping event labels/enum values to transition targets.
    """
    state_name = state.name

    if state_name in self._transition_cache:
        return self._transition_cache[state_name]

    resolved_transitions = {}

    raw_transitions = state.get_transitions()
    match raw_transitions:
        case list() | tuple():
            pass  # Already a list
        case _ if isinstance(raw_transitions, (TransitionType, LabeledTransition)):
            raw_transitions = [raw_transitions]
        case _:
            raw_transitions = []

    for transition in raw_transitions:
        match transition:
            case LabeledTransition(label, target):
                # Resolve target if it's a StateRef
                if isinstance(target, StateRef):
                    resolved_target = self.resolve_state(target)
                    if not resolved_target:
                        # Can't resolve - skip this transition
                        continue
                    resolved = resolved_target
                else:
                    resolved = target

                # Use the label enum itself as the key
                resolved_transitions[label] = resolved
                # Also allow lookup by label name
                resolved_transitions[label.name] = resolved
                # Also allow lookup by label value
                if hasattr(label, 'value'):
                    resolved_transitions[label.value] = resolved
            case s if isinstance(s, StateSpec):
                # Direct state reference
                resolved_transitions[s.name] = s
                resolved_transitions[s] = s
            case r if isinstance(r, StateRef):
                # String reference - resolve it
                resolved = self.resolve_state(r)
                if resolved:
                    resolved_transitions[r.state] = resolved
            case _ if isinstance(transition, (StateContinuation, StateRenewal)):
                # Special transitions - allow lookup by type
                resolved_transitions[transition] = transition

    self._transition_cache[state_name] = resolved_transitions
    return resolved_transitions

validate_pda_properties

validate_pda_properties(initial_state: Union[str, StateRef, StateSpec], error_state: Optional[Union[str, StateRef, StateSpec]] = None) -> ValidationResult

Validate PDA-specific properties (stack depth, push/pop balance).

Source code in src/mycorrhizal/septum/core.py
def validate_pda_properties(
    self,
    initial_state: Union[str, StateRef, StateSpec],
    error_state: Optional[Union[str, StateRef, StateSpec]] = None,
) -> ValidationResult:
    """Validate PDA-specific properties (stack depth, push/pop balance)."""
    errors = []
    warnings = []

    # Get all states from global registry
    all_states = list(get_all_states().values())

    # Build a map of state name to state
    state_map = {state.name: state for state in all_states}

    for state_name, state in state_map.items():
        transitions = state.get_transitions()

        # Check for unbounded push (state pushes itself without Pop)
        for t in transitions:
            if isinstance(t, Push):
                for pushed_state in t.push_states:
                    # Check if pushed state is the same as current state
                    if isinstance(pushed_state, StateSpec) and pushed_state.name == state_name:
                        # Check if there's a Pop transition
                        has_pop = any(isinstance(tr, (Pop, type(Pop))) for tr in transitions)
                        if not has_pop:
                            warnings.append(
                                f"State '{state_name}' pushes itself without Pop. "
                                "This may cause unbounded stack growth."
                            )

        # Check for unreachable pop (pop without matching push)
        has_pop = any(isinstance(t, (Pop, type(Pop))) for t in transitions)
        if has_pop and state_name != initial_state if isinstance(initial_state, str) else state.name != (initial_state.name if isinstance(initial_state, StateSpec) else ""):
            # Check if this state can be reached via a Push transition
            can_be_pushed = self._check_if_state_can_be_pushed(state, state_map)
            if not can_be_pushed:
                warnings.append(
                    f"State '{state_name}' can Pop without being pushed. "
                    "This may cause PopFromEmptyStack exception."
                )

    return ValidationResult(
        valid=len(errors) == 0,
        errors=errors,
        warnings=warnings,
        discovered_states=set(state_map.keys()),
    )

_check_if_state_can_be_pushed

_check_if_state_can_be_pushed(state: StateSpec, state_map: Dict[str, StateSpec]) -> bool

Check if a state can be reached via a Push transition from any other state.

Source code in src/mycorrhizal/septum/core.py
def _check_if_state_can_be_pushed(self, state: StateSpec, state_map: Dict[str, StateSpec]) -> bool:
    """Check if a state can be reached via a Push transition from any other state."""
    for other_state_name, other_state in state_map.items():
        if other_state_name == state.name:
            continue
        transitions = other_state.get_transitions()
        for t in transitions:
            if isinstance(t, Push):
                for pushed_state in t.push_states:
                    if isinstance(pushed_state, StateSpec) and pushed_state.name == state.name:
                        return True
    return False

validate_determinism

validate_determinism(initial_state: Union[str, StateRef, StateSpec], error_state: Optional[Union[str, StateRef, StateSpec]] = None) -> ValidationResult

Check that no state has multiple transitions for same event.

Source code in src/mycorrhizal/septum/core.py
def validate_determinism(
    self,
    initial_state: Union[str, StateRef, StateSpec],
    error_state: Optional[Union[str, StateRef, StateSpec]] = None,
) -> ValidationResult:
    """Check that no state has multiple transitions for same event."""
    errors = []

    # Get all states from global registry
    all_states = list(get_all_states().values())

    for state in all_states:
        event_transitions = {}

        transitions = state.get_transitions()
        for transition in transitions:
            if isinstance(transition, LabeledTransition):
                event = transition.label
                if event in event_transitions:
                    errors.append(
                        f"State '{state.name}' has multiple transitions for event '{event}'. "
                        "This creates non-deterministic behavior."
                    )
                event_transitions[event] = transition

    return ValidationResult(
        valid=len(errors) == 0,
        errors=errors,
        warnings=[],
        discovered_states=set(s.name for s in all_states),
    )

PrioritizedMessage dataclass

PrioritizedMessage(priority: int, item: Any)

__lt__

__lt__(other)

Compare based on priority only

Source code in src/mycorrhizal/septum/core.py
def __lt__(self, other):
    """Compare based on priority only"""
    if not isinstance(other, PrioritizedMessage):
        return NotImplemented
    return self.priority < other.priority

SeptumInternalMessage dataclass

SeptumInternalMessage()

Base class for internal FSM messages

TimeoutMessage dataclass

TimeoutMessage(state_name: str, timeout_id: int, timeout_duration: float)

Bases: SeptumInternalMessage

Internal message for timeout events

StateMachineComplete

Bases: Exception

Raised when the state machine has reached a terminal state

BlockedInUntimedState

Bases: Exception

Raised when a non-dwelling state blocks without a timeout

PopFromEmptyStack

Bases: Exception

Raised when we attempt to pop from an empty stack

NoStateToTick

Bases: Exception

Raised when the FSM object doesn't have a state attached

StateMachine

StateMachine(initial_state: Union[str, StateRef, StateSpec], max_queue_size: int = 1000, queue_overflow_policy: str = 'block', error_state: Optional[Union[str, StateRef, StateSpec]] = None, filter_fn: Optional[Callable] = None, trap_fn: Optional[Callable] = None, on_error_fn: Optional[Callable] = None, common_data: Optional[Any] = None)

Asyncio-native finite state machine for executing StateSpec-based states.

The StateMachine manages state execution, transitions, message passing, and lifecycle handling. States are defined using the @septum.state decorator and should define on_state, on_enter, on_leave, on_timeout, or on_fail handlers.

Parameters:

Name Type Description Default
initial_state Union[str, StateRef, StateSpec]

The initial state (state function, StateRef, or StateSpec)

required
error_state Optional[Union[str, StateRef, StateSpec]]

Optional error state for handling exceptions

None
filter_fn Optional[Callable]

Optional function to filter messages (ctx, msg) -> bool

None
trap_fn Optional[Callable]

Optional function to trap exceptions (exc) -> None

None
on_error_fn Optional[Callable]

Optional function called on errors (ctx, exc) -> None

None
common_data Optional[Any]

Shared data accessible via ctx.common in all states

None

Attributes:

Name Type Description
current_state Optional[StateSpec]

The currently executing state

context

SharedContext containing msg and common data

state_stack List[StateSpec]

Stack for Push/Pop hierarchical state management

Methods:

Name Description
initialize

Initialize the state machine (must be called before tick)

tick

Process one state machine tick

send_message

Send a message to the state machine

reset

Reset to initial state

Example

fsm = StateMachine( initial_state=IdleState, error_state=ErrorState, common_data={"counter": 0} ) await fsm.initialize() fsm.send_message("start") await fsm.tick()

Source code in src/mycorrhizal/septum/core.py
def __init__(
    self,
    initial_state: Union[str, StateRef, StateSpec],
    max_queue_size: int = 1000,
    queue_overflow_policy: str = "block",
    error_state: Optional[Union[str, StateRef, StateSpec]] = None,
    filter_fn: Optional[Callable] = None,
    trap_fn: Optional[Callable] = None,
    on_error_fn: Optional[Callable] = None,
    common_data: Optional[Any] = None,
):

    # Use global state registry
    self.registry = StateRegistry()

    # Store queue configuration
    self.max_queue_size = max_queue_size
    self.queue_overflow_policy = queue_overflow_policy

    # The filter and trap functions
    self._filter_fn = filter_fn or (lambda x, y: None)
    self._trap_fn = trap_fn or (lambda x: None)
    self._on_err_fn = on_error_fn or (lambda x, y: None)

    # Validate the state machine structure
    validation_result = self.registry.validate_all_states(
        initial_state, error_state
    )

    if not validation_result.valid:
        error_msg = "State machine validation failed:\n" + "\n".join(
            validation_result.errors
        )
        if validation_result.warnings:
            error_msg += "\n\nWarnings:\n" + "\n".join(validation_result.warnings)
        raise ValidationError(error_msg)

    # Log warnings
    if validation_result.warnings:
        for warning in validation_result.warnings:
            self.log(f"WARNING: {warning}")

    # Resolve initial and error states
    self.initial_state = self.registry.resolve_state(initial_state)
    self.error_state = (
        self.registry.resolve_state(error_state) if error_state else None
    )

    # Asyncio-based infrastructure
    self._message_queue = PriorityQueue(maxsize=max_queue_size)
    self._timeout_task = None
    self._timeout_counter = 0
    self._current_timeout_id = None

    # Runtime state
    self.current_state: Optional[StateSpec] = None
    self.state_stack: List[StateSpec] = []
    self.context = SharedContext(
        send_message=self.send_message,
        log=self.log,
        common=common_data if common_data is not None else dict(),
    )

    self.retry_counters: Dict[str, int] = {}
    self.state_enter_times: Dict[str, float] = {}

    # Initialize asynchronously
    self._initialized = False

queue_depth property

queue_depth: int

Get current message queue depth.

initialize async

initialize()

Initialize the state machine asynchronously

Source code in src/mycorrhizal/septum/core.py
async def initialize(self):
    """Initialize the state machine asynchronously"""
    if not self._initialized:
        await self.reset()
        self._initialized = True

send_message

send_message(message: Any)

Send a message to the state machine

Source code in src/mycorrhizal/septum/core.py
def send_message(self, message: Any):
    """Send a message to the state machine"""
    try:
        if message and self._filter_fn(self.context, message):
            return
        self._send_message_internal(message)
    except asyncio.QueueFull:
        # Handle overflow based on policy
        if self.queue_overflow_policy == "block":
            # Wait for space (synchronous, so raise)
            raise RuntimeError(
                "Queue full, would block. Use async send_message_async or check queue depth."
            )
        elif self.queue_overflow_policy == "drop_newest":
            # Drop this message (silently)
            pass
        elif self.queue_overflow_policy == "fail":
            raise RuntimeError(
                f"Queue full (max={self.max_queue_size}), message dropped"
            )
        # Note: "drop_oldest" is handled in _send_message_internal
    except Exception as e:
        self._send_message_internal(e)

_get_message_priority

_get_message_priority(message: Any) -> int

Get message priority for queuing.

Source code in src/mycorrhizal/septum/core.py
def _get_message_priority(self, message: Any) -> int:
    """Get message priority for queuing."""
    if isinstance(message, SeptumInternalMessage):
        return self.MessagePriorities.INTERNAL_MESSAGE
    elif isinstance(message, Exception):
        return self.MessagePriorities.ERROR
    else:
        return self.MessagePriorities.MESSAGE

_send_message_internal

_send_message_internal(message: Any) -> None

Internal message sending with priority handling.

Source code in src/mycorrhizal/septum/core.py
def _send_message_internal(self, message: Any) -> None:
    """Internal message sending with priority handling."""
    priority = self._get_message_priority(message)

    match message:
        case _ if isinstance(message, SeptumInternalMessage):
            try:
                self._message_queue.put_nowait(
                    PrioritizedMessage(priority, message)
                )
            except asyncio.QueueFull:
                if self.queue_overflow_policy == "drop_oldest":
                    # Drop oldest message
                    try:
                        self._message_queue.get_nowait()
                        self._message_queue.put_nowait(PrioritizedMessage(priority, message))
                    except asyncio.QueueEmpty:
                        pass  # Queue was actually empty, race condition
                # Other policies handled by send_message
        case _ if isinstance(message, Exception):
            try:
                self._message_queue.put_nowait(
                    PrioritizedMessage(priority, message)
                )
            except asyncio.QueueFull:
                if self.queue_overflow_policy == "drop_oldest":
                    # Drop oldest message
                    try:
                        self._message_queue.get_nowait()
                        self._message_queue.put_nowait(PrioritizedMessage(priority, message))
                    except asyncio.QueueEmpty:
                        pass  # Queue was actually empty, race condition
        case _:
            try:
                self._message_queue.put_nowait(
                    PrioritizedMessage(priority, message)
                )
            except asyncio.QueueFull:
                if self.queue_overflow_policy == "drop_oldest":
                    # Drop oldest message
                    try:
                        self._message_queue.get_nowait()
                        self._message_queue.put_nowait(PrioritizedMessage(priority, message))
                    except asyncio.QueueEmpty:
                        pass  # Queue was actually empty, race condition

reset async

reset()

Reset the state machine to initial state

Source code in src/mycorrhizal/septum/core.py
async def reset(self):
    """Reset the state machine to initial state"""
    self._cancel_timeout()
    self.state_stack = []
    self.retry_counters = {}
    self.state_enter_times = {}

    # Clear message queue
    while not self._message_queue.empty():
        try:
            self._message_queue.get_nowait()
        except Exception:
            break

    # Transition into our initial state
    await self._transition_to_state(self.initial_state)

log

log(message: str)

Log a message

Source code in src/mycorrhizal/septum/core.py
def log(self, message: str):
    """Log a message"""
    print(f"[FSM] {message}")

tick async

tick(timeout: Optional[Union[float, int]] = 0)

Process one state machine tick

Source code in src/mycorrhizal/septum/core.py
async def tick(self, timeout: Optional[Union[float, int]] = 0):
    """Process one state machine tick"""

    if not self._initialized:
        await self.initialize()

    if not self.current_state:
        raise NoStateToTick()

    # Validate timeout parameter
    match timeout:
        case x if x and not isinstance(x, (int, float)):
            raise ValueError("Tick timeout must be None, or an int/float >= 0")
        case 0:
            block = False
        case None:
            block = True
        case x if x > 0:
            block = True
        case _:
            raise ValueError("Tick timeout must be None or >= 0")

    while True:
        try:
            if isinstance(self.context.msg, Exception):
                raise self.context.msg

            # Handle timeout messages specially
            if isinstance(self.context.msg, TimeoutMessage):
                if not self._handle_timeout_message():
                    return
                transition = await self._call_on_timeout(self.current_state)
            else:
                transition = await self._call_on_state(self.current_state)

            if self.current_state.config.terminal:
                raise StateMachineComplete()

            if transition in (None, Unhandled):
                self._trap_fn(self.context)

            self.context.msg = None

            should_check_queue = await self._process_transition(transition, block)

            # In manual mode (block=False), stop after processing one transition if not checking queue
            if not block and not should_check_queue:
                break

            if should_check_queue:
                if block:
                    message = await asyncio.wait_for(
                        self._message_queue.get(), timeout=timeout
                    )
                    message = message.item
                else:
                    message = self._message_queue.get_nowait().item
                self.context.msg = message
        except asyncio.QueueEmpty:
            break
        except asyncio.TimeoutError:
            break
        except Exception as e:
            if (
                isinstance(e, StateMachineComplete)
                or self.current_state.config.terminal
            ):
                raise

            next_transition = self._on_err_fn(self.context, e)
            if next_transition:
                await self._process_transition(next_transition, block)
            elif self.error_state and self.current_state != self.error_state:
                # Clear the exception message before transitioning
                self.context.msg = None
                await self._transition_to_state(self.error_state)
            elif self.current_state == self.error_state:
                # Already in error state and got another exception - stop
                break
            else:
                raise

    return None

_call_on_state async

_call_on_state(state: StateSpec)

Call the state's on_state handler

Source code in src/mycorrhizal/septum/core.py
async def _call_on_state(self, state: StateSpec):
    """Call the state's on_state handler"""
    if state.on_state is None:
        raise ValueError(f"State {state.name} has no on_state handler")
    # Create interface view if handler has interface type hint
    ctx_to_pass = _create_interface_view_for_context(self.context, state.on_state)
    return await state.on_state(ctx_to_pass)

_call_on_enter async

_call_on_enter(state: StateSpec)

Call the state's on_enter handler

Source code in src/mycorrhizal/septum/core.py
async def _call_on_enter(self, state: StateSpec):
    """Call the state's on_enter handler"""
    if state.on_enter is not None:
        self.log(f"  [DEBUG] Calling on_enter for {state.name}")
        # Create interface view if handler has interface type hint
        ctx_to_pass = _create_interface_view_for_context(self.context, state.on_enter)
        await state.on_enter(ctx_to_pass)
        self.log(f"  [DEBUG] on_enter completed for {state.name}")

_call_on_leave async

_call_on_leave(state: StateSpec)

Call the state's on_leave handler

Source code in src/mycorrhizal/septum/core.py
async def _call_on_leave(self, state: StateSpec):
    """Call the state's on_leave handler"""
    if state.on_leave is not None:
        # Create interface view if handler has interface type hint
        ctx_to_pass = _create_interface_view_for_context(self.context, state.on_leave)
        await state.on_leave(ctx_to_pass)

_call_on_timeout async

_call_on_timeout(state: StateSpec)

Call the state's on_timeout handler

Source code in src/mycorrhizal/septum/core.py
async def _call_on_timeout(self, state: StateSpec):
    """Call the state's on_timeout handler"""
    if state.on_timeout is None:
        raise ValueError(f"State {state.name} has no on_timeout handler")
    # Create interface view if handler has interface type hint
    ctx_to_pass = _create_interface_view_for_context(self.context, state.on_timeout)
    return await state.on_timeout(ctx_to_pass)

_call_on_fail async

_call_on_fail(state: StateSpec)

Call the state's on_fail handler

Source code in src/mycorrhizal/septum/core.py
async def _call_on_fail(self, state: StateSpec):
    """Call the state's on_fail handler"""
    if state.on_fail is None:
        raise ValueError(f"State {state.name} has no on_fail handler")
    # Create interface view if handler has interface type hint
    ctx_to_pass = _create_interface_view_for_context(self.context, state.on_fail)
    return await state.on_fail(ctx_to_pass)

_process_transition async

_process_transition(transition: Any, block: bool = False) -> bool

Process a state transition

Source code in src/mycorrhizal/septum/core.py
async def _process_transition(self, transition: Any, block: bool = False) -> bool:
    """Process a state transition"""

    valid_transitions = self.registry.get_transitions(self.current_state)

    # Try to get the transition target
    target = valid_transitions.get(transition)

    # If not found by the transition itself, try by value
    if target is None and transition in valid_transitions.values():
        target = transition
    elif transition is not None and target is None:
        raise ValueError(f"Unknown transition {transition}: {valid_transitions}")

    # Process the transition

    if transition in (None, Unhandled):
        if (
            self.current_state.config.can_dwell
        ) or self.current_state.config.timeout:
            return True
        raise BlockedInUntimedState(
            f"{self.current_state.name} cannot dwell and does not have a timeout"
        )
    elif isinstance(target, StateSpec):
        await self._transition_to_state(target)
    elif isinstance(target, Push):
        await self._handle_push(target)
    elif isinstance(target, Pop) or target is Pop:
        await self._handle_pop()
    elif target == Again:
        # Re-execute current state immediately
        # In manual tick mode (block=False), stop after one execution
        # In automatic mode (block=True), continue the loop
        if block:
            # Automatic mode: continue the loop
            return True
        else:
            # Manual mode: stop after this execution
            return False
    elif target == Repeat:
        await self._transition_to_state(self.current_state)
    elif target == Restart:
        self._reset_state_context()
        await self._transition_to_state(self.current_state)
        return True
    elif target == Retry:
        await self._handle_retry()
    elif isinstance(target, (StateContinuation, StateRenewal)):
        # Other continuations/renewals - handle appropriately
        if target == Again:
            return True  # Continue the loop
        elif target == Repeat:
            await self._transition_to_state(self.current_state)
        elif target == Restart:
            self._reset_state_context()
            await self._transition_to_state(self.current_state)
            return True
        elif target == Retry:
            await self._handle_retry()

    return False

_transition_to_state async

_transition_to_state(next_state: StateSpec)

Transition to a new state

Source code in src/mycorrhizal/septum/core.py
async def _transition_to_state(self, next_state: StateSpec):
    """Transition to a new state"""

    if self.current_state:
        await self._call_on_leave(self.current_state)
        self._cancel_timeout()

    self.current_state = next_state
    self.state_enter_times[next_state.name] = time.time()
    self.log(f"Transitioned to {next_state.name}")
    self.log(f"  [DEBUG] common object id: {id(self.context.common)}")
    self.log(f"  [DEBUG] on_enter is None? {next_state.on_enter is None}")

    self._start_timeout(next_state)
    await self._call_on_enter(next_state)
    self.log(f"  [DEBUG] After on_enter, common: {self.context.common}")

_start_timeout

_start_timeout(state: StateSpec) -> Optional[int]

Start a timeout for the given state

Source code in src/mycorrhizal/septum/core.py
def _start_timeout(self, state: StateSpec) -> Optional[int]:
    """Start a timeout for the given state"""

    if state.config.timeout is None:
        return None

    self._cancel_timeout()

    self._timeout_counter += 1
    timeout_id = self._timeout_counter
    self._current_timeout_id = timeout_id

    async def timeout_handler():
        try:
            await asyncio.sleep(state.config.timeout)
            self._send_timeout_message(state.name, timeout_id, state.config.timeout)
        except asyncio.CancelledError:
            pass

    self._timeout_task = asyncio.create_task(timeout_handler())
    return timeout_id

_cancel_timeout

_cancel_timeout()

Cancel any active timeout

Source code in src/mycorrhizal/septum/core.py
def _cancel_timeout(self):
    """Cancel any active timeout"""
    if self._timeout_task is not None:
        self._timeout_task.cancel()
        self._timeout_task = None
    self._current_timeout_id = None

_send_timeout_message

_send_timeout_message(state_name: str, timeout_id: int, duration: float)

Internal method to send timeout messages

Source code in src/mycorrhizal/septum/core.py
def _send_timeout_message(self, state_name: str, timeout_id: int, duration: float):
    """Internal method to send timeout messages"""
    timeout_msg = TimeoutMessage(state_name, timeout_id, duration)
    self._send_message_internal(timeout_msg)

_handle_timeout_message

_handle_timeout_message() -> bool

Handle a timeout message

Source code in src/mycorrhizal/septum/core.py
def _handle_timeout_message(self) -> bool:
    """Handle a timeout message"""

    timeout_msg = self.context.msg
    self.context.msg = None

    if timeout_msg.timeout_id != self._current_timeout_id:
        self.log(f"Ignoring stale timeout {timeout_msg.timeout_id}")
        return False

    if timeout_msg.state_name != self.current_state.name:
        self.log(
            f"Ignoring timeout for {timeout_msg.state_name}, current state is {self.current_state.name}"
        )
        return False

    self.log(
        f"Handling timeout for {timeout_msg.state_name} ({timeout_msg.timeout_duration}s)"
    )
    self._cancel_timeout()
    return True

_handle_retry async

_handle_retry()

Handle retry logic with counter

Source code in src/mycorrhizal/septum/core.py
async def _handle_retry(self):
    """Handle retry logic with counter"""
    state_name = self.current_state.name

    if state_name not in self.retry_counters:
        self.retry_counters[state_name] = 0

    self.retry_counters[state_name] += 1

    if (
        self.current_state.config.retries is not None
        and self.retry_counters[state_name] > self.current_state.config.retries
    ):
        self.log(f"Retry limit exceeded for {state_name}")
        transition = await self._call_on_fail(self.current_state)
        await self._process_transition(transition)
    else:
        await self._transition_to_state(self.current_state)

_reset_state_context

_reset_state_context()

Reset context for current state

Source code in src/mycorrhizal/septum/core.py
def _reset_state_context(self):
    """Reset context for current state"""
    state_name = self.current_state.name
    if state_name in self.retry_counters:
        del self.retry_counters[state_name]

_handle_push async

_handle_push(push: Push)

Handle push transition

Source code in src/mycorrhizal/septum/core.py
async def _handle_push(self, push: Push):
    """Handle push transition"""
    resolved_states = [self.registry.resolve_state(s) for s in push.push_states]

    for state in reversed(resolved_states[1:]):
        self.state_stack.append(state)

    await self._transition_to_state(resolved_states[0])

_handle_pop async

_handle_pop()

Handle pop transition

Source code in src/mycorrhizal/septum/core.py
async def _handle_pop(self):
    """Handle pop transition"""
    if not self.state_stack:
        raise PopFromEmptyStack()

    next_state = self.state_stack.pop()
    await self._transition_to_state(next_state)

run async

run(max_iterations: Optional[int] = None, timeout: Optional[float] = 1)

Run the state machine until terminal state

Source code in src/mycorrhizal/septum/core.py
async def run(
    self, max_iterations: Optional[int] = None, timeout: Optional[float] = 1
):
    """Run the state machine until terminal state"""
    if not self._initialized:
        await self.initialize()

    iteration = 0

    while True:
        if max_iterations is not None and iteration >= max_iterations:
            break

        if self.current_state.config.terminal:
            self.log(f"Reached terminal state: {self.current_state.name}")
            break

        try:
            await self.tick(timeout=timeout)
            iteration += 1
        except StateMachineComplete:
            raise
        except Exception as e:
            self.log(f"Error in FSM: {e}")
            if self.error_state:
                await self._transition_to_state(self.error_state)
            else:
                raise

_SeptumDecoratorAPI

_SeptumDecoratorAPI()

Decorator API for defining states

Source code in src/mycorrhizal/septum/core.py
def __init__(self):
    self._tracking_stack: List[List[Tuple[str, Any]]] = []

state

state(config: StateConfiguration = StateConfiguration(), name: Optional[str] = None, group: Optional[str] = None) -> Callable[[Callable], StateSpec]

Decorator to define a state.

Parameters:

Name Type Description Default
config StateConfiguration

StateConfiguration for this state

StateConfiguration()
name Optional[str]

Optional name (auto-generated if not provided)

None
group Optional[str]

Optional group name for organization

None

The decorated function should contain nested decorated functions (@on_state, @on_enter, @transitions, etc.). No need to return a dict!

Source code in src/mycorrhizal/septum/core.py
def state(
    self,
    config: StateConfiguration = StateConfiguration(),
    name: Optional[str] = None,
    group: Optional[str] = None,
) -> Callable[[Callable], StateSpec]:
    """
    Decorator to define a state.

    Args:
        config: StateConfiguration for this state
        name: Optional name (auto-generated if not provided)
        group: Optional group name for organization

    The decorated function should contain nested decorated functions
    (@on_state, @on_enter, @transitions, etc.). No need to return a dict!
    """

    def decorator(func: Callable[..., Any]) -> StateSpec:
        # Get function metadata
        module = func.__module__
        qualname = func.__qualname__

        # Auto-generate name if not provided
        if name is None:
            # Handle __main__ module
            if module == "__main__":
                filename = inspect.getfile(func)
                module = os.path.splitext(os.path.basename(filename))[0]
            state_name = f"{module}.{qualname}"
        else:
            state_name = name

        # Set up tracking for this state
        tracked_items = []
        self._tracking_stack.append(tracked_items)

        try:
            # Call the function to execute inner decorators
            func()
        finally:
            # Always pop the tracking stack
            self._tracking_stack.pop()

        # Extract decorated methods from tracked items
        on_state = None
        on_enter = None
        on_leave = None
        on_timeout = None
        on_fail = None
        transitions = None
        Events = None

        for item_name, item_fn in tracked_items:
            if hasattr(item_fn, "_septum_on_state"):
                on_state = item_fn
            elif hasattr(item_fn, "_septum_on_enter"):
                on_enter = item_fn
            elif hasattr(item_fn, "_septum_on_leave"):
                on_leave = item_fn
            elif hasattr(item_fn, "_septum_on_timeout"):
                on_timeout = item_fn
            elif hasattr(item_fn, "_septum_on_fail"):
                on_fail = item_fn
            elif hasattr(item_fn, "_septum_transitions"):
                transitions = item_fn
            elif hasattr(item_fn, "_septum_events"):
                Events = item_fn

        # Validate required methods
        if on_state is None:
            raise ValueError(f"State '{state_name}' must have an @on_state decorated method")

        # Create the StateSpec
        state_spec = StateSpec(
            name=state_name,
            qualname=qualname,
            module=module,
            config=config,
            on_state=on_state,
            on_enter=on_enter,
            on_leave=on_leave,
            on_timeout=on_timeout,
            on_fail=on_fail,
            transitions=transitions,
            Events=Events,
            group_name=group or "",
        )

        # Register the state
        register_state(state_spec)

        return state_spec

    return decorator

on_state

on_state(func: Callable) -> Callable

Decorator for the main state logic method

Source code in src/mycorrhizal/septum/core.py
def on_state(self, func: Callable) -> Callable:
    """Decorator for the main state logic method"""
    func._septum_on_state = func
    if self._tracking_stack:
        self._tracking_stack[-1].append((func.__name__, func))
    return func

on_enter

on_enter(func: Callable) -> Callable

Decorator for on_enter lifecycle method

Source code in src/mycorrhizal/septum/core.py
def on_enter(self, func: Callable) -> Callable:
    """Decorator for on_enter lifecycle method"""
    func._septum_on_enter = func
    if self._tracking_stack:
        self._tracking_stack[-1].append((func.__name__, func))
    return func

on_leave

on_leave(func: Callable) -> Callable

Decorator for on_leave lifecycle method

Source code in src/mycorrhizal/septum/core.py
def on_leave(self, func: Callable) -> Callable:
    """Decorator for on_leave lifecycle method"""
    func._septum_on_leave = func
    if self._tracking_stack:
        self._tracking_stack[-1].append((func.__name__, func))
    return func

on_timeout

on_timeout(func: Callable) -> Callable

Decorator for on_timeout lifecycle method

Source code in src/mycorrhizal/septum/core.py
def on_timeout(self, func: Callable) -> Callable:
    """Decorator for on_timeout lifecycle method"""
    func._septum_on_timeout = func
    if self._tracking_stack:
        self._tracking_stack[-1].append((func.__name__, func))
    return func

on_fail

on_fail(func: Callable) -> Callable

Decorator for on_fail lifecycle method

Source code in src/mycorrhizal/septum/core.py
def on_fail(self, func: Callable) -> Callable:
    """Decorator for on_fail lifecycle method"""
    func._septum_on_fail = func
    if self._tracking_stack:
        self._tracking_stack[-1].append((func.__name__, func))
    return func

transitions

transitions(func: Callable) -> Callable

Decorator for declaring transitions

Source code in src/mycorrhizal/septum/core.py
def transitions(self, func: Callable) -> Callable:
    """Decorator for declaring transitions"""
    func._septum_transitions = func
    if self._tracking_stack:
        self._tracking_stack[-1].append((func.__name__, func))
    return func

events

events(events_class: type) -> type

Decorator for registering the Events enum.

Optional decorator that makes the Events enum accessible via state.Events for introspection and testing. The Events enum works via closure even without this decorator, but state.Events will be None.

Use this decorator when: - You need to access state.Events programmatically (e.g., in tests) - You want to inspect what events a state defines - Registry methods need to reference the Events enum

Skip this decorator when: - You only use Events within on_state/transitions (closure handles it) - You don't need external access to the Events enum

Example

@septum.state() def MyState(): @septum.events # Optional - makes MyState.Events accessible class Events(Enum): GO = auto()

@septum.on_state
async def on_state(ctx):
    return Events.GO  # Works via closure even without @septum.events
Source code in src/mycorrhizal/septum/core.py
def events(self, events_class: type) -> type:
    """
    Decorator for registering the Events enum.

    Optional decorator that makes the Events enum accessible via state.Events
    for introspection and testing. The Events enum works via closure even
    without this decorator, but state.Events will be None.

    Use this decorator when:
    - You need to access state.Events programmatically (e.g., in tests)
    - You want to inspect what events a state defines
    - Registry methods need to reference the Events enum

    Skip this decorator when:
    - You only use Events within on_state/transitions (closure handles it)
    - You don't need external access to the Events enum

    Example:
        @septum.state()
        def MyState():
            @septum.events  # Optional - makes MyState.Events accessible
            class Events(Enum):
                GO = auto()

            @septum.on_state
            async def on_state(ctx):
                return Events.GO  # Works via closure even without @septum.events
    """
    events_class._septum_events = events_class
    if self._tracking_stack:
        self._tracking_stack[-1].append(("Events", events_class))
    return events_class

root

root(func: Callable) -> Callable

Decorator to mark a state as the initial/root state

Source code in src/mycorrhizal/septum/core.py
def root(self, func: Callable) -> Callable:
    """Decorator to mark a state as the initial/root state"""
    func._septum_is_root = True
    return func

_clear_interface_view_cache

_clear_interface_view_cache() -> None

Clear the interface view cache. Useful for testing.

Source code in src/mycorrhizal/septum/core.py
def _clear_interface_view_cache() -> None:
    """Clear the interface view cache. Useful for testing."""
    global _interface_view_cache
    _interface_view_cache.clear()
    # Also clear the compilation cache from common module
    _clear_compilation_cache()

_create_interface_view_for_context

_create_interface_view_for_context(context: SharedContext, handler: Callable) -> SharedContext

Create a constrained view of context.common if the handler has an interface type hint.

This enables type-safe, constrained access to blackboard state based on interface definitions created with @blackboard_interface.

The function signature can use an interface type

async def on_state(ctx: SharedContext[MyInterface]): # ctx.common is automatically a constrained view return Events.DONE

Parameters:

Name Type Description Default
context SharedContext

The SharedContext object

required
handler Callable

The state handler function to check for interface type hints

required

Returns:

Type Description
SharedContext

The same context object with context.common potentially updated to a constrained view

Raises:

Type Description
TypeError

If handler is not callable or type hints are malformed

AttributeError

If type hints reference undefined types

Note

Uses id() for cache key which is safe because id() is unique per object instance.

Mutates context.common in-place which is safe because: 1. ConstrainedView wraps the underlying blackboard and forwards mutations to it 2. All interface views for the same blackboard share the same underlying state 3. Mutations through interface views PERSIST and are visible to subsequent states (this is intentional!) 4. Each handler call creates a fresh interface view wrapping self.context.common 5. Different handlers can have different interfaces, but all mutations go to the same blackboard 6. Readonly enforcement happens in ConstrainedView.setattr, not via context isolation

Source code in src/mycorrhizal/septum/core.py
def _create_interface_view_for_context(context: SharedContext, handler: Callable) -> SharedContext:
    """
    Create a constrained view of context.common if the handler has an interface type hint.

    This enables type-safe, constrained access to blackboard state based on
    interface definitions created with @blackboard_interface.

    The function signature can use an interface type:
        async def on_state(ctx: SharedContext[MyInterface]):
            # ctx.common is automatically a constrained view
            return Events.DONE

    Args:
        context: The SharedContext object
        handler: The state handler function to check for interface type hints

    Returns:
        The same context object with context.common potentially updated to a constrained view

    Raises:
        TypeError: If handler is not callable or type hints are malformed
        AttributeError: If type hints reference undefined types

    Note:
        Uses id() for cache key which is safe because id() is unique per object instance.

        Mutates context.common in-place which is safe because:
        1. ConstrainedView wraps the underlying blackboard and forwards mutations to it
        2. All interface views for the same blackboard share the same underlying state
        3. Mutations through interface views PERSIST and are visible to subsequent states (this is intentional!)
        4. Each handler call creates a fresh interface view wrapping self.context.common
        5. Different handlers can have different interfaces, but all mutations go to the same blackboard
        6. Readonly enforcement happens in ConstrainedView.__setattr__, not via context isolation
    """
    # Get compiled metadata (uses EAFP pattern internally)
    # Raises specific exceptions if compilation fails
    metadata = _get_compiled_metadata(handler)

    # If handler has interface type hint, create constrained view
    if metadata.has_interface and metadata.interface_type:
        # Use id() as cache key for singleton instance caching
        cache_key = (id(context.common), metadata.interface_type)

        # Check cache
        if cache_key in _interface_view_cache:
            constrained_common = _interface_view_cache[cache_key]
        else:
            # Create view
            constrained_common = create_view_from_protocol(
                context.common,
                metadata.interface_type,
                readonly_fields=metadata.readonly_fields
            )
            # Cache for reuse
            _interface_view_cache[cache_key] = constrained_common

        # Update context.common in-place (safe, see Note above)
        context.common = constrained_common

    return context

register_state

register_state(state: StateSpec) -> None

Register a state in the global registry

Source code in src/mycorrhizal/septum/core.py
def register_state(state: StateSpec) -> None:
    """Register a state in the global registry"""
    _state_registry[state.name] = state

get_state

get_state(name: str) -> Optional[StateSpec]

Get a state from the global registry

Source code in src/mycorrhizal/septum/core.py
def get_state(name: str) -> Optional[StateSpec]:
    """Get a state from the global registry"""
    return _state_registry.get(name)

get_all_states

get_all_states() -> Dict[str, StateSpec]

Get all registered states

Source code in src/mycorrhizal/septum/core.py
def get_all_states() -> Dict[str, StateSpec]:
    """Get all registered states"""
    return _state_registry.copy()

_clear_state_registry

_clear_state_registry() -> None

Clear the global state registry. Useful for testing.

This function should be called in test fixtures to ensure tests don't interfere with each other through shared state registrations.

Example

@pytest.fixture(autouse=True) def clear_registries(): _clear_state_registry() yield _clear_state_registry()

Source code in src/mycorrhizal/septum/core.py
def _clear_state_registry() -> None:
    """Clear the global state registry. Useful for testing.

    This function should be called in test fixtures to ensure tests
    don't interfere with each other through shared state registrations.

    Example:
        @pytest.fixture(autouse=True)
        def clear_registries():
            _clear_state_registry()
            yield
            _clear_state_registry()
    """
    global _state_registry
    _state_registry.clear()