Skip to content

Hypha Petri Nets

Hypha is a decorator-based Colored Petri Net DSL for modeling concurrent workflows.

Overview

Hypha Petri Nets provide:

  • Decorator-based syntax - Define nets, places, and transitions with decorators
  • Colored tokens - Rich data objects as tokens, not just markers
  • Multi-set places - All places are bags (multi-sets) supporting token multiplicity
  • Async execution - Full asyncio support for concurrent transitions
  • Modular composition - Subnets for hierarchical design

Quick Example

from mycorrhizal.hypha.core import pn, Runner as PNRunner
from pydantic import BaseModel

class WorkItem(BaseModel):
    id: int
    data: str

@pn.net
def ProcessingNet(builder):
    input_place = builder.place("input")
    processed = builder.place("processed")

    @builder.transition()
    async def process_item(consumed, bb, timebase):
        for token in consumed:
            item = token.data
            print(f"Processing: {item.data}")
            result = WorkItem(id=item.id, data=item.data.upper())
            yield {processed: result}

# Create and run
runner = PNRunner(ProcessingNet, blackboard=WorkItem(id=0, data=""))
await runner.start(timebase)

Key Concepts

Nets

A net is the container for places and transitions:

@pn.net
def MyNet(builder):
    my_place = builder.place("my_place")

    @builder.transition()
    async def my_transition(consumed, bb, timebase):
        yield {my_place: consumed[0]}

Places

Places hold tokens as multi-sets (bags):

@pn.net
def MyNet(builder):
    # All places are multi-sets (bags)
    work_items = builder.place("work_items")
    results = builder.place("results")

Transitions

Transitions consume and produce tokens:

@builder.transition()
async def my_transition(consumed, bb, timebase):
    """
    Process consumed tokens and yield output tokens.

    Args:
        consumed: List of consumed tokens (from input places)
        bb: Shared blackboard
        timebase: Time abstraction

    Yields:
        Dictionaries mapping place references to tokens
    """
    # Process consumed tokens
    for token in consumed:
        # Produce output tokens
        yield {output_place: processed_token}

Arcs

Arcs connect places to transitions:

# Connect place to transition
builder.arc(input_place, my_transition)
# Chain to connect transition to output
builder.arc(my_transition, output_place)

Subnets

Compose nets hierarchically:

@pn.net
def Validator(builder):
    """Reusable validation subnet."""
    input_p = builder.place("input")
    output_p = builder.place("output")

    @builder.transition()
    async def validate(consumed, bb, timebase):
        # Validation logic
        yield {output_p: validated_token}

@pn.net
def MainNet(builder):
    """Main processing net using subnet."""
    input_p = builder.place("input")
    output_p = builder.place("output")

    validator = builder.subnet(Validator, "validator")

    builder.arc(input_p, validator.input)
    builder.arc(validator.output, output_p)

Blackboard Integration

Access and modify shared state:

from pydantic import BaseModel

class NetContext(BaseModel):
    processed_count: int = 0

@pn.net
def CountingNet(builder):
    @builder.transition()
    async def count_and_process(consumed, bb, timebase):
        # Access blackboard
        bb.processed_count += 1
        print(f"Processed {bb.processed_count} items")

        # Process tokens
        yield {output_place: processed_token}

Examples

Documentation

Mermaid Export

Visualize Before You Run

Hypha enables static verification of Petri net structure through Mermaid diagram export:

net = MyNet()
mermaid = net.to_mermaid()
print(mermaid)

Paste into Mermaid Live Editor to visualize your workflow.

Benefits: - Verify token flow through places and transitions - Identify potential deadlocks or unreachable places - Check that all transitions can fire when tokens are available - Validate workflow architecture before execution - Document complex Petri nets automatically

Catch structural issues in your workflow before running any transitions!

Example: Task Processing System

This Petri net processes tasks through multiple stages with error handling and notification routing:

