Skip to content

Graph Engine

The core workflow engine for Maticlib.

MaticGraph

maticlib.graph.graph.MaticGraph

A fast, pure-Python graph workflow engine with optional state management. Supports dict, TypedDict, dataclass, and Pydantic BaseModel states.

Parameters:

Name Type Description Default
stateful bool

If True, maintains and merges state across nodes.

True
state_schema Optional[type]

Optional Pydantic BaseModel, dataclass, or TypedDict class

None
max_workers int

Maximum number of parallel workers (default: 4)

4
Source code in maticlib/graph/graph.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
class MaticGraph:
    """
    A fast, pure-Python graph workflow engine with optional state management.
    Supports dict, TypedDict, dataclass, and Pydantic BaseModel states.

    Args:
        stateful: If True, maintains and merges state across nodes.
        state_schema: Optional Pydantic BaseModel, dataclass, or TypedDict class
        max_workers: Maximum number of parallel workers (default: 4)
    """

    def __init__(
        self,
        stateful: bool = True,
        state_schema: Optional[type] = None,
        max_workers: int = 4
    ):
        """
        Initializes the MaticGraph engine.

        Args:
            stateful (bool): If True, state is preserved and merged between nodes.
            state_schema (type, optional): A Pydantic model, dataclass, or 
                TypedDict class to use as the state container.
            max_workers (int): Maximum number of parallel threads for `parallel_group` 
                execution. Default is 4.
        """
        self.stateful = stateful
        self.state_schema = state_schema
        self.max_workers = max_workers
        self.nodes: Dict[str, Node] = {}
        self.entry_node: Optional[str] = None
        self.exit_nodes: List[str] = []
        self._execution_log: List[Dict[str, Any]] = []
        self._parallel_groups: Dict[str, List[str]] = {}  # Maps trigger node to parallel nodes

        # Detect if state_schema is a Pydantic model
        self._is_pydantic = False
        if state_schema is not None and PYDANTIC_AVAILABLE:
            self._is_pydantic = (
                isinstance(state_schema, type) and 
                issubclass(state_schema, BaseModel)
            )

    def add_node(self, name: str, function: Callable) -> 'MaticGraph':
        """
        Adds a processing node to the graph.

        Args:
            name (str): Unique name for the node.
            function (Callable): Function to execute. Should accept the current 
                state and return a dict or model update.

        Returns:
            MaticGraph: The graph instance (for chaining).

        Raises:
            ValueError: If a node with the same name already exists.
        """
        if name in self.nodes:
            raise ValueError(f"Node '{name}' already exists")

        self.nodes[name] = Node(name=name, function=function)
        return self

    def add_edge(self, from_node: str, to_node: str) -> 'MaticGraph':
        """
        Adds a directed edge between two nodes.

        Args:
            from_node (str): The name of the starting node.
            to_node (str): The name of the target node, or 'END' to exit.

        Returns:
            MaticGraph: The graph instance (for chaining).

        Raises:
            ValueError: If either node name is not found in the graph.
        """
        if from_node not in self.nodes:
            raise ValueError(f"Source node '{from_node}' not found")
        if to_node not in self.nodes and to_node != "END":
            raise ValueError(f"Destination node '{to_node}' not found")

        self.nodes[from_node].next_nodes.append(to_node)
        return self

    def parallel_group(
        self,
        from_node: str,
        parallel_nodes: List[str],
        join_node: Optional[str] = None,
        condition: Optional[Callable[[Any], bool]] = None
    ) -> 'MaticGraph':
        """
        Execute multiple nodes in parallel after a specific node.

        Args:
            from_node: Node that triggers parallel execution
            parallel_nodes: List of nodes to execute in parallel
            join_node: Optional node to execute after all parallel nodes complete
            condition: Optional function to decide whether to parallelize
                      If returns False, executes first parallel_node only

        Example:
            # Always parallel
            graph.parallel_group(
                "analyze",
                ["sentiment", "entities", "summary"],
                join_node="combine_results"
            )

            # Conditional parallel
            graph.parallel_group(
                "check_size",
                ["process_large_a", "process_large_b"],
                join_node="merge",
                condition=lambda state: state.get("size") > 1000
            )
        """
        # Validate nodes exist
        if from_node not in self.nodes:
            raise ValueError(f"Trigger node '{from_node}' not found")

        for node in parallel_nodes:
            if node not in self.nodes:
                raise ValueError(f"Parallel node '{node}' not found")

        if join_node and join_node not in self.nodes:
            raise ValueError(f"Join node '{join_node}' not found")

        # Store parallel group configuration
        self._parallel_groups[from_node] = {
            "nodes": parallel_nodes,
            "join_node": join_node,
            "condition": condition
        }

        # Mark the trigger node as having parallel execution
        self.nodes[from_node].parallel_group = parallel_nodes
        self.nodes[from_node].parallel_join = join_node
        self.nodes[from_node].parallel_condition = condition

        return self

    def add_conditional_edge(
        self,
        from_node: str,
        condition: Callable[[Any], str],
        routes: Dict[str, str],
        readable_names: Optional[Dict[str, str]] = None
    ) -> 'MaticGraph':
        """
        Adds a conditional edge that routes execution based on a condition function.

        Args:
            from_node (str): The node from which to route.
            condition (Callable): A function that takes the current state and 
                returns a string key matching one of the routes.
            routes (Dict[str, str]): A mapping of condition keys to target node names.
            readable_names (dict, optional): Mapping of keys to human-friendly 
                names for documentation/visualization.

        Returns:
            MaticGraph: The graph instance (for chaining).

        Raises:
            ValueError: If from_node or route targets are not found.
        """
        if from_node not in self.nodes:
            raise ValueError(f"Node '{from_node}' not found")

        for route_key, target in routes.items():
            if target not in self.nodes and target != "END":
                raise ValueError(f"Route target '{target}' not found")

        node = self.nodes[from_node]
        node.condition_func = condition
        node.condition_map = routes
        node.readable_names = readable_names or {}

        return self

    def when(self, from_node: str, **routes: str) -> 'MaticGraph':
        """
        Simplified conditional routing using state['next'] or state.next
        Works with both dict and Pydantic models.
        """
        def route_by_next(state: Any) -> str:
            # Handle Pydantic models
            if self._is_pydantic and isinstance(state, BaseModel):
                next_key = getattr(state, 'next', None)
            # Handle dicts and TypedDict
            elif isinstance(state, dict):
                next_key = state.get("next")
            else:
                # Handle dataclasses
                next_key = getattr(state, 'next', None)

            if next_key not in routes:
                available = ", ".join(routes.keys())
                raise ValueError(
                    f"Invalid route '{next_key}'. Available routes: {available}"
                )
            return next_key

        return self.add_conditional_edge(
            from_node,
            route_by_next,
            routes,
            readable_names={k: k.replace("_", " ").title() for k in routes.keys()}
        )

    def set_entry(self, node_name: str) -> 'MaticGraph':
        """
        Sets the starting node for the graph execution.

        Args:
            node_name (str): The name of the entry node.

        Returns:
            MaticGraph: The graph instance (for chaining).

        Raises:
            ValueError: If the node_name does not exist in the graph.
        """
        if node_name not in self.nodes:
            raise ValueError(f"Node '{node_name}' not found")
        self.entry_node = node_name
        return self

    def set_exit(self, node_name: str) -> 'MaticGraph':
        """
        Marks a node as an explicit exit point for the workflow.

        Args:
            node_name (str): The name of the node.

        Returns:
            MaticGraph: The graph instance (for chaining).

        Raises:
            ValueError: If the node_name does not exist in the graph.
        """
        if node_name not in self.nodes:
            raise ValueError(f"Node '{node_name}' not found")
        if node_name not in self.exit_nodes:
            self.exit_nodes.append(node_name)
        return self

    def _merge_state(self, current: Any, update: Any) -> Any:
        """
        Merge state updates intelligently.
        Supports dict, Pydantic models, and dataclasses.
        """
        if not self.stateful:
            return update if update else current

        # Handle Pydantic models
        if self._is_pydantic:
            if isinstance(current, BaseModel):
                if isinstance(update, dict):
                    update_data = current.model_dump()
                    update_data.update(update)
                    return self.state_schema(**update_data)
                elif isinstance(update, BaseModel):
                    update_data = current.model_dump()
                    update_data.update(update.model_dump(exclude_unset=True))
                    return self.state_schema(**update_data)
            else:
                if isinstance(update, dict):
                    return self.state_schema(**update)
                return update

        # Handle dict-based state
        if isinstance(current, dict):
            for key, value in (update.items() if isinstance(update, dict) else vars(update).items()):
                if key in current:
                    if isinstance(current[key], list) and isinstance(value, list):
                        current[key].extend(value)
                    elif isinstance(current[key], dict) and isinstance(value, dict):
                        current[key].update(value)
                    else:
                        current[key] = value
                else:
                    current[key] = value
            return current

        # Handle dataclasses
        if hasattr(current, '__dataclass_fields__'):
            update_dict = vars(current).copy()
            if isinstance(update, dict):
                update_dict.update(update)
            else:
                update_dict.update(vars(update))
            return type(current)(**update_dict)

        return update if update else current

    def _execute_node(self, node_name: str, state: Any) -> Any:
        """Execute a single node and return updated state."""
        node = self.nodes[node_name]
        start_time = time.time()

        try:
            result = node.function(state)

            if result is None:
                result = {}

            # Don't merge state here if we're in a parallel group
            # The parallel executor will handle merging
            updated_state = result if not self.stateful else self._merge_state(state, result)

            execution_time = time.time() - start_time

            # Log execution
            if self._is_pydantic and isinstance(updated_state, BaseModel):
                state_keys = list(updated_state.model_fields.keys())
            elif isinstance(updated_state, dict):
                state_keys = list(updated_state.keys())
            else:
                state_keys = list(vars(updated_state).keys()) if hasattr(updated_state, '__dict__') else []

            self._execution_log.append({
                "node": node_name,
                "status": "success",
                "execution_time": execution_time,
                "state_keys": state_keys
            })

            return updated_state

        except Exception as e:
            execution_time = time.time() - start_time
            self._execution_log.append({
                "node": node_name,
                "status": "error",
                "execution_time": execution_time,
                "error": str(e)
            })
            raise RuntimeError(f"Error in node '{node_name}': {str(e)}") from e

    def _execute_parallel_group(
        self,
        parallel_nodes: List[str],
        state: Any,
        verbose: bool = False
    ) -> Any:
        """
        Execute a group of nodes in parallel and merge their results.
        """
        if verbose:
            print(f"\n  [PARALLEL] Executing {len(parallel_nodes)} nodes in parallel: {parallel_nodes}")

        with ThreadPoolExecutor(max_workers=min(self.max_workers, len(parallel_nodes))) as executor:
            # Submit all parallel nodes
            future_to_node = {
                executor.submit(self._execute_node, node_name, state): node_name
                for node_name in parallel_nodes
            }

            # Collect results
            results = {}
            for future in as_completed(future_to_node):
                node_name = future_to_node[future]
                try:
                    node_result = future.result()
                    results[node_name] = node_result
                    if verbose:
                        print(f"    [OK] {node_name} completed")
                except Exception as e:
                    raise RuntimeError(f"Parallel node '{node_name}' failed: {e}") from e

            # Merge all results into state
            merged_state = state
            for node_name in parallel_nodes:
                if self.stateful:
                    merged_state = self._merge_state(merged_state, results[node_name])
                else:
                    merged_state = results[node_name]  # Last result wins in stateless

            return merged_state

    def _get_next_node(self, current_node: str, state: Any) -> Optional[Union[str, List[str]]]:
        """
        Determine the next node(s) based on edges and conditions.
        Returns either a single node name or a list for parallel execution.
        """
        node = self.nodes[current_node]

        if current_node in self.exit_nodes:
            return None

        # Check for parallel group execution
        if hasattr(node, 'parallel_group') and node.parallel_group:
            # Check condition if exists
            if hasattr(node, 'parallel_condition') and node.parallel_condition:
                should_parallelize = node.parallel_condition(state)
                if not should_parallelize:
                    # Execute first node only (sequential fallback)
                    return node.parallel_group[0]

            # Return special marker for parallel execution
            return ("PARALLEL", node.parallel_group, node.parallel_join)

        # Check conditional routing
        if node.condition_func:
            try:
                route_key = node.condition_func(state)
                next_node = node.condition_map.get(route_key)

                if next_node is None:
                    raise ValueError(
                        f"Condition returned invalid key '{route_key}'. "
                        f"Valid keys: {list(node.condition_map.keys())}"
                    )

                readable = node.readable_names.get(route_key, route_key)
                self._execution_log.append({
                    "type": "routing",
                    "from": current_node,
                    "to": next_node,
                    "route": readable
                })

                return None if next_node == "END" else next_node

            except Exception as e:
                raise RuntimeError(
                    f"Condition function failed in node '{current_node}': {str(e)}"
                ) from e

        # Regular edge
        if node.next_nodes:
            next_node = node.next_nodes[0]
            return None if next_node == "END" else next_node

        return None

    def run(
        self,
        initial_state: Optional[Union[Dict[str, Any], BaseModel]] = None,
        max_iterations: int = 1000,
        verbose: bool = False
    ) -> Union[Dict[str, Any], BaseModel]:
        """
        Executes the graph workflow dynamically.

        Args:
            initial_state (dict | BaseModel, optional): Starting data for the workflow.
            max_iterations (int): Safety limit on total node executions to prevent 
                infinite loops. Default is 1000.
            verbose (bool): If True, prints execution trace to stdout.

        Returns:
            Any: The final accumulated state of the workflow.

        Raises:
            RuntimeError: If no entry node is set, or if an execution error 
                occurs in a node.
        """
        if self.entry_node is None:
            raise RuntimeError("No entry node set. Call set_entry() first.")

        self._execution_log = []

        # Initialize state based on schema
        if initial_state is None:
            if self._is_pydantic:
                state = self.state_schema()
            elif self.state_schema:
                state = self.state_schema()
            else:
                state = {}
        else:
            if self._is_pydantic and isinstance(initial_state, dict):
                state = self.state_schema(**initial_state)
            else:
                state = initial_state

        current_node = self.entry_node
        iteration = 0

        if verbose:
            print(f"Starting execution at node: {current_node}")
            print(f"Initial state: {state}\n")

        while current_node is not None and iteration < max_iterations:
            iteration += 1

            if verbose:
                print(f"[{iteration}] Executing node: {current_node}")

            # Execute current node
            state = self._execute_node(current_node, state)

            if verbose:
                print(f"    State after: {state}")

            # Get next node(s)
            next_node = self._get_next_node(current_node, state)

            # Check if next is a parallel group
            if isinstance(next_node, tuple) and next_node[0] == "PARALLEL":
                _, parallel_nodes, join_node = next_node

                # Execute parallel nodes
                state = self._execute_parallel_group(parallel_nodes, state, verbose)

                # Continue to join node if specified
                if join_node:
                    current_node = join_node
                    if verbose:
                        print(f"  [JOIN] Parallel execution complete, continuing to: {join_node}\n")
                else:
                    # No join node, end execution
                    current_node = None
            else:
                # Regular sequential execution
                if verbose and next_node:
                    print(f"    Next: {next_node}\n")
                elif verbose:
                    print(f"    Workflow complete\n")

                current_node = next_node

        if iteration >= max_iterations:
            raise RuntimeError(
                f"Maximum iterations ({max_iterations}) reached. "
                "Possible infinite loop in graph."
            )

        return state

    def get_execution_log(self) -> List[Dict[str, Any]]:
        """Get detailed execution log from last run."""
        return self._execution_log

    def visualize(self) -> str:
        """Generate a text-based visualization of the graph."""
        lines = ["Graph Structure:", "=" * 50]

        if self.state_schema:
            schema_name = getattr(self.state_schema, '__name__', str(self.state_schema))
            state_type = "Pydantic Model" if self._is_pydantic else "Schema"
            lines.append(f"State {state_type}: {schema_name}")

        lines.append(f"Max Workers: {self.max_workers}")
        lines.append("")

        for node_name, node in self.nodes.items():
            marker = ">" if node_name == self.entry_node else "•"
            exit_marker = " [EXIT]" if node_name in self.exit_nodes else ""

            # Check if node has parallel group
            parallel_marker = ""
            if hasattr(node, 'parallel_group') and node.parallel_group:
                parallel_marker = f" [PARALLEL->{len(node.parallel_group)} nodes]"

            lines.append(f"{marker} {node_name}{exit_marker}{parallel_marker}")

            # Show parallel group details
            if hasattr(node, 'parallel_group') and node.parallel_group:
                lines.append(f"  |- Parallel nodes:")
                for pnode in node.parallel_group:
                    lines.append(f"  |  L- {pnode}")
                if hasattr(node, 'parallel_join') and node.parallel_join:
                    lines.append(f"  L- Join at: {node.parallel_join}")

            # Show conditional routing
            if node.condition_func:
                lines.append(f"  └─ Conditional routing:")
                for route_key, target in node.condition_map.items():
                    readable = node.readable_names.get(route_key, route_key)
                    lines.append(f"     [{route_key}] → {target} ({readable})")
            # Show regular edges
            elif not hasattr(node, 'parallel_group'):
                for next_node in node.next_nodes:
                    lines.append(f"  └─→ {next_node}")

        return "\n".join(lines)

