Skip to content

Hypha Petri Nets API Reference

Core Module

mycorrhizal.hypha.core

Hypha - Decorator-based Colored Petri Net Framework

A DSL for defining and executing colored Petri nets with support for asyncio, multi-set places, guards, and hierarchical subnet composition.

Usage

from mycorrhizal.hypha.core import pn, Runner

@pn.net def ProcessingNet(builder): # Define places (all are multi-sets/bags) pending = builder.place("pending") processed = builder.place("processed")

# Define transitions
@builder.transition()
async def process(consumed, bb, timebase):
    for token in consumed:
        result = await handle(token)
        yield {processed: result}

# Wire the net
builder.arc(pending, process).arc(processed)

Run the Petri net

runner = Runner(ProcessingNet, bb=blackboard) await runner.start(timebase)

Key Classes

NetBuilder - Builder for constructing Petri nets PlaceSpec - Specification for a place TransitionSpec - Specification for a transition Runner - Runtime for executing Petri nets

All places are multi-sets (bags) with unordered token storage that supports multiplicity.

Transitions

Transitions consume tokens from input places and produce tokens for output places. They are async functions that yield dictionaries mapping place references to tokens.

Guards

Guards are conditions that must be satisfied for a transition to fire. They can check token values, blackboard state, or external conditions.

PlaceSpec dataclass

PlaceSpec(name: str, handler: Optional[Callable] = None, state_factory: Optional[Callable] = None, is_io_input: bool = False, is_io_output: bool = False)

Specification for a place in the Petri net.

All places are multi-sets (bags) that support token storage with multiplicity. Tokens can be added and removed from places.

GuardSpec dataclass

GuardSpec(func: Callable)

Specification for a guard function

TransitionSpec dataclass

TransitionSpec(name: str, handler: Callable, guard: Optional[GuardSpec] = None, state_factory: Optional[Callable] = None, delay: float = 0.0)

Specification for a transition in the Petri net

ArcSpec dataclass

ArcSpec(source: PlaceRef | TransitionRef, target: PlaceRef | TransitionRef, weight: int = 1, name: Optional[str] = None)

Specification for an arc connecting places and transitions

NetSpec dataclass

NetSpec(name: str, parent: Optional[NetSpec] = None, places: Dict[str, PlaceSpec] = dict(), transitions: Dict[str, TransitionSpec] = dict(), arcs: List[ArcSpec] = list(), subnets: Dict[str, NetSpec] = dict())

Complete specification of a Petri net

get_fqn

get_fqn(local_name: Optional[str] = None) -> str

Compute fully qualified name by walking parent chain

Source code in src/mycorrhizal/hypha/core/specs.py
def get_fqn(self, local_name: Optional[str] = None) -> str:
    """Compute fully qualified name by walking parent chain"""
    return '.'.join(self.get_parts(local_name))

get_parts

get_parts(local_name: Optional[str] = None) -> List[str]

Return the fully qualified name as a list of path components.

If local_name is provided, returns the parts for that member under this spec (e.g. ['Parent', 'Spec', 'local']). Otherwise returns the parts for this spec itself.

Source code in src/mycorrhizal/hypha/core/specs.py
def get_parts(self, local_name: Optional[str] = None) -> List[str]:
    """Return the fully qualified name as a list of path components.

    If local_name is provided, returns the parts for that member
    under this spec (e.g. ['Parent', 'Spec', 'local']). Otherwise
    returns the parts for this spec itself.
    """
    parts: List[str] = []
    if self.parent:
        parts.extend(self.parent.get_parts())

    parts.append(self.name)

    if local_name is not None:
        parts.append(local_name)

    return parts

to_mermaid

to_mermaid() -> str

Generate Mermaid diagram of the net

Source code in src/mycorrhizal/hypha/core/specs.py
def to_mermaid(self) -> str:
    """Generate Mermaid diagram of the net"""
    lines = ["graph TD"]

    def add_subnet(spec: NetSpec, indent: str = "    "):
        if spec.subnets:
            for subnet_name, subnet_spec in spec.subnets.items():
                subnet_fqn = subnet_spec.get_fqn()
                lines.append(f"{indent}subgraph {subnet_fqn}")
                add_subnet(subnet_spec, indent + "    ")
                lines.append(f"{indent}end")

        for place_name, place_spec in spec.places.items():
            place_fqn = spec.get_fqn(place_name)
            shape = "((" 
            close = "))" 
            prefix = "[INPUT]</br>" if place_spec.is_io_input else "[OUTPUT]</br>" if place_spec.is_io_output else "" 
            lines.append(f"{indent}{place_fqn}{shape}\"{prefix}{place_fqn}\"{close}")

        for trans_name in spec.transitions.keys():
            trans_fqn = spec.get_fqn(trans_name)
            lines.append(f"{indent}{trans_fqn}[{trans_fqn}]")

        for arc in spec.arcs:
            source_fqn = arc.source.get_fqn()
            target_fqn = arc.target.get_fqn()
            weight_label = f"|weight={arc.weight}|" if arc.weight > 1 else ""
            lines.append(f"{indent}{source_fqn} -->{weight_label} {target_fqn}")

    add_subnet(self)

    return "\n".join(lines)

PlaceRef

PlaceRef(local_name: str, parent_spec: NetSpec)

Reference to a place for use in arc definitions

Source code in src/mycorrhizal/hypha/core/specs.py
def __init__(self, local_name: str, parent_spec: NetSpec):
    self.local_name = local_name
    self.parent_spec = parent_spec

get_fqn

get_fqn() -> str

Compute FQN dynamically

Source code in src/mycorrhizal/hypha/core/specs.py
def get_fqn(self) -> str:
    """Compute FQN dynamically"""
    return '.'.join(self.get_parts())

get_parts

get_parts() -> List[str]

Return the FQN as list of parts for this place ref.

Source code in src/mycorrhizal/hypha/core/specs.py
def get_parts(self) -> List[str]:
    """Return the FQN as list of parts for this place ref."""
    return self.parent_spec.get_parts(self.local_name)

__call__

__call__(func: Callable) -> PlaceRef

Allow PlaceRef to be used as a decorator to register a handler.

Example

@builder.place("queue") def queue_handler(bb): return bb.tokens

The PlaceRef is returned for use in arcs, and the handler is registered.

Source code in src/mycorrhizal/hypha/core/specs.py
def __call__(self, func: Callable) -> "PlaceRef":
    """Allow PlaceRef to be used as a decorator to register a handler.

    Example:
        @builder.place("queue")
        def queue_handler(bb):
            return bb.tokens

    The PlaceRef is returned for use in arcs, and the handler is registered.
    """
    # Get the place spec and register the handler
    place_spec = self.parent_spec.places.get(self.local_name)
    if place_spec is not None:
        place_spec.handler = func
    return self

TransitionRef

TransitionRef(local_name: str, parent_spec: NetSpec)

Reference to a transition for use in arc definitions

Source code in src/mycorrhizal/hypha/core/specs.py
def __init__(self, local_name: str, parent_spec: NetSpec):
    self.local_name = local_name
    self.parent_spec = parent_spec

get_fqn

get_fqn() -> str

Compute FQN dynamically

Source code in src/mycorrhizal/hypha/core/specs.py
def get_fqn(self) -> str:
    """Compute FQN dynamically"""
    return '.'.join(self.get_parts())

SubnetRef

SubnetRef(spec: NetSpec)

Reference to a subnet instance for accessing its places/transitions

Source code in src/mycorrhizal/hypha/core/specs.py
def __init__(self, spec: NetSpec):
    self.spec = spec

get_fqn

get_fqn() -> str

Compute FQN dynamically

Source code in src/mycorrhizal/hypha/core/specs.py
def get_fqn(self) -> str:
    """Compute FQN dynamically"""
    return '.'.join(self.get_parts())

NetBuilder

NetBuilder(name: str, parent: Optional[NetSpec] = None)

Builder for constructing Petri net specifications

Source code in src/mycorrhizal/hypha/core/builder.py
def __init__(self, name: str, parent: Optional[NetSpec] = None):
    self.spec = NetSpec(name, parent=parent)
    # Track name counters for generating unique names when there are collisions
    self._transition_name_counters: Dict[str, int] = {}
    self._place_name_counters: Dict[str, int] = {}

place

place(name_or_func: Union[str, Callable, None] = None, state_factory: Optional[Callable] = None) -> Union[PlaceRef, Callable]

Declare a regular place.

All places are multi-sets (bags) that support token storage with multiplicity. Tokens can be added and removed from places.

Can be used in three ways:

  1. As a method call (returns PlaceRef for use in arcs): place = builder.place("my_place")

  2. As a decorator without parens (infers name from function): @builder.place def my_place(bb): return bb.tokens

  3. As a decorator with custom name: @builder.place("custom_name") def my_func(bb): return bb.tokens

Source code in src/mycorrhizal/hypha/core/builder.py
def place(
    self,
    name_or_func: Union[str, Callable, None] = None,
    state_factory: Optional[Callable] = None,
) -> Union[PlaceRef, Callable]:
    """Declare a regular place.

    All places are multi-sets (bags) that support token storage with
    multiplicity. Tokens can be added and removed from places.

    Can be used in three ways:

    1. As a method call (returns PlaceRef for use in arcs):
       place = builder.place("my_place")

    2. As a decorator without parens (infers name from function):
       @builder.place
       def my_place(bb):
           return bb.tokens

    3. As a decorator with custom name:
       @builder.place("custom_name")
       def my_func(bb):
           return bb.tokens
    """
    # Case 1: Used as decorator without parens - first arg is the function
    if callable(name_or_func):
        func = name_or_func
        place_name = func.__name__
        place_spec = PlaceSpec(place_name, handler=func, state_factory=state_factory)
        self.spec.places[place_name] = place_spec
        return PlaceRef(place_name, self.spec)

    # Case 2 & 3: Used with explicit name (as method call or decorator with args)
    # or called without any args (need to return decorator)
    if isinstance(name_or_func, str):
        name = name_or_func
        place_spec = PlaceSpec(name, state_factory=state_factory)
        self.spec.places[name] = place_spec
        # Return PlaceRef - it's callable via __call__ for decorator use
        return PlaceRef(name, self.spec)

    # Case 4: Called without any args - shouldn't happen but handle gracefully
    # This would be like @builder.place() with empty parens
    def decorator(func: Callable) -> PlaceRef:
        place_name = func.__name__
        place_spec = PlaceSpec(place_name, handler=func, state_factory=state_factory)
        self.spec.places[place_name] = place_spec
        return PlaceRef(place_name, self.spec)
    return decorator

io_input_place

io_input_place()

Decorator for IOInputPlace with async generator

Source code in src/mycorrhizal/hypha/core/builder.py
def io_input_place(self):
    """Decorator for IOInputPlace with async generator"""

    def decorator(func: Callable) -> PlaceRef:
        name = func.__name__
        place_spec = PlaceSpec(
            name, handler=func, is_io_input=True
        )
        self.spec.places[name] = place_spec
        return PlaceRef(name, self.spec)

    return decorator

io_output_place

io_output_place()

Decorator for IOOutputPlace with async handler

Source code in src/mycorrhizal/hypha/core/builder.py
def io_output_place(self):
    """Decorator for IOOutputPlace with async handler"""

    def decorator(func: Callable) -> PlaceRef:
        name = func.__name__
        place_spec = PlaceSpec(
            name, handler=func, is_io_output=True
        )
        self.spec.places[name] = place_spec
        return PlaceRef(name, self.spec)

    return decorator

guard

guard(func: Callable) -> GuardSpec

Create a guard specification from a function

Source code in src/mycorrhizal/hypha/core/builder.py
def guard(self, func: Callable) -> GuardSpec:
    """Create a guard specification from a function"""
    return GuardSpec(func)

transition

transition(guard: Optional[GuardSpec] = None, state_factory: Optional[Callable] = None, delay: float = 0.0)

Decorator for transition function.

Parameters:

Name Type Description Default
guard Optional[GuardSpec]

Optional guard specification

None
state_factory Optional[Callable]

Optional state factory for transition state

None
delay float

Static delay in seconds before firing after becoming enabled (default: 0.0)

0.0
Usage

@builder.transition() async def my_transition(consumed, bb, timebase): yield {output: token}

@builder.transition(delay=0.1) # 100ms delay async def delayed_transition(consumed, bb, timebase): yield {output: token}

Note: When creating multiple transitions in a loop with the same function name, the builder automatically generates unique names by appending a counter suffix.

Source code in src/mycorrhizal/hypha/core/builder.py
def transition(
    self,
    guard: Optional[GuardSpec] = None,
    state_factory: Optional[Callable] = None,
    delay: float = 0.0,
):
    """Decorator for transition function.

    Args:
        guard: Optional guard specification
        state_factory: Optional state factory for transition state
        delay: Static delay in seconds before firing after becoming enabled (default: 0.0)

    Usage:
        @builder.transition()
        async def my_transition(consumed, bb, timebase):
            yield {output: token}

        @builder.transition(delay=0.1)  # 100ms delay
        async def delayed_transition(consumed, bb, timebase):
            yield {output: token}

    Note: When creating multiple transitions in a loop with the same function name,
    the builder automatically generates unique names by appending a counter suffix.
    """

    def decorator(func: Callable) -> TransitionRef:
        base_name = func.__name__
        # Generate unique name to handle collisions (e.g., when creating transitions in loops)
        if base_name in self._transition_name_counters:
            self._transition_name_counters[base_name] += 1
            name = f"{base_name}_{self._transition_name_counters[base_name]}"
        else:
            self._transition_name_counters[base_name] = 0
            name = base_name

        trans_spec = TransitionSpec(name, func, guard, state_factory, delay)
        self.spec.transitions[name] = trans_spec
        return TransitionRef(name, self.spec)

    return decorator

arc

arc(source: Any, target: Any, name: Optional[str] = None, weight: int = 1) -> ArcChain

Create an arc and return chainable ArcChain

Source code in src/mycorrhizal/hypha/core/builder.py
def arc(
    self, source: Any, target: Any, name: Optional[str] = None, weight: int = 1
) -> ArcChain:
    """Create an arc and return chainable ArcChain"""
    source_type = type(source)
    target_type = type(target)

    if source_type == target_type:
        raise ValueError(
            f"Cannot connect {source_type.__name__} to {target_type.__name__} directly. "
            f"Arcs must alternate between places and transitions."
        )

    arc_spec = ArcSpec(source, target, weight, name)
    self.spec.arcs.append(arc_spec)

    return ArcChain(self, target)

subnet

subnet(net_func: Callable, instance_name: str) -> SubnetRef

Instantiate a subnet with given instance name

Source code in src/mycorrhizal/hypha/core/builder.py
def subnet(self, net_func: Callable, instance_name: str) -> SubnetRef:
    """Instantiate a subnet with given instance name"""
    # 1. Validation
    if not hasattr(net_func, "_spec"):
        raise ValueError(
            f"{net_func.__name__} is not a valid net. "
            f"Did you forget to decorate it with @pn.net?"
        )

    original_spec = net_func._spec

    # 2. Create subnet spec and copy places/transitions
    subnet_spec = NetSpec(instance_name, parent=self.spec)

    # Copy places and transitions (PlaceSpec/TransitionSpec are plain
    # dataclasses holding callables/state factories; keeping the same
    # instances is acceptable since they are stateless descriptors).
    subnet_spec.places = dict(original_spec.places)
    subnet_spec.transitions = dict(original_spec.transitions)

    # 3. Remap arcs to point into the new subnet
    subnet_spec.arcs = self._remap_arcs_to_subnet(original_spec.arcs, subnet_spec)

    # 4. Ensure nested subnets are copied
    self._ensure_nested_subnets_copied(original_spec, subnet_spec)

    # 5. Register and return
    self.spec.subnets[instance_name] = subnet_spec
    return SubnetRef(subnet_spec)

_remap_arcs_to_subnet

_remap_arcs_to_subnet(arcs: List[ArcSpec], subnet_spec: NetSpec) -> List[ArcSpec]

