Skip to content

Spores Logging API Reference

Core Module

mycorrhizal.spores

Spores - Event and Object Logging for Observability

OCEL (Object-Centric Event Log) compatible logging system for tracking events and objects in Mycorrhizal systems. Provides automatic extraction of events and objects from DSL execution.

Usage (Standalone Event and Object Logging): from mycorrhizal.spores import configure, get_spore_sync, get_spore_async from mycorrhizal.spores.transport import SyncFileTransport, AsyncFileTransport from mycorrhizal.spores.models import SporesAttr from pydantic import BaseModel from typing import Annotated

# Mark domain model fields for object attribute logging
class Order(BaseModel):
    id: str
    status: Annotated[str, SporesAttr]
    total: Annotated[float, SporesAttr]

# Sync code
configure(transport=SyncFileTransport("logs/ocel.jsonl"))
spore = get_spore_sync(__name__)

@spore.log_event(
    event_type="OrderCreated",
    relationships={"order": ("return", "Order")},
)
def create_order(customer: Customer, items: list) -> Order:
    return Order(...)

# Async code
configure(transport=AsyncFileTransport("logs/ocel.jsonl"))
aspore = get_spore_async(__name__)

@aspore.log_event(
    event_type="OrderCreated",
    relationships={"order": ("return", "Order")},
)
async def create_order_async(customer: Customer, items: list) -> Order:
    return Order(...)

Usage (DSL Adapters): from mycorrhizal.spores import configure, spore from mycorrhizal.spores.models import EventAttr, ObjectRef, ObjectScope from pydantic import BaseModel from typing import Annotated

# Mark blackboard fields for event attribute extraction
class MissionContext(BaseModel):
    mission_id: Annotated[str, EventAttr]
    robot: Annotated[Robot, ObjectRef(qualifier="actor", scope=ObjectScope.GLOBAL)]

# Mark object types for logging
@spore.object(object_type="Robot")
class Robot(BaseModel):
    id: str
    name: str
DSL Adapters

HyphaAdapter - Log Petri net transitions with token relationships RhizomorphAdapter - Log behavior tree node execution with status SeptumAdapter - Log state machine execution and lifecycle events

Annotation Types

EventAttr - Mark blackboard fields for automatic event attribute extraction (DSL adapters) SporesAttr - Mark domain model fields for automatic object attribute logging (log_event) ObjectRef - Mark blackboard fields as object references with scope

Key Concepts

Event - Something that happens at a point in time with attributes Object - An entity with a type and attributes Relationship - Link between events and objects (e.g., "actor", "target")

Object Scopes

EVENT - Object exists only for this event (default) GLOBAL - Object exists across all events (deduplicated by ID)

SporesConfig dataclass

SporesConfig(enabled: bool = True, object_cache_size: int = 128, encoder: Optional[Encoder] = None, transport: Optional[Transport] = None, eviction_policy: EvictionPolicy = EVICT_AND_LOG, touch_resend_n: int = 100)

Global configuration for the Spores logging system.

Attributes:

Name Type Description
enabled bool

Whether spores logging is enabled

object_cache_size int

Maximum objects in LRU cache

encoder Optional[Encoder]

Encoder instance to use

transport Optional[Transport]

Transport instance to use (required for logging to work)

eviction_policy EvictionPolicy

Policy for handling cache eviction

touch_resend_n int

Resend object every N touches (0 to disable periodic resend)

__post_init__

__post_init__()

Set default encoder if not provided.

Source code in src/mycorrhizal/spores/core.py
def __post_init__(self):
    """Set default encoder if not provided."""
    if self.encoder is None:
        self.encoder = JSONEncoder()

EvictionPolicy

Bases: str, Enum

Cache eviction policy for object logging.

Attributes:

Name Type Description
EVICT_AND_LOG

Evict from cache when full, log immediately via sync/async path

EVICT_AND_BUFFER

Evict from cache, buffer for later logging (future)

NO_EVICT

Keep in cache until explicit flush (future)

CacheMetrics dataclass

CacheMetrics(evictions: int = 0, eviction_failures: int = 0, first_sights: int = 0)

Track cache eviction statistics.

EventLogger

Bases: ABC

Abstract base class for event loggers.

Implementations can be sync or async, but the interface is the same.

event abstractmethod

event(event_type: str, **kwargs)

Log an event.

Source code in src/mycorrhizal/spores/core.py
@abstractmethod
def event(self, event_type: str, **kwargs):
    """Log an event."""
    pass

log_object abstractmethod

log_object(obj_type: str, obj_id: str, **kwargs)

Log an object.

Source code in src/mycorrhizal/spores/core.py
@abstractmethod
def log_object(self, obj_type: str, obj_id: str, **kwargs):
    """Log an object."""
    pass

AsyncEventLogger

AsyncEventLogger(name: str)

Bases: EventLogger

Async event logger for use in async contexts.

Source code in src/mycorrhizal/spores/core.py
def __init__(self, name: str):
    self.name = name

event async

event(event_type: str, relationships: Dict[str, Relationship] | None = None, **kwargs) -> None

Log an event asynchronously.

Parameters:

Name Type Description Default
event_type str

The type of event

required
relationships Dict[str, Relationship] | None

Optional dict of qualifier -> Relationship for OCEL object relationships

None
**kwargs

Event attributes

{}
Source code in src/mycorrhizal/spores/core.py
async def event(self, event_type: str, relationships: Dict[str, Relationship] | None = None, **kwargs) -> None:
    """
    Log an event asynchronously.

    Args:
        event_type: The type of event
        relationships: Optional dict of qualifier -> Relationship for OCEL object relationships
        **kwargs: Event attributes
    """
    config = get_config()
    if not config.enabled:
        return

    timestamp = datetime.now()

    attr_values = {}
    for key, value in kwargs.items():
        attr_values[key] = attribute_value_from_python(value)

    event = Event(
        id=generate_event_id(),
        type=event_type,
        time=timestamp,
        attributes=attr_values,
        relationships=relationships or {}
    )

    record = LogRecord(event=event)
    await _send_log_record(record)

log_object async

log_object(obj_type: str, obj_id: str, **kwargs) -> None

Log an object asynchronously.

Source code in src/mycorrhizal/spores/core.py
async def log_object(self, obj_type: str, obj_id: str, **kwargs) -> None:
    """Log an object asynchronously."""
    config = get_config()
    if not config.enabled:
        return

    timestamp = datetime.now()

    attr_values = {}
    for key, value in kwargs.items():
        attr_values[key] = object_attribute_from_python(value, time=timestamp)

    obj = Object(
        id=obj_id,
        type=obj_type,
        attributes=attr_values
    )

    record = LogRecord(object=obj)
    await _send_log_record(record)

log_event

log_event(event_type: str, relationships: Dict[str, tuple] | None = None, attributes: Dict[str, Any] | None = None)

Decorator factory that logs events with auto-logged object relationships (async version).

Parameters:

Name Type Description Default
event_type str

The type of event to log

required
relationships Dict[str, tuple] | None

Dict mapping qualifiers to relationship specs: - qualifier: Relationship qualifier (e.g., "order", "customer") - spec: 2-tuple (source, obj_type) or 3-tuple (source, obj_type, attrs) - source: Where to get object ("return", "ret", param name, "self") - obj_type: OCEL object type - attrs: Optional list of attribute names to log (omitted = auto-detect SporesAttr)

None
attributes Dict[str, Any] | None

Dict mapping event attribute names to values or callables

None

Returns:

Type Description

Decorator function

Source code in src/mycorrhizal/spores/core.py
def log_event(self, event_type: str, relationships: Dict[str, tuple] | None = None, attributes: Dict[str, Any] | None = None):
    """
    Decorator factory that logs events with auto-logged object relationships (async version).

    Args:
        event_type: The type of event to log
        relationships: Dict mapping qualifiers to relationship specs:
            - qualifier: Relationship qualifier (e.g., "order", "customer")
            - spec: 2-tuple (source, obj_type) or 3-tuple (source, obj_type, attrs)
                - source: Where to get object ("return", "ret", param name, "self")
                - obj_type: OCEL object type
                - attrs: Optional list of attribute names to log (omitted = auto-detect SporesAttr)
        attributes: Dict mapping event attribute names to values or callables

    Returns:
        Decorator function
    """
    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        @functools.wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            # Execute the function
            result = await func(*args, **kwargs)

            # Build context for resolving objects and attributes
            context = self._build_context(func, args, kwargs, result)

            # Process relationships - log objects and build event relationships
            event_relationships = {}
            if relationships:
                for qualifier, rel_spec in relationships.items():
                    # Handle both 2-tuple and 3-tuple formats
                    if len(rel_spec) == 2:
                        source, obj_type = rel_spec
                        attrs = None  # Auto-detect SporesAttr
                    else:
                        source, obj_type, attrs = rel_spec

                    # Resolve object from source
                    obj = self._resolve_source(source, context, obj_type)
                    if obj is None:
                        continue

                    # Extract object ID
                    obj_id = self._get_object_id(obj)
                    if obj_id is None:
                        continue

                    # Extract and log object attributes
                    obj_attrs = self._extract_object_attrs(obj, attrs)
                    await self.log_object(obj_type, obj_id, **obj_attrs)

                    # Add to event relationships
                    event_relationships[qualifier] = Relationship(object_id=obj_id, qualifier=qualifier)

            # Extract event attributes
            event_attrs = {}
            if attributes:
                for attr_name, attr_value in attributes.items():
                    event_attrs[attr_name] = self._evaluate_expression(attr_value, context)

            # Log the event with relationships
            await self.event(event_type, relationships=event_relationships, **event_attrs)

            return result

        return wrapper  # type: ignore
    return decorator

_build_context

_build_context(func: Callable, args: tuple, kwargs: dict, result: Any) -> Dict[str, Any]

Build context dict for async version.

Source code in src/mycorrhizal/spores/core.py
def _build_context(self, func: Callable, args: tuple, kwargs: dict, result: Any) -> Dict[str, Any]:
    """Build context dict for async version."""
    context = {
        'return': result,
        'ret': result,
    }

    sig = inspect.signature(func)
    bound = sig.bind(*args, **kwargs)
    bound.apply_defaults()

    for param_name, param_value in bound.arguments.items():
        context[param_name] = param_value

    return context

_resolve_source

_resolve_source(source: str, context: Dict[str, Any], obj_type: str | None = None) -> Any

Resolve source for async version.

If source is "bb" (blackboard) and obj_type is provided, extract the field of that type from the blackboard instead of returning the entire blackboard.

Source code in src/mycorrhizal/spores/core.py
def _resolve_source(self, source: str, context: Dict[str, Any], obj_type: str | None = None) -> Any:
    """Resolve source for async version.

    If source is "bb" (blackboard) and obj_type is provided, extract the field
    of that type from the blackboard instead of returning the entire blackboard.
    """
    if source == "return" or source == "ret":
        return context.get("return")
    elif source == "self":
        return context.get("self")
    else:
        obj = context.get(source)

        # If source is blackboard and we need a specific type, extract that field
        if obj_type and source == "bb" and hasattr(obj, '__annotations__'):
            return self._find_object_by_type(obj, obj_type)

        return obj

_find_object_by_type

_find_object_by_type(blackboard: Any, obj_type: str) -> Any

Find a field in the blackboard that matches the requested object type.

Scans the blackboard's fields and returns the first field whose type annotation matches obj_type.

Source code in src/mycorrhizal/spores/core.py
def _find_object_by_type(self, blackboard: Any, obj_type: str) -> Any:
    """Find a field in the blackboard that matches the requested object type.

    Scans the blackboard's fields and returns the first field whose type
    annotation matches obj_type.
    """
    # Get the class to check annotations
    obj_class = blackboard if isinstance(blackboard, type) else type(blackboard)

    if not hasattr(obj_class, '__annotations__'):
        return blackboard

    # Check each field's type annotation
    for field_name, field_type in obj_class.__annotations__.items():
        # Handle Annotated types
        if get_origin(field_type) is Annotated:
            args = get_args(field_type)
            if args:
                actual_type = args[0]
                # Check if type name matches (handle both str and type)
                type_name = actual_type if isinstance(actual_type, str) else actual_type.__name__
                if type_name == obj_type:
                    field_value = getattr(blackboard, field_name, None)
                    # Return the actual value, not None
                    if field_value is not None:
                        return field_value
        # Handle Union types (e.g., Sample | None)
        elif get_origin(field_type) is Union:
            args = get_args(field_type)
            for arg in args:
                # Skip None
                if arg is type(None):
                    continue
                # Check if this arg matches our target type
                if get_origin(arg) is Annotated:
                    annotated_args = get_args(arg)
                    if annotated_args:
                        actual_type = annotated_args[0]
                        type_name = actual_type if isinstance(actual_type, str) else actual_type.__name__
                        if type_name == obj_type:
                            field_value = getattr(blackboard, field_name, None)
                            if field_value is not None:
                                return field_value
                else:
                    type_name = arg if isinstance(arg, str) else arg.__name__
                    if type_name == obj_type:
                        field_value = getattr(blackboard, field_name, None)
                        if field_value is not None:
                            return field_value
        else:
            # Check if type name matches
            type_name = field_type if isinstance(field_type, str) else field_type.__name__
            if type_name == obj_type:
                field_value = getattr(blackboard, field_name, None)
                if field_value is not None:
                    return field_value

    # Fallback: return blackboard if no matching field found
    return blackboard

_get_object_id

_get_object_id(obj: Any) -> str | None

Extract object ID for async version.

Source code in src/mycorrhizal/spores/core.py
def _get_object_id(self, obj: Any) -> str | None:
    """Extract object ID for async version."""
    if obj is None:
        return None

    if hasattr(obj, 'id'):
        return str(getattr(obj, 'id'))
    else:
        return str(obj)

_extract_object_attrs

_extract_object_attrs(obj: Any, attrs_spec: list | dict) -> Dict[str, Any]

Extract attributes for async version.

Source code in src/mycorrhizal/spores/core.py
def _extract_object_attrs(self, obj: Any, attrs_spec: list | dict) -> Dict[str, Any]:
    """Extract attributes for async version."""
    if attrs_spec is None:
        return self._extract_spores_attrs(obj)

    if isinstance(attrs_spec, list):
        result = {}
        for attr_name in attrs_spec:
            if hasattr(obj, attr_name):
                result[attr_name] = getattr(obj, attr_name)
        return result

    elif isinstance(attrs_spec, dict):
        result = {}
        for attr_name, expr in attrs_spec.items():
            if isinstance(expr, str) and expr.startswith(("return.", "ret.", "self.")):
                parts = expr.split(".", 1)
                source = self._resolve_source(parts[0], {})
                if source and len(parts) > 1:
                    result[attr_name] = getattr(source, parts[1])
            elif isinstance(expr, str) and hasattr(obj, expr):
                result[attr_name] = getattr(obj, expr)
            elif callable(expr):
                result[attr_name] = expr(obj)
            else:
                result[attr_name] = expr
        return result

    return {}

_extract_spores_attrs

_extract_spores_attrs(obj: Any) -> Dict[str, Any]

Extract SporesAttr-marked fields for async version.

Source code in src/mycorrhizal/spores/core.py
def _extract_spores_attrs(self, obj: Any) -> Dict[str, Any]:
    """Extract SporesAttr-marked fields for async version."""
    result = {}
    obj_class = obj if isinstance(obj, type) else type(obj)

    if hasattr(obj_class, '__annotations__'):
        for field_name, field_type in obj_class.__annotations__.items():
            if get_origin(field_type) is Annotated:
                args = get_args(field_type)
                for arg in args:
                    if arg is SporesAttr:
                        if hasattr(obj, field_name):
                            value = getattr(obj, field_name)
                            result[field_name] = value
                        break

    return result