__init__

__init__(stateful=True, state_schema=None, max_workers=4)

Initializes the MaticGraph engine.

Parameters:

Name Type Description Default
stateful bool

If True, state is preserved and merged between nodes.

True
state_schema type

A Pydantic model, dataclass, or TypedDict class to use as the state container.

None
max_workers int

Maximum number of parallel threads for parallel_group execution. Default is 4.

4
Source code in maticlib/graph/graph.py
def __init__(
    self,
    stateful: bool = True,
    state_schema: Optional[type] = None,
    max_workers: int = 4
):
    """
    Initializes the MaticGraph engine.

    Args:
        stateful (bool): If True, state is preserved and merged between nodes.
        state_schema (type, optional): A Pydantic model, dataclass, or 
            TypedDict class to use as the state container.
        max_workers (int): Maximum number of parallel threads for `parallel_group` 
            execution. Default is 4.
    """
    self.stateful = stateful
    self.state_schema = state_schema
    self.max_workers = max_workers
    self.nodes: Dict[str, Node] = {}
    self.entry_node: Optional[str] = None
    self.exit_nodes: List[str] = []
    self._execution_log: List[Dict[str, Any]] = []
    self._parallel_groups: Dict[str, List[str]] = {}  # Maps trigger node to parallel nodes

    # Detect if state_schema is a Pydantic model
    self._is_pydantic = False
    if state_schema is not None and PYDANTIC_AVAILABLE:
        self._is_pydantic = (
            isinstance(state_schema, type) and 
            issubclass(state_schema, BaseModel)
        )

