Skip to content

Programmatic Petri Net Construction

This guide explains how to build Petri nets programmatically using the NetBuilder API, as an alternative to the decorator-based DSL.

Overview

The Hypha DSL provides two ways to build Petri nets:

  1. Decorator DSL (@pn.net) - Declarative, decorator-based syntax
  2. Programmatic API (NetBuilder) - Imperative, runtime construction

Both approaches are fully capable and produce equivalent nets. The choice depends on your use case.

When to Use Programmatic Building

Use the programmatic API when: - Building nets from configuration files (JSON, YAML, etc.) - Constructing workflows dynamically at runtime - Need conditional net structures based on runtime data - Generating nets programmatically from external definitions - Building workflow engines or visual editors

Use the decorator DSL when: - Net structure is static and known at development time - You prefer declarative, readable syntax - Building application-specific workflows

NetBuilder API Reference

Basic Usage

from mycorrhizal.hypha.core.builder import NetBuilder
from mycorrhizal.hypha.core import Runner

# Create a builder
builder = NetBuilder("MyNet")

# Create places
input_place = builder.place("input")
output_place = builder.place("output")

# Create transition
async def my_transition(consumed, bb, timebase):
    for token in consumed:
        yield {output_place: f"processed_{token}"}

transition = builder.transition()(my_transition)

# Wire together
builder.arc(input_place, transition).arc(output_place)

# Create wrapper for Runner
def net_func():
    pass

net_func._spec = builder.spec
net_func.to_mermaid = lambda: builder.spec.to_mermaid()

# Use with Runner
runner = Runner(net_func, blackboard)

Key Methods

Creating Places

# BAG place (multiset, allows duplicates)
bag = builder.place("my_bag")

# QUEUE place (FIFO queue)
queue = builder.place("my_queue")

IO Input Places

Decorator for IO input places (token sources):

@builder.io_input_place()
async def source(bb, timebase):
    """Generator that yields tokens"""
    for i in range(10):
        yield f"token_{i}"

IO Output Places

Decorator for IO output places (token sinks):

@builder.io_output_place()
async def sink(bb, timebase, tokens):
    """Consumes tokens from the place"""
    for token in tokens:
        print(f"Output: {token}")

Creating Transitions

@builder.transition()
async def my_transition(consumed, bb, timebase):
    """Process tokens from input places"""
    for token in consumed:
        # Process token
        yield {output_place: processed_token}

Connecting with Arcs

The arc() method connects places and transitions. Returns ArcChain for chaining:

# Single arc
builder.arc(input_place, transition)

# Chained arcs
builder.arc(source, trans1).arc(place1).arc(trans2).arc(output)

Convenience Methods

Forward - Create a pass-through transition:

# Creates a transition that moves tokens from input to output
builder.forward(input_place, output_place, name="pass_through")

Fork - Split tokens to multiple output places:

builder.fork(input_place, [output1, output2, output3])

Join - Merge tokens from multiple input places:

builder.join([input1, input2, input3], output_place)

Subnets

Instantiate a subnet within this net:

from mycorrhizal.hypha.core import pn

@pn.net
def MySubnet(builder: NetBuilder):
    # Subnet definition
    ...

# Instantiate subnet in parent net
subnet_ref = builder.subnet(MySubnet, "instance1")

Examples

Example 1: Simple Linear Workflow

from mycorrhizal.hypha.core.builder import NetBuilder
from mycorrhizal.hypha.core import Runner

def build_linear_workflow():
    """Build a simple linear processing pipeline"""
    builder = NetBuilder("LinearWorkflow")

    # Create places
    input_p = builder.place("input")
    stage1 = builder.place("stage1")
    stage2 = builder.place("stage2")
    output_p = builder.place("output")

    # Create transitions
    @builder.transition()
    async def process_stage1(consumed, bb, timebase):
        for token in consumed:
            yield {stage1: f"stage1_{token}"}

    @builder.transition()
    async def process_stage2(consumed, bb, timebase):
        for token in consumed:
            yield {stage2: f"stage2_{token}"}

    @builder.transition()
    async def finalize(consumed, bb, timebase):
        for token in consumed:
            yield {output_p: f"final_{token}"}

    # Wire together
    builder.arc(input_p, process_stage1).arc(stage1)
    builder.arc(stage1, process_stage2).arc(stage2)
    builder.arc(stage2, finalize).arc(output_p)

    # Create wrapper
    def net_func():
        pass

    net_func._spec = builder.spec
    net_func.to_mermaid = lambda: builder.spec.to_mermaid()

    return net_func