Remap arc references to point into target subnet.

For each PlaceRef/TransitionRef in the arcs, creates a new reference bound to the target subnet. Handles nested SubnetRefs by recursively copying their specs into the target subnet.

Parameters:

Name Type Description Default
arcs List[ArcSpec]

Original arcs from the template net

required
subnet_spec NetSpec

Target subnet spec to bind references to

required

Returns:

Type Description
List[ArcSpec]

New list of ArcSpec with remapped references

Source code in src/mycorrhizal/hypha/core/builder.py
def _remap_arcs_to_subnet(self, arcs: List[ArcSpec], subnet_spec: NetSpec) -> List[ArcSpec]:
    """Remap arc references to point into target subnet.

    For each PlaceRef/TransitionRef in the arcs, creates a new reference
    bound to the target subnet. Handles nested SubnetRefs by recursively
    copying their specs into the target subnet.

    Args:
        arcs: Original arcs from the template net
        subnet_spec: Target subnet spec to bind references to

    Returns:
        New list of ArcSpec with remapped references
    """
    new_arcs = []
    for arc in arcs:
        new_src = self._remap_ref_to_subnet(arc.source, subnet_spec, original_spec=None)
        new_tgt = self._remap_ref_to_subnet(arc.target, subnet_spec, original_spec=None)
        new_arcs.append(ArcSpec(new_src, new_tgt, arc.weight, arc.name))
    return new_arcs

_remap_ref_to_subnet

_remap_ref_to_subnet(ref: Any, target_subnet: NetSpec, original_spec: Optional[NetSpec] = None) -> Any

Remap a single reference to point into target subnet.

Handles: - PlaceRef: Create new PlaceRef bound to target_subnet - TransitionRef: Create new TransitionRef bound to target_subnet - SubnetRef: Recursively copy nested subnet spec into target_subnet

Parameters:

Name Type Description Default
ref Any

Reference to remap (PlaceRef, TransitionRef, SubnetRef, or other)

required
target_subnet NetSpec

The subnet spec to bind the reference to

required
original_spec Optional[NetSpec]

The original template spec (for resolving nested subnets)

None

Returns:

Type Description
Any

The remapped reference, or original if unknown type

Source code in src/mycorrhizal/hypha/core/builder.py
def _remap_ref_to_subnet(self, ref: Any, target_subnet: NetSpec, original_spec: Optional[NetSpec] = None) -> Any:
    """Remap a single reference to point into target subnet.

    Handles:
    - PlaceRef: Create new PlaceRef bound to target_subnet
    - TransitionRef: Create new TransitionRef bound to target_subnet
    - SubnetRef: Recursively copy nested subnet spec into target_subnet

    Args:
        ref: Reference to remap (PlaceRef, TransitionRef, SubnetRef, or other)
        target_subnet: The subnet spec to bind the reference to
        original_spec: The original template spec (for resolving nested subnets)

    Returns:
        The remapped reference, or original if unknown type
    """
    from .specs import PlaceRef, TransitionRef, SubnetRef

    if isinstance(ref, PlaceRef):
        return PlaceRef(ref.local_name, target_subnet)
    if isinstance(ref, TransitionRef):
        return TransitionRef(ref.local_name, target_subnet)
    if isinstance(ref, SubnetRef):
        # Handle nested subnets recursively
        nested_name = ref.spec.name
        if nested_name not in target_subnet.subnets:
            # Copy the nested spec into this subnet (recursive)
            copied_nested = self._copy_spec_with_parent(ref.spec, target_subnet)
            target_subnet.subnets[nested_name] = copied_nested
            return SubnetRef(copied_nested)
        else:
            # Already copied, return reference to existing copy
            return SubnetRef(target_subnet.subnets[nested_name])

    # Unknown ref type: return as-is
    return ref

_ensure_nested_subnets_copied

_ensure_nested_subnets_copied(original_spec: NetSpec, subnet_spec: NetSpec)

Ensure all nested subnets are copied into the target subnet.

This handles nested subnets that weren't already processed during arc remapping. Each nested subnet is copied with its parent set to the target subnet.

Parameters:

Name Type Description Default
original_spec NetSpec

The template spec containing nested subnets

required
subnet_spec NetSpec

The target subnet spec to copy nested subnets into

required
Source code in src/mycorrhizal/hypha/core/builder.py
def _ensure_nested_subnets_copied(self, original_spec: NetSpec, subnet_spec: NetSpec):
    """Ensure all nested subnets are copied into the target subnet.

    This handles nested subnets that weren't already processed during
    arc remapping. Each nested subnet is copied with its parent set
    to the target subnet.

    Args:
        original_spec: The template spec containing nested subnets
        subnet_spec: The target subnet spec to copy nested subnets into
    """
    for sub_name, sub_spec in original_spec.subnets.items():
        # Only copy if not already handled (e.g., by arc remapping)
        if sub_name not in subnet_spec.subnets:
            subnet_spec.subnets[sub_name] = self._copy_spec_with_parent(
                sub_spec, subnet_spec
            )

_copy_spec_with_parent

_copy_spec_with_parent(original: NetSpec, new_parent: NetSpec) -> NetSpec

Deep copy a spec and set new parent

Source code in src/mycorrhizal/hypha/core/builder.py
def _copy_spec_with_parent(self, original: NetSpec, new_parent: NetSpec) -> NetSpec:
    """Deep copy a spec and set new parent"""
    new_spec = NetSpec(original.name, parent=new_parent)
    new_spec.places = dict(original.places)
    new_spec.transitions = dict(original.transitions)
    new_spec.arcs = list(original.arcs)

    for sub_name, sub_spec in original.subnets.items():
        new_spec.subnets[sub_name] = self._copy_spec_with_parent(sub_spec, new_spec)

    return new_spec

forward

forward(input_place: Any, output_place: Any, name: Optional[str] = None)

Create a simple pass-through transition

Source code in src/mycorrhizal/hypha/core/builder.py
def forward(self, input_place: Any, output_place: Any, name: Optional[str] = None):
    """Create a simple pass-through transition"""
    trans_name = (
        name or f"forward_{input_place.local_name}_to_{output_place.local_name}"
    )

    def make_handler(out_place):
        async def forward_handler(consumed, bb, timebase):
            for token in consumed:
                yield {out_place: token}

        return forward_handler

    trans_spec = TransitionSpec(trans_name, make_handler(output_place))
    self.spec.transitions[trans_name] = trans_spec
    trans_ref = TransitionRef(trans_name, self.spec)

    self.spec.arcs.append(ArcSpec(input_place, trans_ref))
    self.spec.arcs.append(ArcSpec(trans_ref, output_place))

fork

fork(input_place: Any, output_places: List[Any], name: Optional[str] = None)

Create a transition that broadcasts tokens to multiple outputs

Source code in src/mycorrhizal/hypha/core/builder.py
def fork(
    self, input_place: Any, output_places: List[Any], name: Optional[str] = None
):
    """Create a transition that broadcasts tokens to multiple outputs"""
    trans_name = name or f"fork_{input_place.local_name}"

    def make_handler(out_places):
        async def fork_handler(consumed, bb, timebase):
            for token in consumed:
                for output_place in out_places:
                    yield {output_place: token}

        return fork_handler

    trans_spec = TransitionSpec(trans_name, make_handler(output_places))
    self.spec.transitions[trans_name] = trans_spec
    trans_ref = TransitionRef(trans_name, self.spec)

    self.spec.arcs.append(ArcSpec(input_place, trans_ref))
    for output_place in output_places:
        self.spec.arcs.append(ArcSpec(trans_ref, output_place))

join

join(input_places: List[Any], output_place: Any, name: Optional[str] = None)

Create a transition that waits for tokens from all inputs

Source code in src/mycorrhizal/hypha/core/builder.py
def join(
    self, input_places: List[Any], output_place: Any, name: Optional[str] = None
):
    """Create a transition that waits for tokens from all inputs"""
    trans_name = name or f"join_to_{output_place.local_name}"

    def make_handler(out_place):
        async def join_handler(consumed, bb, timebase):
            for token in consumed:
                yield {out_place: token}

        return join_handler

    trans_spec = TransitionSpec(trans_name, make_handler(output_place))
    self.spec.transitions[trans_name] = trans_spec
    trans_ref = TransitionRef(trans_name, self.spec)

    for input_place in input_places:
        self.spec.arcs.append(ArcSpec(input_place, trans_ref))
    self.spec.arcs.append(ArcSpec(trans_ref, output_place))

merge

merge(input_places: List[Any], output_place: Any, name: Optional[str] = None)

Create transitions from each input to output

Source code in src/mycorrhizal/hypha/core/builder.py
def merge(
    self, input_places: List[Any], output_place: Any, name: Optional[str] = None
):
    """Create transitions from each input to output"""
    for i, input_place in enumerate(input_places):
        trans_name = (
            name or f"{input_place.local_name}_to_{output_place.local_name}"
        )
        if len(input_places) > 1 and not name:
            trans_name = (
                f"{input_place.local_name}_{i}_to_{output_place.local_name}"
            )

        def make_handler(out_place):
            async def merge_handler(consumed, bb, timebase):
                for token in consumed:
                    yield {out_place: token}

            return merge_handler

        trans_spec = TransitionSpec(trans_name, make_handler(output_place))
        self.spec.transitions[trans_name] = trans_spec
        trans_ref = TransitionRef(trans_name, self.spec)

        self.spec.arcs.append(ArcSpec(input_place, trans_ref))
        self.spec.arcs.append(ArcSpec(trans_ref, output_place))

round_robin

round_robin(input_place: Any, output_places: List[Any], name: Optional[str] = None)

Create a transition that distributes tokens round-robin

Source code in src/mycorrhizal/hypha/core/builder.py
def round_robin(
    self, input_place: Any, output_places: List[Any], name: Optional[str] = None
):
    """Create a transition that distributes tokens round-robin"""
    trans_name = name or f"round_robin_{input_place.local_name}"

    def make_handler(out_places):
        async def round_robin_handler(consumed, bb, timebase, state):
            for token in consumed:
                output_place = out_places[state["index"]]
                state["index"] = (state["index"] + 1) % len(out_places)
                yield {output_place: token}

        return round_robin_handler

    trans_spec = TransitionSpec(
        trans_name, make_handler(output_places), state_factory=lambda: {"index": 0}
    )
    self.spec.transitions[trans_name] = trans_spec
    trans_ref = TransitionRef(trans_name, self.spec)

    self.spec.arcs.append(ArcSpec(input_place, trans_ref))
    for output_place in output_places:
        self.spec.arcs.append(ArcSpec(trans_ref, output_place))

route

route(input_place: Any, type_map: Dict[Type, Any], name: Optional[str] = None)

Create a transition that routes by token type

Source code in src/mycorrhizal/hypha/core/builder.py
def route(
    self, input_place: Any, type_map: Dict[Type, Any], name: Optional[str] = None
):
    """Create a transition that routes by token type"""
    trans_name = name or f"route_{input_place.local_name}"

    def make_handler(t_map):
        async def route_handler(consumed, bb, timebase):
            for token in consumed:
                token_type = type(token)
                if token_type in t_map:
                    yield {t_map[token_type]: token}

        return route_handler

    trans_spec = TransitionSpec(trans_name, make_handler(type_map))
    self.spec.transitions[trans_name] = trans_spec
    trans_ref = TransitionRef(trans_name, self.spec)

    self.spec.arcs.append(ArcSpec(input_place, trans_ref))
    for output_place in type_map.values():
        self.spec.arcs.append(ArcSpec(trans_ref, output_place))

ArcChain

ArcChain(builder: NetBuilder, last_ref: Any)

Fluent interface for chaining arc definitions

Source code in src/mycorrhizal/hypha/core/builder.py
def __init__(self, builder: "NetBuilder", last_ref: Any):
    self.builder = builder
    self.last_ref = last_ref
    self.last_type = type(last_ref)

arc

arc(target: Any, name: Optional[str] = None, weight: int = 1) -> ArcChain

Chain another arc from the last element to target

Source code in src/mycorrhizal/hypha/core/builder.py
def arc(
    self, target: Any, name: Optional[str] = None, weight: int = 1
) -> "ArcChain":
    """Chain another arc from the last element to target"""
    target_type = type(target)

    if self.last_type == target_type:
        raise ValueError(
            f"Cannot connect {self.last_type.__name__} to {target_type.__name__} directly. "
            f"Arcs must alternate between places and transitions."
        )

    arc_spec = ArcSpec(self.last_ref, target, weight, name)
    self.builder.spec.arcs.append(arc_spec)

    return ArcChain(self.builder, target)

Runner

Runner(net_func: Any, blackboard: Any)

High-level runner for executing a Petri net

Uses MatrixRuntime for high-performance matrix-based execution.

Source code in src/mycorrhizal/hypha/core/runtime.py
def __init__(self, net_func: Any, blackboard: Any):
    if not hasattr(net_func, '_spec'):
        raise ValueError(f"{net_func} is not a valid net")

    self.spec = net_func._spec
    self.blackboard = blackboard
    self.timebase = None
    self._matrix_runtime: Optional[MatrixRuntime] = None

runtime property

runtime

Get the active runtime (MatrixRuntime).

start async

start(timebase: Any)

Start the net with given timebase

Source code in src/mycorrhizal/hypha/core/runtime.py
async def start(self, timebase: Any):
    """Start the net with given timebase"""
    self.timebase = timebase

    self._matrix_runtime = MatrixRuntime(
        self.spec,
        self.blackboard,
        self.timebase
    )
    await self._matrix_runtime.start()

stop async

stop(timeout: float = 5.0)

Stop the net

Source code in src/mycorrhizal/hypha/core/runtime.py
async def stop(self, timeout: float = 5.0):
    """Stop the net"""
    if self._matrix_runtime:
        await self._matrix_runtime.stop(timeout)

add_place

add_place(fqn: str, state_factory: Optional[Callable] = None)

Add a place to the running net

Source code in src/mycorrhizal/hypha/core/runtime.py
def add_place(self, fqn: str,
              state_factory: Optional[Callable] = None):
    """Add a place to the running net"""
    if not self.runtime:
        raise RuntimeError("Net is not running. Call start() first.")
    return self.runtime.add_place(fqn, state_factory)

add_transition

add_transition(fqn: str, handler: Callable, guard: Optional[GuardSpec] = None, state_factory: Optional[Callable] = None, delay: float = 0.0)

Add a transition to the running net

Source code in src/mycorrhizal/hypha/core/runtime.py
def add_transition(self, fqn: str, handler: Callable,
                  guard: Optional[GuardSpec] = None,
                  state_factory: Optional[Callable] = None,
                  delay: float = 0.0):
    """Add a transition to the running net"""
    if not self.runtime:
        raise RuntimeError("Net is not running. Call start() first.")
    return self.runtime.add_transition(fqn, handler, guard, state_factory, delay)

add_arc

add_arc(source_fqn: str, target_fqn: str, weight: int = 1, name: Optional[str] = None)

Add an arc to the running net

Source code in src/mycorrhizal/hypha/core/runtime.py
def add_arc(self, source_fqn: str, target_fqn: str, weight: int = 1,
            name: Optional[str] = None):
    """Add an arc to the running net"""
    if not self.runtime:
        raise RuntimeError("Net is not running. Call start() first.")
    self.runtime.add_arc(source_fqn, target_fqn, weight, name)

remove_arc async

remove_arc(source_fqn: str, target_fqn: str)

Remove an arc from the running net

Source code in src/mycorrhizal/hypha/core/runtime.py
async def remove_arc(self, source_fqn: str, target_fqn: str):
    """Remove an arc from the running net"""
    if not self.runtime:
        raise RuntimeError("Net is not running. Call start() first.")
    await self.runtime.remove_arc(source_fqn, target_fqn)

remove async

remove(fqn: str)

Remove a place or transition

Source code in src/mycorrhizal/hypha/core/runtime.py
async def remove(self, fqn: str):
    """Remove a place or transition"""
    if not self.runtime:
        raise RuntimeError("Net is not running. Call start() first.")
    await self.runtime.remove(fqn)