_evaluate_expression

_evaluate_expression(expr: Any, context: Dict[str, Any]) -> Any

Evaluate expression for async version.

Source code in src/mycorrhizal/spores/core.py
def _evaluate_expression(self, expr: Any, context: Dict[str, Any]) -> Any:
    """Evaluate expression for async version."""
    if callable(expr):
        sig = inspect.signature(expr)
        params = sig.parameters

        if len(params) == 1 and list(params.values())[0].kind == inspect.Parameter.VAR_POSITIONAL:
            return expr(**context)
        elif len(params) == 0:
            return expr()
        else:
            kwargs = {}
            for param_name in params:
                if param_name in context:
                    kwargs[param_name] = context[param_name]
            return expr(**kwargs)
    elif isinstance(expr, str):
        # Check if it's a simple parameter reference
        if expr in context:
            return context[expr]
        # Check if it's a dotted attribute access
        elif "." in expr:
            parts = expr.split(".", 1)
            if parts[0] in context:
                obj = context[parts[0]]
                if hasattr(obj, parts[1]):
                    return getattr(obj, parts[1])
    return expr

SyncEventLogger

SyncEventLogger(name: str)

Bases: EventLogger

Sync event logger for use in synchronous contexts.

Uses daemon threads for fire-and-forget logging - business logic never blocks. Logs are written via sync transport's blocking send() (no event loop needed).

Source code in src/mycorrhizal/spores/core.py
def __init__(self, name: str):
    self.name = name

event

event(event_type: str, relationships: Dict[str, Relationship] | None = None, **kwargs) -> None

Log an event in background daemon thread (fire-and-forget).

Parameters:

Name Type Description Default
event_type str

The type of event

required
relationships Dict[str, Relationship] | None

Optional dict of qualifier -> Relationship for OCEL object relationships

None
**kwargs

Event attributes

{}
Source code in src/mycorrhizal/spores/core.py
def event(self, event_type: str, relationships: Dict[str, Relationship] | None = None, **kwargs) -> None:
    """
    Log an event in background daemon thread (fire-and-forget).

    Args:
        event_type: The type of event
        relationships: Optional dict of qualifier -> Relationship for OCEL object relationships
        **kwargs: Event attributes
    """
    config = get_config()
    if not config.enabled:
        return

    def log_in_thread():
        timestamp = datetime.now()

        attr_values = {}
        for key, value in kwargs.items():
            attr_values[key] = attribute_value_from_python(value)

        event = Event(
            id=generate_event_id(),
            type=event_type,
            time=timestamp,
            attributes=attr_values,
            relationships=relationships or {}
        )

        record = LogRecord(event=event)
        # Blocking send - no event loop needed
        _send_log_record_sync(record)

    thread = threading.Thread(target=log_in_thread, daemon=True)
    thread.start()

log_object

log_object(obj_type: str, obj_id: str, **kwargs) -> None

Log an object in background daemon thread (fire-and-forget).

Source code in src/mycorrhizal/spores/core.py
def log_object(self, obj_type: str, obj_id: str, **kwargs) -> None:
    """Log an object in background daemon thread (fire-and-forget)."""
    config = get_config()
    if not config.enabled:
        return

    def log_in_thread():
        timestamp = datetime.now()

        attr_values = {}
        for key, value in kwargs.items():
            attr_values[key] = object_attribute_from_python(value, time=timestamp)

        obj = Object(
            id=obj_id,
            type=obj_type,
            attributes=attr_values
        )

        record = LogRecord(object=obj)
        # Blocking send - no event loop needed
        _send_log_record_sync(record)

    thread = threading.Thread(target=log_in_thread, daemon=True)
    thread.start()

log_event

log_event(event_type: str, relationships: Dict[str, tuple] | None = None, attributes: Dict[str, Any] | None = None)

Decorator factory that logs events with auto-logged object relationships.

Parameters:

Name Type Description Default
event_type str

The type of event to log

required
relationships Dict[str, tuple] | None

Dict mapping qualifiers to relationship specs: - qualifier: Relationship qualifier (e.g., "order", "customer") - spec: 2-tuple (source, obj_type) or 3-tuple (source, obj_type, attrs) - source: Where to get object ("return", "ret", param name, "self") - obj_type: OCEL object type - attrs: Optional list of attribute names to log (omitted = auto-detect SporesAttr)

None
attributes Dict[str, Any] | None

Dict mapping event attribute names to values or callables

None

Returns:

Type Description

Decorator function

Example
@spore.log_event(
    event_type="OrderCreated",
    relationships={
        "order": ("return", "Order"),  # 2-tuple: auto-detect SporesAttr
        "customer": ("customer", "Customer"),  # 2-tuple: auto-detect SporesAttr
    },
    attributes={
        "item_count": lambda items: len(items),
    },
)
def create_order(customer: Customer, items: list) -> Order:
    order = Order(...)
    return order
Source code in src/mycorrhizal/spores/core.py
def log_event(self, event_type: str, relationships: Dict[str, tuple] | None = None, attributes: Dict[str, Any] | None = None):
    """
    Decorator factory that logs events with auto-logged object relationships.

    Args:
        event_type: The type of event to log
        relationships: Dict mapping qualifiers to relationship specs:
            - qualifier: Relationship qualifier (e.g., "order", "customer")
            - spec: 2-tuple (source, obj_type) or 3-tuple (source, obj_type, attrs)
                - source: Where to get object ("return", "ret", param name, "self")
                - obj_type: OCEL object type
                - attrs: Optional list of attribute names to log (omitted = auto-detect SporesAttr)
        attributes: Dict mapping event attribute names to values or callables

    Returns:
        Decorator function

    Example:
        ```python
        @spore.log_event(
            event_type="OrderCreated",
            relationships={
                "order": ("return", "Order"),  # 2-tuple: auto-detect SporesAttr
                "customer": ("customer", "Customer"),  # 2-tuple: auto-detect SporesAttr
            },
            attributes={
                "item_count": lambda items: len(items),
            },
        )
        def create_order(customer: Customer, items: list) -> Order:
            order = Order(...)
            return order
        ```
    """
    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            # Execute the function
            result = func(*args, **kwargs)

            # Build context for resolving objects and attributes
            context = self._build_context(func, args, kwargs, result)

            # Process relationships - log objects and build event relationships
            event_relationships = {}
            if relationships:
                for qualifier, rel_spec in relationships.items():
                    # Handle both 2-tuple and 3-tuple formats
                    if len(rel_spec) == 2:
                        source, obj_type = rel_spec
                        attrs = None  # Auto-detect SporesAttr
                    else:
                        source, obj_type, attrs = rel_spec

                    # Resolve object from source
                    obj = self._resolve_source(source, context, obj_type)
                    if obj is None:
                        continue

                    # Extract object ID
                    obj_id = self._get_object_id(obj)
                    if obj_id is None:
                        continue

                    # Extract and log object attributes
                    obj_attrs = self._extract_object_attrs(obj, attrs)
                    self.log_object(obj_type, obj_id, **obj_attrs)

                    # Add to event relationships
                    event_relationships[qualifier] = Relationship(object_id=obj_id, qualifier=qualifier)

            # Extract event attributes
            event_attrs = {}
            if attributes:
                for attr_name, attr_value in attributes.items():
                    event_attrs[attr_name] = self._evaluate_expression(attr_value, context)

            # Log the event with relationships
            self.event(event_type, relationships=event_relationships, **event_attrs)

            return result

        return wrapper  # type: ignore
    return decorator

_build_context

_build_context(func: Callable, args: tuple, kwargs: dict, result: Any) -> Dict[str, Any]

Build a context dict for resolving sources and evaluating expressions.

Returns dict with
  • 'return' and 'ret': The return value
  • 'self': For methods, the self parameter
  • All function parameters by name
Source code in src/mycorrhizal/spores/core.py
def _build_context(self, func: Callable, args: tuple, kwargs: dict, result: Any) -> Dict[str, Any]:
    """
    Build a context dict for resolving sources and evaluating expressions.

    Returns dict with:
        - 'return' and 'ret': The return value
        - 'self': For methods, the self parameter
        - All function parameters by name
    """
    context = {
        'return': result,
        'ret': result,
    }

    # Bind arguments to parameter names
    sig = inspect.signature(func)
    bound = sig.bind(*args, **kwargs)
    bound.apply_defaults()

    for param_name, param_value in bound.arguments.items():
        context[param_name] = param_value

    return context

_resolve_source

_resolve_source(source: str, context: Dict[str, Any], obj_type: str | None = None) -> Any

Resolve an object from a source expression.

Sources
  • "return" or "ret": Return value
  • "self": For methods
  • Any other string: Parameter name from context
  • If source is "bb" (blackboard) and obj_type is provided, extract the field of that type
Source code in src/mycorrhizal/spores/core.py
def _resolve_source(self, source: str, context: Dict[str, Any], obj_type: str | None = None) -> Any:
    """
    Resolve an object from a source expression.

    Sources:
        - "return" or "ret": Return value
        - "self": For methods
        - Any other string: Parameter name from context
        - If source is "bb" (blackboard) and obj_type is provided, extract the field of that type
    """
    if source == "return" or source == "ret":
        return context.get("return")
    elif source == "self":
        return context.get("self")
    else:
        obj = context.get(source)

        # If source is blackboard and we need a specific type, extract that field
        if obj_type and source == "bb" and hasattr(obj, '__annotations__'):
            return self._find_object_by_type(obj, obj_type)

        return obj

_find_object_by_type

_find_object_by_type(blackboard: Any, obj_type: str) -> Any

Find a field in the blackboard that matches the requested object type.

Scans the blackboard's fields and returns the first field whose type annotation matches obj_type.

Source code in src/mycorrhizal/spores/core.py
def _find_object_by_type(self, blackboard: Any, obj_type: str) -> Any:
    """Find a field in the blackboard that matches the requested object type.

    Scans the blackboard's fields and returns the first field whose type
    annotation matches obj_type.
    """
    # Get the class to check annotations
    obj_class = blackboard if isinstance(blackboard, type) else type(blackboard)

    if not hasattr(obj_class, '__annotations__'):
        return blackboard

    # Check each field's type annotation
    for field_name, field_type in obj_class.__annotations__.items():
        # Handle Annotated types
        if get_origin(field_type) is Annotated:
            args = get_args(field_type)
            if args:
                actual_type = args[0]
                # Check if type name matches (handle both str and type)
                type_name = actual_type if isinstance(actual_type, str) else actual_type.__name__
                if type_name == obj_type:
                    field_value = getattr(blackboard, field_name, None)
                    # Return the actual value, not None
                    if field_value is not None:
                        return field_value
        # Handle Union types (e.g., Sample | None)
        elif get_origin(field_type) is Union:
            args = get_args(field_type)
            for arg in args:
                # Skip None
                if arg is type(None):
                    continue
                # Check if this arg matches our target type
                if get_origin(arg) is Annotated:
                    annotated_args = get_args(arg)
                    if annotated_args:
                        actual_type = annotated_args[0]
                        type_name = actual_type if isinstance(actual_type, str) else actual_type.__name__
                        if type_name == obj_type:
                            field_value = getattr(blackboard, field_name, None)
                            if field_value is not None:
                                return field_value
                else:
                    type_name = arg if isinstance(arg, str) else arg.__name__
                    if type_name == obj_type:
                        field_value = getattr(blackboard, field_name, None)
                        if field_value is not None:
                            return field_value
        else:
            # Check if type name matches
            type_name = field_type if isinstance(field_type, str) else field_type.__name__
            if type_name == obj_type:
                field_value = getattr(blackboard, field_name, None)
                if field_value is not None:
                    return field_value

    # Fallback: return blackboard if no matching field found
    return blackboard

_get_object_id

_get_object_id(obj: Any) -> str | None

Extract object ID from an object.

Tries
  1. obj.id attribute
  2. str(obj)
Source code in src/mycorrhizal/spores/core.py
def _get_object_id(self, obj: Any) -> str | None:
    """
    Extract object ID from an object.

    Tries:
        1. obj.id attribute
        2. str(obj)
    """
    if obj is None:
        return None

    if hasattr(obj, 'id'):
        return str(getattr(obj, 'id'))
    else:
        return str(obj)

_extract_object_attrs

_extract_object_attrs(obj: Any, attrs_spec: list | dict) -> Dict[str, Any]

Extract attributes from an object for logging.

Parameters:

Name Type Description Default
obj Any

The object to extract from

required
attrs_spec list | dict

Either a list of attribute names, or a dict of {name: expression}

required

Returns:

Type Description
Dict[str, Any]

Dict of attribute name -> value

Source code in src/mycorrhizal/spores/core.py
def _extract_object_attrs(self, obj: Any, attrs_spec: list | dict) -> Dict[str, Any]:
    """
    Extract attributes from an object for logging.

    Args:
        obj: The object to extract from
        attrs_spec: Either a list of attribute names, or a dict of {name: expression}

    Returns:
        Dict of attribute name -> value
    """
    if attrs_spec is None:
        # Check for SporesAttr annotations on the object's class
        return self._extract_spores_attrs(obj)

    if isinstance(attrs_spec, list):
        # Simple list of attribute names
        result = {}
        for attr_name in attrs_spec:
            if hasattr(obj, attr_name):
                result[attr_name] = getattr(obj, attr_name)
        return result

    elif isinstance(attrs_spec, dict):
        # Dict with custom names or callables
        result = {}
        for attr_name, expr in attrs_spec.items():
            if isinstance(expr, str) and expr.startswith(("return.", "ret.", "self.")):
                # Special handling for return/ret/self references
                parts = expr.split(".", 1)
                source = self._resolve_source(parts[0], {})
                if source and len(parts) > 1:
                    result[attr_name] = getattr(source, parts[1])
            elif isinstance(expr, str) and hasattr(obj, expr):
                # Simple attribute access
                result[attr_name] = getattr(obj, expr)
            elif callable(expr):
                # Callable expression
                result[attr_name] = expr(obj)
            else:
                # Static value
                result[attr_name] = expr
        return result

    return {}

_extract_spores_attrs

_extract_spores_attrs(obj: Any) -> Dict[str, Any]

Extract attributes marked with SporesAttr from a Pydantic model.

Checks the object's class annotations for Annotated[type, SporesAttr] fields.

Source code in src/mycorrhizal/spores/core.py
def _extract_spores_attrs(self, obj: Any) -> Dict[str, Any]:
    """
    Extract attributes marked with SporesAttr from a Pydantic model.

    Checks the object's class __annotations__ for Annotated[type, SporesAttr] fields.
    """
    result = {}

    # Get the class (handle both instances and classes)
    obj_class = obj if isinstance(obj, type) else type(obj)

    if hasattr(obj_class, '__annotations__'):
        for field_name, field_type in obj_class.__annotations__.items():
            # Check if this is an Annotated type with SporesAttr
            if get_origin(field_type) is Annotated:
                args = get_args(field_type)
                for arg in args:
                    if arg is SporesAttr:
                        # Found a SporesAttr marker - log this field
                        if hasattr(obj, field_name):
                            value = getattr(obj, field_name)
                            result[field_name] = value
                        break

    return result

_evaluate_expression

_evaluate_expression(expr: Any, context: Dict[str, Any]) -> Any

Evaluate an expression for event or object attributes.

Supports
  • Static values (strings, numbers, etc.)
  • Callables (called with context)
  • Strings that are parameter references or attribute accesses