Example 2: Dynamic Net from Configuration

def build_net_from_config(config):
    """Build a Petri net from a configuration dictionary"""

    builder = NetBuilder(config["name"])

    # Build places
    places = {}
    for place_config in config["places"]:
        places[place_config["name"]] = builder.place(place_config["name"])

    # Build transitions
    transitions = {}
    for trans_config in config["transitions"]:
        # Handler functions should be registered somehow
        handler = get_handler(trans_config["handler"])
        trans = builder.transition()(handler)
        transitions[trans_config["name"]] = trans

    # Build arcs
    for trans_config in config["transitions"]:
        trans = transitions[trans_config["name"]]

        for arc_config in trans_config["arcs"]:
            source_name, target_name = arc_config

            source = places.get(source_name) or transitions.get(source_name)
            target = places.get(target_name) or transitions.get(target_name)

            builder.arc(source, target)

    # Return wrapper
    def net_func():
        pass

    net_func._spec = builder.spec
    net_func.to_mermaid = lambda: builder.spec.to_mermaid()

    return net_func

# Usage:
config = {
    "name": "DynamicNet",
    "places": [
        {"name": "input", "type": "queue"},
        {"name": "output", "type": "bag"}
    ],
    "transitions": [
        {
            "name": "process",
            "handler": process_handler,
            "arcs": [("input", "process"), ("process", "output")]
        }
    ]
}

net = build_net_from_config(config)

Example 3: Conditional Net Construction

def build_adaptive_workflow(use_validation: bool):
    """Build different workflow structures based on runtime conditions"""

    builder = NetBuilder("AdaptiveWorkflow")

    input_p = builder.place("input")
    output_p = builder.place("output")

    if use_validation:
        # Path with validation step
        validation = builder.place("validation")

        @builder.transition()
        async def validate(consumed, bb, timebase):
            for token in consumed:
                if is_valid(token):  # Some validation logic
                    yield {validation: token}

        @builder.transition()
        async def process(consumed, bb, timebase):
            for token in consumed:
                result = process_token(token)
                yield {output_p: result}

        builder.arc(input_p, validate).arc(validation)
        builder.arc(validation, process).arc(output_p)
    else:
        # Direct path (no validation)
        @builder.transition()
        async def process_direct(consumed, bb, timebase):
            for token in consumed:
                result = process_token(token)
                yield {output_p: result}

        builder.arc(input_p, process_direct).arc(output_p)

    # Create wrapper
    def net_func():
        pass

    net_func._spec = builder.spec
    net_func.to_mermaid = lambda: builder.spec.to_mermaid()

    return net_func

Example 4: Using Convenience Methods

def build_parallel_processing():
    """Build a parallel processing network"""

    builder = NetBuilder("ParallelProcessing")

    # Create places
    input_p = builder.place("input")
    worker1_p = builder.place("worker1")
    worker2_p = builder.place("worker2")
    worker3_p = builder.place("worker3")
    output_p = builder.place("output")

    # Fork: distribute work to 3 workers
    builder.fork(input_p, [worker1_p, worker2_p, worker3_p])

    # Add processing transitions for each worker
    for i, worker_p in enumerate([worker1_p, worker2_p, worker3_p], 1):
        @builder.transition()
        async def work(consumed, bb, timebase):
            for token in consumed:
                result = process_worker(token, worker_id=i)
                yield {output_p: result}

        builder.arc(worker_p, work).arc(output_p)

    # Create wrapper
    def net_func():
        pass

    net_func._spec = builder.spec
    net_func.to_mermaid = lambda: builder.spec.to_mermaid()

    return net_func

Comparison: Decorator vs Programmatic

Decorator Approach