run_sync

run_sync(timebase: Any, max_cycles: int = 100000) -> int

Run the net to completion in synchronous mode.

This is the high-performance execution path for pure computational nets. Only works with MatrixRuntime and requires no async features.

Parameters:

Name Type Description Default
timebase Any

Timebase for timing operations

required
max_cycles int

Maximum number of cycles to execute

100000

Returns:

Type Description
int

Number of cycles executed

Raises:

Type Description
RuntimeError

If net has async features

Source code in src/mycorrhizal/hypha/core/runtime.py
def run_sync(self, timebase: Any, max_cycles: int = 100000) -> int:
    """Run the net to completion in synchronous mode.

    This is the high-performance execution path for pure computational nets.
    Only works with MatrixRuntime and requires no async features.

    Args:
        timebase: Timebase for timing operations
        max_cycles: Maximum number of cycles to execute

    Returns:
        Number of cycles executed

    Raises:
        RuntimeError: If net has async features
    """
    self.timebase = timebase

    self._matrix_runtime = MatrixRuntime(
        self.spec,
        self.blackboard,
        self.timebase
    )
    return self._matrix_runtime.run_sync(max_cycles)

Builder Module

mycorrhizal.hypha.core.builder

Hypha DSL - Builder Layer

NetBuilder provides the API for declaratively constructing Petri net specifications. Used within @pn.net decorated functions.

ArcChain

ArcChain(builder: NetBuilder, last_ref: Any)

Fluent interface for chaining arc definitions

Source code in src/mycorrhizal/hypha/core/builder.py
def __init__(self, builder: "NetBuilder", last_ref: Any):
    self.builder = builder
    self.last_ref = last_ref
    self.last_type = type(last_ref)

arc

arc(target: Any, name: Optional[str] = None, weight: int = 1) -> ArcChain

Chain another arc from the last element to target

Source code in src/mycorrhizal/hypha/core/builder.py
def arc(
    self, target: Any, name: Optional[str] = None, weight: int = 1
) -> "ArcChain":
    """Chain another arc from the last element to target"""
    target_type = type(target)

    if self.last_type == target_type:
        raise ValueError(
            f"Cannot connect {self.last_type.__name__} to {target_type.__name__} directly. "
            f"Arcs must alternate between places and transitions."
        )

    arc_spec = ArcSpec(self.last_ref, target, weight, name)
    self.builder.spec.arcs.append(arc_spec)

    return ArcChain(self.builder, target)

NetBuilder

NetBuilder(name: str, parent: Optional[NetSpec] = None)

Builder for constructing Petri net specifications

Source code in src/mycorrhizal/hypha/core/builder.py
def __init__(self, name: str, parent: Optional[NetSpec] = None):
    self.spec = NetSpec(name, parent=parent)
    # Track name counters for generating unique names when there are collisions
    self._transition_name_counters: Dict[str, int] = {}
    self._place_name_counters: Dict[str, int] = {}

place

place(name_or_func: Union[str, Callable, None] = None, state_factory: Optional[Callable] = None) -> Union[PlaceRef, Callable]

Declare a regular place.

All places are multi-sets (bags) that support token storage with multiplicity. Tokens can be added and removed from places.

Can be used in three ways:

  1. As a method call (returns PlaceRef for use in arcs): place = builder.place("my_place")

  2. As a decorator without parens (infers name from function): @builder.place def my_place(bb): return bb.tokens

  3. As a decorator with custom name: @builder.place("custom_name") def my_func(bb): return bb.tokens

Source code in src/mycorrhizal/hypha/core/builder.py
def place(
    self,
    name_or_func: Union[str, Callable, None] = None,
    state_factory: Optional[Callable] = None,
) -> Union[PlaceRef, Callable]:
    """Declare a regular place.

    All places are multi-sets (bags) that support token storage with
    multiplicity. Tokens can be added and removed from places.

    Can be used in three ways:

    1. As a method call (returns PlaceRef for use in arcs):
       place = builder.place("my_place")

    2. As a decorator without parens (infers name from function):
       @builder.place
       def my_place(bb):
           return bb.tokens

    3. As a decorator with custom name:
       @builder.place("custom_name")
       def my_func(bb):
           return bb.tokens
    """
    # Case 1: Used as decorator without parens - first arg is the function
    if callable(name_or_func):
        func = name_or_func
        place_name = func.__name__
        place_spec = PlaceSpec(place_name, handler=func, state_factory=state_factory)
        self.spec.places[place_name] = place_spec
        return PlaceRef(place_name, self.spec)

    # Case 2 & 3: Used with explicit name (as method call or decorator with args)
    # or called without any args (need to return decorator)
    if isinstance(name_or_func, str):
        name = name_or_func
        place_spec = PlaceSpec(name, state_factory=state_factory)
        self.spec.places[name] = place_spec
        # Return PlaceRef - it's callable via __call__ for decorator use
        return PlaceRef(name, self.spec)

    # Case 4: Called without any args - shouldn't happen but handle gracefully
    # This would be like @builder.place() with empty parens
    def decorator(func: Callable) -> PlaceRef:
        place_name = func.__name__
        place_spec = PlaceSpec(place_name, handler=func, state_factory=state_factory)
        self.spec.places[place_name] = place_spec
        return PlaceRef(place_name, self.spec)
    return decorator

io_input_place

io_input_place()

Decorator for IOInputPlace with async generator

Source code in src/mycorrhizal/hypha/core/builder.py
def io_input_place(self):
    """Decorator for IOInputPlace with async generator"""

    def decorator(func: Callable) -> PlaceRef:
        name = func.__name__
        place_spec = PlaceSpec(
            name, handler=func, is_io_input=True
        )
        self.spec.places[name] = place_spec
        return PlaceRef(name, self.spec)

    return decorator

io_output_place

io_output_place()

Decorator for IOOutputPlace with async handler

Source code in src/mycorrhizal/hypha/core/builder.py
def io_output_place(self):
    """Decorator for IOOutputPlace with async handler"""

    def decorator(func: Callable) -> PlaceRef:
        name = func.__name__
        place_spec = PlaceSpec(
            name, handler=func, is_io_output=True
        )
        self.spec.places[name] = place_spec
        return PlaceRef(name, self.spec)

    return decorator

guard

guard(func: Callable) -> GuardSpec

Create a guard specification from a function

Source code in src/mycorrhizal/hypha/core/builder.py
def guard(self, func: Callable) -> GuardSpec:
    """Create a guard specification from a function"""
    return GuardSpec(func)

transition

transition(guard: Optional[GuardSpec] = None, state_factory: Optional[Callable] = None, delay: float = 0.0)

Decorator for transition function.

Parameters:

Name Type Description Default
guard Optional[GuardSpec]

Optional guard specification

None
state_factory Optional[Callable]

Optional state factory for transition state

None
delay float

Static delay in seconds before firing after becoming enabled (default: 0.0)

0.0
Usage

@builder.transition() async def my_transition(consumed, bb, timebase): yield {output: token}

@builder.transition(delay=0.1) # 100ms delay async def delayed_transition(consumed, bb, timebase): yield {output: token}

Note: When creating multiple transitions in a loop with the same function name, the builder automatically generates unique names by appending a counter suffix.

Source code in src/mycorrhizal/hypha/core/builder.py
def transition(
    self,
    guard: Optional[GuardSpec] = None,
    state_factory: Optional[Callable] = None,
    delay: float = 0.0,
):
    """Decorator for transition function.

    Args:
        guard: Optional guard specification
        state_factory: Optional state factory for transition state
        delay: Static delay in seconds before firing after becoming enabled (default: 0.0)

    Usage:
        @builder.transition()
        async def my_transition(consumed, bb, timebase):
            yield {output: token}

        @builder.transition(delay=0.1)  # 100ms delay
        async def delayed_transition(consumed, bb, timebase):
            yield {output: token}

    Note: When creating multiple transitions in a loop with the same function name,
    the builder automatically generates unique names by appending a counter suffix.
    """

    def decorator(func: Callable) -> TransitionRef:
        base_name = func.__name__
        # Generate unique name to handle collisions (e.g., when creating transitions in loops)
        if base_name in self._transition_name_counters:
            self._transition_name_counters[base_name] += 1
            name = f"{base_name}_{self._transition_name_counters[base_name]}"
        else:
            self._transition_name_counters[base_name] = 0
            name = base_name

        trans_spec = TransitionSpec(name, func, guard, state_factory, delay)
        self.spec.transitions[name] = trans_spec
        return TransitionRef(name, self.spec)

    return decorator

arc

arc(source: Any, target: Any, name: Optional[str] = None, weight: int = 1) -> ArcChain

Create an arc and return chainable ArcChain

Source code in src/mycorrhizal/hypha/core/builder.py
def arc(
    self, source: Any, target: Any, name: Optional[str] = None, weight: int = 1
) -> ArcChain:
    """Create an arc and return chainable ArcChain"""
    source_type = type(source)
    target_type = type(target)

    if source_type == target_type:
        raise ValueError(
            f"Cannot connect {source_type.__name__} to {target_type.__name__} directly. "
            f"Arcs must alternate between places and transitions."
        )

    arc_spec = ArcSpec(source, target, weight, name)
    self.spec.arcs.append(arc_spec)

    return ArcChain(self, target)

subnet

subnet(net_func: Callable, instance_name: str) -> SubnetRef

Instantiate a subnet with given instance name

Source code in src/mycorrhizal/hypha/core/builder.py
def subnet(self, net_func: Callable, instance_name: str) -> SubnetRef:
    """Instantiate a subnet with given instance name"""
    # 1. Validation
    if not hasattr(net_func, "_spec"):
        raise ValueError(
            f"{net_func.__name__} is not a valid net. "
            f"Did you forget to decorate it with @pn.net?"
        )

    original_spec = net_func._spec

    # 2. Create subnet spec and copy places/transitions
    subnet_spec = NetSpec(instance_name, parent=self.spec)

    # Copy places and transitions (PlaceSpec/TransitionSpec are plain
    # dataclasses holding callables/state factories; keeping the same
    # instances is acceptable since they are stateless descriptors).
    subnet_spec.places = dict(original_spec.places)
    subnet_spec.transitions = dict(original_spec.transitions)

    # 3. Remap arcs to point into the new subnet
    subnet_spec.arcs = self._remap_arcs_to_subnet(original_spec.arcs, subnet_spec)

    # 4. Ensure nested subnets are copied
    self._ensure_nested_subnets_copied(original_spec, subnet_spec)

    # 5. Register and return
    self.spec.subnets[instance_name] = subnet_spec
    return SubnetRef(subnet_spec)

_remap_arcs_to_subnet

_remap_arcs_to_subnet(arcs: List[ArcSpec], subnet_spec: NetSpec) -> List[ArcSpec]

Remap arc references to point into target subnet.

For each PlaceRef/TransitionRef in the arcs, creates a new reference bound to the target subnet. Handles nested SubnetRefs by recursively copying their specs into the target subnet.

Parameters:

Name Type Description Default
arcs List[ArcSpec]

Original arcs from the template net

required
subnet_spec NetSpec

Target subnet spec to bind references to

required

Returns:

Type Description
List[ArcSpec]

New list of ArcSpec with remapped references

Source code in src/mycorrhizal/hypha/core/builder.py
def _remap_arcs_to_subnet(self, arcs: List[ArcSpec], subnet_spec: NetSpec) -> List[ArcSpec]:
    """Remap arc references to point into target subnet.

    For each PlaceRef/TransitionRef in the arcs, creates a new reference
    bound to the target subnet. Handles nested SubnetRefs by recursively
    copying their specs into the target subnet.

    Args:
        arcs: Original arcs from the template net
        subnet_spec: Target subnet spec to bind references to

    Returns:
        New list of ArcSpec with remapped references
    """
    new_arcs = []
    for arc in arcs:
        new_src = self._remap_ref_to_subnet(arc.source, subnet_spec, original_spec=None)
        new_tgt = self._remap_ref_to_subnet(arc.target, subnet_spec, original_spec=None)
        new_arcs.append(ArcSpec(new_src, new_tgt, arc.weight, arc.name))
    return new_arcs

_remap_ref_to_subnet

_remap_ref_to_subnet(ref: Any, target_subnet: NetSpec, original_spec: Optional[NetSpec] = None) -> Any

Remap a single reference to point into target subnet.

Handles: - PlaceRef: Create new PlaceRef bound to target_subnet - TransitionRef: Create new TransitionRef bound to target_subnet - SubnetRef: Recursively copy nested subnet spec into target_subnet

Parameters:

Name Type Description Default
ref Any

Reference to remap (PlaceRef, TransitionRef, SubnetRef, or other)

required
target_subnet NetSpec

The subnet spec to bind the reference to

required
original_spec Optional[NetSpec]

The original template spec (for resolving nested subnets)

None

Returns:

Type Description
Any

The remapped reference, or original if unknown type

Source code in src/mycorrhizal/hypha/core/builder.py
def _remap_ref_to_subnet(self, ref: Any, target_subnet: NetSpec, original_spec: Optional[NetSpec] = None) -> Any:
    """Remap a single reference to point into target subnet.

    Handles:
    - PlaceRef: Create new PlaceRef bound to target_subnet
    - TransitionRef: Create new TransitionRef bound to target_subnet
    - SubnetRef: Recursively copy nested subnet spec into target_subnet

    Args:
        ref: Reference to remap (PlaceRef, TransitionRef, SubnetRef, or other)
        target_subnet: The subnet spec to bind the reference to
        original_spec: The original template spec (for resolving nested subnets)

    Returns:
        The remapped reference, or original if unknown type
    """
    from .specs import PlaceRef, TransitionRef, SubnetRef

    if isinstance(ref, PlaceRef):
        return PlaceRef(ref.local_name, target_subnet)
    if isinstance(ref, TransitionRef):
        return TransitionRef(ref.local_name, target_subnet)
    if isinstance(ref, SubnetRef):
        # Handle nested subnets recursively
        nested_name = ref.spec.name
        if nested_name not in target_subnet.subnets:
            # Copy the nested spec into this subnet (recursive)
            copied_nested = self._copy_spec_with_parent(ref.spec, target_subnet)
            target_subnet.subnets[nested_name] = copied_nested
            return SubnetRef(copied_nested)
        else:
            # Already copied, return reference to existing copy
            return SubnetRef(target_subnet.subnets[nested_name])

    # Unknown ref type: return as-is
    return ref

_ensure_nested_subnets_copied

_ensure_nested_subnets_copied(original_spec: NetSpec, subnet_spec: NetSpec)

Ensure all nested subnets are copied into the target subnet.

This handles nested subnets that weren't already processed during arc remapping. Each nested subnet is copied with its parent set to the target subnet.

Parameters:

Name Type Description Default
original_spec NetSpec

The template spec containing nested subnets

required
subnet_spec NetSpec

The target subnet spec to copy nested subnets into

required
Source code in src/mycorrhizal/hypha/core/builder.py
def _ensure_nested_subnets_copied(self, original_spec: NetSpec, subnet_spec: NetSpec):
    """Ensure all nested subnets are copied into the target subnet.

    This handles nested subnets that weren't already processed during
    arc remapping. Each nested subnet is copied with its parent set
    to the target subnet.

    Args:
        original_spec: The template spec containing nested subnets
        subnet_spec: The target subnet spec to copy nested subnets into
    """
    for sub_name, sub_spec in original_spec.subnets.items():
        # Only copy if not already handled (e.g., by arc remapping)
        if sub_name not in subnet_spec.subnets:
            subnet_spec.subnets[sub_name] = self._copy_spec_with_parent(
                sub_spec, subnet_spec
            )

_copy_spec_with_parent

_copy_spec_with_parent(original: NetSpec, new_parent: NetSpec) -> NetSpec

Deep copy a spec and set new parent