add_conditional_edge

add_conditional_edge(
    from_node, condition, routes, readable_names=None
)

Adds a conditional edge that routes execution based on a condition function.

Parameters:

Name Type Description Default
from_node str

The node from which to route.

required
condition Callable

A function that takes the current state and returns a string key matching one of the routes.

required
routes Dict[str, str]

A mapping of condition keys to target node names.

required
readable_names dict

Mapping of keys to human-friendly names for documentation/visualization.

None

Returns:

Name Type Description
MaticGraph MaticGraph

The graph instance (for chaining).

Raises:

Type Description
ValueError

If from_node or route targets are not found.

Source code in maticlib/graph/graph.py
def add_conditional_edge(
    self,
    from_node: str,
    condition: Callable[[Any], str],
    routes: Dict[str, str],
    readable_names: Optional[Dict[str, str]] = None
) -> 'MaticGraph':
    """
    Adds a conditional edge that routes execution based on a condition function.

    Args:
        from_node (str): The node from which to route.
        condition (Callable): A function that takes the current state and 
            returns a string key matching one of the routes.
        routes (Dict[str, str]): A mapping of condition keys to target node names.
        readable_names (dict, optional): Mapping of keys to human-friendly 
            names for documentation/visualization.

    Returns:
        MaticGraph: The graph instance (for chaining).

    Raises:
        ValueError: If from_node or route targets are not found.
    """
    if from_node not in self.nodes:
        raise ValueError(f"Node '{from_node}' not found")

    for route_key, target in routes.items():
        if target not in self.nodes and target != "END":
            raise ValueError(f"Route target '{target}' not found")

    node = self.nodes[from_node]
    node.condition_func = condition
    node.condition_map = routes
    node.readable_names = readable_names or {}

    return self