Source code in src/mycorrhizal/spores/core.py
def _evaluate_expression(self, expr: Any, context: Dict[str, Any]) -> Any:
    """
    Evaluate an expression for event or object attributes.

    Supports:
        - Static values (strings, numbers, etc.)
        - Callables (called with context)
        - Strings that are parameter references or attribute accesses
    """
    if callable(expr):
        # Callable - try to detect what params it wants
        sig = inspect.signature(expr)
        params = sig.parameters

        if len(params) == 1 and list(params.values())[0].kind == inspect.Parameter.VAR_POSITIONAL:
            # **kwargs style
            return expr(**context)
        elif len(params) == 0:
            # No params
            return expr()
        else:
            # Named params - pass relevant context
            kwargs = {}
            for param_name in params:
                if param_name in context:
                    kwargs[param_name] = context[param_name]
            return expr(**kwargs)
    elif isinstance(expr, str):
        # Check if it's a simple parameter reference
        if expr in context:
            return context[expr]
        # Check if it's an attribute access like "order.id"
        elif "." in expr:
            parts = expr.split(".", 1)
            if parts[0] in context:
                obj = context[parts[0]]
                if hasattr(obj, parts[1]):
                    return getattr(obj, parts[1])
    return expr

Event dataclass

Event(id: str, type: str, activity: Optional[str] = None, time: datetime = now(), attributes: Dict[str, EventAttributeValue] = dict(), relationships: Dict[str, Relationship] = dict())

An OCEL event representing something that happened.

Attributes:

Name Type Description
id str

Unique event identifier

type str

Event type/category (event class)

activity Optional[str]

Optional semantic activity label (defaults to type if null)

time datetime

When the event occurred

attributes Dict[str, EventAttributeValue]

Event attributes (name -> EventAttributeValue)

relationships Dict[str, Relationship]

Objects related to this event (qualifier -> Relationship)

Object dataclass

Object(id: str, type: str, attributes: Dict[str, ObjectAttributeValue] = dict(), relationships: Dict[str, Relationship] = dict())

An OCEL object representing an entity in the system.

Attributes:

Name Type Description
id str

Unique object identifier

type str

Object type/category

attributes Dict[str, ObjectAttributeValue]

Object attributes (name -> ObjectAttributeValue)

relationships Dict[str, Relationship]

Other objects related to this object (qualifier -> Relationship)

LogRecord dataclass

LogRecord(event: Optional[Event] = None, object: Optional[Object] = None)

A log record containing either an event or an object (not both).

This is the union type used for streaming to OCEL consumers.

Attributes:

Name Type Description
event Optional[Event]

An event record (if logging an event)

object Optional[Object]

An object record (if logging an object)

Note

Exactly one of event or object must be set.

__post_init__

__post_init__()

Validate that exactly one of event or object is set.

Source code in src/mycorrhizal/spores/models.py
def __post_init__(self):
    """Validate that exactly one of event or object is set."""
    if self.event is None and self.object is None:
        raise ValueError("LogRecord must have either event or object set")
    if self.event is not None and self.object is not None:
        raise ValueError("LogRecord cannot have both event and object set")

Relationship dataclass

Relationship(object_id: str, qualifier: str)

A relationship from an event to an object.

Attributes:

Name Type Description
object_id str

The ID of the related object

qualifier str

How the object relates to the event (e.g., "input", "output", "actor")

EventAttributeValue dataclass

EventAttributeValue(name: str, value: str, type: str)

An attribute value for an event.

Event attributes don't have timestamps because the event time itself is sufficient.

Attributes:

Name Type Description
name str

The attribute name

value str

The string representation of the value

type str

The data type ("string", "integer", "float", "boolean", "timestamp")

ObjectAttributeValue dataclass

ObjectAttributeValue(name: str, value: str, type: str, time: Optional[datetime] = None)

An attribute value for an object.

OCEL attributes include timestamps for versioning.

Attributes:

Name Type Description
name str

The attribute name

value str

The string representation of the value

type str

The data type ("string", "integer", "float", "boolean", "timestamp")

time Optional[datetime]

When this attribute value was set

ObjectRef dataclass

ObjectRef(qualifier: str, scope: Union[ObjectScope, str] = EVENT)

Metadata annotation for object references in blackboard interfaces.

Used with typing.Annotated to mark fields as OCEL objects:

robot: Annotated[Robot, ObjectRef(qualifier="actor", scope=ObjectScope.GLOBAL)]

Attributes:

Name Type Description
qualifier str

How the object relates to events ("input", "output", "actor", etc.)

scope Union[ObjectScope, str]

When to include this object in events

__post_init__

__post_init__()

Convert string scope to ObjectScope enum if needed.

Source code in src/mycorrhizal/spores/models.py
def __post_init__(self):
    """Convert string scope to ObjectScope enum if needed."""
    if isinstance(self.scope, str):
        object.__setattr__(self, 'scope', ObjectScope(self.scope))

ObjectScope

Bases: Enum

Scope for object associations with events.

EventAttr dataclass

EventAttr(name: Optional[str] = None)

Metadata annotation for event attributes in blackboard interfaces.

Used with typing.Annotated to mark fields that should be logged as event attributes:

mission_id: Annotated[str, EventAttr]
battery_level: Annotated[int, EventAttr]

Attributes:

Name Type Description
name Optional[str]

Optional custom name for the attribute (defaults to field name)

SporesAttr dataclass

SporesAttr(name: Optional[str] = None)

Metadata annotation for object attributes in OCEL object logging.

Used with typing.Annotated to mark fields that should be logged when an object is logged:

class Order(BaseModel):
    id: str
    status: Annotated[str, SporesAttr]  # Logged
    total: Annotated[float, SporesAttr]  # Logged
    items: list[dict]  # Not marked - not logged

When using @spore.log_event(relationships={...}), fields marked with SporesAttr are automatically logged. If attributes list is provided, it overrides SporesAttr.

Attributes:

Name Type Description
name Optional[str]

Optional custom name for the attribute (defaults to field name)

ObjectLRUCache

ObjectLRUCache(maxsize: int = 128, needs_logged: Optional[Callable[[K, Object], None]] = None, touch_resend_n: int = 100, metrics: Optional['CacheMetrics'] = None)

Bases: Generic[K, V]

LRU cache with unified callback for object logging.

The cache fires the needs_logged callback when an object needs to be logged: - On first sight (new object added to cache) - On eviction (object removed from cache to make room) - When attributes change (detected via hash comparison) - Every N touches (configurable via touch_resend_n)

This ensures OCEL consumers see object evolution: - First sight: Initial object state - Attribute changes: Updated state - Eviction: Final state before being removed from cache - Periodic resend: Long-lived objects are re-logged periodically

Example
def needs_logged(object_id: str, obj: Object):
    # Send object to transport
    transport.send(LogRecord(object=obj))

cache = ObjectLRUCache(maxsize=128, needs_logged=needs_logged, touch_resend_n=100)

# Check if object exists, add if not
if not cache.contains_or_add(object_id, object):
    # Object not in cache, already logged by needs_logged
    pass

Parameters:

Name Type Description Default
maxsize int

Maximum number of objects to cache

128
needs_logged Optional[Callable[[K, Object], None]]

Callback when object needs logging (object_id, object) -> None

None
touch_resend_n int

Resend object every N touches (0 to disable periodic resend)

100
Source code in src/mycorrhizal/spores/cache.py
def __init__(
    self,
    maxsize: int = 128,
    needs_logged: Optional[Callable[[K, Object], None]] = None,
    touch_resend_n: int = 100,
    metrics: Optional["CacheMetrics"] = None
):
    self.maxsize = maxsize
    self.needs_logged = needs_logged
    self.touch_resend_n = touch_resend_n
    self.metrics = metrics
    self._cache: OrderedDict[K, CacheEntry] = OrderedDict()

_evict_if_needed

_evict_if_needed() -> None

Evict oldest entry if cache is full.

Source code in src/mycorrhizal/spores/cache.py
def _evict_if_needed(self) -> None:
    """Evict oldest entry if cache is full."""
    while len(self._cache) >= self.maxsize:
        # FIFO from OrderedDict (oldest first)
        object_id, entry = self._cache.popitem(last=False)
        if self.metrics is not None:
            self.metrics.evictions += 1
        if self.needs_logged:
            self.needs_logged(object_id, entry.object)

get

get(key: K) -> Optional[Object]

Get an object from the cache without updating its position.

Parameters:

Name Type Description Default
key K

The object ID

required

Returns:

Type Description
Optional[Object]

The Object if found, None otherwise

Source code in src/mycorrhizal/spores/cache.py
def get(self, key: K) -> Optional[Object]:
    """
    Get an object from the cache without updating its position.

    Args:
        key: The object ID

    Returns:
        The Object if found, None otherwise
    """
    entry = self._cache.get(key)
    return entry.object if entry else None

get_and_touch

get_and_touch(key: K) -> Optional[Object]

Get an object from the cache and mark it as recently used.

Parameters:

Name Type Description Default
key K

The object ID

required

Returns:

Type Description
Optional[Object]

The Object if found, None otherwise

Source code in src/mycorrhizal/spores/cache.py
def get_and_touch(self, key: K) -> Optional[Object]:
    """
    Get an object from the cache and mark it as recently used.

    Args:
        key: The object ID

    Returns:
        The Object if found, None otherwise
    """
    entry = self._cache.get(key)
    if entry:
        # Move to end (most recently used)
        self._cache.move_to_end(key)
        return entry.object
    return None

contains_or_add

contains_or_add(key: K, obj: Object) -> bool

Check if key exists, add if not.

This is the primary method for object tracking: - If key exists: check for attribute changes, periodic resend, mark as recently used - If key doesn't exist: add object, potentially evict, log first sight

The needs_logged callback is fired when: - First sight: new object added to cache - Attribute change: object attributes changed (hash comparison) - Every N touches: periodic resend for long-lived objects

Parameters:

Name Type Description Default
key K

The object ID

required
obj Object

The OCEL Object to add if not present

required

Returns:

Type Description
bool

True if object was already in cache, False if it was just added

Source code in src/mycorrhizal/spores/cache.py
def contains_or_add(self, key: K, obj: Object) -> bool:
    """
    Check if key exists, add if not.

    This is the primary method for object tracking:
    - If key exists: check for attribute changes, periodic resend, mark as recently used
    - If key doesn't exist: add object, potentially evict, log first sight

    The `needs_logged` callback is fired when:
    - First sight: new object added to cache
    - Attribute change: object attributes changed (hash comparison)
    - Every N touches: periodic resend for long-lived objects

    Args:
        key: The object ID
        obj: The OCEL Object to add if not present

    Returns:
        True if object was already in cache, False if it was just added
    """
    if key in self._cache:
        # Object exists - check if we need to log
        entry = self._cache[key]
        entry.sight_count += 1

        # Compute current attribute hash
        current_hash = _compute_attributes_hash(obj)

        # Check if attributes changed
        attrs_changed = (entry.attributes_hash is not None and
                        current_hash != entry.attributes_hash)

        # Update if attributes changed
        if attrs_changed:
            entry.object = obj
            entry.attributes_hash = current_hash

        # Fire callback if ANY condition met
        should_log = any([
            attrs_changed,
            self.touch_resend_n > 0 and entry.sight_count % self.touch_resend_n == 0,
        ])
        if should_log and self.needs_logged:
            self.needs_logged(key, entry.object)

        self._cache.move_to_end(key)
        return True
    else:
        # First sight - store hash, fire callback
        self._evict_if_needed()
        if self.metrics is not None:
            self.metrics.first_sights += 1
        attr_hash = _compute_attributes_hash(obj)
        entry = CacheEntry(object=obj, attributes_hash=attr_hash)
        self._cache[key] = entry
        if self.needs_logged:
            self.needs_logged(key, obj)
        return False

add

add(key: K, obj: Object) -> None

Add an object to the cache (or update if exists).

Note: This method fires needs_logged for new objects, but not for updates. For attribute change detection and periodic resend, use contains_or_add().

Parameters:

Name Type Description Default
key K

The object ID

required
obj Object

The OCEL Object

required
Source code in src/mycorrhizal/spores/cache.py
def add(self, key: K, obj: Object) -> None:
    """
    Add an object to the cache (or update if exists).

    Note: This method fires needs_logged for new objects, but not for updates.
    For attribute change detection and periodic resend, use contains_or_add().

    Args:
        key: The object ID
        obj: The OCEL Object
    """
    if key in self._cache:
        # Update existing entry
        entry = self._cache[key]
        entry.object = obj
        entry.sight_count += 1
        self._cache.move_to_end(key)
    else:
        # New entry
        self._evict_if_needed()

        attr_hash = _compute_attributes_hash(obj)
        entry = CacheEntry(object=obj, attributes_hash=attr_hash)
        self._cache[key] = entry

        if self.needs_logged:
            self.needs_logged(key, obj)

remove

remove(key: K) -> Optional[Object]

Remove an object from the cache.

Parameters:

Name Type Description Default
key K

The object ID

required

Returns:

Type Description
Optional[Object]

The removed Object, or None if key wasn't in cache

Source code in src/mycorrhizal/spores/cache.py
def remove(self, key: K) -> Optional[Object]:
    """
    Remove an object from the cache.

    Args:
        key: The object ID

    Returns:
        The removed Object, or None if key wasn't in cache
    """
    return self._cache.pop(key, None)

clear

clear() -> None

Clear all objects from the cache without logging.

Source code in src/mycorrhizal/spores/cache.py
def clear(self) -> None:
    """Clear all objects from the cache without logging."""
    self._cache.clear()

__len__

__len__() -> int

Return the number of objects in the cache.

Source code in src/mycorrhizal/spores/cache.py
def __len__(self) -> int:
    """Return the number of objects in the cache."""
    return len(self._cache)

__contains__

__contains__(key: K) -> bool

Check if key is in cache.

Source code in src/mycorrhizal/spores/cache.py
def __contains__(self, key: K) -> bool:
    """Check if key is in cache."""
    return key in self._cache

keys

keys() -> list[K]

Return all keys in the cache.

Source code in src/mycorrhizal/spores/cache.py
def keys(self) -> list[K]:
    """Return all keys in the cache."""
    return list(self._cache.keys())

values

values() -> list[Object]

Return all objects in the cache.

Source code in src/mycorrhizal/spores/cache.py
def values(self) -> list[Object]:
    """Return all objects in the cache."""
    return [entry.object for entry in self._cache.values()]

items

items() -> list[tuple[K, Object]]

Return all (key, object) pairs in the cache.

Source code in src/mycorrhizal/spores/cache.py
def items(self) -> list[tuple[K, Object]]:
    """Return all (key, object) pairs in the cache."""
    return [(key, entry.object) for key, entry in self._cache.items()]

Encoder

Bases: Protocol

Protocol for encoders that convert LogRecords to bytes.

Encoders serialize OCEL LogRecords for transport to OCEL consumers.

encode

encode(record: LogRecord) -> bytes

Encode a LogRecord to bytes.

Parameters:

Name Type Description Default
record LogRecord

The LogRecord to encode

required

Returns:

Type Description
bytes

Serialized bytes representation

Source code in src/mycorrhizal/spores/encoder/base.py
def encode(self, record: LogRecord) -> bytes:
    """
    Encode a LogRecord to bytes.

    Args:
        record: The LogRecord to encode

    Returns:
        Serialized bytes representation
    """
    ...

content_type

content_type() -> str

Return the MIME content type for this encoding.

Returns:

Type Description
str

Content type string (e.g., "application/json")

Source code in src/mycorrhizal/spores/encoder/base.py
def content_type(self) -> str:
    """
    Return the MIME content type for this encoding.

    Returns:
        Content type string (e.g., "application/json")
    """
    ...

JSONEncoder

JSONEncoder()

Bases: Encoder

JSON encoder for OCEL LogRecords.

Produces OCEL-compatible JSON with Unix float64 timestamps.

Example output