Source code in src/mycorrhizal/hypha/core/builder.py
def _copy_spec_with_parent(self, original: NetSpec, new_parent: NetSpec) -> NetSpec:
    """Deep copy a spec and set new parent"""
    new_spec = NetSpec(original.name, parent=new_parent)
    new_spec.places = dict(original.places)
    new_spec.transitions = dict(original.transitions)
    new_spec.arcs = list(original.arcs)

    for sub_name, sub_spec in original.subnets.items():
        new_spec.subnets[sub_name] = self._copy_spec_with_parent(sub_spec, new_spec)

    return new_spec

forward

forward(input_place: Any, output_place: Any, name: Optional[str] = None)

Create a simple pass-through transition

Source code in src/mycorrhizal/hypha/core/builder.py
def forward(self, input_place: Any, output_place: Any, name: Optional[str] = None):
    """Create a simple pass-through transition"""
    trans_name = (
        name or f"forward_{input_place.local_name}_to_{output_place.local_name}"
    )

    def make_handler(out_place):
        async def forward_handler(consumed, bb, timebase):
            for token in consumed:
                yield {out_place: token}

        return forward_handler

    trans_spec = TransitionSpec(trans_name, make_handler(output_place))
    self.spec.transitions[trans_name] = trans_spec
    trans_ref = TransitionRef(trans_name, self.spec)

    self.spec.arcs.append(ArcSpec(input_place, trans_ref))
    self.spec.arcs.append(ArcSpec(trans_ref, output_place))

fork

fork(input_place: Any, output_places: List[Any], name: Optional[str] = None)

Create a transition that broadcasts tokens to multiple outputs

Source code in src/mycorrhizal/hypha/core/builder.py
def fork(
    self, input_place: Any, output_places: List[Any], name: Optional[str] = None
):
    """Create a transition that broadcasts tokens to multiple outputs"""
    trans_name = name or f"fork_{input_place.local_name}"

    def make_handler(out_places):
        async def fork_handler(consumed, bb, timebase):
            for token in consumed:
                for output_place in out_places:
                    yield {output_place: token}

        return fork_handler

    trans_spec = TransitionSpec(trans_name, make_handler(output_places))
    self.spec.transitions[trans_name] = trans_spec
    trans_ref = TransitionRef(trans_name, self.spec)

    self.spec.arcs.append(ArcSpec(input_place, trans_ref))
    for output_place in output_places:
        self.spec.arcs.append(ArcSpec(trans_ref, output_place))

join

join(input_places: List[Any], output_place: Any, name: Optional[str] = None)

Create a transition that waits for tokens from all inputs

Source code in src/mycorrhizal/hypha/core/builder.py
def join(
    self, input_places: List[Any], output_place: Any, name: Optional[str] = None
):
    """Create a transition that waits for tokens from all inputs"""
    trans_name = name or f"join_to_{output_place.local_name}"

    def make_handler(out_place):
        async def join_handler(consumed, bb, timebase):
            for token in consumed:
                yield {out_place: token}

        return join_handler

    trans_spec = TransitionSpec(trans_name, make_handler(output_place))
    self.spec.transitions[trans_name] = trans_spec
    trans_ref = TransitionRef(trans_name, self.spec)

    for input_place in input_places:
        self.spec.arcs.append(ArcSpec(input_place, trans_ref))
    self.spec.arcs.append(ArcSpec(trans_ref, output_place))

merge

merge(input_places: List[Any], output_place: Any, name: Optional[str] = None)

Create transitions from each input to output

Source code in src/mycorrhizal/hypha/core/builder.py
def merge(
    self, input_places: List[Any], output_place: Any, name: Optional[str] = None
):
    """Create transitions from each input to output"""
    for i, input_place in enumerate(input_places):
        trans_name = (
            name or f"{input_place.local_name}_to_{output_place.local_name}"
        )
        if len(input_places) > 1 and not name:
            trans_name = (
                f"{input_place.local_name}_{i}_to_{output_place.local_name}"
            )

        def make_handler(out_place):
            async def merge_handler(consumed, bb, timebase):
                for token in consumed:
                    yield {out_place: token}

            return merge_handler

        trans_spec = TransitionSpec(trans_name, make_handler(output_place))
        self.spec.transitions[trans_name] = trans_spec
        trans_ref = TransitionRef(trans_name, self.spec)

        self.spec.arcs.append(ArcSpec(input_place, trans_ref))
        self.spec.arcs.append(ArcSpec(trans_ref, output_place))

round_robin

round_robin(input_place: Any, output_places: List[Any], name: Optional[str] = None)

Create a transition that distributes tokens round-robin

Source code in src/mycorrhizal/hypha/core/builder.py
def round_robin(
    self, input_place: Any, output_places: List[Any], name: Optional[str] = None
):
    """Create a transition that distributes tokens round-robin"""
    trans_name = name or f"round_robin_{input_place.local_name}"

    def make_handler(out_places):
        async def round_robin_handler(consumed, bb, timebase, state):
            for token in consumed:
                output_place = out_places[state["index"]]
                state["index"] = (state["index"] + 1) % len(out_places)
                yield {output_place: token}

        return round_robin_handler

    trans_spec = TransitionSpec(
        trans_name, make_handler(output_places), state_factory=lambda: {"index": 0}
    )
    self.spec.transitions[trans_name] = trans_spec
    trans_ref = TransitionRef(trans_name, self.spec)

    self.spec.arcs.append(ArcSpec(input_place, trans_ref))
    for output_place in output_places:
        self.spec.arcs.append(ArcSpec(trans_ref, output_place))

route

route(input_place: Any, type_map: Dict[Type, Any], name: Optional[str] = None)

Create a transition that routes by token type

Source code in src/mycorrhizal/hypha/core/builder.py
def route(
    self, input_place: Any, type_map: Dict[Type, Any], name: Optional[str] = None
):
    """Create a transition that routes by token type"""
    trans_name = name or f"route_{input_place.local_name}"

    def make_handler(t_map):
        async def route_handler(consumed, bb, timebase):
            for token in consumed:
                token_type = type(token)
                if token_type in t_map:
                    yield {t_map[token_type]: token}

        return route_handler

    trans_spec = TransitionSpec(trans_name, make_handler(type_map))
    self.spec.transitions[trans_name] = trans_spec
    trans_ref = TransitionRef(trans_name, self.spec)

    self.spec.arcs.append(ArcSpec(input_place, trans_ref))
    for output_place in type_map.values():
        self.spec.arcs.append(ArcSpec(trans_ref, output_place))

PetriNetDSL

Module-level API for Petri net definition

net staticmethod

net(func: Callable) -> Callable

Decorator for defining a Petri net. The decorated function receives a NetBuilder as its parameter.

Source code in src/mycorrhizal/hypha/core/builder.py
@staticmethod
def net(func: Callable) -> Callable:
    """
    Decorator for defining a Petri net.
    The decorated function receives a NetBuilder as its parameter.
    """
    builder = NetBuilder(func.__name__, parent=None)
    func(builder)

    func._spec = builder.spec
    func.to_mermaid = lambda: builder.spec.to_mermaid()

    return func

Runtime Module

mycorrhizal.hypha.core.runtime

Hypha DSL - Runtime Layer

Runtime objects that execute the Petri net specification. Manages token flow, transition firing, and asyncio task coordination.

PlaceWrapper

PlaceWrapper(matrix_runtime: MatrixRuntime, place_idx: int)

Wrapper for places in MatrixRuntime to provide NetRuntime-like API.

Source code in src/mycorrhizal/hypha/core/runtime.py
def __init__(self, matrix_runtime: 'MatrixRuntime', place_idx: int):
    self._runtime = matrix_runtime
    self._place_idx = place_idx
    # Compatibility: provide token_added_event like NetRuntime
    self.token_added_event = asyncio.Event()

tokens property

tokens: List[Any]

Get tokens at this place.

add_token

add_token(token: Any)

Add a token to this place.

Source code in src/mycorrhizal/hypha/core/runtime.py
def add_token(self, token: Any):
    """Add a token to this place."""
    self._runtime.add_token(self._place_idx, token)
    # Set the event to notify that a token was added
    self.token_added_event.set()

IncidenceMatrix dataclass

IncidenceMatrix(num_places: int, num_transitions: int, matrix: Dict[int, Dict[int, int]] = dict(), place_to_idx: Dict[Tuple[str, ...], int] = dict(), idx_to_place: Dict[int, Tuple[str, ...]] = dict(), transition_to_idx: Dict[Tuple[str, ...], int] = dict(), idx_to_transition: Dict[int, Tuple[str, ...]] = dict(), input_requirements: Dict[int, Dict[int, int]] = dict(), output_destinations: Dict[int, Dict[int, int]] = dict())

Incidence matrix representation of Petri net structure.

The incidence matrix A represents the net structure: - Rows: places (P rows) - Columns: transitions (T columns) - Entry A[i,j]: net token change at place i when transition j fires - Negative value: tokens consumed from place i - Positive value: tokens produced to place i - Zero: no connection

For a transition with input arcs (consuming tokens) and output arcs (producing tokens), the column shows the net change.

get

get(place_idx: int, trans_idx: int) -> int

Get matrix entry at (place_idx, trans_idx)

Source code in src/mycorrhizal/hypha/core/runtime.py
def get(self, place_idx: int, trans_idx: int) -> int:
    """Get matrix entry at (place_idx, trans_idx)"""
    return self.matrix.get(place_idx, {}).get(trans_idx, 0)

set

set(place_idx: int, trans_idx: int, value: int)

Set matrix entry at (place_idx, trans_idx)

Source code in src/mycorrhizal/hypha/core/runtime.py
def set(self, place_idx: int, trans_idx: int, value: int):
    """Set matrix entry at (place_idx, trans_idx)"""
    if place_idx not in self.matrix:
        self.matrix[place_idx] = {}
    self.matrix[place_idx][trans_idx] = value

add_input

add_input(place_idx: int, trans_idx: int, weight: int)

Add input arc (consumes tokens, so negative entry)

Source code in src/mycorrhizal/hypha/core/runtime.py
def add_input(self, place_idx: int, trans_idx: int, weight: int):
    """Add input arc (consumes tokens, so negative entry)"""
    current = self.get(place_idx, trans_idx)
    self.set(place_idx, trans_idx, current - weight)

    # Track input requirements (sum weights for multiple arcs)
    if trans_idx not in self.input_requirements:
        self.input_requirements[trans_idx] = {}
    if place_idx not in self.input_requirements[trans_idx]:
        self.input_requirements[trans_idx][place_idx] = 0
    self.input_requirements[trans_idx][place_idx] += weight

add_output

add_output(place_idx: int, trans_idx: int, weight: int)

Add output arc (produces tokens, so positive entry)

Source code in src/mycorrhizal/hypha/core/runtime.py
def add_output(self, place_idx: int, trans_idx: int, weight: int):
    """Add output arc (produces tokens, so positive entry)"""
    current = self.get(place_idx, trans_idx)
    self.set(place_idx, trans_idx, current + weight)

    # Track output destinations (sum weights for multiple arcs)
    if trans_idx not in self.output_destinations:
        self.output_destinations[trans_idx] = {}
    if place_idx not in self.output_destinations[trans_idx]:
        self.output_destinations[trans_idx][place_idx] = 0
    self.output_destinations[trans_idx][place_idx] += weight

compute_state_change

compute_state_change(firing_vector: List[int]) -> Dict[int, int]

Compute state change: delta = A @ F

Parameters:

Name Type Description Default
firing_vector List[int]

List of length num_transitions with 0/1 values

required

Returns:

Type Description
Dict[int, int]

Dict mapping place_idx -> net_token_change

Source code in src/mycorrhizal/hypha/core/runtime.py
def compute_state_change(self, firing_vector: List[int]) -> Dict[int, int]:
    """Compute state change: delta = A @ F

    Args:
        firing_vector: List of length num_transitions with 0/1 values

    Returns:
        Dict mapping place_idx -> net_token_change
    """
    state_change: Dict[int, int] = {}

    for place_idx, trans_entries in self.matrix.items():
        delta = 0
        for trans_idx, value in trans_entries.items():
            delta += value * firing_vector[trans_idx]

        if delta != 0:
            state_change[place_idx] = delta

    return state_change

is_enabled

is_enabled(trans_idx: int, marking: Dict[int, int]) -> bool

Check if transition is enabled (has sufficient input tokens).

IMPORTANT: When multiple arcs connect the same place to a transition, we need to check if there are enough tokens to satisfy all arcs. The old runtime allowed "bag semantics" where arcs could consume the same token, but for practical purposes we require distinct tokens per arc.

Parameters:

Name Type Description Default
trans_idx int

Transition index

required
marking Dict[int, int]

Current marking (place_idx -> token_count)

required

Returns:

Type Description
bool

True if transition has all required input tokens

Source code in src/mycorrhizal/hypha/core/runtime.py
def is_enabled(self, trans_idx: int, marking: Dict[int, int]) -> bool:
    """Check if transition is enabled (has sufficient input tokens).

    IMPORTANT: When multiple arcs connect the same place to a transition,
    we need to check if there are enough tokens to satisfy all arcs.
    The old runtime allowed "bag semantics" where arcs could consume the
    same token, but for practical purposes we require distinct tokens per arc.

    Args:
        trans_idx: Transition index
        marking: Current marking (place_idx -> token_count)

    Returns:
        True if transition has all required input tokens
    """
    if trans_idx not in self.input_requirements:
        # No input requirements (generator transition) - always enabled
        return True

    for place_idx, required in self.input_requirements[trans_idx].items():
        # Check if place has enough tokens to satisfy all arcs
        # For bag semantics, we allow the same token to be consumed by multiple arcs,
        # so we only need 1 token minimum. However, to avoid infinite loops when
        # transitions consume more than they produce, we check for required tokens.
        if marking.get(place_idx, 0) < required:
            return False

    return True

get_token_slots

get_token_slots(trans_idx: int) -> List[Tuple[int, int]]

Get token consumption slots for a transition (cached for performance).

When multiple arcs connect the same place to a transition, we need to consume separate tokens (or the same token multiple times for bag semantics).

Returns:

Type Description
List[Tuple[int, int]]

List of (place_idx, slot_index) tuples representing token consumption slots

Source code in src/mycorrhizal/hypha/core/runtime.py
def get_token_slots(self, trans_idx: int) -> List[Tuple[int, int]]:
    """Get token consumption slots for a transition (cached for performance).

    When multiple arcs connect the same place to a transition,
    we need to consume separate tokens (or the same token multiple times for bag semantics).

    Returns:
        List of (place_idx, slot_index) tuples representing token consumption slots
    """
    # Return cached slots if available
    if trans_idx in self._token_slots_cache:
        return self._token_slots_cache[trans_idx]

    if trans_idx not in self.input_requirements:
        return []

    slots = []
    for place_idx, count in self.input_requirements[trans_idx].items():
        for i in range(count):
            slots.append((place_idx, i))

    # Cache for future use
    self._token_slots_cache[trans_idx] = slots
    return slots

TokenEntry dataclass

TokenEntry(token_id: int, data: Any = None)

Represents a token with optional data payload.

TokenRegistry

TokenRegistry()

Registry for tracking token data.

Since incidence matrices track counts, we need a separate registry to map token identifiers to their actual data payloads.

Source code in src/mycorrhizal/hypha/core/runtime.py
def __init__(self):
    self._next_id: int = 0
    self._tokens: Dict[int, Any] = {}

register

register(data: Any) -> int

Register a token and return its ID.

Source code in src/mycorrhizal/hypha/core/runtime.py
def register(self, data: Any) -> int:
    """Register a token and return its ID."""
    token_id = self._next_id
    self._next_id += 1
    self._tokens[token_id] = data
    return token_id

get

get(token_id: int) -> Any

Get token data by ID.

Source code in src/mycorrhizal/hypha/core/runtime.py
def get(self, token_id: int) -> Any:
    """Get token data by ID."""
    return self._tokens.get(token_id)

remove

remove(token_id: int)

Remove a token from registry.