add_edge

add_edge(from_node, to_node)

Adds a directed edge between two nodes.

Parameters:

Name Type Description Default
from_node str

The name of the starting node.

required
to_node str

The name of the target node, or 'END' to exit.

required

Returns:

Name Type Description
MaticGraph MaticGraph

The graph instance (for chaining).

Raises:

Type Description
ValueError

If either node name is not found in the graph.

Source code in maticlib/graph/graph.py
def add_edge(self, from_node: str, to_node: str) -> 'MaticGraph':
    """
    Adds a directed edge between two nodes.

    Args:
        from_node (str): The name of the starting node.
        to_node (str): The name of the target node, or 'END' to exit.

    Returns:
        MaticGraph: The graph instance (for chaining).

    Raises:
        ValueError: If either node name is not found in the graph.
    """
    if from_node not in self.nodes:
        raise ValueError(f"Source node '{from_node}' not found")
    if to_node not in self.nodes and to_node != "END":
        raise ValueError(f"Destination node '{to_node}' not found")

    self.nodes[from_node].next_nodes.append(to_node)
    return self

add_node

add_node(name, function)

Adds a processing node to the graph.

Parameters:

Name Type Description Default
name str

Unique name for the node.

required
function Callable

Function to execute. Should accept the current state and return a dict or model update.