{ "event": { "id": "evt-123", "type": "process_item", "activity": null, "time": 1705689296.123456, "attributes": [ {"name": "priority", "value": "high", "type": "string"} ], "relationships": [ {"objectId": "obj-456", "qualifier": "input"} ] }, "object": null }

Or for objects

{ "event": null, "object": { "id": "obj-456", "type": "WorkItem", "attributes": [ {"name": "status", "value": "pending", "type": "string", "time": 1705689296.123456} ], "relationships": [] } }

Initialize JSONEncoder.

Timestamps are always encoded as Unix float64 timestamps.

Source code in src/mycorrhizal/spores/encoder/json.py
def __init__(self):
    """
    Initialize JSONEncoder.

    Timestamps are always encoded as Unix float64 timestamps.
    """
    pass

encode

encode(record: LogRecord) -> bytes

Encode a LogRecord to JSON bytes.

Parameters:

Name Type Description Default
record LogRecord

The LogRecord to encode

required

Returns:

Type Description
bytes

JSON-encoded bytes

Source code in src/mycorrhizal/spores/encoder/json.py
def encode(self, record: LogRecord) -> bytes:
    """
    Encode a LogRecord to JSON bytes.

    Args:
        record: The LogRecord to encode

    Returns:
        JSON-encoded bytes
    """
    # Convert LogRecord to dict with union structure (both fields, one null)
    if record.event is not None:
        data = {"event": self._event_to_dict(record.event), "object": None}
    else:
        data = {"event": None, "object": self._object_to_dict(record.object)}

    # Encode to JSON
    return json.dumps(data, separators=(',', ':')).encode('utf-8')

content_type

content_type() -> str

Return the JSON content type.

Source code in src/mycorrhizal/spores/encoder/json.py
def content_type(self) -> str:
    """Return the JSON content type."""
    return "application/json"

_event_to_dict

_event_to_dict(event: Event) -> Dict[str, Any]

Convert an Event to dict for JSON serialization.

Source code in src/mycorrhizal/spores/encoder/json.py
def _event_to_dict(self, event: Event) -> Dict[str, Any]:
    """Convert an Event to dict for JSON serialization."""
    return {
        "id": event.id,
        "type": event.type,
        "activity": event.activity,
        "time": self._format_datetime(event.time),
        "attributes": [
            self._event_attr_to_dict(name, attr)
            for name, attr in event.attributes.items()
        ],
        "relationships": [
            self._relationship_to_dict(qualifier, rel)
            for qualifier, rel in event.relationships.items()
        ]
    }

_object_to_dict

_object_to_dict(obj: Object) -> Dict[str, Any]

Convert an Object to dict for JSON serialization.

Source code in src/mycorrhizal/spores/encoder/json.py
def _object_to_dict(self, obj: Object) -> Dict[str, Any]:
    """Convert an Object to dict for JSON serialization."""
    return {
        "id": obj.id,
        "type": obj.type,
        "attributes": [
            self._object_attr_to_dict(name, attr)
            for name, attr in obj.attributes.items()
        ],
        "relationships": [
            self._relationship_to_dict(qualifier, rel)
            for qualifier, rel in obj.relationships.items()
        ]
    }

_event_attr_to_dict

_event_attr_to_dict(name: str, attr: EventAttributeValue) -> Dict[str, Any]

Convert EventAttributeValue to dict.

Source code in src/mycorrhizal/spores/encoder/json.py
def _event_attr_to_dict(self, name: str, attr: EventAttributeValue) -> Dict[str, Any]:
    """Convert EventAttributeValue to dict."""
    return {
        "name": attr.name or name,
        "value": attr.value,
        "type": attr.type
    }

_object_attr_to_dict

_object_attr_to_dict(name: str, attr: ObjectAttributeValue) -> Dict[str, Any]

Convert ObjectAttributeValue to dict.

Source code in src/mycorrhizal/spores/encoder/json.py
def _object_attr_to_dict(self, name: str, attr: ObjectAttributeValue) -> Dict[str, Any]:
    """Convert ObjectAttributeValue to dict."""
    result = {
        "name": attr.name or name,
        "value": attr.value,
        "type": attr.type
    }
    if attr.time is not None:
        result["time"] = self._format_datetime(attr.time)
    else:
        result["time"] = None
    return result

_relationship_to_dict

_relationship_to_dict(qualifier: str, rel: Relationship) -> Dict[str, str]

Convert Relationship to dict.

Source code in src/mycorrhizal/spores/encoder/json.py
def _relationship_to_dict(self, qualifier: str, rel: Relationship) -> Dict[str, str]:
    """Convert Relationship to dict."""
    return {
        "objectId": rel.object_id,
        "qualifier": qualifier
    }

_format_datetime

_format_datetime(dt: datetime) -> float

Format datetime as Unix float64 timestamp.

Parameters:

Name Type Description Default
dt datetime

The datetime to format

required

Returns:

Type Description
float

Unix float64 timestamp (seconds since epoch)

Source code in src/mycorrhizal/spores/encoder/json.py
def _format_datetime(self, dt: datetime) -> float:
    """
    Format datetime as Unix float64 timestamp.

    Args:
        dt: The datetime to format

    Returns:
        Unix float64 timestamp (seconds since epoch)
    """
    return dt.timestamp()

SyncTransport

Bases: Protocol

Protocol for synchronous transports (blocking I/O).

Sync transports use blocking send() - they write data immediately and block until complete. Use these with get_spore_sync().

Examples: file writes, blocking HTTP requests, etc.

send

send(data: bytes, content_type: str) -> None

Send encoded data to the transport destination (blocking).

Parameters:

Name Type Description Default
data bytes

The encoded log record data

required
content_type str

MIME content type (e.g., "application/json")

required
Source code in src/mycorrhizal/spores/transport/base.py
def send(self, data: bytes, content_type: str) -> None:
    """
    Send encoded data to the transport destination (blocking).

    Args:
        data: The encoded log record data
        content_type: MIME content type (e.g., "application/json")
    """
    ...

close

close() -> None

Close the transport and release resources.

Source code in src/mycorrhizal/spores/transport/base.py
def close(self) -> None:
    """Close the transport and release resources."""
    ...

AsyncTransport

Bases: Protocol

Protocol for asynchronous transports (async I/O).

Async transports use non-blocking async send() - they await completion. Use these with get_spore_async().

Examples: async file writes, async HTTP requests, message queues, etc.

send async

send(data: bytes, content_type: str) -> None

Send encoded data to the transport destination (async).

Parameters:

Name Type Description Default
data bytes

The encoded log record data

required
content_type str

MIME content type (e.g., "application/json")

required
Source code in src/mycorrhizal/spores/transport/base.py
async def send(self, data: bytes, content_type: str) -> None:
    """
    Send encoded data to the transport destination (async).

    Args:
        data: The encoded log record data
        content_type: MIME content type (e.g., "application/json")
    """
    ...

close async

close() -> None

Close the transport and release resources.

Source code in src/mycorrhizal/spores/transport/base.py
async def close(self) -> None:
    """Close the transport and release resources."""
    ...

SyncFileTransport

SyncFileTransport(filepath: str | Path)

Synchronous file transport using blocking I/O.

Writes log records to a file in JSONL format (one JSON object per line). Thread-safe with a lock for concurrent writes.

Initialize the file transport.

Parameters:

Name Type Description Default
filepath str | Path

Path to the log file. Will be created if it doesn't exist.

required
Source code in src/mycorrhizal/spores/transport/file.py
def __init__(self, filepath: str | Path):
    """
    Initialize the file transport.

    Args:
        filepath: Path to the log file. Will be created if it doesn't exist.
    """
    self.filepath = Path(filepath)
    self.filepath.parent.mkdir(parents=True, exist_ok=True)
    self._file = open(self.filepath, 'a', encoding='utf-8')
    self._lock = threading.Lock()

send

send(data: bytes, content_type: str) -> None

Write data to file (blocking call).

Parameters:

Name Type Description Default
data bytes

Encoded log record data

required
content_type str

MIME content type (e.g., "application/json")

required
Source code in src/mycorrhizal/spores/transport/file.py
def send(self, data: bytes, content_type: str) -> None:
    """
    Write data to file (blocking call).

    Args:
        data: Encoded log record data
        content_type: MIME content type (e.g., "application/json")
    """
    with self._lock:
        self._file.write(data.decode('utf-8') + '\n')
        self._file.flush()

close

close() -> None

Close the file handle.

Source code in src/mycorrhizal/spores/transport/file.py
def close(self) -> None:
    """Close the file handle."""
    with self._lock:
        if self._file and not self._file.closed:
            self._file.close()
            self._file = None

__enter__

__enter__()

Context manager support.

Source code in src/mycorrhizal/spores/transport/file.py
def __enter__(self):
    """Context manager support."""
    return self

__exit__

__exit__(exc_type, exc_val, exc_tb)

Context manager support.

Source code in src/mycorrhizal/spores/transport/file.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager support."""
    self.close()

AsyncFileTransport

AsyncFileTransport(filepath: str | Path)

Asynchronous file transport using async I/O.

Writes log records to a file in JSONL format (one JSON object per line). Uses asyncio.to_thread() for non-blocking async file I/O.

This is the async version of SyncFileTransport - use it with get_spore_async().

Initialize the async file transport.

Parameters:

Name Type Description Default
filepath str | Path

Path to the log file. Will be created if it doesn't exist.

required
Source code in src/mycorrhizal/spores/transport/file.py
def __init__(self, filepath: str | Path):
    """
    Initialize the async file transport.

    Args:
        filepath: Path to the log file. Will be created if it doesn't exist.
    """
    self.filepath = Path(filepath)
    self.filepath.parent.mkdir(parents=True, exist_ok=True)
    # Open file in append mode, will be managed by asyncio.to_thread
    self._file = open(self.filepath, 'a', encoding='utf-8')
    self._lock = asyncio.Lock()

send async

send(data: bytes, content_type: str) -> None

Write data to file asynchronously.

Uses asyncio.to_thread() to run blocking I/O in a thread pool, avoiding the need for external dependencies like aiofiles.

Parameters:

Name Type Description Default
data bytes

Encoded log record data

required
content_type str

MIME content type (e.g., "application/json")

required
Source code in src/mycorrhizal/spores/transport/file.py
async def send(self, data: bytes, content_type: str) -> None:
    """
    Write data to file asynchronously.

    Uses asyncio.to_thread() to run blocking I/O in a thread pool,
    avoiding the need for external dependencies like aiofiles.

    Args:
        data: Encoded log record data
        content_type: MIME content type (e.g., "application/json")
    """
    async with self._lock:
        # Run blocking file I/O in thread pool
        await asyncio.to_thread(self._write_sync, data)

_write_sync

_write_sync(data: bytes) -> None

Synchronous write helper - runs in thread pool.

Parameters:

Name Type Description Default
data bytes

Encoded log record data

required
Source code in src/mycorrhizal/spores/transport/file.py
def _write_sync(self, data: bytes) -> None:
    """
    Synchronous write helper - runs in thread pool.

    Args:
        data: Encoded log record data
    """
    self._file.write(data.decode('utf-8') + '\n')
    self._file.flush()

close async

close() -> None

Close the file handle asynchronously.

Source code in src/mycorrhizal/spores/transport/file.py
async def close(self) -> None:
    """Close the file handle asynchronously."""
    async with self._lock:
        if self._file and not self._file.closed:
            await asyncio.to_thread(self._file.close)
            self._file = None

__aenter__ async

__aenter__()

Async context manager support.

Source code in src/mycorrhizal/spores/transport/file.py
async def __aenter__(self):
    """Async context manager support."""
    return self

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb)

Async context manager support.

Source code in src/mycorrhizal/spores/transport/file.py
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager support."""
    await self.close()

HyphaAdapter

HyphaAdapter()

Adapter for Hypha Petri net logging.

Provides decorators and helpers for logging: - Transition execution with consumed/produced tokens - Place token arrivals/departures - Token object lifecycle tracking

Usage
from mycorrhizal.spores.dsl import HyphaAdapter

adapter = HyphaAdapter()

@pn.net
def MyNet(builder):
    @builder.transition()
    @adapter.log_transition(event_type="process_item")
    async def process(consumed, bb, timebase):
        # Event automatically logged with:
        # - token_count attribute
        # - relationships to consumed tokens
        yield {output: consumed[0]}

    @builder.place()
    @adapter.log_place(event_type="item_arrived")
    def input_place(bb):
        return None

Initialize the Hypha adapter.

Source code in src/mycorrhizal/spores/dsl/hypha.py
def __init__(self):
    """Initialize the Hypha adapter."""
    self._enabled = True

enable

enable()

Enable logging for this adapter.

Source code in src/mycorrhizal/spores/dsl/hypha.py
def enable(self):
    """Enable logging for this adapter."""
    self._enabled = True

disable

disable()

Disable logging for this adapter.

Source code in src/mycorrhizal/spores/dsl/hypha.py
def disable(self):
    """Disable logging for this adapter."""
    self._enabled = False

log_transition

log_transition(event_type: str, attributes: Optional[Union[Dict[str, Any], List[str]]] = None, log_inputs: bool = True, log_outputs: bool = False) -> Callable

Decorator to log Hypha transition execution.

Automatically captures: - Token count from consumed tokens - Relationships to consumed/produced tokens - Attributes from blackboard (if specified) - Objects from blackboard with ObjectRef metadata

Parameters:

Name Type Description Default
event_type str

Type of event to log

required
attributes Optional[Union[Dict[str, Any], List[str]]]

Static attributes or param names to extract

None
log_inputs bool

Whether to log input tokens as related objects

True
log_outputs bool

Whether to log output tokens as related objects

False

Returns:

Type Description
Callable

Decorator function

Source code in src/mycorrhizal/spores/dsl/hypha.py
def log_transition(
    self,
    event_type: str,
    attributes: Optional[Union[Dict[str, Any], List[str]]] = None,
    log_inputs: bool = True,
    log_outputs: bool = False,
) -> Callable:
    """
    Decorator to log Hypha transition execution.

    Automatically captures:
    - Token count from consumed tokens
    - Relationships to consumed/produced tokens
    - Attributes from blackboard (if specified)
    - Objects from blackboard with ObjectRef metadata

    Args:
        event_type: Type of event to log
        attributes: Static attributes or param names to extract
        log_inputs: Whether to log input tokens as related objects
        log_outputs: Whether to log output tokens as related objects

    Returns:
        Decorator function
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def async_wrapper(consumed: List[Any], bb: Any, timebase: Any, state: Any = None):
            # Call original transition
            if state is not None:
                result = func(consumed, bb, timebase, state)
            else:
                result = func(consumed, bb, timebase)

            # Handle async generator
            if inspect.isasyncgen(result):
                # Log before processing
                await _log_transition_event(
                    func, consumed, bb, timebase, event_type, attributes, log_inputs, None
                )

                # Collect outputs
                outputs = []
                async for yielded in result:
                    outputs.append(yielded)
                    yield yielded

                # Log outputs if requested
                if log_outputs:
                    await _log_outputs(func, outputs, bb, timebase, event_type)
            else:
                # Handle coroutine
                result = await result

                # Log event
                await _log_transition_event(
                    func, consumed, bb, timebase, event_type, attributes, log_inputs, result
                )

                # Yield the result if it's not None
                if result is not None:
                    yield result

        @functools.wraps(func)
        def sync_wrapper(consumed: List[Any], bb: Any, timebase: Any, state: Any = None):
            # For sync transitions, log asynchronously
            if state is not None:
                result = func(consumed, bb, timebase, state)
            else:
                result = func(consumed, bb, timebase)

            # Schedule logging
            asyncio.create_task(_log_transition_event(
                func, consumed, bb, timebase, event_type, attributes, log_inputs, result
            ))

            return result

        # Return appropriate wrapper
        if asyncio.iscoroutinefunction(func):
            return async_wrapper  # type: ignore
        else:
            return sync_wrapper  # type: ignore

    return decorator

log_place

log_place(event_type: str = 'token_arrived') -> Callable

Decorator to log place token arrivals.

Note: This requires instrumenting the place handler or using a custom place runtime wrapper. For most cases, transition logging provides sufficient coverage.

Parameters:

Name Type Description Default
event_type str

Type of event to log

'token_arrived'

Returns:

Type Description
Callable

Decorator function

Source code in src/mycorrhizal/spores/dsl/hypha.py
def log_place(self, event_type: str = "token_arrived") -> Callable:
    """
    Decorator to log place token arrivals.

    Note: This requires instrumenting the place handler or using
    a custom place runtime wrapper. For most cases, transition
    logging provides sufficient coverage.

    Args:
        event_type: Type of event to log

    Returns:
        Decorator function
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def async_wrapper(bb: Any, timebase: Any):
            result = await func(bb, timebase)

            # Log token arrival
            config = get_config()
            if config.enabled:
                await _log_place_event(func, bb, timebase, event_type)

            return result

        @functools.wraps(func)
        def sync_wrapper(bb: Any, timebase: Any):
            result = func(bb, timebase)

            # Schedule logging
            config = get_config()
            if config.enabled:
                asyncio.create_task(_log_place_event(func, bb, timebase, event_type))

            return result

        if asyncio.iscoroutinefunction(func):
            return async_wrapper  # type: ignore
        else:
            return sync_wrapper  # type: ignore

    return decorator