Source code in src/mycorrhizal/hypha/core/runtime.py
def remove(self, token_id: int):
    """Remove a token from registry."""
    if token_id in self._tokens:
        del self._tokens[token_id]

Marking dataclass

Marking(tokens: Dict[int, List[Any]] = dict())

State vector representation of Petri net marking.

For data-carrying tokens, stores token IDs. For simple count places, stores the count directly.

PERFORMANCE: Cached count dict is invalidated on token changes to avoid recomputing on every transition check.

_invalidate_cache

_invalidate_cache()

Invalidate the count cache.

Source code in src/mycorrhizal/hypha/core/runtime.py
def _invalidate_cache(self):
    """Invalidate the count cache."""
    self._cache_valid = False

get_count

get_count(place_idx: int) -> int

Get token count at a place.

Source code in src/mycorrhizal/hypha/core/runtime.py
def get_count(self, place_idx: int) -> int:
    """Get token count at a place."""
    return len(self.tokens.get(place_idx, []))

add_tokens

add_tokens(place_idx: int, tokens: List[Any])

Add tokens to a place.

Source code in src/mycorrhizal/hypha/core/runtime.py
def add_tokens(self, place_idx: int, tokens: List[Any]):
    """Add tokens to a place."""
    if place_idx not in self.tokens:
        self.tokens[place_idx] = []
    self.tokens[place_idx].extend(tokens)
    self._invalidate_cache()

remove_tokens

remove_tokens(place_idx: int, count: int) -> List[Any]

Remove tokens from a place (returns removed tokens).

Source code in src/mycorrhizal/hypha/core/runtime.py
def remove_tokens(self, place_idx: int, count: int) -> List[Any]:
    """Remove tokens from a place (returns removed tokens)."""
    if place_idx not in self.tokens:
        return []

    place_tokens = self.tokens[place_idx]
    removed = place_tokens[:count]
    self.tokens[place_idx] = place_tokens[count:]

    if not self.tokens[place_idx]:
        del self.tokens[place_idx]

    self._invalidate_cache()
    return removed

remove_tokens_fast

remove_tokens_fast(place_idx: int, token_ids: List[Any])

Remove specific tokens from a place (faster O(1) per token).

This is used when we know exactly which tokens to remove. Uses a set-based approach for O(n) total instead of O(n*m).

Source code in src/mycorrhizal/hypha/core/runtime.py
def remove_tokens_fast(self, place_idx: int, token_ids: List[Any]):
    """Remove specific tokens from a place (faster O(1) per token).

    This is used when we know exactly which tokens to remove.
    Uses a set-based approach for O(n) total instead of O(n*m).
    """
    if place_idx not in self.tokens:
        return

    if not token_ids:
        return

    place_tokens = self.tokens[place_idx]

    # Build a set of tokens to remove for O(1) lookup
    to_remove = set(token_ids)

    # Filter out the removed tokens (single pass)
    self.tokens[place_idx] = [t for t in place_tokens if t not in to_remove]

    if not self.tokens[place_idx]:
        del self.tokens[place_idx]

    self._invalidate_cache()

peek_tokens

peek_tokens(place_idx: int, count: int) -> List[Any]

Peek at tokens without removing.

Source code in src/mycorrhizal/hypha/core/runtime.py
def peek_tokens(self, place_idx: int, count: int) -> List[Any]:
    """Peek at tokens without removing."""
    place_tokens = self.tokens.get(place_idx, [])
    return place_tokens[:count]

has_tokens

has_tokens(place_idx: int, count: int) -> bool

Check if place has at least count tokens.

Source code in src/mycorrhizal/hypha/core/runtime.py
def has_tokens(self, place_idx: int, count: int) -> bool:
    """Check if place has at least count tokens."""
    return self.get_count(place_idx) >= count

get_count_dict

get_count_dict() -> Dict[int, int]

Get dict of place_idx -> token_count (cached for performance).

Source code in src/mycorrhizal/hypha/core/runtime.py
def get_count_dict(self) -> Dict[int, int]:
    """Get dict of place_idx -> token_count (cached for performance)."""
    if not self._cache_valid:
        self._cached_count_dict = {idx: len(tokens) for idx, tokens in self.tokens.items()}
        self._cache_valid = True
    return self._cached_count_dict

GuardInfo dataclass

GuardInfo(has_guard: bool = False, guard_spec: Optional[GuardSpec] = None, requires_tokens: bool = False)

Information about a transition's guard.

GuardMatrix

GuardMatrix()

Guard type matrix and evaluation logic.

Tracks which transitions have guards and evaluates them to build the firing vector F.

Source code in src/mycorrhizal/hypha/core/runtime.py
def __init__(self):
    # trans_idx -> GuardInfo
    self.guards: Dict[int, GuardInfo] = {}

set_guard

set_guard(trans_idx: int, guard_spec: Optional[GuardSpec], requires_tokens: bool = False)

Set guard info for a transition.

Source code in src/mycorrhizal/hypha/core/runtime.py
def set_guard(self, trans_idx: int, guard_spec: Optional[GuardSpec], requires_tokens: bool = False):
    """Set guard info for a transition."""
    self.guards[trans_idx] = GuardInfo(
        has_guard=guard_spec is not None,
        guard_spec=guard_spec,
        requires_tokens=requires_tokens,
    )

has_guard

has_guard(trans_idx: int) -> bool

Check if transition has a guard.

Source code in src/mycorrhizal/hypha/core/runtime.py
def has_guard(self, trans_idx: int) -> bool:
    """Check if transition has a guard."""
    return self.guards.get(trans_idx, GuardInfo()).has_guard

evaluate

evaluate(trans_idx: int, marking: Marking, incidence_matrix: IncidenceMatrix, bb: Any, timebase: Any, token_registry: TokenRegistry) -> bool | Any

Evaluate guard for a transition.

Parameters:

Name Type Description Default
trans_idx int

Transition index

required
marking Marking

Current marking

required
incidence_matrix IncidenceMatrix

Incidence matrix for net structure

required
bb Any

Blackboard

required
timebase Any

Timebase

required
token_registry TokenRegistry

Token registry for data lookups

required

Returns:

Type Description
bool | Any

True if guard passes (or no guard), False otherwise

bool | Any

Can also return async generator for async guards (handled by caller)

Source code in src/mycorrhizal/hypha/core/runtime.py
def evaluate(
    self,
    trans_idx: int,
    marking: Marking,
    incidence_matrix: IncidenceMatrix,
    bb: Any,
    timebase: Any,
    token_registry: TokenRegistry,
) -> bool | Any:
    """Evaluate guard for a transition.

    Args:
        trans_idx: Transition index
        marking: Current marking
        incidence_matrix: Incidence matrix for net structure
        bb: Blackboard
        timebase: Timebase
        token_registry: Token registry for data lookups

    Returns:
        True if guard passes (or no guard), False otherwise
        Can also return async generator for async guards (handled by caller)
    """
    guard_info = self.guards.get(trans_idx)
    if not guard_info or not guard_info.has_guard:
        return True

    guard_spec = guard_info.guard_spec
    if guard_spec is None:
        return True

    # Get input places for this transition
    input_places = incidence_matrix.input_requirements.get(trans_idx, {})

    if not input_places:
        # Generator transition - no tokens to evaluate
        combinations = [[]]
    else:
        # Generate token combinations
        arc_tokens = []
        for place_idx, required_count in input_places.items():
            tokens = marking.peek_tokens(place_idx, required_count)
            if len(tokens) < required_count:
                return False  # Not enough tokens

            if required_count == 1:
                arc_tokens.append([(t,) for t in tokens])
            else:
                arc_tokens.append(list(combinations(tokens, required_count)))

        if not arc_tokens:
            return False

        combinations = list(product(*arc_tokens))

    if not combinations:
        return False

    # Evaluate guard
    guard_func = guard_spec.func
    guard_result = guard_func(combinations, bb, timebase)

    # Check if guard passed any combination
    if inspect.isgenerator(guard_result):
        for result in guard_result:
            if result is not None:
                return True
        return False
    elif inspect.isasyncgen(guard_result):
        # Need to handle async guard - return coroutine for caller to await
        return guard_result
    else:
        return guard_result is not None

MatrixRuntime

MatrixRuntime(spec: NetSpec, bb: Any, timebase: Any)

Matrix-based Petri net runtime.

Uses incidence matrix and state vector for execution, eliminating per-transition asyncio overhead.

Execution model: 1. Build firing vector F by evaluating enabled transitions 2. Fire via M_new = M + (A @ F) 3. Process token data flow for fired transitions 4. Repeat

Source code in src/mycorrhizal/hypha/core/runtime.py
def __init__(self, spec: NetSpec, bb: Any, timebase: Any):
    self.spec = spec
    self.bb = bb
    self.timebase = timebase
    self._interface_cache = InterfaceViewCache(maxsize=256)

    # Compatibility layer: places dict accessor
    self._places_wrapper: Dict[Tuple[str, ...], 'PlaceWrapper'] = {}

    # Build matrix representations
    self.incidence_matrix = build_incidence_matrix(spec)
    self.guard_matrix = build_guard_matrix(spec, self.incidence_matrix)

    # State
    self.marking = Marking()
    self.token_registry = TokenRegistry()
    self._stop_event = asyncio.Event()
    self._stop_event_sync = False  # Synchronous stop flag

    # Transition specs for execution
    self._transition_specs: Dict[int, TransitionSpec] = {}
    self._transition_states: Dict[int, Any] = {}

    # Delay tracking
    self._enabled_times: Dict[int, float] = {}

    # Place specs for IO handling (initialize early for async detection)
    self._place_specs: Dict[int, PlaceSpec] = {}

    # Collect transition specs and place specs first (needed for async detection)
    transitions: Dict[Tuple[str, ...], TransitionSpec] = {}
    places: Dict[Tuple[str, ...], PlaceSpec] = {}

    def collect_spec(net_spec: NetSpec):
        for trans_name, trans_spec in net_spec.transitions.items():
            transitions[tuple(net_spec.get_parts(trans_name))] = trans_spec
        for place_name, place_spec in net_spec.places.items():
            places[tuple(net_spec.get_parts(place_name))] = place_spec
        for subnet_spec in net_spec.subnets.values():
            collect_spec(subnet_spec)

    collect_spec(spec)

    # Populate transition specs
    for trans_fqn, trans_spec in transitions.items():
        trans_idx = self.incidence_matrix.transition_to_idx.get(trans_fqn)
        if trans_idx is not None:
            self._transition_specs[trans_idx] = trans_spec
            if trans_spec.state_factory:
                self._transition_states[trans_idx] = trans_spec.state_factory()

    # Populate place specs (done before async detection)
    for place_fqn, place_spec in places.items():
        place_idx = self.incidence_matrix.place_to_idx.get(place_fqn)
        if place_idx is not None:
            self._place_specs[place_idx] = place_spec

    # Detect if net has async features (IO places, async transitions/guards)
    self._has_async_features = self._detect_async_features()

    # Build places wrapper for compatibility
    for place_fqn in self.incidence_matrix.place_to_idx.keys():
        place_idx = self.incidence_matrix.place_to_idx[place_fqn]
        self._places_wrapper[place_fqn] = PlaceWrapper(self, place_idx)

    # IO tasks
    self._io_tasks: List[asyncio.Task] = []
    self._run_task: Optional[asyncio.Task] = None

places property

places: Dict[Tuple[str, ...], PlaceWrapper]

Get places dict (compatibility layer with NetRuntime).

transitions property

transitions: Dict[str, Any]

Get transitions dict (compatibility layer with NetRuntime).

Note: This returns a simplified view for compatibility with mycelium bridge. The actual transition data is stored in _transition_specs.

_detect_async_features

_detect_async_features() -> bool

Detect if the net has async features that require async mode.

Returns:

Type Description
bool

True if net has IO places, async transitions, or async guards

Source code in src/mycorrhizal/hypha/core/runtime.py
def _detect_async_features(self) -> bool:
    """Detect if the net has async features that require async mode.

    Returns:
        True if net has IO places, async transitions, or async guards
    """
    # Check for IO places
    for place_spec in self._place_specs.values():
        if place_spec.is_io_input or place_spec.is_io_output:
            return True

    # Check for async transitions or guards
    for trans_spec in self._transition_specs.values():
        # Check if transition handler is async
        if inspect.iscoroutinefunction(trans_spec.handler):
            return True

        # Check if guard is async
        if trans_spec.guard and inspect.isasyncgenfunction(trans_spec.guard.func):
            return True

    return False

get_cache_stats

get_cache_stats() -> Dict[str, Any]

Get cache statistics for monitoring and debugging.

Returns:

Type Description
Dict[str, Any]

Dict with current cache size, maxsize, and hit/miss counts

Source code in src/mycorrhizal/hypha/core/runtime.py
def get_cache_stats(self) -> Dict[str, Any]:
    """
    Get cache statistics for monitoring and debugging.

    Returns:
        Dict with current cache size, maxsize, and hit/miss counts
    """
    return self._interface_cache.get_stats()

_create_interface_view_if_needed

_create_interface_view_if_needed(bb: Any, handler: Callable) -> Any

Create constrained view if handler has interface type hint.

Source code in src/mycorrhizal/hypha/core/runtime.py
def _create_interface_view_if_needed(self, bb: Any, handler: Callable) -> Any:
    """Create constrained view if handler has interface type hint."""
    metadata = _get_compiled_metadata(handler)

    if metadata.has_interface and metadata.interface_type:
        readonly_fields = tuple(metadata.readonly_fields) if metadata.readonly_fields else None

        return self._interface_cache.get_or_create(
            bb_id=id(bb),
            interface_type=metadata.interface_type,
            readonly_fields=readonly_fields,
            creator_func=lambda: create_view_from_protocol(
                bb,
                metadata.interface_type,
                readonly_fields=metadata.readonly_fields
            )
        )

    return bb

_start_io_input async

_start_io_input()

Start IO input places.

Source code in src/mycorrhizal/hypha/core/runtime.py
async def _start_io_input(self):
    """Start IO input places."""
    for place_idx, place_spec in self._place_specs.items():
        if place_spec.is_io_input and place_spec.handler:
            task = asyncio.create_task(self._io_input_loop(place_idx, place_spec))
            self._io_tasks.append(task)

_io_input_loop async

_io_input_loop(place_idx: int, place_spec: PlaceSpec)

IO input loop for a single place.

Source code in src/mycorrhizal/hypha/core/runtime.py
async def _io_input_loop(self, place_idx: int, place_spec: PlaceSpec):
    """IO input loop for a single place."""
    try:
        handler = place_spec.handler
        if handler is None:
            return

        bb_to_pass = self._create_interface_view_if_needed(self.bb, handler)

        sig = inspect.signature(handler)
        if len(sig.parameters) == 2:
            gen = handler(bb_to_pass, self.timebase)
        else:
            gen = handler()  # type: ignore[call-arg]

        async for token in gen:
            token_id = self.token_registry.register(token)
            self.marking.add_tokens(place_idx, [token_id])
    except asyncio.CancelledError:
        pass

_handle_io_output async

_handle_io_output(place_idx: int, token: Any)

Handle IO output for a token.

Source code in src/mycorrhizal/hypha/core/runtime.py
async def _handle_io_output(self, place_idx: int, token: Any):
    """Handle IO output for a token."""
    place_spec = self._place_specs.get(place_idx)
    if place_spec and place_spec.is_io_output and place_spec.handler:
        bb_to_pass = self._create_interface_view_if_needed(self.bb, place_spec.handler)

        sig = inspect.signature(place_spec.handler)
        if len(sig.parameters) == 3:
            await place_spec.handler(token, bb_to_pass, self.timebase)
        else:
            await place_spec.handler(token)

add_token

add_token(place_idx: int, token: Any)

Add a token to a place.

Source code in src/mycorrhizal/hypha/core/runtime.py
def add_token(self, place_idx: int, token: Any):
    """Add a token to a place."""
    if isinstance(token, int) and token in self.token_registry._tokens:
        # Already a token ID
        token_id = token
    else:
        token_id = self.token_registry.register(token)
    self.marking.add_tokens(place_idx, [token_id])