required

Returns:

Name Type Description
MaticGraph MaticGraph

The graph instance (for chaining).

Raises:

Type Description
ValueError

If a node with the same name already exists.

Source code in maticlib/graph/graph.py
def add_node(self, name: str, function: Callable) -> 'MaticGraph':
    """
    Adds a processing node to the graph.

    Args:
        name (str): Unique name for the node.
        function (Callable): Function to execute. Should accept the current 
            state and return a dict or model update.

    Returns:
        MaticGraph: The graph instance (for chaining).

    Raises:
        ValueError: If a node with the same name already exists.
    """
    if name in self.nodes:
        raise ValueError(f"Node '{name}' already exists")

    self.nodes[name] = Node(name=name, function=function)
    return self

get_execution_log

get_execution_log()

Get detailed execution log from last run.

Source code in maticlib/graph/graph.py
def get_execution_log(self) -> List[Dict[str, Any]]:
    """Get detailed execution log from last run."""
    return self._execution_log

parallel_group

parallel_group(
    from_node,
    parallel_nodes,
    join_node=None,
    condition=None,
)

Execute multiple nodes in parallel after a specific node.

Parameters:

Name Type Description Default
from_node str

Node that triggers parallel execution

required
parallel_nodes List[str]

List of nodes to execute in parallel