RhizomorphAdapter

RhizomorphAdapter()

Adapter for Rhizomorph behavior tree logging.

Provides decorators and helpers for logging: - Node tick execution with status results - Tree-level events - Blackboard object lifecycle tracking

Usage
from mycorrhizal.spores.dsl import RhizomorphAdapter

adapter = RhizomorphAdapter()

@bt.tree
def MyTree():
    @bt.action
    @adapter.log_node(event_type="check_threat")
    async def check_threat(bb: MissionContext) -> Status:
        # Event automatically logged with:
        # - status result attribute
        # - attributes from bb (with EventAttr annotations)
        # - relationships to objects (with ObjectRef annotations)
        return Status.SUCCESS

Initialize the Rhizomorph adapter.

Source code in src/mycorrhizal/spores/dsl/rhizomorph.py
def __init__(self):
    """Initialize the Rhizomorph adapter."""
    self._enabled = True

enable

enable()

Enable logging for this adapter.

Source code in src/mycorrhizal/spores/dsl/rhizomorph.py
def enable(self):
    """Enable logging for this adapter."""
    self._enabled = True

disable

disable()

Disable logging for this adapter.

Source code in src/mycorrhizal/spores/dsl/rhizomorph.py
def disable(self):
    """Disable logging for this adapter."""
    self._enabled = False

log_node

log_node(event_type: str, attributes: Optional[Union[Dict[str, Any], List[str]]] = None, log_status: bool = True) -> Callable

Decorator to log Rhizomorph node execution.

Automatically captures: - Node execution status (SUCCESS, FAILURE, RUNNING, etc.) - Attributes from blackboard (with EventAttr annotations) - Objects from blackboard (with ObjectRef annotations) - Node name

Parameters:

Name Type Description Default
event_type str

Type of event to log

required
attributes Optional[Union[Dict[str, Any], List[str]]]

Static attributes or param names to extract

None
log_status bool

Whether to include status in event attributes

True

Returns:

Type Description
Callable

Decorator function

Source code in src/mycorrhizal/spores/dsl/rhizomorph.py
def log_node(
    self,
    event_type: str,
    attributes: Optional[Union[Dict[str, Any], List[str]]] = None,
    log_status: bool = True,
) -> Callable:
    """
    Decorator to log Rhizomorph node execution.

    Automatically captures:
    - Node execution status (SUCCESS, FAILURE, RUNNING, etc.)
    - Attributes from blackboard (with EventAttr annotations)
    - Objects from blackboard (with ObjectRef annotations)
    - Node name

    Args:
        event_type: Type of event to log
        attributes: Static attributes or param names to extract
        log_status: Whether to include status in event attributes

    Returns:
        Decorator function
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def async_wrapper(bb: Any, tb: Any = None):
            # Call original node function
            if _supports_timebase(func):
                result = await func(bb=bb, tb=tb)
            else:
                result = await func(bb=bb)

            # Log event
            await _log_node_event(
                func, bb, tb, event_type, attributes, log_status, result
            )

            return result

        @functools.wraps(func)
        def sync_wrapper(bb: Any, tb: Any = None):
            # Call original node function
            if _supports_timebase(func):
                result = func(bb=bb, tb=tb)
            else:
                result = func(bb=bb)

            # Schedule logging
            asyncio.create_task(_log_node_event(
                func, bb, tb, event_type, attributes, log_status, result
            ))

            return result

        # Return appropriate wrapper
        if asyncio.iscoroutinefunction(func):
            return async_wrapper  # type: ignore
        else:
            return sync_wrapper  # type: ignore

    return decorator

SeptumAdapter

SeptumAdapter()

Adapter for Septum state machine logging.

Provides decorators and helpers for logging: - State entry/exit/timeout events - State execution events - Transition events - Message/context object lifecycle tracking

Usage
from mycorrhizal.spores.dsl import SeptumAdapter

adapter = SeptumAdapter()

@septum.state()
def MyState():
    @septum.on_state
    @adapter.log_state(event_type="state_execute")
    async def on_state(ctx: SharedContext):
        # Event automatically logged with:
        # - state_name attribute
        # - attributes from ctx.common (with EventAttr annotations)
        # - relationships to objects (with ObjectRef annotations)
        return Events.DONE

    @septum.on_enter
    @adapter.log_state_lifecycle(event_type="state_enter")
    async def on_enter(ctx: SharedContext):
        pass

Initialize the Septum adapter.

Source code in src/mycorrhizal/spores/dsl/septum.py
def __init__(self):
    """Initialize the Septum adapter."""
    self._enabled = True

enable

enable()

Enable logging for this adapter.

Source code in src/mycorrhizal/spores/dsl/septum.py
def enable(self):
    """Enable logging for this adapter."""
    self._enabled = True

disable

disable()

Disable logging for this adapter.

Source code in src/mycorrhizal/spores/dsl/septum.py
def disable(self):
    """Disable logging for this adapter."""
    self._enabled = False

log_state

log_state(event_type: str, attributes: Optional[Dict[str, Any]] = None, log_state_name: bool = True, log_transition: bool = True) -> Callable

Decorator to log Septum state execution.

Automatically captures: - State name - Transition result (if log_transition=True) - Attributes from ctx.common (with EventAttr annotations) - Objects from ctx.common (with ObjectRef annotations) - Message information (if present in ctx)

Parameters:

Name Type Description Default
event_type str

Type of event to log

required
attributes Optional[Dict[str, Any]]

Static attributes to include

None
log_state_name bool

Whether to include state name in attributes

True
log_transition bool

Whether to include transition result

True

Returns:

Type Description
Callable

Decorator function

Source code in src/mycorrhizal/spores/dsl/septum.py
def log_state(
    self,
    event_type: str,
    attributes: Optional[Dict[str, Any]] = None,
    log_state_name: bool = True,
    log_transition: bool = True,
) -> Callable:
    """
    Decorator to log Septum state execution.

    Automatically captures:
    - State name
    - Transition result (if log_transition=True)
    - Attributes from ctx.common (with EventAttr annotations)
    - Objects from ctx.common (with ObjectRef annotations)
    - Message information (if present in ctx)

    Args:
        event_type: Type of event to log
        attributes: Static attributes to include
        log_state_name: Whether to include state name in attributes
        log_transition: Whether to include transition result

    Returns:
        Decorator function
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def async_wrapper(ctx: SharedContext):
            # Call original state handler
            result = await func(ctx)

            # Log event
            await _log_state_event(
                func, ctx, event_type, attributes, log_state_name, log_transition, result
            )

            return result

        @functools.wraps(func)
        def sync_wrapper(ctx: SharedContext):
            # Call original state handler
            result = func(ctx)

            # Schedule logging
            asyncio.create_task(_log_state_event(
                func, ctx, event_type, attributes, log_state_name, log_transition, result
            ))

            return result

        if asyncio.iscoroutinefunction(func):
            return async_wrapper  # type: ignore
        else:
            return sync_wrapper  # type: ignore

    return decorator

log_state_lifecycle

log_state_lifecycle(event_type: str, attributes: Optional[Dict[str, Any]] = None) -> Callable

Decorator to log state lifecycle events (on_enter, on_leave, on_timeout).

Use this for logging lifecycle hooks rather than main state execution.

Parameters:

Name Type Description Default
event_type str

Type of event to log (e.g., "state_enter", "state_leave")

required
attributes Optional[Dict[str, Any]]

Static attributes to include

None

Returns:

Type Description
Callable

Decorator function