add_token_by_fqn

add_token_by_fqn(place_fqn: Tuple[str, ...], token: Any)

Add a token by place FQN.

Source code in src/mycorrhizal/hypha/core/runtime.py
def add_token_by_fqn(self, place_fqn: Tuple[str, ...], token: Any):
    """Add a token by place FQN."""
    place_idx = self.incidence_matrix.place_to_idx.get(place_fqn)
    if place_idx is not None:
        self.add_token(place_idx, token)

get_token_count_by_fqn

get_token_count_by_fqn(place_fqn: Tuple[str, ...]) -> int

Get token count by place FQN.

Source code in src/mycorrhizal/hypha/core/runtime.py
def get_token_count_by_fqn(self, place_fqn: Tuple[str, ...]) -> int:
    """Get token count by place FQN."""
    place_idx = self.incidence_matrix.place_to_idx.get(place_fqn)
    if place_idx is not None:
        return self.marking.get_count(place_idx)
    return 0

_build_firing_vector async

_build_firing_vector() -> List[int]

Build firing vector F by evaluating enabled transitions.

Returns:

Type Description
List[int]

List of 0/1 values indicating which transitions fire

Source code in src/mycorrhizal/hypha/core/runtime.py
async def _build_firing_vector(self) -> List[int]:
    """Build firing vector F by evaluating enabled transitions.

    Returns:
        List of 0/1 values indicating which transitions fire
    """
    firing_vector = [0] * self.incidence_matrix.num_transitions

    # Get count dict once per cycle (cached in Marking)
    count_dict = self.marking.get_count_dict()

    for trans_idx in range(self.incidence_matrix.num_transitions):
        # Check if enabled (has sufficient tokens)
        if not self.incidence_matrix.is_enabled(trans_idx, count_dict):
            self._enabled_times.pop(trans_idx, None)
            continue

        # Track enabled time for delay support
        trans_spec = self._transition_specs.get(trans_idx)
        if trans_spec and trans_spec.delay > 0:
            if trans_idx not in self._enabled_times:
                self._enabled_times[trans_idx] = self.timebase.now()

            # Check if delay has elapsed
            enabled_time = self._enabled_times.get(trans_idx)
            if enabled_time is None:
                continue

            elapsed = self.timebase.now() - enabled_time
            if elapsed < trans_spec.delay:
                continue

        # Evaluate guard
        guard_result = self.guard_matrix.evaluate(
            trans_idx,
            self.marking,
            self.incidence_matrix,
            self.bb,
            self.timebase,
            self.token_registry,
        )

        # Handle async guard
        if inspect.isasyncgen(guard_result):
            guard_passed = False
            async for result in guard_result:
                if result is not None:
                    guard_passed = True
                    break
            if not guard_passed:
                continue
        elif isinstance(guard_result, bool) and not guard_result:
            continue
        elif not guard_result:
            continue

        # Transition fires!
        firing_vector[trans_idx] = 1

    return firing_vector

_execute_transition async

_execute_transition(trans_idx: int, input_tokens: List[Any])

Execute a transition with input tokens.

Parameters:

Name Type Description Default
trans_idx int

Transition index

required
input_tokens List[Any]

List of consumed token data

required
Source code in src/mycorrhizal/hypha/core/runtime.py
async def _execute_transition(self, trans_idx: int, input_tokens: List[Any]):
    """Execute a transition with input tokens.

    Args:
        trans_idx: Transition index
        input_tokens: List of consumed token data
    """
    trans_spec = self._transition_specs.get(trans_idx)
    if not trans_spec:
        return

    # Create interface view if needed
    bb_to_pass = self._create_interface_view_if_needed(self.bb, trans_spec.handler)

    # Execute transition
    trans_state = self._transition_states.get(trans_idx)

    if trans_state is not None:
        logger.debug("[matrix_fire] trans=%s consumed=%s",
                    self.incidence_matrix.idx_to_transition[trans_idx], input_tokens)
        results = trans_spec.handler(input_tokens, bb_to_pass, self.timebase, trans_state)
    else:
        logger.debug("[matrix_fire] trans=%s consumed=%s",
                    self.incidence_matrix.idx_to_transition[trans_idx], input_tokens)
        results = trans_spec.handler(input_tokens, bb_to_pass, self.timebase)

    # Process results
    if inspect.isasyncgen(results):
        async for yielded in results:
            await self._process_yield(trans_idx, yielded)
    elif inspect.iscoroutine(results):
        result = await results
        if result is not None:
            await self._process_yield(trans_idx, result)
    else:
        await self._process_yield(trans_idx, results)

_process_yield async

_process_yield(trans_idx: int, yielded)

Process yielded output from transition.

Source code in src/mycorrhizal/hypha/core/runtime.py
async def _process_yield(self, trans_idx: int, yielded):
    """Process yielded output from transition."""
    # Get output destinations for this transition
    output_dests = self.incidence_matrix.output_destinations.get(trans_idx, {})

    if isinstance(yielded, dict):
        await self._process_dict_yield(trans_idx, yielded, output_dests)
    else:
        await self._process_single_yield(trans_idx, yielded, output_dests)

_process_dict_yield async

_process_dict_yield(trans_idx: int, yielded: dict, output_dests: Dict[int, int])

Process dictionary yield.

Source code in src/mycorrhizal/hypha/core/runtime.py
async def _process_dict_yield(self, trans_idx: int, yielded: dict, output_dests: Dict[int, int]):
    """Process dictionary yield."""
    wildcard_token = yielded.get('*')
    explicit_targets = set()

    for key, token in yielded.items():
        if key == '*':
            continue

        # Resolve place reference
        place_idx = self._resolve_place_ref(key, output_dests, trans_idx)
        if place_idx is not None:
            explicit_targets.add(place_idx)
            await self._add_token_to_place(trans_idx, place_idx, token)

    if wildcard_token is not None:
        await self._expand_wildcard_to_outputs(trans_idx, wildcard_token, explicit_targets, output_dests)

_process_single_yield async

_process_single_yield(trans_idx: int, yielded, output_dests: Dict[int, int])

Process single (place_ref, token) yield.

Source code in src/mycorrhizal/hypha/core/runtime.py
async def _process_single_yield(self, trans_idx: int, yielded, output_dests: Dict[int, int]):
    """Process single (place_ref, token) yield."""
    if isinstance(yielded, tuple) and len(yielded) == 2:
        place_ref, token = yielded
        place_idx = self._resolve_place_ref(place_ref, output_dests, trans_idx)
        if place_idx is not None:
            await self._add_token_to_place(trans_idx, place_idx, token)

_resolve_place_ref

_resolve_place_ref(place_ref, output_dests: Dict[int, int], trans_idx: int) -> Optional[int]

Resolve a place reference to a place index.

Uses parent-relative mapping for subnet instances, similar to NetRuntime.

Resolution strategies: 1. Exact match using PlaceRef.get_parts() 2. Parent-relative mapping using transition's FQN context

Source code in src/mycorrhizal/hypha/core/runtime.py
def _resolve_place_ref(self, place_ref, output_dests: Dict[int, int], trans_idx: int) -> Optional[int]:
    """Resolve a place reference to a place index.

    Uses parent-relative mapping for subnet instances, similar to NetRuntime.

    Resolution strategies:
    1. Exact match using PlaceRef.get_parts()
    2. Parent-relative mapping using transition's FQN context
    """
    # Extract local_name from place_ref
    if hasattr(place_ref, 'local_name'):
        local_name = place_ref.local_name
    elif isinstance(place_ref, str):
        local_name = place_ref.split('.')[-1]
    else:
        local_name = str(place_ref)

    # Strategy 1: Exact match
    if hasattr(place_ref, 'get_parts'):
        parts = tuple(place_ref.get_parts())
        place_idx = self.incidence_matrix.place_to_idx.get(parts)
        if place_idx is not None:
            return place_idx

    # Strategy 2: Parent-relative mapping (for subnet instances)
    # Get the transition's FQN and extract parent prefix
    trans_fqn = self.incidence_matrix.idx_to_transition.get(trans_idx)
    if trans_fqn and len(trans_fqn) > 1:
        parent_prefix = trans_fqn[:-1]  # Everything except the transition name
        candidate = tuple(list(parent_prefix) + [local_name])
        place_idx = self.incidence_matrix.place_to_idx.get(candidate)
        if place_idx is not None:
            return place_idx

    # Strategy 3: Try as direct string
    if isinstance(place_ref, str):
        parts = tuple(place_ref.split('.'))
        place_idx = self.incidence_matrix.place_to_idx.get(parts)
        if place_idx is not None:
            return place_idx

    # Strategy 4: Handle direct place indices
    if isinstance(place_ref, int) and place_idx in output_dests:
        return place_idx

    return None

_add_token_to_place async

_add_token_to_place(trans_idx: int, place_idx: int, token: Any)

Add a token to a place, handling IO output.

Source code in src/mycorrhizal/hypha/core/runtime.py
async def _add_token_to_place(self, trans_idx: int, place_idx: int, token: Any):
    """Add a token to a place, handling IO output."""
    place_spec = self._place_specs.get(place_idx)

    if place_spec and place_spec.is_io_output:
        await self._handle_io_output(place_idx, token)
    else:
        self.add_token(place_idx, token)

_expand_wildcard_to_outputs async

_expand_wildcard_to_outputs(trans_idx: int, wildcard_token: Any, explicit_targets: set, output_dests: Dict[int, int])

Expand wildcard token to all output places.

Source code in src/mycorrhizal/hypha/core/runtime.py
async def _expand_wildcard_to_outputs(
    self, trans_idx: int, wildcard_token: Any, explicit_targets: set, output_dests: Dict[int, int]
):
    """Expand wildcard token to all output places."""
    for place_idx in output_dests.keys():
        if place_idx not in explicit_targets:
            await self._add_token_to_place(trans_idx, place_idx, wildcard_token)

_fire_transitions async

_fire_transitions(firing_vector: List[int])

Fire transitions using matrix multiplication and process data flow.

This is the CRITICAL method: state transformation is ALWAYS via matmul.

CRITICAL: When multiple transitions compete for the same input place, we must fire them sequentially (each consumes its tokens before the next fires) to ensure each token is consumed by exactly one transition per cycle.

Parameters:

Name Type Description Default
firing_vector List[int]

Binary vector indicating which transitions fire

required
Source code in src/mycorrhizal/hypha/core/runtime.py
async def _fire_transitions(self, firing_vector: List[int]):
    """Fire transitions using matrix multiplication and process data flow.

    This is the CRITICAL method: state transformation is ALWAYS via matmul.

    CRITICAL: When multiple transitions compete for the same input place, we must
    fire them sequentially (each consumes its tokens before the next fires) to ensure
    each token is consumed by exactly one transition per cycle.

    Args:
        firing_vector: Binary vector indicating which transitions fire
    """
    # Step 1: Compute state change via matrix multiplication
    # M_new = M + (A @ F)
    # Note: We compute this for theoretical completeness, but actual state
    # changes happen through token data flow in Step 2
    _state_change = self.incidence_matrix.compute_state_change(firing_vector)

    # Step 2: Fire transitions one at a time, removing tokens immediately
    # This prevents multiple transitions from consuming the same token
    for trans_idx in range(len(firing_vector)):
        if firing_vector[trans_idx] == 0:
            continue

        # Get token consumption slots (handles multiple arcs from same place)
        token_slots = self.incidence_matrix.get_token_slots(trans_idx)
        if not token_slots:
            # No input requirements (generator transition)
            consumed_data = []
            await self._execute_transition(trans_idx, consumed_data)
            self._enabled_times.pop(trans_idx, None)
            continue

        # Collect input tokens by peeking (not removing yet)
        # Group by place to avoid duplicates when multiple slots from same place
        consumed_by_place: Dict[int, List[int]] = {}
        for place_idx, slot_idx in token_slots:
            if place_idx not in consumed_by_place:
                # Peek at tokens from this place
                count = self.incidence_matrix.input_requirements[trans_idx][place_idx]
                tokens = self.marking.peek_tokens(place_idx, count)
                if len(tokens) < count:
                    # Not enough tokens - skip this transition
                    # (can happen when another transition in same cycle consumed them)
                    consumed_by_place = {}
                    break
                consumed_by_place[place_idx] = tokens

        # Check if we got all required tokens
        if not consumed_by_place:
            self._enabled_times.pop(trans_idx, None)
            continue

        # Flatten consumed tokens (including duplicates for multiple arcs from same place)
        consumed = []
        for place_idx, slot_idx in token_slots:
            if place_idx in consumed_by_place and consumed_by_place[place_idx]:
                # Take the next token from this place (repeating if needed for multiple slots)
                token_idx = slot_idx % len(consumed_by_place[place_idx])
                consumed.append(consumed_by_place[place_idx][token_idx])

        # Remove consumed tokens IMMEDIATELY before executing
        # This ensures subsequent transitions don't see these tokens
        # Use fast removal for each place
        for place_idx, count in self.incidence_matrix.input_requirements.get(trans_idx, {}).items():
            if place_idx in consumed_by_place:
                # Get the tokens from this place (consumed_by_place has the original order)
                place_tokens_to_remove = consumed_by_place[place_idx][:count]
                # Use optimized removal
                self.marking.remove_tokens_fast(place_idx, place_tokens_to_remove)

        # Convert token IDs to data
        consumed_data = [self.token_registry.get(tid) for tid in consumed]

        # Execute transition
        await self._execute_transition(trans_idx, consumed_data)

        # Clear enabled time after firing
        self._enabled_times.pop(trans_idx, None)

run_cycle async

run_cycle()

Run a single execution cycle.

Source code in src/mycorrhizal/hypha/core/runtime.py
async def run_cycle(self):
    """Run a single execution cycle."""
    # Build firing vector
    firing_vector = await self._build_firing_vector()

    # Fire transitions (matmul + data flow)
    await self._fire_transitions(firing_vector)

    # Return whether any transitions fired
    return any(firing_vector)

run async

run()

Main execution loop.

Source code in src/mycorrhizal/hypha/core/runtime.py
async def run(self):
    """Main execution loop."""
    await self._start_io_input()

    try:
        cycle_timeout = 30.0  # Safety timeout for single cycle (seconds)

        # For pure computational nets (no async features), use minimal sleeps
        use_fast_loop = not self._has_async_features

        while not self._stop_event.is_set():
            # Record cycle start time BEFORE running the cycle
            cycle_start = time.perf_counter()

            fired = await self.run_cycle()

            # Check for hard loop (single cycle taking too long)
            cycle_elapsed = time.perf_counter() - cycle_start
            if cycle_elapsed > cycle_timeout:
                logger.warning(" Cycle timeout detected (%.2fs), stopping to prevent hard loop",
                             cycle_elapsed)
                break
            await asyncio.sleep(1e-10)

    except asyncio.CancelledError:
        logger.debug(" Execution cancelled")
        pass
    except Exception as e:
        logger.error(" Error in execution loop: %s", e)
        raise
    finally:
        # Cleanup IO tasks - cancel all and wait for completion
        logger.debug(" Cleaning up %d IO tasks", len(self._io_tasks))
        for task in self._io_tasks:
            if not task.done():
                task.cancel()

        # Wait for all tasks to complete (with return_exceptions to avoid CancelledError propagation)
        if self._io_tasks:
            await asyncio.gather(*self._io_tasks, return_exceptions=True)

        logger.debug(" Cleanup complete")

start async

start()

Start the runtime.

Source code in src/mycorrhizal/hypha/core/runtime.py
async def start(self):
    """Start the runtime."""
    self._stop_event.clear()
    # Create the run task
    task = asyncio.create_task(self.run())
    # Give the run loop a chance to start by yielding control
    await asyncio.sleep(0)
    # Store the task so we can cancel it later if needed
    self._run_task = task

stop async

stop(timeout: float = 5.0)

Stop the runtime.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for graceful shutdown (seconds)