required
join_node Optional[str]

Optional node to execute after all parallel nodes complete

None
condition Optional[Callable[[Any], bool]]

Optional function to decide whether to parallelize If returns False, executes first parallel_node only

None
Example

Always parallel

graph.parallel_group( "analyze", ["sentiment", "entities", "summary"], join_node="combine_results" )

Conditional parallel

graph.parallel_group( "check_size", ["process_large_a", "process_large_b"], join_node="merge", condition=lambda state: state.get("size") > 1000 )

Source code in maticlib/graph/graph.py
def parallel_group(
    self,
    from_node: str,
    parallel_nodes: List[str],
    join_node: Optional[str] = None,
    condition: Optional[Callable[[Any], bool]] = None
) -> 'MaticGraph':
    """
    Execute multiple nodes in parallel after a specific node.

    Args:
        from_node: Node that triggers parallel execution
        parallel_nodes: List of nodes to execute in parallel
        join_node: Optional node to execute after all parallel nodes complete
        condition: Optional function to decide whether to parallelize
                  If returns False, executes first parallel_node only

    Example:
        # Always parallel
        graph.parallel_group(
            "analyze",
            ["sentiment", "entities", "summary"],
            join_node="combine_results"
        )

        # Conditional parallel
        graph.parallel_group(
            "check_size",
            ["process_large_a", "process_large_b"],
            join_node="merge",
            condition=lambda state: state.get("size") > 1000
        )
    """
    # Validate nodes exist
    if from_node not in self.nodes:
        raise ValueError(f"Trigger node '{from_node}' not found")

    for node in parallel_nodes:
        if node not in self.nodes:
            raise ValueError(f"Parallel node '{node}' not found")

    if join_node and join_node not in self.nodes:
        raise ValueError(f"Join node '{join_node}' not found")

    # Store parallel group configuration
    self._parallel_groups[from_node] = {
        "nodes": parallel_nodes,
        "join_node": join_node,
        "condition": condition
    }

    # Mark the trigger node as having parallel execution
    self.nodes[from_node].parallel_group = parallel_nodes
    self.nodes[from_node].parallel_join = join_node
    self.nodes[from_node].parallel_condition = condition

    return self

run

run(initial_state=None, max_iterations=1000, verbose=False)

Executes the graph workflow dynamically.

Parameters:

Name Type Description Default
initial_state dict | BaseModel

Starting data for the workflow.

None
max_iterations int

Safety limit on total node executions to prevent infinite loops. Default is 1000.

1000
verbose bool

If True, prints execution trace to stdout.

False

Returns:

Name Type Description
Any Union[Dict[str, Any], BaseModel]

The final accumulated state of the workflow.

Raises:

Type Description
RuntimeError

If no entry node is set, or if an execution error occurs in a node.