Source code in src/mycorrhizal/spores/dsl/septum.py
def log_state_lifecycle(
    self,
    event_type: str,
    attributes: Optional[Dict[str, Any]] = None,
) -> Callable:
    """
    Decorator to log state lifecycle events (on_enter, on_leave, on_timeout).

    Use this for logging lifecycle hooks rather than main state execution.

    Args:
        event_type: Type of event to log (e.g., "state_enter", "state_leave")
        attributes: Static attributes to include

    Returns:
        Decorator function
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def async_wrapper(ctx: SharedContext):
            # Log before execution
            config = get_config()
            if config.enabled:
                await _log_lifecycle_event(func, ctx, event_type, attributes, "enter")

            # Call original lifecycle handler
            result = await func(ctx)

            return result

        @functools.wraps(func)
        def sync_wrapper(ctx: SharedContext):
            # Schedule logging
            config = get_config()
            if config.enabled:
                asyncio.create_task(_log_lifecycle_event(
                    func, ctx, event_type, attributes, "enter"
                ))

            return func(ctx)

        if asyncio.iscoroutinefunction(func):
            return async_wrapper  # type: ignore
        else:
            return sync_wrapper  # type: ignore

    return decorator

configure

configure(enabled: bool = True, object_cache_size: int = 128, encoder: Optional[Encoder] = None, transport: Optional[Transport] = None, eviction_policy: Union[EvictionPolicy, str] = EVICT_AND_LOG, touch_resend_n: int = 100) -> None

Configure the Spores logging system (thread-safe).

This should be called once at application startup. If called multiple times, the last call wins.

Thread-safe: Can be called from multiple threads concurrently.

Parameters:

Name Type Description Default
enabled bool

Whether spores logging is enabled (default: True)

True
object_cache_size int

Maximum objects in LRU cache (default: 128)

128
encoder Optional[Encoder]

Encoder instance (defaults to JSONEncoder)

None
transport Optional[Transport]

Transport instance (required for logging to work)

None
eviction_policy Union[EvictionPolicy, str]

Policy for cache eviction (default: evict_and_log)

EVICT_AND_LOG
touch_resend_n int

Resend object every N touches (default: 100, 0 to disable)

100
Example
from mycorrhizal.spores.transport import FileTransport

spore.configure(
    transport=FileTransport("logs/ocel.jsonl"),
    object_cache_size=256,
    touch_resend_n=100,
)
Source code in src/mycorrhizal/spores/core.py
def configure(
    enabled: bool = True,
    object_cache_size: int = 128,
    encoder: Optional[Encoder] = None,
    transport: Optional[Transport] = None,
    eviction_policy: Union[EvictionPolicy, str] = EvictionPolicy.EVICT_AND_LOG,
    touch_resend_n: int = 100
) -> None:
    """
    Configure the Spores logging system (thread-safe).

    This should be called once at application startup.
    If called multiple times, the last call wins.

    Thread-safe: Can be called from multiple threads concurrently.

    Args:
        enabled: Whether spores logging is enabled (default: True)
        object_cache_size: Maximum objects in LRU cache (default: 128)
        encoder: Encoder instance (defaults to JSONEncoder)
        transport: Transport instance (required for logging to work)
        eviction_policy: Policy for cache eviction (default: evict_and_log)
        touch_resend_n: Resend object every N touches (default: 100, 0 to disable)

    Example:
        ```python
        from mycorrhizal.spores.transport import FileTransport

        spore.configure(
            transport=FileTransport("logs/ocel.jsonl"),
            object_cache_size=256,
            touch_resend_n=100,
        )
        ```
    """
    global _config, _object_cache, _config_initialized, _cache_metrics

    # Convert string to EvictionPolicy if needed
    if isinstance(eviction_policy, str):
        eviction_policy = EvictionPolicy(eviction_policy)

    with _config_lock:
        # Warn if already configured (but allow reconfiguration)
        if _config_initialized:
            logger.warning(
                "Spores already configured. Reconfiguring may cause issues. "
                "Ensure configure() is called only once at startup."
            )

        # Create new config (atomic under lock)
        _config = SporesConfig(
            enabled=enabled,
            object_cache_size=object_cache_size,
            encoder=encoder,
            transport=transport,
            eviction_policy=eviction_policy,
            touch_resend_n=touch_resend_n
        )

        # Reset metrics on reconfiguration
        _cache_metrics = CacheMetrics()

        # Create object cache with unified callback
        def needs_logged(object_id: str, obj: Object):
            """Log object when it needs to be logged (first sight, eviction, change, or Nth touch)."""
            global _cache_metrics

            try:
                loop = asyncio.get_event_loop()
                if loop.is_running():
                    # Async context: schedule task
                    asyncio.create_task(_send_log_record(LogRecord(object=obj)))
                else:
                    # No running loop, use sync path
                    _send_log_record_sync(LogRecord(object=obj))
            except RuntimeError:
                # No event loop at all, use sync path
                try:
                    _send_log_record_sync(LogRecord(object=obj))
                except Exception as e:
                    logger.error(f"Failed to log object {object_id}: {e}")

        _object_cache = ObjectLRUCache(
            maxsize=object_cache_size,
            needs_logged=needs_logged,
            touch_resend_n=touch_resend_n,
            metrics=_cache_metrics
        )

        _config_initialized = True

    logger.info(f"Spores configured: enabled={enabled}, cache_size={object_cache_size}, eviction_policy={eviction_policy.value}")

get_config

get_config() -> SporesConfig

Get the current spores configuration (thread-safe).

If no configuration exists, creates a default one. Thread-safe: Can be called from multiple threads concurrently.

Source code in src/mycorrhizal/spores/core.py
def get_config() -> SporesConfig:
    """
    Get the current spores configuration (thread-safe).

    If no configuration exists, creates a default one.
    Thread-safe: Can be called from multiple threads concurrently.
    """
    global _config, _config_initialized

    # Fast path: read without lock (GIL protects single read)
    if _config is not None:
        return _config

    # Slow path: needs initialization
    with _config_lock:
        # Double-check: another thread may have initialized while we waited
        if _config is None:
            # Initialize with defaults
            _config = SporesConfig()
            _config_initialized = True

    return _config

get_object_cache

get_object_cache() -> ObjectLRUCache[str, Object]

Get the object cache (thread-safe).

If no cache exists, initializes with default configuration. Thread-safe: Can be called from multiple threads concurrently.

Source code in src/mycorrhizal/spores/core.py
def get_object_cache() -> ObjectLRUCache[str, Object]:
    """
    Get the object cache (thread-safe).

    If no cache exists, initializes with default configuration.
    Thread-safe: Can be called from multiple threads concurrently.
    """
    global _object_cache

    # Fast path: read without lock (GIL protects single read)
    if _object_cache is not None:
        return _object_cache

    # Slow path: needs initialization
    with _config_lock:
        # Double-check: another thread may have initialized while we waited
        if _object_cache is None:
            # Trigger full initialization
            configure()  # Will acquire lock again, but that's OK (reentrant in same thread)

    return _object_cache

flush_object_cache

flush_object_cache() -> None

Flush all objects from the cache to the log.

This forces all cached objects to be written, even if they haven't been evicted yet. Use this before application shutdown to ensure all objects are logged.

Thread-safe: Can be called from multiple threads concurrently.

Example
from mycorrhizal.spores import configure, flush_object_cache
from mycorrhizal.spores.transport import FileTransport

configure(transport=FileTransport("logs/ocel.jsonl"))

# ... application logic ...

flush_object_cache()  # Ensure all objects logged
Source code in src/mycorrhizal/spores/core.py
def flush_object_cache() -> None:
    """
    Flush all objects from the cache to the log.

    This forces all cached objects to be written, even if they haven't
    been evicted yet. Use this before application shutdown to ensure
    all objects are logged.

    Thread-safe: Can be called from multiple threads concurrently.

    Example:
        ```python
        from mycorrhizal.spores import configure, flush_object_cache
        from mycorrhizal.spores.transport import FileTransport

        configure(transport=FileTransport("logs/ocel.jsonl"))

        # ... application logic ...

        flush_object_cache()  # Ensure all objects logged
        ```
    """
    cache = get_object_cache()
    config = get_config()

    if not config.enabled:
        return

    # Get all objects currently in cache
    all_objects = list(cache._cache.values())

    # Log each object
    flushed_count = 0
    for entry in all_objects:
        try:
            # Use sync path to ensure it's written
            _send_log_record_sync(LogRecord(object=entry.object))
            flushed_count += 1
        except Exception as e:
            logger.error(f"Failed to flush object {entry.object.id}: {e}")

    logger.info(f"Flushed {flushed_count} objects from cache")

get_cache_metrics

get_cache_metrics() -> CacheMetrics

Get cache eviction metrics.

Returns statistics about cache evictions and failures.

Returns:

Type Description
CacheMetrics

CacheMetrics with eviction statistics

Example
from mycorrhizal.spores import get_cache_metrics

metrics = get_cache_metrics()
print(f"Evictions: {metrics.evictions}")
print(f"Failures: {metrics.eviction_failures}")
Source code in src/mycorrhizal/spores/core.py
def get_cache_metrics() -> CacheMetrics:
    """
    Get cache eviction metrics.

    Returns statistics about cache evictions and failures.

    Returns:
        CacheMetrics with eviction statistics

    Example:
        ```python
        from mycorrhizal.spores import get_cache_metrics

        metrics = get_cache_metrics()
        print(f"Evictions: {metrics.evictions}")
        print(f"Failures: {metrics.eviction_failures}")
        ```
    """
    global _cache_metrics
    return _cache_metrics

get_spore_sync

get_spore_sync(name: str) -> SyncEventLogger

Get a synchronous spore logger.

Use this in synchronous code. The logger uses daemon threads for fire-and-forget logging - business logic never blocks.

Parameters:

Name Type Description Default
name str

Spore name (typically module or name)

required

Returns:

Type Description
SyncEventLogger

A SyncEventLogger instance

Example
from mycorrhizal.spores import configure, get_spore_sync
from mycorrhizal.spores.transport import SyncFileTransport

configure(transport=SyncFileTransport("logs/ocel.jsonl"))
spore = get_spore_sync(__name__)

@spore.log_event(event_type="OrderCreated", order_id="order.id")
def create_order(order: Order) -> Order:
    return order
Source code in src/mycorrhizal/spores/core.py
def get_spore_sync(name: str) -> SyncEventLogger:
    """
    Get a synchronous spore logger.

    Use this in synchronous code. The logger uses daemon threads for
    fire-and-forget logging - business logic never blocks.

    Args:
        name: Spore name (typically __module__ or __name__)

    Returns:
        A SyncEventLogger instance

    Example:
        ```python
        from mycorrhizal.spores import configure, get_spore_sync
        from mycorrhizal.spores.transport import SyncFileTransport

        configure(transport=SyncFileTransport("logs/ocel.jsonl"))
        spore = get_spore_sync(__name__)

        @spore.log_event(event_type="OrderCreated", order_id="order.id")
        def create_order(order: Order) -> Order:
            return order
        ```
    """
    return SyncEventLogger(name)

get_spore_async

get_spore_async(name: str) -> AsyncEventLogger

Get an asynchronous spore logger.

Use this in asynchronous code. The logger uses async I/O.

Parameters:

Name Type Description Default
name str

Spore name (typically module or name)

required

Returns:

Type Description
AsyncEventLogger

An AsyncEventLogger instance

Example
from mycorrhizal.spores import configure, get_spore_async
from mycorrhizal.spores.transport import AsyncFileTransport

configure(transport=AsyncFileTransport("logs/ocel.jsonl"))
spore = get_spore_async(__name__)

@spore.log_event(event_type="OrderCreated", order_id="order.id")
async def create_order(order: Order) -> Order:
    return order
Source code in src/mycorrhizal/spores/core.py
def get_spore_async(name: str) -> AsyncEventLogger:
    """
    Get an asynchronous spore logger.

    Use this in asynchronous code. The logger uses async I/O.

    Args:
        name: Spore name (typically __module__ or __name__)

    Returns:
        An AsyncEventLogger instance

    Example:
        ```python
        from mycorrhizal.spores import configure, get_spore_async
        from mycorrhizal.spores.transport import AsyncFileTransport

        configure(transport=AsyncFileTransport("logs/ocel.jsonl"))
        spore = get_spore_async(__name__)

        @spore.log_event(event_type="OrderCreated", order_id="order.id")
        async def create_order(order: Order) -> Order:
            return order
        ```
    """
    return AsyncEventLogger(name)

generate_event_id

generate_event_id() -> str

Generate a unique event ID.

Simple implementation using counter. Could be enhanced with UUIDs or more sophisticated schemes.

Returns:

Type Description
str

A unique event identifier

Source code in src/mycorrhizal/spores/models.py
def generate_event_id() -> str:
    """
    Generate a unique event ID.

    Simple implementation using counter. Could be enhanced with UUIDs
    or more sophisticated schemes.

    Returns:
        A unique event identifier
    """
    import uuid
    return f"evt-{uuid.uuid4()}"

generate_object_id

generate_object_id(obj: Any) -> str

Generate an object ID from an object.

Tries to use obj.id if available, otherwise generates a UUID.

Parameters:

Name Type Description Default
obj Any

The object to generate an ID for

required

Returns:

Type Description
str

A unique object identifier

Source code in src/mycorrhizal/spores/models.py
def generate_object_id(obj: Any) -> str:
    """
    Generate an object ID from an object.

    Tries to use obj.id if available, otherwise generates a UUID.

    Args:
        obj: The object to generate an ID for

    Returns:
        A unique object identifier
    """
    # Try to get id attribute
    if hasattr(obj, 'id'):
        return str(obj.id)

    # Try Pydantic model's field
    if hasattr(obj, '__dict__') and 'id' in obj.__dict__:
        return str(obj.__dict__['id'])

    # Generate UUID-based ID
    import uuid
    return f"obj-{uuid.uuid4()}"

attribute_value_from_python

attribute_value_from_python(value: Any) -> EventAttributeValue

Convert a Python value to an OCEL EventAttributeValue.

Parameters:

Name Type Description Default
value Any

The Python value to convert

required

Returns:

Type Description
EventAttributeValue

An EventAttributeValue with type information

Source code in src/mycorrhizal/spores/models.py
def attribute_value_from_python(value: Any) -> EventAttributeValue:
    """
    Convert a Python value to an OCEL EventAttributeValue.

    Args:
        value: The Python value to convert

    Returns:
        An EventAttributeValue with type information
    """
    # Infer type
    attr_type = infer_type(value)

    # Handle None
    if value is None:
        str_value = "null"

    # Handle enums (use name)
    elif isinstance(value, Enum):
        str_value = value.name

    # Handle datetime (ISO format)
    elif isinstance(value, datetime):
        str_value = value.isoformat()

    # Handle bool (lowercase true/false)
    elif isinstance(value, bool):
        str_value = "true" if value else "false"

    # Handle primitives
    elif isinstance(value, (str, int, float)):
        str_value = str(value)

    # Everything else: convert to string
    else:
        str_value = str(value)

    return EventAttributeValue(
        name="",  # Name set by caller
        value=str_value,
        type=attr_type
    )

object_attribute_from_python

object_attribute_from_python(value: Any, time: Optional[datetime] = None) -> ObjectAttributeValue

Convert a Python value to an OCEL ObjectAttributeValue.

Parameters:

Name Type Description Default
value Any

The Python value to convert

required
time Optional[datetime]

The timestamp for this attribute value (optional)

None

Returns:

Type Description
ObjectAttributeValue

An ObjectAttributeValue with type information

Source code in src/mycorrhizal/spores/models.py
def object_attribute_from_python(value: Any, time: Optional[datetime] = None) -> ObjectAttributeValue:
    """
    Convert a Python value to an OCEL ObjectAttributeValue.

    Args:
        value: The Python value to convert
        time: The timestamp for this attribute value (optional)

    Returns:
        An ObjectAttributeValue with type information
    """
    # Use same type inference logic
    event_attr = attribute_value_from_python(value)

    # Default to current time if not provided
    if time is None:
        time = datetime.now()

    return ObjectAttributeValue(
        name=event_attr.name,
        value=event_attr.value,
        type=event_attr.type,
        time=time
    )

Models

mycorrhizal.spores.models

Spores Data Models

OCEL-compatible data models for event and object logging.

Relationship dataclass

Relationship(object_id: str, qualifier: str)

A relationship from an event to an object.

Attributes:

Name Type Description
object_id str

The ID of the related object

qualifier str

How the object relates to the event (e.g., "input", "output", "actor")

EventAttributeValue dataclass

EventAttributeValue(name: str, value: str, type: str)

An attribute value for an event.

Event attributes don't have timestamps because the event time itself is sufficient.

Attributes:

Name Type Description
name str

The attribute name

value str

The string representation of the value

type str

The data type ("string", "integer", "float", "boolean", "timestamp")

ObjectAttributeValue dataclass

ObjectAttributeValue(name: str, value: str, type: str, time: Optional[datetime] = None)

An attribute value for an object.

OCEL attributes include timestamps for versioning.

Attributes:

Name Type Description
name str

The attribute name

value str

The string representation of the value

type str

The data type ("string", "integer", "float", "boolean", "timestamp")

time Optional[datetime]

When this attribute value was set

Event dataclass

Event(id: str, type: str, activity: Optional[str] = None, time: datetime = now(), attributes: Dict[str, EventAttributeValue] = dict(), relationships: Dict[str, Relationship] = dict())

An OCEL event representing something that happened.

Attributes:

Name Type Description
id str

Unique event identifier

type str

Event type/category (event class)

activity Optional[str]

Optional semantic activity label (defaults to type if null)

time datetime

When the event occurred

attributes Dict[str, EventAttributeValue]

Event attributes (name -> EventAttributeValue)

relationships Dict[str, Relationship]

Objects related to this event (qualifier -> Relationship)

Object dataclass

Object(id: str, type: str, attributes: Dict[str, ObjectAttributeValue] = dict(), relationships: Dict[str, Relationship] = dict())

An OCEL object representing an entity in the system.

Attributes:

Name Type Description
id str

Unique object identifier

type str

Object type/category

attributes Dict[str, ObjectAttributeValue]

Object attributes (name -> ObjectAttributeValue)

relationships Dict[str, Relationship]

Other objects related to this object (qualifier -> Relationship)

LogRecord dataclass

LogRecord(event: Optional[Event] = None, object: Optional[Object] = None)

A log record containing either an event or an object (not both).

This is the union type used for streaming to OCEL consumers.

Attributes:

Name Type Description
event Optional[Event]

An event record (if logging an event)

object Optional[Object]

An object record (if logging an object)

Note

Exactly one of event or object must be set.

__post_init__

__post_init__()

Validate that exactly one of event or object is set.

Source code in src/mycorrhizal/spores/models.py
def __post_init__(self):
    """Validate that exactly one of event or object is set."""
    if self.event is None and self.object is None:
        raise ValueError("LogRecord must have either event or object set")
    if self.event is not None and self.object is not None:
        raise ValueError("LogRecord cannot have both event and object set")

ObjectScope

Bases: Enum

Scope for object associations with events.

ObjectRef dataclass

ObjectRef(qualifier: str, scope: Union[ObjectScope, str] = EVENT)

Metadata annotation for object references in blackboard interfaces.

Used with typing.Annotated to mark fields as OCEL objects:

robot: Annotated[Robot, ObjectRef(qualifier="actor", scope=ObjectScope.GLOBAL)]

Attributes:

Name Type Description
qualifier str

How the object relates to events ("input", "output", "actor", etc.)

scope Union[ObjectScope, str]

When to include this object in events

__post_init__

__post_init__()

Convert string scope to ObjectScope enum if needed.

Source code in src/mycorrhizal/spores/models.py
def __post_init__(self):
    """Convert string scope to ObjectScope enum if needed."""
    if isinstance(self.scope, str):
        object.__setattr__(self, 'scope', ObjectScope(self.scope))

EventAttr dataclass

EventAttr(name: Optional[str] = None)

Metadata annotation for event attributes in blackboard interfaces.

Used with typing.Annotated to mark fields that should be logged as event attributes:

mission_id: Annotated[str, EventAttr]
battery_level: Annotated[int, EventAttr]

Attributes:

Name Type Description
name Optional[str]

Optional custom name for the attribute (defaults to field name)

SporesAttr dataclass

SporesAttr(name: Optional[str] = None)

Metadata annotation for object attributes in OCEL object logging.

Used with typing.Annotated to mark fields that should be logged when an object is logged:

class Order(BaseModel):
    id: str
    status: Annotated[str, SporesAttr]  # Logged
    total: Annotated[float, SporesAttr]  # Logged
    items: list[dict]  # Not marked - not logged

When using @spore.log_event(relationships={...}), fields marked with SporesAttr are automatically logged. If attributes list is provided, it overrides SporesAttr.

Attributes:

Name Type Description
name Optional[str]

Optional custom name for the attribute (defaults to field name)

infer_type

infer_type(value: Any) -> str

Infer the OCEL type for a Python value.

Parameters:

Name Type Description Default
value Any

The Python value to infer type for

required

Returns:

Type Description
str

One of: "string", "integer", "float", "boolean", "timestamp"

Source code in src/mycorrhizal/spores/models.py
def infer_type(value: Any) -> str:
    """
    Infer the OCEL type for a Python value.

    Args:
        value: The Python value to infer type for

    Returns:
        One of: "string", "integer", "float", "boolean", "timestamp"
    """
    if isinstance(value, bool):
        return "boolean"
    elif isinstance(value, int):
        return "integer"
    elif isinstance(value, float):
        return "float"
    elif isinstance(value, datetime):
        return "timestamp"
    else:
        return "string"

attribute_value_from_python

attribute_value_from_python(value: Any) -> EventAttributeValue

Convert a Python value to an OCEL EventAttributeValue.

Parameters:

Name Type Description Default
value Any

The Python value to convert

required

Returns:

Type Description
EventAttributeValue

An EventAttributeValue with type information

Source code in src/mycorrhizal/spores/models.py
def attribute_value_from_python(value: Any) -> EventAttributeValue:
    """
    Convert a Python value to an OCEL EventAttributeValue.

    Args:
        value: The Python value to convert

    Returns:
        An EventAttributeValue with type information
    """
    # Infer type
    attr_type = infer_type(value)

    # Handle None
    if value is None:
        str_value = "null"

    # Handle enums (use name)
    elif isinstance(value, Enum):
        str_value = value.name

    # Handle datetime (ISO format)
    elif isinstance(value, datetime):
        str_value = value.isoformat()

    # Handle bool (lowercase true/false)
    elif isinstance(value, bool):
        str_value = "true" if value else "false"

    # Handle primitives
    elif isinstance(value, (str, int, float)):
        str_value = str(value)

    # Everything else: convert to string
    else:
        str_value = str(value)

    return EventAttributeValue(
        name="",  # Name set by caller
        value=str_value,
        type=attr_type
    )

object_attribute_from_python

object_attribute_from_python(value: Any, time: Optional[datetime] = None) -> ObjectAttributeValue

Convert a Python value to an OCEL ObjectAttributeValue.

Parameters:

Name Type Description Default
value Any

The Python value to convert

required
time Optional[datetime]

The timestamp for this attribute value (optional)

None

Returns:

Type Description
ObjectAttributeValue

An ObjectAttributeValue with type information

Source code in src/mycorrhizal/spores/models.py
def object_attribute_from_python(value: Any, time: Optional[datetime] = None) -> ObjectAttributeValue:
    """
    Convert a Python value to an OCEL ObjectAttributeValue.

    Args:
        value: The Python value to convert
        time: The timestamp for this attribute value (optional)

    Returns:
        An ObjectAttributeValue with type information
    """
    # Use same type inference logic
    event_attr = attribute_value_from_python(value)

    # Default to current time if not provided
    if time is None:
        time = datetime.now()

    return ObjectAttributeValue(
        name=event_attr.name,
        value=event_attr.value,
        type=event_attr.type,
        time=time
    )

generate_event_id

generate_event_id() -> str

Generate a unique event ID.

Simple implementation using counter. Could be enhanced with UUIDs or more sophisticated schemes.

Returns:

Type Description
str

A unique event identifier

Source code in src/mycorrhizal/spores/models.py
def generate_event_id() -> str:
    """
    Generate a unique event ID.

    Simple implementation using counter. Could be enhanced with UUIDs
    or more sophisticated schemes.

    Returns:
        A unique event identifier
    """
    import uuid
    return f"evt-{uuid.uuid4()}"

generate_object_id

generate_object_id(obj: Any) -> str

Generate an object ID from an object.

Tries to use obj.id if available, otherwise generates a UUID.

Parameters:

Name Type Description Default
obj Any

The object to generate an ID for

required

Returns:

Type Description
str

A unique object identifier

Source code in src/mycorrhizal/spores/models.py
def generate_object_id(obj: Any) -> str:
    """
    Generate an object ID from an object.

    Tries to use obj.id if available, otherwise generates a UUID.

    Args:
        obj: The object to generate an ID for

    Returns:
        A unique object identifier
    """
    # Try to get id attribute
    if hasattr(obj, 'id'):
        return str(obj.id)

    # Try Pydantic model's field
    if hasattr(obj, '__dict__') and 'id' in obj.__dict__:
        return str(obj.__dict__['id'])

    # Generate UUID-based ID
    import uuid
    return f"obj-{uuid.uuid4()}"

Encoder

mycorrhizal.spores.encoder

Spores Encoders

Encoders for serializing OCEL LogRecords to various formats.

Encoder

Bases: Protocol

Protocol for encoders that convert LogRecords to bytes.

Encoders serialize OCEL LogRecords for transport to OCEL consumers.

encode

encode(record: LogRecord) -> bytes

Encode a LogRecord to bytes.

Parameters:

Name Type Description Default
record LogRecord

The LogRecord to encode

required

Returns:

Type Description
bytes

Serialized bytes representation

Source code in src/mycorrhizal/spores/encoder/base.py
def encode(self, record: LogRecord) -> bytes:
    """
    Encode a LogRecord to bytes.

    Args:
        record: The LogRecord to encode

    Returns:
        Serialized bytes representation
    """
    ...

content_type

content_type() -> str

Return the MIME content type for this encoding.

Returns:

Type Description
str

Content type string (e.g., "application/json")

Source code in src/mycorrhizal/spores/encoder/base.py
def content_type(self) -> str:
    """
    Return the MIME content type for this encoding.

    Returns:
        Content type string (e.g., "application/json")
    """
    ...

JSONEncoder

JSONEncoder()

Bases: Encoder

JSON encoder for OCEL LogRecords.

Produces OCEL-compatible JSON with Unix float64 timestamps.

Example output

{ "event": { "id": "evt-123", "type": "process_item", "activity": null, "time": 1705689296.123456, "attributes": [ {"name": "priority", "value": "high", "type": "string"} ], "relationships": [ {"objectId": "obj-456", "qualifier": "input"} ] }, "object": null }

Or for objects

{ "event": null, "object": { "id": "obj-456", "type": "WorkItem", "attributes": [ {"name": "status", "value": "pending", "type": "string", "time": 1705689296.123456} ], "relationships": [] } }

Initialize JSONEncoder.

Timestamps are always encoded as Unix float64 timestamps.

Source code in src/mycorrhizal/spores/encoder/json.py
def __init__(self):
    """
    Initialize JSONEncoder.

    Timestamps are always encoded as Unix float64 timestamps.
    """
    pass

encode

encode(record: LogRecord) -> bytes

Encode a LogRecord to JSON bytes.

Parameters:

Name Type Description Default
record LogRecord

The LogRecord to encode

required

Returns:

Type Description
bytes

JSON-encoded bytes

Source code in src/mycorrhizal/spores/encoder/json.py
def encode(self, record: LogRecord) -> bytes:
    """
    Encode a LogRecord to JSON bytes.

    Args:
        record: The LogRecord to encode

    Returns:
        JSON-encoded bytes
    """
    # Convert LogRecord to dict with union structure (both fields, one null)
    if record.event is not None:
        data = {"event": self._event_to_dict(record.event), "object": None}
    else:
        data = {"event": None, "object": self._object_to_dict(record.object)}

    # Encode to JSON
    return json.dumps(data, separators=(',', ':')).encode('utf-8')

content_type

content_type() -> str

Return the JSON content type.

Source code in src/mycorrhizal/spores/encoder/json.py
def content_type(self) -> str:
    """Return the JSON content type."""
    return "application/json"

_event_to_dict

_event_to_dict(event: Event) -> Dict[str, Any]

Convert an Event to dict for JSON serialization.

Source code in src/mycorrhizal/spores/encoder/json.py
def _event_to_dict(self, event: Event) -> Dict[str, Any]:
    """Convert an Event to dict for JSON serialization."""
    return {
        "id": event.id,
        "type": event.type,
        "activity": event.activity,
        "time": self._format_datetime(event.time),
        "attributes": [
            self._event_attr_to_dict(name, attr)
            for name, attr in event.attributes.items()
        ],
        "relationships": [
            self._relationship_to_dict(qualifier, rel)
            for qualifier, rel in event.relationships.items()
        ]
    }

_object_to_dict

_object_to_dict(obj: Object) -> Dict[str, Any]

Convert an Object to dict for JSON serialization.

Source code in src/mycorrhizal/spores/encoder/json.py
def _object_to_dict(self, obj: Object) -> Dict[str, Any]:
    """Convert an Object to dict for JSON serialization."""
    return {
        "id": obj.id,
        "type": obj.type,
        "attributes": [
            self._object_attr_to_dict(name, attr)
            for name, attr in obj.attributes.items()
        ],
        "relationships": [
            self._relationship_to_dict(qualifier, rel)
            for qualifier, rel in obj.relationships.items()
        ]
    }

_event_attr_to_dict

_event_attr_to_dict(name: str, attr: EventAttributeValue) -> Dict[str, Any]

Convert EventAttributeValue to dict.

Source code in src/mycorrhizal/spores/encoder/json.py
def _event_attr_to_dict(self, name: str, attr: EventAttributeValue) -> Dict[str, Any]:
    """Convert EventAttributeValue to dict."""
    return {
        "name": attr.name or name,
        "value": attr.value,
        "type": attr.type
    }

_object_attr_to_dict

_object_attr_to_dict(name: str, attr: ObjectAttributeValue) -> Dict[str, Any]

Convert ObjectAttributeValue to dict.

Source code in src/mycorrhizal/spores/encoder/json.py
def _object_attr_to_dict(self, name: str, attr: ObjectAttributeValue) -> Dict[str, Any]:
    """Convert ObjectAttributeValue to dict."""
    result = {
        "name": attr.name or name,
        "value": attr.value,
        "type": attr.type
    }
    if attr.time is not None:
        result["time"] = self._format_datetime(attr.time)
    else:
        result["time"] = None
    return result

_relationship_to_dict

_relationship_to_dict(qualifier: str, rel: Relationship) -> Dict[str, str]

Convert Relationship to dict.

Source code in src/mycorrhizal/spores/encoder/json.py
def _relationship_to_dict(self, qualifier: str, rel: Relationship) -> Dict[str, str]:
    """Convert Relationship to dict."""
    return {
        "objectId": rel.object_id,
        "qualifier": qualifier
    }

_format_datetime

_format_datetime(dt: datetime) -> float

Format datetime as Unix float64 timestamp.

Parameters:

Name Type Description Default
dt datetime

The datetime to format

required

Returns:

Type Description
float

Unix float64 timestamp (seconds since epoch)

Source code in src/mycorrhizal/spores/encoder/json.py
def _format_datetime(self, dt: datetime) -> float:
    """
    Format datetime as Unix float64 timestamp.

    Args:
        dt: The datetime to format

    Returns:
        Unix float64 timestamp (seconds since epoch)
    """
    return dt.timestamp()

Transport

mycorrhizal.spores.transport

Spores Transports

Transports for sending OCEL LogRecords to various destinations.

SyncTransport

Bases: Protocol

Protocol for synchronous transports (blocking I/O).

Sync transports use blocking send() - they write data immediately and block until complete. Use these with get_spore_sync().

Examples: file writes, blocking HTTP requests, etc.

send

send(data: bytes, content_type: str) -> None

Send encoded data to the transport destination (blocking).

Parameters:

Name Type Description Default
data bytes

The encoded log record data

required
content_type str

MIME content type (e.g., "application/json")

required
Source code in src/mycorrhizal/spores/transport/base.py
def send(self, data: bytes, content_type: str) -> None:
    """
    Send encoded data to the transport destination (blocking).

    Args:
        data: The encoded log record data
        content_type: MIME content type (e.g., "application/json")
    """
    ...

close

close() -> None

Close the transport and release resources.

Source code in src/mycorrhizal/spores/transport/base.py
def close(self) -> None:
    """Close the transport and release resources."""
    ...

AsyncTransport

Bases: Protocol

Protocol for asynchronous transports (async I/O).

Async transports use non-blocking async send() - they await completion. Use these with get_spore_async().

Examples: async file writes, async HTTP requests, message queues, etc.

send async

send(data: bytes, content_type: str) -> None

Send encoded data to the transport destination (async).

Parameters:

Name Type Description Default
data bytes

The encoded log record data

required
content_type str

MIME content type (e.g., "application/json")

required
Source code in src/mycorrhizal/spores/transport/base.py
async def send(self, data: bytes, content_type: str) -> None:
    """
    Send encoded data to the transport destination (async).

    Args:
        data: The encoded log record data
        content_type: MIME content type (e.g., "application/json")
    """
    ...

close async

close() -> None

Close the transport and release resources.

Source code in src/mycorrhizal/spores/transport/base.py
async def close(self) -> None:
    """Close the transport and release resources."""
    ...

SyncFileTransport

SyncFileTransport(filepath: str | Path)

Synchronous file transport using blocking I/O.

Writes log records to a file in JSONL format (one JSON object per line). Thread-safe with a lock for concurrent writes.

Initialize the file transport.

Parameters:

Name Type Description Default
filepath str | Path

Path to the log file. Will be created if it doesn't exist.

required
Source code in src/mycorrhizal/spores/transport/file.py
def __init__(self, filepath: str | Path):
    """
    Initialize the file transport.

    Args:
        filepath: Path to the log file. Will be created if it doesn't exist.
    """
    self.filepath = Path(filepath)
    self.filepath.parent.mkdir(parents=True, exist_ok=True)
    self._file = open(self.filepath, 'a', encoding='utf-8')
    self._lock = threading.Lock()

send

send(data: bytes, content_type: str) -> None

Write data to file (blocking call).

Parameters:

Name Type Description Default
data bytes

Encoded log record data

required
content_type str

MIME content type (e.g., "application/json")

required
Source code in src/mycorrhizal/spores/transport/file.py
def send(self, data: bytes, content_type: str) -> None:
    """
    Write data to file (blocking call).

    Args:
        data: Encoded log record data
        content_type: MIME content type (e.g., "application/json")
    """
    with self._lock:
        self._file.write(data.decode('utf-8') + '\n')
        self._file.flush()

close

close() -> None

Close the file handle.

Source code in src/mycorrhizal/spores/transport/file.py
def close(self) -> None:
    """Close the file handle."""
    with self._lock:
        if self._file and not self._file.closed:
            self._file.close()
            self._file = None

__enter__

__enter__()

Context manager support.

Source code in src/mycorrhizal/spores/transport/file.py
def __enter__(self):
    """Context manager support."""
    return self

__exit__

__exit__(exc_type, exc_val, exc_tb)

Context manager support.

Source code in src/mycorrhizal/spores/transport/file.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager support."""
    self.close()