graph TD
    subgraph TaskProcessingSystem.TaskGen
        TaskProcessingSystem.TaskGen.source(("[INPUT]</br>TaskProcessingSystem.TaskGen.source"))
    end
    subgraph TaskProcessingSystem.TaskProc
        TaskProcessingSystem.TaskProc.input(("TaskProcessingSystem.TaskProc.input"))
        TaskProcessingSystem.TaskProc.processing(("TaskProcessingSystem.TaskProc.processing"))
        TaskProcessingSystem.TaskProc.completed(("TaskProcessingSystem.TaskProc.completed"))
        TaskProcessingSystem.TaskProc.failed(("TaskProcessingSystem.TaskProc.failed"))
        TaskProcessingSystem.TaskProc.take_to_processing[TaskProcessingSystem.TaskProc.take_to_processing]
        TaskProcessingSystem.TaskProc.do_processing[TaskProcessingSystem.TaskProc.do_processing]
        TaskProcessingSystem.TaskProc.input --> TaskProcessingSystem.TaskProc.take_to_processing
        TaskProcessingSystem.TaskProc.take_to_processing --> TaskProcessingSystem.TaskProc.processing
        TaskProcessingSystem.TaskProc.processing --> TaskProcessingSystem.TaskProc.do_processing
        TaskProcessingSystem.TaskProc.do_processing --> TaskProcessingSystem.TaskProc.completed
        TaskProcessingSystem.TaskProc.do_processing --> TaskProcessingSystem.TaskProc.failed
    end
    subgraph TaskProcessingSystem.Notify
        TaskProcessingSystem.Notify.input(("TaskProcessingSystem.Notify.input"))
        TaskProcessingSystem.Notify.email_sink(("[OUTPUT]</br>TaskProcessingSystem.Notify.email_sink"))
        TaskProcessingSystem.Notify.sms_sink(("[OUTPUT]</br>TaskProcessingSystem.Notify.sms_sink"))
        TaskProcessingSystem.Notify.log_sink(("[OUTPUT]</br>TaskProcessingSystem.Notify.log_sink"))
        TaskProcessingSystem.Notify.NotificationFork[TaskProcessingSystem.Notify.NotificationFork]
        TaskProcessingSystem.Notify.input --> TaskProcessingSystem.Notify.NotificationFork
        TaskProcessingSystem.Notify.NotificationFork --> TaskProcessingSystem.Notify.email_sink
        TaskProcessingSystem.Notify.NotificationFork --> TaskProcessingSystem.Notify.sms_sink
        TaskProcessingSystem.Notify.NotificationFork --> TaskProcessingSystem.Notify.log_sink
    end
    subgraph TaskProcessingSystem.ErrorHandle
        TaskProcessingSystem.ErrorHandle.input(("TaskProcessingSystem.ErrorHandle.input"))
        TaskProcessingSystem.ErrorHandle.error_log(("[OUTPUT]</br>TaskProcessingSystem.ErrorHandle.error_log"))
        TaskProcessingSystem.ErrorHandle.ErrorForward[TaskProcessingSystem.ErrorHandle.ErrorForward]
        TaskProcessingSystem.ErrorHandle.input --> TaskProcessingSystem.ErrorHandle.ErrorForward
        TaskProcessingSystem.ErrorHandle.ErrorForward --> TaskProcessingSystem.ErrorHandle.error_log
    end
    TaskProcessingSystem.completion_tracker(("[OUTPUT]</br>TaskProcessingSystem.completion_tracker"))
    TaskProcessingSystem.forward_source_to_input[TaskProcessingSystem.forward_source_to_input]
    TaskProcessingSystem.CompletionFork[TaskProcessingSystem.CompletionFork]
    TaskProcessingSystem.FailureFork[TaskProcessingSystem.FailureFork]
    TaskProcessingSystem.TaskGen.source --> TaskProcessingSystem.forward_source_to_input
    TaskProcessingSystem.forward_source_to_input --> TaskProcessingSystem.TaskProc.input
    TaskProcessingSystem.TaskProc.completed --> TaskProcessingSystem.CompletionFork
    TaskProcessingSystem.CompletionFork --> TaskProcessingSystem.Notify.input
    TaskProcessingSystem.CompletionFork --> TaskProcessingSystem.completion_tracker
    TaskProcessingSystem.TaskProc.failed --> TaskProcessingSystem.FailureFork
    TaskProcessingSystem.FailureFork --> TaskProcessingSystem.ErrorHandle.input
    TaskProcessingSystem.FailureFork --> TaskProcessingSystem.completion_tracker

Key features shown: - Task generation, processing, and completion tracking - Error handling with separate logging - Multi-channel notifications (email, SMS, log) - Fork/join patterns for parallel flows

See the Hypha Demo for the complete executable example.

Programmatic Net Building

In addition to the decorator-based DSL, Hypha supports programmatic net construction using the NetBuilder API. This is useful for:

  • Building nets from configuration files (JSON, YAML)
  • Creating dynamic workflows at runtime
  • Generating nets from external definitions
  • Building workflow engines and visual editors

See Programmatic Hypha Building for complete documentation.

See Also