Source code in maticlib/graph/graph.py
def run(
    self,
    initial_state: Optional[Union[Dict[str, Any], BaseModel]] = None,
    max_iterations: int = 1000,
    verbose: bool = False
) -> Union[Dict[str, Any], BaseModel]:
    """
    Executes the graph workflow dynamically.

    Args:
        initial_state (dict | BaseModel, optional): Starting data for the workflow.
        max_iterations (int): Safety limit on total node executions to prevent 
            infinite loops. Default is 1000.
        verbose (bool): If True, prints execution trace to stdout.

    Returns:
        Any: The final accumulated state of the workflow.

    Raises:
        RuntimeError: If no entry node is set, or if an execution error 
            occurs in a node.
    """
    if self.entry_node is None:
        raise RuntimeError("No entry node set. Call set_entry() first.")

    self._execution_log = []

    # Initialize state based on schema
    if initial_state is None:
        if self._is_pydantic:
            state = self.state_schema()
        elif self.state_schema:
            state = self.state_schema()
        else:
            state = {}
    else:
        if self._is_pydantic and isinstance(initial_state, dict):
            state = self.state_schema(**initial_state)
        else:
            state = initial_state

    current_node = self.entry_node
    iteration = 0

    if verbose:
        print(f"Starting execution at node: {current_node}")
        print(f"Initial state: {state}\n")

    while current_node is not None and iteration < max_iterations:
        iteration += 1

        if verbose:
            print(f"[{iteration}] Executing node: {current_node}")

        # Execute current node
        state = self._execute_node(current_node, state)

        if verbose:
            print(f"    State after: {state}")

        # Get next node(s)
        next_node = self._get_next_node(current_node, state)

        # Check if next is a parallel group
        if isinstance(next_node, tuple) and next_node[0] == "PARALLEL":
            _, parallel_nodes, join_node = next_node

            # Execute parallel nodes
            state = self._execute_parallel_group(parallel_nodes, state, verbose)

            # Continue to join node if specified
            if join_node:
                current_node = join_node
                if verbose:
                    print(f"  [JOIN] Parallel execution complete, continuing to: {join_node}\n")
            else:
                # No join node, end execution
                current_node = None
        else:
            # Regular sequential execution
            if verbose and next_node:
                print(f"    Next: {next_node}\n")
            elif verbose:
                print(f"    Workflow complete\n")

            current_node = next_node

    if iteration >= max_iterations:
        raise RuntimeError(
            f"Maximum iterations ({max_iterations}) reached. "
            "Possible infinite loop in graph."
        )

    return state

set_entry

set_entry(node_name)

Sets the starting node for the graph execution.

Parameters:

Name Type Description Default
node_name str

The name of the entry node.

required

Returns:

Name Type Description
MaticGraph MaticGraph

The graph instance (for chaining).

Raises:

Type Description
ValueError

If the node_name does not exist in the graph.

Source code in maticlib/graph/graph.py
def set_entry(self, node_name: str) -> 'MaticGraph':
    """
    Sets the starting node for the graph execution.

    Args:
        node_name (str): The name of the entry node.

    Returns:
        MaticGraph: The graph instance (for chaining).

    Raises:
        ValueError: If the node_name does not exist in the graph.
    """
    if node_name not in self.nodes:
        raise ValueError(f"Node '{node_name}' not found")
    self.entry_node = node_name
    return self

set_exit

set_exit(node_name)

Marks a node as an explicit exit point for the workflow.

Parameters:

Name Type Description Default
node_name str

The name of the node.

required

Returns:

Name Type Description
MaticGraph MaticGraph

The graph instance (for chaining).

Raises:

Type Description
ValueError

If the node_name does not exist in the graph.

Source code in maticlib/graph/graph.py
def set_exit(self, node_name: str) -> 'MaticGraph':
    """
    Marks a node as an explicit exit point for the workflow.

    Args:
        node_name (str): The name of the node.

    Returns:
        MaticGraph: The graph instance (for chaining).

    Raises:
        ValueError: If the node_name does not exist in the graph.
    """
    if node_name not in self.nodes:
        raise ValueError(f"Node '{node_name}' not found")
    if node_name not in self.exit_nodes:
        self.exit_nodes.append(node_name)
    return self

visualize

visualize()

Generate a text-based visualization of the graph.