AsyncFileTransport

AsyncFileTransport(filepath: str | Path)

Asynchronous file transport using async I/O.

Writes log records to a file in JSONL format (one JSON object per line). Uses asyncio.to_thread() for non-blocking async file I/O.

This is the async version of SyncFileTransport - use it with get_spore_async().

Initialize the async file transport.

Parameters:

Name Type Description Default
filepath str | Path

Path to the log file. Will be created if it doesn't exist.

required
Source code in src/mycorrhizal/spores/transport/file.py
def __init__(self, filepath: str | Path):
    """
    Initialize the async file transport.

    Args:
        filepath: Path to the log file. Will be created if it doesn't exist.
    """
    self.filepath = Path(filepath)
    self.filepath.parent.mkdir(parents=True, exist_ok=True)
    # Open file in append mode, will be managed by asyncio.to_thread
    self._file = open(self.filepath, 'a', encoding='utf-8')
    self._lock = asyncio.Lock()

send async

send(data: bytes, content_type: str) -> None

Write data to file asynchronously.

Uses asyncio.to_thread() to run blocking I/O in a thread pool, avoiding the need for external dependencies like aiofiles.

Parameters:

Name Type Description Default
data bytes

Encoded log record data

required
content_type str

MIME content type (e.g., "application/json")

required
Source code in src/mycorrhizal/spores/transport/file.py
async def send(self, data: bytes, content_type: str) -> None:
    """
    Write data to file asynchronously.

    Uses asyncio.to_thread() to run blocking I/O in a thread pool,
    avoiding the need for external dependencies like aiofiles.

    Args:
        data: Encoded log record data
        content_type: MIME content type (e.g., "application/json")
    """
    async with self._lock:
        # Run blocking file I/O in thread pool
        await asyncio.to_thread(self._write_sync, data)

_write_sync

_write_sync(data: bytes) -> None

Synchronous write helper - runs in thread pool.

Parameters:

Name Type Description Default
data bytes

Encoded log record data

required
Source code in src/mycorrhizal/spores/transport/file.py
def _write_sync(self, data: bytes) -> None:
    """
    Synchronous write helper - runs in thread pool.

    Args:
        data: Encoded log record data
    """
    self._file.write(data.decode('utf-8') + '\n')
    self._file.flush()

close async

close() -> None

Close the file handle asynchronously.

