Hypha Petri Nets¶
Hypha is a decorator-based Colored Petri Net DSL for modeling concurrent workflows.
Overview¶
Hypha Petri Nets provide:
- Decorator-based syntax - Define nets, places, and transitions with decorators
- Colored tokens - Rich data objects as tokens, not just markers
- Multi-set places - All places are bags (multi-sets) supporting token multiplicity
- Async execution - Full asyncio support for concurrent transitions
- Modular composition - Subnets for hierarchical design
Quick Example¶
from mycorrhizal.hypha.core import pn, Runner as PNRunner
from pydantic import BaseModel
class WorkItem(BaseModel):
id: int
data: str
@pn.net
def ProcessingNet(builder):
input_place = builder.place("input")
processed = builder.place("processed")
@builder.transition()
async def process_item(consumed, bb, timebase):
for token in consumed:
item = token.data
print(f"Processing: {item.data}")
result = WorkItem(id=item.id, data=item.data.upper())
yield {processed: result}
# Create and run
runner = PNRunner(ProcessingNet, blackboard=WorkItem(id=0, data=""))
await runner.start(timebase)
Key Concepts¶
Nets¶
A net is the container for places and transitions:
@pn.net
def MyNet(builder):
my_place = builder.place("my_place")
@builder.transition()
async def my_transition(consumed, bb, timebase):
yield {my_place: consumed[0]}
Places¶
Places hold tokens as multi-sets (bags):
@pn.net
def MyNet(builder):
# All places are multi-sets (bags)
work_items = builder.place("work_items")
results = builder.place("results")
Transitions¶
Transitions consume and produce tokens:
@builder.transition()
async def my_transition(consumed, bb, timebase):
"""
Process consumed tokens and yield output tokens.
Args:
consumed: List of consumed tokens (from input places)
bb: Shared blackboard
timebase: Time abstraction
Yields:
Dictionaries mapping place references to tokens
"""
# Process consumed tokens
for token in consumed:
# Produce output tokens
yield {output_place: processed_token}
Arcs¶
Arcs connect places to transitions:
# Connect place to transition
builder.arc(input_place, my_transition)
# Chain to connect transition to output
builder.arc(my_transition, output_place)
Subnets¶
Compose nets hierarchically:
@pn.net
def Validator(builder):
"""Reusable validation subnet."""
input_p = builder.place("input")
output_p = builder.place("output")
@builder.transition()
async def validate(consumed, bb, timebase):
# Validation logic
yield {output_p: validated_token}
@pn.net
def MainNet(builder):
"""Main processing net using subnet."""
input_p = builder.place("input")
output_p = builder.place("output")
validator = builder.subnet(Validator, "validator")
builder.arc(input_p, validator.input)
builder.arc(validator.output, output_p)
Blackboard Integration¶
Access and modify shared state:
from pydantic import BaseModel
class NetContext(BaseModel):
processed_count: int = 0
@pn.net
def CountingNet(builder):
@builder.transition()
async def count_and_process(consumed, bb, timebase):
# Access blackboard
bb.processed_count += 1
print(f"Processed {bb.processed_count} items")
# Process tokens
yield {output_place: processed_token}
Examples¶
- Petri Net Demo - Basic workflow
- Blended Demo - Petri net + behavior tree
Documentation¶
- API Reference - Complete API documentation
- Getting Started - Tutorial
- Programmatic Hypha - Building nets programmatically
- Composition - Subnet patterns
Mermaid Export¶
Visualize Before You Run¶
Hypha enables static verification of Petri net structure through Mermaid diagram export:
Paste into Mermaid Live Editor to visualize your workflow.
Benefits: - Verify token flow through places and transitions - Identify potential deadlocks or unreachable places - Check that all transitions can fire when tokens are available - Validate workflow architecture before execution - Document complex Petri nets automatically
Catch structural issues in your workflow before running any transitions!
Example: Task Processing System¶
This Petri net processes tasks through multiple stages with error handling and notification routing:
graph TD
subgraph TaskProcessingSystem.TaskGen
TaskProcessingSystem.TaskGen.source(("[INPUT]</br>TaskProcessingSystem.TaskGen.source"))
end
subgraph TaskProcessingSystem.TaskProc
TaskProcessingSystem.TaskProc.input(("TaskProcessingSystem.TaskProc.input"))
TaskProcessingSystem.TaskProc.processing(("TaskProcessingSystem.TaskProc.processing"))
TaskProcessingSystem.TaskProc.completed(("TaskProcessingSystem.TaskProc.completed"))
TaskProcessingSystem.TaskProc.failed(("TaskProcessingSystem.TaskProc.failed"))
TaskProcessingSystem.TaskProc.take_to_processing[TaskProcessingSystem.TaskProc.take_to_processing]
TaskProcessingSystem.TaskProc.do_processing[TaskProcessingSystem.TaskProc.do_processing]
TaskProcessingSystem.TaskProc.input --> TaskProcessingSystem.TaskProc.take_to_processing
TaskProcessingSystem.TaskProc.take_to_processing --> TaskProcessingSystem.TaskProc.processing
TaskProcessingSystem.TaskProc.processing --> TaskProcessingSystem.TaskProc.do_processing
TaskProcessingSystem.TaskProc.do_processing --> TaskProcessingSystem.TaskProc.completed
TaskProcessingSystem.TaskProc.do_processing --> TaskProcessingSystem.TaskProc.failed
end
subgraph TaskProcessingSystem.Notify
TaskProcessingSystem.Notify.input(("TaskProcessingSystem.Notify.input"))
TaskProcessingSystem.Notify.email_sink(("[OUTPUT]</br>TaskProcessingSystem.Notify.email_sink"))
TaskProcessingSystem.Notify.sms_sink(("[OUTPUT]</br>TaskProcessingSystem.Notify.sms_sink"))
TaskProcessingSystem.Notify.log_sink(("[OUTPUT]</br>TaskProcessingSystem.Notify.log_sink"))
TaskProcessingSystem.Notify.NotificationFork[TaskProcessingSystem.Notify.NotificationFork]
TaskProcessingSystem.Notify.input --> TaskProcessingSystem.Notify.NotificationFork
TaskProcessingSystem.Notify.NotificationFork --> TaskProcessingSystem.Notify.email_sink
TaskProcessingSystem.Notify.NotificationFork --> TaskProcessingSystem.Notify.sms_sink
TaskProcessingSystem.Notify.NotificationFork --> TaskProcessingSystem.Notify.log_sink
end
subgraph TaskProcessingSystem.ErrorHandle
TaskProcessingSystem.ErrorHandle.input(("TaskProcessingSystem.ErrorHandle.input"))
TaskProcessingSystem.ErrorHandle.error_log(("[OUTPUT]</br>TaskProcessingSystem.ErrorHandle.error_log"))
TaskProcessingSystem.ErrorHandle.ErrorForward[TaskProcessingSystem.ErrorHandle.ErrorForward]
TaskProcessingSystem.ErrorHandle.input --> TaskProcessingSystem.ErrorHandle.ErrorForward
TaskProcessingSystem.ErrorHandle.ErrorForward --> TaskProcessingSystem.ErrorHandle.error_log
end
TaskProcessingSystem.completion_tracker(("[OUTPUT]</br>TaskProcessingSystem.completion_tracker"))
TaskProcessingSystem.forward_source_to_input[TaskProcessingSystem.forward_source_to_input]
TaskProcessingSystem.CompletionFork[TaskProcessingSystem.CompletionFork]
TaskProcessingSystem.FailureFork[TaskProcessingSystem.FailureFork]
TaskProcessingSystem.TaskGen.source --> TaskProcessingSystem.forward_source_to_input
TaskProcessingSystem.forward_source_to_input --> TaskProcessingSystem.TaskProc.input
TaskProcessingSystem.TaskProc.completed --> TaskProcessingSystem.CompletionFork
TaskProcessingSystem.CompletionFork --> TaskProcessingSystem.Notify.input
TaskProcessingSystem.CompletionFork --> TaskProcessingSystem.completion_tracker
TaskProcessingSystem.TaskProc.failed --> TaskProcessingSystem.FailureFork
TaskProcessingSystem.FailureFork --> TaskProcessingSystem.ErrorHandle.input
TaskProcessingSystem.FailureFork --> TaskProcessingSystem.completion_tracker
Key features shown: - Task generation, processing, and completion tracking - Error handling with separate logging - Multi-channel notifications (email, SMS, log) - Fork/join patterns for parallel flows
See the Hypha Demo for the complete executable example.
Programmatic Net Building¶
In addition to the decorator-based DSL, Hypha supports programmatic net construction using the NetBuilder API. This is useful for:
- Building nets from configuration files (JSON, YAML)
- Creating dynamic workflows at runtime
- Generating nets from external definitions
- Building workflow engines and visual editors
See Programmatic Hypha Building for complete documentation.
See Also¶
- Programmatic Building - NetBuilder API guide
- Rhizomorph - Behavior Trees for control logic
- Septum - State Machines for stateful behavior