@pn.net
def MyNet(builder: NetBuilder):
    @builder.io_input_place()
    async def source(bb, timebase):
        yield "token1"
        yield "token2"

    output = builder.place("output")

    @builder.transition()
    async def process(consumed, bb, timebase):
        for token in consumed:
            yield {output: token}

    builder.arc(source, process).arc(output)

Pros: - Clean, declarative syntax - Easier to read and maintain - Less boilerplate

Cons: - Structure fixed at definition time - Cannot be modified at runtime - Not suitable for dynamic workflows

Programmatic Approach

def build_my_net():
    builder = NetBuilder("MyNet")

    async def source(bb, timebase):
        yield "token1"
        yield "token2"

    input_p = builder.io_input_place()(source)
    output_p = builder.place("output")

    async def process(consumed, bb, timebase):
        for token in consumed:
            yield {output_p: token}

    trans = builder.transition()(process)

    builder.arc(input_p, trans).arc(output_p)

    def net_func():
        pass

    net_func._spec = builder.spec
    net_func.to_mermaid = lambda: builder.spec.to_mermaid()

    return net_func

Pros: - Full control over net structure - Can build from configuration/data - Runtime flexibility - Suitable for workflow engines

Cons: - More verbose - Slightly more complex - Need to create wrapper function

Advanced Patterns

Building Hierarchical Nets with Subnets

from mycorrhizal.hypha.core import pn

# Define a reusable subnet
@pn.net
def ProcessorSubnet(builder: NetBuilder):
    input_p = builder.place("input")
    output_p = builder.place("output")

    @builder.transition()
    async def process(consumed, bb, timebase):
        for token in consumed:
            yield {output_p: f"processed_{token}"}

    builder.arc(input_p, process).arc(output_p)

# Use subnet in parent net
def build_hierarchical_net():
    builder = NetBuilder("HierarchicalNet")

    main_input = builder.place("main_input")
    main_output = builder.place("main_output")

    # Instantiate subnet
    processor1 = builder.subnet(ProcessorSubnet, "processor1")
    processor2 = builder.subnet(ProcessorSubnet, "processor2")

    # Connect to subnets
    # Note: subnet places are accessed via SubnetRef
    # See the full example for details

    # Create wrapper
    def net_func():
        pass

    net_func._spec = builder.spec
    net_func.to_mermaid = lambda: builder.spec.to_mermaid()

    return net_func

Arc Chaining with ArcChain

The arc() method returns an ArcChain object for fluent chaining:

builder.arc(place1, trans1)  # Returns ArcChain pointing to trans1
    .arc(place2)              # Chain: trans1 -> place2
    .arc(trans2)              # Chain: place2 -> trans2
    .arc(place3);             # Chain: trans2 -> place3

This is equivalent to:

builder.arc(place1, trans1)
builder.arc(trans1, place2)
builder.arc(place2, trans2)
builder.arc(trans2, place3)

Best Practices

1. Name Your Places and Transitions Clearly

# Good
input_queue = builder.place("input_queue")
processed_data = builder.place("processed_data")

# Avoid
p1 = builder.place("p1")
p2 = builder.place("p2")

2. Use Consistent Conventions for Wrapper Functions

def create_wrapper(spec):
    def net_func():
        pass
    net_func._spec = spec
    net_func.to_mermaid = lambda: spec.to_mermaid()
    return net_func

3. Document Configuration-Based Builders

def build_from_yaml(yaml_path: str):
    """
    Build a Petri net from a YAML configuration file.

    YAML structure:
        name: MyNet
        places:
          - name: input
            type: queue
          - name: output
            type: bag
        transitions:
          - name: process
            handler: my_module.process_handler
            arcs:
            - [input, process]
            - [process, output]
    """
    # Implementation...

4. Validate Configurations Before Building

def validate_config(config):
    """Validate net configuration before building"""
    required_keys = {"name", "places", "transitions"}
    for key in required_keys:
        if key not in config:
            raise ValueError(f"Missing required key: {key}")

    # Validate place references
    place_names = {p["name"] for p in config["places"]}
    for trans in config["transitions"]:
        for arc in trans["arcs"]:
            if arc[0] not in place_names or arc[1] not in place_names:
                # Assuming transition names are also in places
                pass

See Also