5.0
Source code in src/mycorrhizal/hypha/core/runtime.py
async def stop(self, timeout: float = 5.0):
    """Stop the runtime.

    Args:
        timeout: Maximum time to wait for graceful shutdown (seconds)
    """
    logger.debug(" Stopping runtime (timeout=%.2fs)", timeout)

    # Set stop event to signal run loop to exit
    self._stop_event.set()

    # Wait for the run task to finish (with timeout)
    # The run() method handles IO task cleanup in its finally block
    if self._run_task and not self._run_task.done():
        try:
            await asyncio.wait_for(self._run_task, timeout=timeout)
            logger.debug(" Run task completed gracefully")
        except asyncio.TimeoutError:
            logger.warning(" Run task did not complete in %.2fs, cancelling", timeout)
            self._run_task.cancel()
            try:
                await self._run_task
                logger.debug(" Run task cancelled successfully")
            except asyncio.CancelledError:
                logger.debug(" Run task cancelled with CancelledError")
            except Exception as e:
                logger.error(" Error cancelling run task: %s", e)
        except Exception as e:
            logger.error(" Error waiting for run task: %s", e)

    logger.debug(" Stop complete")

_build_firing_vector_sync

_build_firing_vector_sync() -> List[int]

Build firing vector F by evaluating enabled transitions (synchronous).

Returns:

Type Description
List[int]

List of 0/1 values indicating which transitions fire

Source code in src/mycorrhizal/hypha/core/runtime.py
def _build_firing_vector_sync(self) -> List[int]:
    """Build firing vector F by evaluating enabled transitions (synchronous).

    Returns:
        List of 0/1 values indicating which transitions fire
    """
    firing_vector = [0] * self.incidence_matrix.num_transitions

    # Get count dict once per cycle (cached in Marking)
    count_dict = self.marking.get_count_dict()

    for trans_idx in range(self.incidence_matrix.num_transitions):
        # Check if enabled (has sufficient tokens)
        if not self.incidence_matrix.is_enabled(trans_idx, count_dict):
            self._enabled_times.pop(trans_idx, None)
            continue

        # Track enabled time for delay support
        trans_spec = self._transition_specs.get(trans_idx)
        if trans_spec and trans_spec.delay > 0:
            if trans_idx not in self._enabled_times:
                self._enabled_times[trans_idx] = self.timebase.now()

            # Check if delay has elapsed
            enabled_time = self._enabled_times.get(trans_idx)
            if enabled_time is None:
                continue

            elapsed = self.timebase.now() - enabled_time
            if elapsed < trans_spec.delay:
                continue

        # Evaluate guard (synchronous only - async guards not supported in sync mode)
        guard_result = self.guard_matrix.evaluate(
            trans_idx,
            self.marking,
            self.incidence_matrix,
            self.bb,
            self.timebase,
            self.token_registry,
        )

        # Skip async guards in sync mode
        if inspect.isasyncgen(guard_result):
            continue
        elif isinstance(guard_result, bool) and not guard_result:
            continue
        elif not guard_result:
            continue

        # Transition fires!
        firing_vector[trans_idx] = 1

    return firing_vector

_execute_transition_sync

_execute_transition_sync(trans_idx: int, input_tokens: List[Any])

Execute a transition with input tokens (synchronous).

Parameters:

Name Type Description Default
trans_idx int

Transition index

required
input_tokens List[Any]

List of consumed token data

required
Source code in src/mycorrhizal/hypha/core/runtime.py
def _execute_transition_sync(self, trans_idx: int, input_tokens: List[Any]):
    """Execute a transition with input tokens (synchronous).

    Args:
        trans_idx: Transition index
        input_tokens: List of consumed token data
    """
    trans_spec = self._transition_specs.get(trans_idx)
    if not trans_spec:
        return

    # Skip async transitions in sync mode
    if inspect.iscoroutinefunction(trans_spec.handler):
        return

    # Create interface view if needed
    bb_to_pass = self._create_interface_view_if_needed(self.bb, trans_spec.handler)

    # Execute transition
    trans_state = self._transition_states.get(trans_idx)

    if trans_state is not None:
        logger.debug("[matrix_fire_sync] trans=%s consumed=%s",
                    self.incidence_matrix.idx_to_transition[trans_idx], input_tokens)
        results = trans_spec.handler(input_tokens, bb_to_pass, self.timebase, trans_state)
    else:
        logger.debug("[matrix_fire_sync] trans=%s consumed=%s",
                    self.incidence_matrix.idx_to_transition[trans_idx], input_tokens)
        results = trans_spec.handler(input_tokens, bb_to_pass, self.timebase)

    # Process results (synchronous generators only)
    if inspect.isgenerator(results):
        for yielded in results:
            self._process_yield_sync(trans_idx, yielded)
    elif results is not None:
        self._process_yield_sync(trans_idx, results)

_process_yield_sync

_process_yield_sync(trans_idx: int, yielded)

Process yielded output from transition (synchronous).

Source code in src/mycorrhizal/hypha/core/runtime.py
def _process_yield_sync(self, trans_idx: int, yielded):
    """Process yielded output from transition (synchronous)."""
    # Get output destinations for this transition
    output_dests = self.incidence_matrix.output_destinations.get(trans_idx, {})

    if isinstance(yielded, dict):
        self._process_dict_yield_sync(trans_idx, yielded, output_dests)
    else:
        self._process_single_yield_sync(trans_idx, yielded, output_dests)

_process_dict_yield_sync

_process_dict_yield_sync(trans_idx: int, yielded: dict, output_dests: Dict[int, int])

Process dictionary yield (synchronous).

Source code in src/mycorrhizal/hypha/core/runtime.py
def _process_dict_yield_sync(self, trans_idx: int, yielded: dict, output_dests: Dict[int, int]):
    """Process dictionary yield (synchronous)."""
    wildcard_token = yielded.get('*')
    explicit_targets = set()

    for key, token in yielded.items():
        if key == '*':
            continue

        # Resolve place reference
        place_idx = self._resolve_place_ref(key, output_dests, trans_idx)
        if place_idx is not None:
            explicit_targets.add(place_idx)
            self._add_token_to_place_sync(trans_idx, place_idx, token)

    if wildcard_token is not None:
        self._expand_wildcard_to_outputs_sync(trans_idx, wildcard_token, explicit_targets, output_dests)

_process_single_yield_sync

_process_single_yield_sync(trans_idx: int, yielded, output_dests: Dict[int, int])

Process single (place_ref, token) yield (synchronous).

Source code in src/mycorrhizal/hypha/core/runtime.py
def _process_single_yield_sync(self, trans_idx: int, yielded, output_dests: Dict[int, int]):
    """Process single (place_ref, token) yield (synchronous)."""
    if isinstance(yielded, tuple) and len(yielded) == 2:
        place_ref, token = yielded
        place_idx = self._resolve_place_ref(place_ref, output_dests, trans_idx)
        if place_idx is not None:
            self._add_token_to_place_sync(trans_idx, place_idx, token)

_add_token_to_place_sync

_add_token_to_place_sync(trans_idx: int, place_idx: int, token: Any)

Add a token to a place, handling IO output (synchronous).

Note: IO output places are not supported in sync mode - tokens are just added.

Source code in src/mycorrhizal/hypha/core/runtime.py
def _add_token_to_place_sync(self, trans_idx: int, place_idx: int, token: Any):
    """Add a token to a place, handling IO output (synchronous).

    Note: IO output places are not supported in sync mode - tokens are just added.
    """
    # Skip IO output in sync mode (just add token to marking)
    self.add_token(place_idx, token)

_expand_wildcard_to_outputs_sync

_expand_wildcard_to_outputs_sync(trans_idx: int, wildcard_token: Any, explicit_targets: set, output_dests: Dict[int, int])

Expand wildcard token to all output places (synchronous).

Source code in src/mycorrhizal/hypha/core/runtime.py
def _expand_wildcard_to_outputs_sync(
    self, trans_idx: int, wildcard_token: Any, explicit_targets: set, output_dests: Dict[int, int]
):
    """Expand wildcard token to all output places (synchronous)."""
    for place_idx in output_dests.keys():
        if place_idx not in explicit_targets:
            self._add_token_to_place_sync(trans_idx, place_idx, wildcard_token)

_fire_transitions_sync

_fire_transitions_sync(firing_vector: List[int])

Fire transitions using matrix multiplication and process data flow (synchronous).

This is the synchronous version of _fire_transitions for pure computational nets.

CRITICAL: When multiple transitions compete for the same input place, we must fire them sequentially (each consumes its tokens before the next fires) to ensure each token is consumed by exactly one transition per cycle.

Parameters:

Name Type Description Default
firing_vector List[int]

Binary vector indicating which transitions fire

required
Source code in src/mycorrhizal/hypha/core/runtime.py
def _fire_transitions_sync(self, firing_vector: List[int]):
    """Fire transitions using matrix multiplication and process data flow (synchronous).

    This is the synchronous version of _fire_transitions for pure computational nets.

    CRITICAL: When multiple transitions compete for the same input place, we must
    fire them sequentially (each consumes its tokens before the next fires) to ensure
    each token is consumed by exactly one transition per cycle.

    Args:
        firing_vector: Binary vector indicating which transitions fire
    """
    # Step 1: Compute state change via matrix multiplication
    # M_new = M + (A @ F)
    _state_change = self.incidence_matrix.compute_state_change(firing_vector)

    # Step 2: Fire transitions one at a time, removing tokens immediately
    # This prevents multiple transitions from consuming the same token
    for trans_idx in range(len(firing_vector)):
        if firing_vector[trans_idx] == 0:
            continue

        # Get token consumption slots (handles multiple arcs from same place)
        token_slots = self.incidence_matrix.get_token_slots(trans_idx)
        if not token_slots:
            # No input requirements (generator transition)
            consumed_data = []
            self._execute_transition_sync(trans_idx, consumed_data)
            self._enabled_times.pop(trans_idx, None)
            continue

        # Collect input tokens by peeking (not removing yet)
        # Group by place to avoid duplicates when multiple slots from same place
        consumed_by_place: Dict[int, List[int]] = {}
        for place_idx, slot_idx in token_slots:
            if place_idx not in consumed_by_place:
                # Peek at tokens from this place
                count = self.incidence_matrix.input_requirements[trans_idx][place_idx]
                tokens = self.marking.peek_tokens(place_idx, count)
                if len(tokens) < count:
                    # Not enough tokens - skip this transition
                    # (can happen when another transition in same cycle consumed them)
                    consumed_by_place = {}
                    break
                consumed_by_place[place_idx] = tokens

        # Check if we got all required tokens
        if not consumed_by_place:
            self._enabled_times.pop(trans_idx, None)
            continue

        # Flatten consumed tokens (including duplicates for multiple arcs from same place)
        consumed = []
        for place_idx, slot_idx in token_slots:
            if place_idx in consumed_by_place and consumed_by_place[place_idx]:
                # Take the next token from this place (repeating if needed for multiple slots)
                token_idx = slot_idx % len(consumed_by_place[place_idx])
                consumed.append(consumed_by_place[place_idx][token_idx])

        # Remove consumed tokens IMMEDIATELY before executing
        # This ensures subsequent transitions don't see these tokens
        # Use fast removal for each place
        for place_idx, count in self.incidence_matrix.input_requirements.get(trans_idx, {}).items():
            if place_idx in consumed_by_place:
                # Get the tokens from this place (consumed_by_place has the original order)
                place_tokens_to_remove = consumed_by_place[place_idx][:count]
                # Use optimized removal
                self.marking.remove_tokens_fast(place_idx, place_tokens_to_remove)

        # Convert token IDs to data
        consumed_data = [self.token_registry.get(tid) for tid in consumed]

        # Execute transition
        self._execute_transition_sync(trans_idx, consumed_data)

        # Clear enabled time after firing
        self._enabled_times.pop(trans_idx, None)

run_cycle_sync

run_cycle_sync() -> bool

Run a single execution cycle (synchronous).

Returns:

Type Description
bool

True if any transitions fired, False otherwise

Source code in src/mycorrhizal/hypha/core/runtime.py
def run_cycle_sync(self) -> bool:
    """Run a single execution cycle (synchronous).

    Returns:
        True if any transitions fired, False otherwise
    """
    # Build firing vector
    firing_vector = self._build_firing_vector_sync()

    # Fire transitions (matmul + data flow)
    self._fire_transitions_sync(firing_vector)

    # Return whether any transitions fired
    return any(firing_vector)

run_sync

run_sync(max_cycles: int = 100000)

Run the net to completion or max cycles (synchronous execution).

This is the high-performance synchronous execution path for pure computational nets. It avoids all asyncio overhead.

Parameters:

Name Type Description Default
max_cycles int

Maximum number of cycles to execute (prevents infinite loops)

100000

Returns:

Type Description

Number of cycles executed

Raises:

Type Description
RuntimeError

If net has async features (use async mode instead)

Source code in src/mycorrhizal/hypha/core/runtime.py
def run_sync(self, max_cycles: int = 100000):
    """Run the net to completion or max cycles (synchronous execution).

    This is the high-performance synchronous execution path for pure
    computational nets. It avoids all asyncio overhead.

    Args:
        max_cycles: Maximum number of cycles to execute (prevents infinite loops)

    Returns:
        Number of cycles executed

    Raises:
        RuntimeError: If net has async features (use async mode instead)
    """
    if self._has_async_features:
        raise RuntimeError(
            "Net has async features (IO places, async transitions/guards). "
            "Use async mode (run/start) instead."
        )

    cycles = 0
    start_time = time.perf_counter()
    cycle_timeout = 10.0  # Safety timeout for single cycle (seconds)

    for cycles in range(max_cycles):
        # Check stop flag at start of each cycle
        if self._stop_event_sync:
            logger.debug("[run_sync] Stopped by stop flag after %d cycles", cycles)
            break

        # Record cycle start time BEFORE running the cycle
        cycle_start = time.perf_counter()

        # Run one cycle
        fired = self.run_cycle_sync()

        # Check if single cycle is taking too long (possible hard loop)
        cycle_elapsed = time.perf_counter() - cycle_start
        if cycle_elapsed > cycle_timeout:
            logger.warning("[run_sync] Cycle timeout after %d cycles (single cycle took %.2fs)",
                         cycles, cycle_elapsed)
            break

        # If no transitions fired, we're done
        if not fired:
            break

        # Check stop flag again after firing (in case a transition set it)
        if self._stop_event_sync:
            logger.debug("[run_sync] Stopped by stop flag after firing in cycle %d", cycles)
            break

    elapsed = time.perf_counter() - start_time
    logger.debug("[run_sync] Completed %d cycles in %.4fms (%.0f cycles/sec)",
                 cycles, elapsed * 1000, cycles / elapsed if elapsed > 0 else 0)

    return cycles

stop_sync

stop_sync()

Stop the synchronous execution loop.

Source code in src/mycorrhizal/hypha/core/runtime.py
def stop_sync(self):
    """Stop the synchronous execution loop."""
    self._stop_event_sync = True

Runner

Runner(net_func: Any, blackboard: Any)

High-level runner for executing a Petri net

Uses MatrixRuntime for high-performance matrix-based execution.

Source code in src/mycorrhizal/hypha/core/runtime.py
def __init__(self, net_func: Any, blackboard: Any):
    if not hasattr(net_func, '_spec'):
        raise ValueError(f"{net_func} is not a valid net")

    self.spec = net_func._spec
    self.blackboard = blackboard
    self.timebase = None
    self._matrix_runtime: Optional[MatrixRuntime] = None

runtime property

runtime

Get the active runtime (MatrixRuntime).

start async

start(timebase: Any)

Start the net with given timebase

Source code in src/mycorrhizal/hypha/core/runtime.py
async def start(self, timebase: Any):
    """Start the net with given timebase"""
    self.timebase = timebase

    self._matrix_runtime = MatrixRuntime(
        self.spec,
        self.blackboard,
        self.timebase
    )
    await self._matrix_runtime.start()

stop async

stop(timeout: float = 5.0)