Source code in maticlib/graph/graph.py
def visualize(self) -> str:
    """Generate a text-based visualization of the graph."""
    lines = ["Graph Structure:", "=" * 50]

    if self.state_schema:
        schema_name = getattr(self.state_schema, '__name__', str(self.state_schema))
        state_type = "Pydantic Model" if self._is_pydantic else "Schema"
        lines.append(f"State {state_type}: {schema_name}")

    lines.append(f"Max Workers: {self.max_workers}")
    lines.append("")

    for node_name, node in self.nodes.items():
        marker = ">" if node_name == self.entry_node else "•"
        exit_marker = " [EXIT]" if node_name in self.exit_nodes else ""

        # Check if node has parallel group
        parallel_marker = ""
        if hasattr(node, 'parallel_group') and node.parallel_group:
            parallel_marker = f" [PARALLEL->{len(node.parallel_group)} nodes]"

        lines.append(f"{marker} {node_name}{exit_marker}{parallel_marker}")

        # Show parallel group details
        if hasattr(node, 'parallel_group') and node.parallel_group:
            lines.append(f"  |- Parallel nodes:")
            for pnode in node.parallel_group:
                lines.append(f"  |  L- {pnode}")
            if hasattr(node, 'parallel_join') and node.parallel_join:
                lines.append(f"  L- Join at: {node.parallel_join}")

        # Show conditional routing
        if node.condition_func:
            lines.append(f"  └─ Conditional routing:")
            for route_key, target in node.condition_map.items():
                readable = node.readable_names.get(route_key, route_key)
                lines.append(f"     [{route_key}] → {target} ({readable})")
        # Show regular edges
        elif not hasattr(node, 'parallel_group'):
            for next_node in node.next_nodes:
                lines.append(f"  └─→ {next_node}")

    return "\n".join(lines)

when

when(from_node, **routes)

Simplified conditional routing using state['next'] or state.next Works with both dict and Pydantic models.

Source code in maticlib/graph/graph.py
def when(self, from_node: str, **routes: str) -> 'MaticGraph':
    """
    Simplified conditional routing using state['next'] or state.next
    Works with both dict and Pydantic models.
    """
    def route_by_next(state: Any) -> str:
        # Handle Pydantic models
        if self._is_pydantic and isinstance(state, BaseModel):
            next_key = getattr(state, 'next', None)
        # Handle dicts and TypedDict
        elif isinstance(state, dict):
            next_key = state.get("next")
        else:
            # Handle dataclasses
            next_key = getattr(state, 'next', None)

        if next_key not in routes:
            available = ", ".join(routes.keys())
            raise ValueError(
                f"Invalid route '{next_key}'. Available routes: {available}"
            )
        return next_key

    return self.add_conditional_edge(
        from_node,
        route_by_next,
        routes,
        readable_names={k: k.replace("_", " ").title() for k in routes.keys()}
    )

Node

maticlib.graph.node.Node dataclass

Represents a processing node within a MaticGraph workflow.

A node encapsulates a function to be executed and metadata for routing the workflow after execution, including support for conditional branching and parallel execution.

Attributes:

Name Type Description
name str

Unique identifier for the node.

function Callable

The Python function to execute. Receives the current state and returns an update (dict or model).

next_nodes List[str]

List of possible next nodes for sequential or parallel flow.

condition_func Optional[Callable]

A function that determines which route to take next based on the returned key.

condition_map Optional[Dict[str, str]]

Maps keys from condition_func to target node names.

readable_names Optional[Dict[str, str]]

Human-readable names for routes (useful for visualization).

parallel_group Optional[List[str]]

List of nodes to execute in parallel after this node.

parallel_join Optional[str]

A node where parallel execution groups re-converge.

parallel_condition Optional[Callable]

A condition to decide whether to trigger parallel execution.

Source code in maticlib/graph/node.py
@dataclass
class Node:
    """
    Represents a processing node within a MaticGraph workflow.

    A node encapsulates a function to be executed and metadata for
    routing the workflow after execution, including support for 
    conditional branching and parallel execution.

    Attributes:
        name (str): Unique identifier for the node.
        function (Callable): The Python function to execute. Receives the current 
            state and returns an update (dict or model).
        next_nodes (List[str]): List of possible next nodes for sequential or parallel flow.
        condition_func (Optional[Callable]): A function that determines which 
            route to take next based on the returned key.
        condition_map (Optional[Dict[str, str]]): Maps keys from `condition_func` 
            to target node names.
        readable_names (Optional[Dict[str, str]]): Human-readable names for routes 
            (useful for visualization).
        parallel_group (Optional[List[str]]): List of nodes to execute in 
            parallel after this node.
        parallel_join (Optional[str]): A node where parallel execution groups 
            re-converge.
        parallel_condition (Optional[Callable]): A condition to decide whether 
            to trigger parallel execution.
    """
    name: str
    function: Callable
    next_nodes: List[str] = field(default_factory=list)
    condition_func: Optional[Callable] = None
    condition_map: Optional[Dict[str, str]] = None
    readable_names: Optional[Dict[str, str]] = None

    # Parallel execution support
    parallel_group: Optional[List[str]] = None
    parallel_join: Optional[str] = None
    parallel_condition: Optional[Callable] = None