Source code in src/mycorrhizal/spores/transport/file.py
async def close(self) -> None:
    """Close the file handle asynchronously."""
    async with self._lock:
        if self._file and not self._file.closed:
            await asyncio.to_thread(self._file.close)
            self._file = None

__aenter__ async

__aenter__()

Async context manager support.

Source code in src/mycorrhizal/spores/transport/file.py
async def __aenter__(self):
    """Async context manager support."""
    return self

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb)

Async context manager support.

Source code in src/mycorrhizal/spores/transport/file.py
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager support."""
    await self.close()

DSL Adapters

mycorrhizal.spores.dsl

Spores DSL Adapters

DSL-specific adapters for integrating spores logging with: - Hypha (Petri nets) - Rhizomorph (Behavior trees) - Septum (State machines)

Usage
from mycorrhizal.spores.dsl import HyphaAdapter, RhizomorphAdapter, SeptumAdapter

# For Hypha (Petri nets)
hypha_adapter = HyphaAdapter()

@pn.transition()
@hypha_adapter.log_transition(event_type="process")
async def process(consumed, bb, timebase):
    yield {output: consumed[0]}

# For Rhizomorph (Behavior trees)
rhizo_adapter = RhizomorphAdapter()

@bt.action
@rhizo_adapter.log_node(event_type="check")
async def check(bb: Blackboard) -> Status:
    return Status.SUCCESS

# For Septum (State machines)
septum_adapter = SeptumAdapter()

@septum.on_state
@septum_adapter.log_state(event_type="state_execute")
async def on_state(ctx: SharedContext):
    return Events.DONE

HyphaAdapter

HyphaAdapter()

Adapter for Hypha Petri net logging.

Provides decorators and helpers for logging: - Transition execution with consumed/produced tokens - Place token arrivals/departures - Token object lifecycle tracking

Usage
from mycorrhizal.spores.dsl import HyphaAdapter

adapter = HyphaAdapter()

@pn.net
def MyNet(builder):
    @builder.transition()
    @adapter.log_transition(event_type="process_item")
    async def process(consumed, bb, timebase):
        # Event automatically logged with:
        # - token_count attribute
        # - relationships to consumed tokens
        yield {output: consumed[0]}

    @builder.place()
    @adapter.log_place(event_type="item_arrived")
    def input_place(bb):
        return None

Initialize the Hypha adapter.

Source code in src/mycorrhizal/spores/dsl/hypha.py
def __init__(self):
    """Initialize the Hypha adapter."""
    self._enabled = True

enable

enable()

Enable logging for this adapter.

Source code in src/mycorrhizal/spores/dsl/hypha.py
def enable(self):
    """Enable logging for this adapter."""
    self._enabled = True

disable

disable()

Disable logging for this adapter.

Source code in src/mycorrhizal/spores/dsl/hypha.py
def disable(self):
    """Disable logging for this adapter."""
    self._enabled = False

log_transition

log_transition(event_type: str, attributes: Optional[Union[Dict[str, Any], List[str]]] = None, log_inputs: bool = True, log_outputs: bool = False) -> Callable

Decorator to log Hypha transition execution.

Automatically captures: - Token count from consumed tokens - Relationships to consumed/produced tokens - Attributes from blackboard (if specified) - Objects from blackboard with ObjectRef metadata

Parameters:

Name Type Description Default
event_type str

Type of event to log

required
attributes Optional[Union[Dict[str, Any], List[str]]]

Static attributes or param names to extract

None
log_inputs bool

Whether to log input tokens as related objects

True
log_outputs bool

Whether to log output tokens as related objects

False

Returns:

Type Description
Callable

Decorator function

Source code in src/mycorrhizal/spores/dsl/hypha.py
def log_transition(
    self,
    event_type: str,
    attributes: Optional[Union[Dict[str, Any], List[str]]] = None,
    log_inputs: bool = True,
    log_outputs: bool = False,
) -> Callable:
    """
    Decorator to log Hypha transition execution.

    Automatically captures:
    - Token count from consumed tokens
    - Relationships to consumed/produced tokens
    - Attributes from blackboard (if specified)
    - Objects from blackboard with ObjectRef metadata

    Args:
        event_type: Type of event to log
        attributes: Static attributes or param names to extract
        log_inputs: Whether to log input tokens as related objects
        log_outputs: Whether to log output tokens as related objects

    Returns:
        Decorator function
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def async_wrapper(consumed: List[Any], bb: Any, timebase: Any, state: Any = None):
            # Call original transition
            if state is not None:
                result = func(consumed, bb, timebase, state)
            else:
                result = func(consumed, bb, timebase)

            # Handle async generator
            if inspect.isasyncgen(result):
                # Log before processing
                await _log_transition_event(
                    func, consumed, bb, timebase, event_type, attributes, log_inputs, None
                )

                # Collect outputs
                outputs = []
                async for yielded in result:
                    outputs.append(yielded)
                    yield yielded

                # Log outputs if requested
                if log_outputs:
                    await _log_outputs(func, outputs, bb, timebase, event_type)
            else:
                # Handle coroutine
                result = await result

                # Log event
                await _log_transition_event(
                    func, consumed, bb, timebase, event_type, attributes, log_inputs, result
                )

                # Yield the result if it's not None
                if result is not None:
                    yield result

        @functools.wraps(func)
        def sync_wrapper(consumed: List[Any], bb: Any, timebase: Any, state: Any = None):
            # For sync transitions, log asynchronously
            if state is not None:
                result = func(consumed, bb, timebase, state)
            else:
                result = func(consumed, bb, timebase)

            # Schedule logging
            asyncio.create_task(_log_transition_event(
                func, consumed, bb, timebase, event_type, attributes, log_inputs, result
            ))

            return result

        # Return appropriate wrapper
        if asyncio.iscoroutinefunction(func):
            return async_wrapper  # type: ignore
        else:
            return sync_wrapper  # type: ignore

    return decorator

log_place

log_place(event_type: str = 'token_arrived') -> Callable

Decorator to log place token arrivals.

Note: This requires instrumenting the place handler or using a custom place runtime wrapper. For most cases, transition logging provides sufficient coverage.

Parameters:

Name Type Description Default
event_type str

Type of event to log

'token_arrived'

Returns:

Type Description
Callable

Decorator function

Source code in src/mycorrhizal/spores/dsl/hypha.py
def log_place(self, event_type: str = "token_arrived") -> Callable:
    """
    Decorator to log place token arrivals.

    Note: This requires instrumenting the place handler or using
    a custom place runtime wrapper. For most cases, transition
    logging provides sufficient coverage.

    Args:
        event_type: Type of event to log

    Returns:
        Decorator function
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def async_wrapper(bb: Any, timebase: Any):
            result = await func(bb, timebase)

            # Log token arrival
            config = get_config()
            if config.enabled:
                await _log_place_event(func, bb, timebase, event_type)

            return result

        @functools.wraps(func)
        def sync_wrapper(bb: Any, timebase: Any):
            result = func(bb, timebase)

            # Schedule logging
            config = get_config()
            if config.enabled:
                asyncio.create_task(_log_place_event(func, bb, timebase, event_type))

            return result

        if asyncio.iscoroutinefunction(func):
            return async_wrapper  # type: ignore
        else:
            return sync_wrapper  # type: ignore

    return decorator

RhizomorphAdapter

RhizomorphAdapter()

Adapter for Rhizomorph behavior tree logging.

Provides decorators and helpers for logging: - Node tick execution with status results - Tree-level events - Blackboard object lifecycle tracking

Usage
from mycorrhizal.spores.dsl import RhizomorphAdapter

adapter = RhizomorphAdapter()

@bt.tree
def MyTree():
    @bt.action
    @adapter.log_node(event_type="check_threat")
    async def check_threat(bb: MissionContext) -> Status:
        # Event automatically logged with:
        # - status result attribute
        # - attributes from bb (with EventAttr annotations)
        # - relationships to objects (with ObjectRef annotations)
        return Status.SUCCESS

Initialize the Rhizomorph adapter.

Source code in src/mycorrhizal/spores/dsl/rhizomorph.py
def __init__(self):
    """Initialize the Rhizomorph adapter."""
    self._enabled = True

enable

enable()

Enable logging for this adapter.

Source code in src/mycorrhizal/spores/dsl/rhizomorph.py
def enable(self):
    """Enable logging for this adapter."""
    self._enabled = True

disable

disable()

Disable logging for this adapter.

Source code in src/mycorrhizal/spores/dsl/rhizomorph.py
def disable(self):
    """Disable logging for this adapter."""
    self._enabled = False

log_node

log_node(event_type: str, attributes: Optional[Union[Dict[str, Any], List[str]]] = None, log_status: bool = True) -> Callable

Decorator to log Rhizomorph node execution.

Automatically captures: - Node execution status (SUCCESS, FAILURE, RUNNING, etc.) - Attributes from blackboard (with EventAttr annotations) - Objects from blackboard (with ObjectRef annotations) - Node name

Parameters:

Name Type Description Default
event_type str

Type of event to log

required
attributes Optional[Union[Dict[str, Any], List[str]]]

Static attributes or param names to extract

None
log_status bool

Whether to include status in event attributes

True

Returns:

Type Description
Callable

Decorator function

Source code in src/mycorrhizal/spores/dsl/rhizomorph.py
def log_node(
    self,
    event_type: str,
    attributes: Optional[Union[Dict[str, Any], List[str]]] = None,
    log_status: bool = True,
) -> Callable:
    """
    Decorator to log Rhizomorph node execution.

    Automatically captures:
    - Node execution status (SUCCESS, FAILURE, RUNNING, etc.)
    - Attributes from blackboard (with EventAttr annotations)
    - Objects from blackboard (with ObjectRef annotations)
    - Node name

    Args:
        event_type: Type of event to log
        attributes: Static attributes or param names to extract
        log_status: Whether to include status in event attributes

    Returns:
        Decorator function
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def async_wrapper(bb: Any, tb: Any = None):
            # Call original node function
            if _supports_timebase(func):
                result = await func(bb=bb, tb=tb)
            else:
                result = await func(bb=bb)

            # Log event
            await _log_node_event(
                func, bb, tb, event_type, attributes, log_status, result
            )

            return result

        @functools.wraps(func)
        def sync_wrapper(bb: Any, tb: Any = None):
            # Call original node function
            if _supports_timebase(func):
                result = func(bb=bb, tb=tb)
            else:
                result = func(bb=bb)

            # Schedule logging
            asyncio.create_task(_log_node_event(
                func, bb, tb, event_type, attributes, log_status, result
            ))

            return result

        # Return appropriate wrapper
        if asyncio.iscoroutinefunction(func):
            return async_wrapper  # type: ignore
        else:
            return sync_wrapper  # type: ignore

    return decorator

SeptumAdapter

SeptumAdapter()

Adapter for Septum state machine logging.

Provides decorators and helpers for logging: - State entry/exit/timeout events - State execution events - Transition events - Message/context object lifecycle tracking

Usage
from mycorrhizal.spores.dsl import SeptumAdapter

adapter = SeptumAdapter()

@septum.state()
def MyState():
    @septum.on_state
    @adapter.log_state(event_type="state_execute")
    async def on_state(ctx: SharedContext):
        # Event automatically logged with:
        # - state_name attribute
        # - attributes from ctx.common (with EventAttr annotations)
        # - relationships to objects (with ObjectRef annotations)
        return Events.DONE

    @septum.on_enter
    @adapter.log_state_lifecycle(event_type="state_enter")
    async def on_enter(ctx: SharedContext):
        pass

Initialize the Septum adapter.

Source code in src/mycorrhizal/spores/dsl/septum.py
def __init__(self):
    """Initialize the Septum adapter."""
    self._enabled = True

enable

enable()

Enable logging for this adapter.

Source code in src/mycorrhizal/spores/dsl/septum.py
def enable(self):
    """Enable logging for this adapter."""
    self._enabled = True

disable

disable()

Disable logging for this adapter.

Source code in src/mycorrhizal/spores/dsl/septum.py
def disable(self):
    """Disable logging for this adapter."""
    self._enabled = False

log_state

log_state(event_type: str, attributes: Optional[Dict[str, Any]] = None, log_state_name: bool = True, log_transition: bool = True) -> Callable

Decorator to log Septum state execution.

Automatically captures: - State name - Transition result (if log_transition=True) - Attributes from ctx.common (with EventAttr annotations) - Objects from ctx.common (with ObjectRef annotations) - Message information (if present in ctx)

Parameters:

Name Type Description Default
event_type str

Type of event to log

required
attributes Optional[Dict[str, Any]]

Static attributes to include

None
log_state_name bool

Whether to include state name in attributes

True
log_transition bool

Whether to include transition result

True

Returns:

Type Description
Callable

Decorator function

Source code in src/mycorrhizal/spores/dsl/septum.py
def log_state(
    self,
    event_type: str,
    attributes: Optional[Dict[str, Any]] = None,
    log_state_name: bool = True,
    log_transition: bool = True,
) -> Callable:
    """
    Decorator to log Septum state execution.

    Automatically captures:
    - State name
    - Transition result (if log_transition=True)
    - Attributes from ctx.common (with EventAttr annotations)
    - Objects from ctx.common (with ObjectRef annotations)
    - Message information (if present in ctx)

    Args:
        event_type: Type of event to log
        attributes: Static attributes to include
        log_state_name: Whether to include state name in attributes
        log_transition: Whether to include transition result

    Returns:
        Decorator function
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def async_wrapper(ctx: SharedContext):
            # Call original state handler
            result = await func(ctx)

            # Log event
            await _log_state_event(
                func, ctx, event_type, attributes, log_state_name, log_transition, result
            )

            return result

        @functools.wraps(func)
        def sync_wrapper(ctx: SharedContext):
            # Call original state handler
            result = func(ctx)

            # Schedule logging
            asyncio.create_task(_log_state_event(
                func, ctx, event_type, attributes, log_state_name, log_transition, result
            ))

            return result

        if asyncio.iscoroutinefunction(func):
            return async_wrapper  # type: ignore
        else:
            return sync_wrapper  # type: ignore

    return decorator

log_state_lifecycle

log_state_lifecycle(event_type: str, attributes: Optional[Dict[str, Any]] = None) -> Callable

Decorator to log state lifecycle events (on_enter, on_leave, on_timeout).

Use this for logging lifecycle hooks rather than main state execution.

Parameters:

Name Type Description Default
event_type str

Type of event to log (e.g., "state_enter", "state_leave")

required
attributes Optional[Dict[str, Any]]

Static attributes to include

None

Returns:

Type Description
Callable

Decorator function

Source code in src/mycorrhizal/spores/dsl/septum.py
def log_state_lifecycle(
    self,
    event_type: str,
    attributes: Optional[Dict[str, Any]] = None,
) -> Callable:
    """
    Decorator to log state lifecycle events (on_enter, on_leave, on_timeout).

    Use this for logging lifecycle hooks rather than main state execution.

    Args:
        event_type: Type of event to log (e.g., "state_enter", "state_leave")
        attributes: Static attributes to include

    Returns:
        Decorator function
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def async_wrapper(ctx: SharedContext):
            # Log before execution
            config = get_config()
            if config.enabled:
                await _log_lifecycle_event(func, ctx, event_type, attributes, "enter")

            # Call original lifecycle handler
            result = await func(ctx)

            return result

        @functools.wraps(func)
        def sync_wrapper(ctx: SharedContext):
            # Schedule logging
            config = get_config()
            if config.enabled:
                asyncio.create_task(_log_lifecycle_event(
                    func, ctx, event_type, attributes, "enter"
                ))

            return func(ctx)

        if asyncio.iscoroutinefunction(func):
            return async_wrapper  # type: ignore
        else:
            return sync_wrapper  # type: ignore

    return decorator