Stop the net

Source code in src/mycorrhizal/hypha/core/runtime.py
async def stop(self, timeout: float = 5.0):
    """Stop the net"""
    if self._matrix_runtime:
        await self._matrix_runtime.stop(timeout)

add_place

add_place(fqn: str, state_factory: Optional[Callable] = None)

Add a place to the running net

Source code in src/mycorrhizal/hypha/core/runtime.py
def add_place(self, fqn: str,
              state_factory: Optional[Callable] = None):
    """Add a place to the running net"""
    if not self.runtime:
        raise RuntimeError("Net is not running. Call start() first.")
    return self.runtime.add_place(fqn, state_factory)

add_transition

add_transition(fqn: str, handler: Callable, guard: Optional[GuardSpec] = None, state_factory: Optional[Callable] = None, delay: float = 0.0)

Add a transition to the running net

Source code in src/mycorrhizal/hypha/core/runtime.py
def add_transition(self, fqn: str, handler: Callable,
                  guard: Optional[GuardSpec] = None,
                  state_factory: Optional[Callable] = None,
                  delay: float = 0.0):
    """Add a transition to the running net"""
    if not self.runtime:
        raise RuntimeError("Net is not running. Call start() first.")
    return self.runtime.add_transition(fqn, handler, guard, state_factory, delay)

add_arc

add_arc(source_fqn: str, target_fqn: str, weight: int = 1, name: Optional[str] = None)

Add an arc to the running net

Source code in src/mycorrhizal/hypha/core/runtime.py
def add_arc(self, source_fqn: str, target_fqn: str, weight: int = 1,
            name: Optional[str] = None):
    """Add an arc to the running net"""
    if not self.runtime:
        raise RuntimeError("Net is not running. Call start() first.")
    self.runtime.add_arc(source_fqn, target_fqn, weight, name)

remove_arc async

remove_arc(source_fqn: str, target_fqn: str)

Remove an arc from the running net

Source code in src/mycorrhizal/hypha/core/runtime.py
async def remove_arc(self, source_fqn: str, target_fqn: str):
    """Remove an arc from the running net"""
    if not self.runtime:
        raise RuntimeError("Net is not running. Call start() first.")
    await self.runtime.remove_arc(source_fqn, target_fqn)

remove async

remove(fqn: str)

Remove a place or transition

Source code in src/mycorrhizal/hypha/core/runtime.py
async def remove(self, fqn: str):
    """Remove a place or transition"""
    if not self.runtime:
        raise RuntimeError("Net is not running. Call start() first.")
    await self.runtime.remove(fqn)

run_sync

run_sync(timebase: Any, max_cycles: int = 100000) -> int

Run the net to completion in synchronous mode.

This is the high-performance execution path for pure computational nets. Only works with MatrixRuntime and requires no async features.

Parameters:

Name Type Description Default
timebase Any

Timebase for timing operations

required
max_cycles int

Maximum number of cycles to execute

100000

Returns:

Type Description
int

Number of cycles executed

Raises:

Type Description
RuntimeError

If net has async features

Source code in src/mycorrhizal/hypha/core/runtime.py
def run_sync(self, timebase: Any, max_cycles: int = 100000) -> int:
    """Run the net to completion in synchronous mode.

    This is the high-performance execution path for pure computational nets.
    Only works with MatrixRuntime and requires no async features.

    Args:
        timebase: Timebase for timing operations
        max_cycles: Maximum number of cycles to execute

    Returns:
        Number of cycles executed

    Raises:
        RuntimeError: If net has async features
    """
    self.timebase = timebase

    self._matrix_runtime = MatrixRuntime(
        self.spec,
        self.blackboard,
        self.timebase
    )
    return self._matrix_runtime.run_sync(max_cycles)

build_incidence_matrix

build_incidence_matrix(spec: NetSpec) -> IncidenceMatrix

Build incidence matrix from net specification.

Parameters:

Name Type Description Default
spec NetSpec

Net specification

required

Returns:

Type Description
IncidenceMatrix

IncidenceMatrix representation

Source code in src/mycorrhizal/hypha/core/runtime.py
def build_incidence_matrix(spec: NetSpec) -> IncidenceMatrix:
    """Build incidence matrix from net specification.

    Args:
        spec: Net specification

    Returns:
        IncidenceMatrix representation
    """
    # Collect all places, transitions, and arcs
    places: Dict[Tuple[str, ...], PlaceSpec] = {}
    transitions: Dict[Tuple[str, ...], TransitionSpec] = {}
    arcs: List[ArcSpec] = []

    def collect_spec(net_spec: NetSpec):
        for place_name, place_spec in net_spec.places.items():
            places[tuple(net_spec.get_parts(place_name))] = place_spec

        for trans_name, trans_spec in net_spec.transitions.items():
            transitions[tuple(net_spec.get_parts(trans_name))] = trans_spec

        arcs.extend(net_spec.arcs)

        for subnet_spec in net_spec.subnets.values():
            collect_spec(subnet_spec)

    collect_spec(spec)

    # Create index mappings
    place_to_idx = {fqn: idx for idx, fqn in enumerate(sorted(places.keys()))}
    idx_to_place = {idx: fqn for fqn, idx in place_to_idx.items()}

    transition_to_idx = {fqn: idx for idx, fqn in enumerate(sorted(transitions.keys()))}
    idx_to_transition = {idx: fqn for fqn, idx in transition_to_idx.items()}

    # Build incidence matrix
    matrix = IncidenceMatrix(
        num_places=len(places),
        num_transitions=len(transitions),
        place_to_idx=place_to_idx,
        idx_to_place=idx_to_place,
        transition_to_idx=transition_to_idx,
        idx_to_transition=idx_to_transition,
    )

    # Process arcs
    for arc in arcs:
        source_parts = tuple(arc.source_parts)
        target_parts = tuple(arc.target_parts)

        # Place -> Transition (input arc)
        if source_parts in place_to_idx and target_parts in transition_to_idx:
            place_idx = place_to_idx[source_parts]
            trans_idx = transition_to_idx[target_parts]
            matrix.add_input(place_idx, trans_idx, arc.weight)

        # Transition -> Place (output arc)
        elif source_parts in transition_to_idx and target_parts in place_to_idx:
            trans_idx = transition_to_idx[source_parts]
            place_idx = place_to_idx[target_parts]
            matrix.add_output(place_idx, trans_idx, arc.weight)

    return matrix

build_guard_matrix

build_guard_matrix(spec: NetSpec, incidence_matrix: IncidenceMatrix) -> GuardMatrix

Build guard matrix from net specification.

Parameters:

Name Type Description Default
spec NetSpec

Net specification

required
incidence_matrix IncidenceMatrix

Incidence matrix for index mapping

required

Returns:

Type Description
GuardMatrix

GuardMatrix with guard information

Source code in src/mycorrhizal/hypha/core/runtime.py
def build_guard_matrix(spec: NetSpec, incidence_matrix: IncidenceMatrix) -> GuardMatrix:
    """Build guard matrix from net specification.

    Args:
        spec: Net specification
        incidence_matrix: Incidence matrix for index mapping

    Returns:
        GuardMatrix with guard information
    """
    guard_matrix = GuardMatrix()

    transitions: Dict[Tuple[str, ...], TransitionSpec] = {}

    def collect_spec(net_spec: NetSpec):
        for trans_name, trans_spec in net_spec.transitions.items():
            transitions[tuple(net_spec.get_parts(trans_name))] = trans_spec
        for subnet_spec in net_spec.subnets.values():
            collect_spec(subnet_spec)

    collect_spec(spec)

    for trans_fqn, trans_spec in transitions.items():
        trans_idx = incidence_matrix.transition_to_idx.get(trans_fqn)
        if trans_idx is not None:
            guard_matrix.set_guard(trans_idx, trans_spec.guard)

    return guard_matrix

Specifications Module

mycorrhizal.hypha.core.specs

Hypha DSL - Specification Layer

Core data structures representing the declarative specification of a Petri net. These are built by the decorator API and used to instantiate runtime objects.

PlaceSpec dataclass

PlaceSpec(name: str, handler: Optional[Callable] = None, state_factory: Optional[Callable] = None, is_io_input: bool = False, is_io_output: bool = False)

Specification for a place in the Petri net.

All places are multi-sets (bags) that support token storage with multiplicity. Tokens can be added and removed from places.

GuardSpec dataclass

GuardSpec(func: Callable)

Specification for a guard function

TransitionSpec dataclass

TransitionSpec(name: str, handler: Callable, guard: Optional[GuardSpec] = None, state_factory: Optional[Callable] = None, delay: float = 0.0)

Specification for a transition in the Petri net

ArcSpec dataclass

ArcSpec(source: PlaceRef | TransitionRef, target: PlaceRef | TransitionRef, weight: int = 1, name: Optional[str] = None)

Specification for an arc connecting places and transitions

NetSpec dataclass

NetSpec(name: str, parent: Optional[NetSpec] = None, places: Dict[str, PlaceSpec] = dict(), transitions: Dict[str, TransitionSpec] = dict(), arcs: List[ArcSpec] = list(), subnets: Dict[str, NetSpec] = dict())

Complete specification of a Petri net

get_fqn

get_fqn(local_name: Optional[str] = None) -> str

Compute fully qualified name by walking parent chain

Source code in src/mycorrhizal/hypha/core/specs.py
def get_fqn(self, local_name: Optional[str] = None) -> str:
    """Compute fully qualified name by walking parent chain"""
    return '.'.join(self.get_parts(local_name))

get_parts

get_parts(local_name: Optional[str] = None) -> List[str]

Return the fully qualified name as a list of path components.

If local_name is provided, returns the parts for that member under this spec (e.g. ['Parent', 'Spec', 'local']). Otherwise returns the parts for this spec itself.

Source code in src/mycorrhizal/hypha/core/specs.py
def get_parts(self, local_name: Optional[str] = None) -> List[str]:
    """Return the fully qualified name as a list of path components.

    If local_name is provided, returns the parts for that member
    under this spec (e.g. ['Parent', 'Spec', 'local']). Otherwise
    returns the parts for this spec itself.
    """
    parts: List[str] = []
    if self.parent:
        parts.extend(self.parent.get_parts())

    parts.append(self.name)

    if local_name is not None:
        parts.append(local_name)

    return parts

to_mermaid

to_mermaid() -> str

Generate Mermaid diagram of the net

Source code in src/mycorrhizal/hypha/core/specs.py
def to_mermaid(self) -> str:
    """Generate Mermaid diagram of the net"""
    lines = ["graph TD"]

    def add_subnet(spec: NetSpec, indent: str = "    "):
        if spec.subnets:
            for subnet_name, subnet_spec in spec.subnets.items():
                subnet_fqn = subnet_spec.get_fqn()
                lines.append(f"{indent}subgraph {subnet_fqn}")
                add_subnet(subnet_spec, indent + "    ")
                lines.append(f"{indent}end")

        for place_name, place_spec in spec.places.items():
            place_fqn = spec.get_fqn(place_name)
            shape = "((" 
            close = "))" 
            prefix = "[INPUT]</br>" if place_spec.is_io_input else "[OUTPUT]</br>" if place_spec.is_io_output else "" 
            lines.append(f"{indent}{place_fqn}{shape}\"{prefix}{place_fqn}\"{close}")

        for trans_name in spec.transitions.keys():
            trans_fqn = spec.get_fqn(trans_name)
            lines.append(f"{indent}{trans_fqn}[{trans_fqn}]")

        for arc in spec.arcs:
            source_fqn = arc.source.get_fqn()
            target_fqn = arc.target.get_fqn()
            weight_label = f"|weight={arc.weight}|" if arc.weight > 1 else ""
            lines.append(f"{indent}{source_fqn} -->{weight_label} {target_fqn}")

    add_subnet(self)

    return "\n".join(lines)

PlaceRef

PlaceRef(local_name: str, parent_spec: NetSpec)

Reference to a place for use in arc definitions

Source code in src/mycorrhizal/hypha/core/specs.py
def __init__(self, local_name: str, parent_spec: NetSpec):
    self.local_name = local_name
    self.parent_spec = parent_spec

get_fqn

get_fqn() -> str

Compute FQN dynamically

Source code in src/mycorrhizal/hypha/core/specs.py
def get_fqn(self) -> str:
    """Compute FQN dynamically"""
    return '.'.join(self.get_parts())

get_parts

get_parts() -> List[str]

Return the FQN as list of parts for this place ref.

Source code in src/mycorrhizal/hypha/core/specs.py
def get_parts(self) -> List[str]:
    """Return the FQN as list of parts for this place ref."""
    return self.parent_spec.get_parts(self.local_name)

__call__

__call__(func: Callable) -> PlaceRef

Allow PlaceRef to be used as a decorator to register a handler.

Example

@builder.place("queue") def queue_handler(bb): return bb.tokens

The PlaceRef is returned for use in arcs, and the handler is registered.

Source code in src/mycorrhizal/hypha/core/specs.py
def __call__(self, func: Callable) -> "PlaceRef":
    """Allow PlaceRef to be used as a decorator to register a handler.

    Example:
        @builder.place("queue")
        def queue_handler(bb):
            return bb.tokens

    The PlaceRef is returned for use in arcs, and the handler is registered.
    """
    # Get the place spec and register the handler
    place_spec = self.parent_spec.places.get(self.local_name)
    if place_spec is not None:
        place_spec.handler = func
    return self

TransitionRef

TransitionRef(local_name: str, parent_spec: NetSpec)

Reference to a transition for use in arc definitions

Source code in src/mycorrhizal/hypha/core/specs.py
def __init__(self, local_name: str, parent_spec: NetSpec):
    self.local_name = local_name
    self.parent_spec = parent_spec

get_fqn

get_fqn() -> str

Compute FQN dynamically

Source code in src/mycorrhizal/hypha/core/specs.py
def get_fqn(self) -> str:
    """Compute FQN dynamically"""
    return '.'.join(self.get_parts())

SubnetRef

SubnetRef(spec: NetSpec)

Reference to a subnet instance for accessing its places/transitions

Source code in src/mycorrhizal/hypha/core/specs.py
def __init__(self, spec: NetSpec):
    self.spec = spec

get_fqn

get_fqn() -> str

Compute FQN dynamically

Source code in src/mycorrhizal/hypha/core/specs.py
def get_fqn(self) -> str:
    """Compute FQN dynamically"""
    return '.'.join(self.get_parts())

Utilities

mycorrhizal.hypha.util

Hypha Petri Net Utilities

Utility functions for Petri net operations like Mermaid diagram generation.

to_mermaid

to_mermaid(spec: NetSpec) -> str

Generate Mermaid diagram from a NetSpec.

This is a utility wrapper around NetSpec.to_mermaid() for convenience. Use this when you have a spec object and want to generate a diagram.

Parameters:

Name Type Description Default
spec NetSpec

The NetSpec to generate a diagram for

required

Returns:

Type Description
str

Mermaid diagram as a string

Example
from mycorrhizal.hypha.core import pn
from mycorrhizal.hypha.util import to_mermaid

@pn.net
def MyNet(builder):
    # ... define net ...
    pass

# Generate diagram
mermaid_diagram = to_mermaid(MyNet._spec)
print(mermaid_diagram)
Source code in src/mycorrhizal/hypha/util.py
def to_mermaid(spec: NetSpec) -> str:
    """
    Generate Mermaid diagram from a NetSpec.

    This is a utility wrapper around NetSpec.to_mermaid() for convenience.
    Use this when you have a spec object and want to generate a diagram.

    Args:
        spec: The NetSpec to generate a diagram for

    Returns:
        Mermaid diagram as a string

    Example:
        ```python
        from mycorrhizal.hypha.core import pn
        from mycorrhizal.hypha.util import to_mermaid

        @pn.net
        def MyNet(builder):
            # ... define net ...
            pass

        # Generate diagram
        mermaid_diagram = to_mermaid(MyNet._spec)
        print(mermaid_diagram)
        ```
    """
    return spec.to_mermaid()