In pydantic-graph, a graph is a collection of nodes that can be run in sequence. The nodes define
their outgoing edges — e.g. which nodes may be run next, and thereby the structure of the graph.
Here's a very simple example of a graph which increments a number by 1, but makes sure the number is never
42 at the end.
@dataclass(init=False)classGraph(Generic[StateT,DepsT,RunEndT]):"""Definition of a graph. In `pydantic-graph`, a graph is a collection of nodes that can be run in sequence. The nodes define their outgoing edges — e.g. which nodes may be run next, and thereby the structure of the graph. Here's a very simple example of a graph which increments a number by 1, but makes sure the number is never 42 at the end. ```py {title="never_42.py" noqa="I001" py="3.10"} from __future__ import annotations from dataclasses import dataclass from pydantic_graph import BaseNode, End, Graph, GraphRunContext @dataclass class MyState: number: int @dataclass class Increment(BaseNode[MyState]): async def run(self, ctx: GraphRunContext) -> Check42: ctx.state.number += 1 return Check42() @dataclass class Check42(BaseNode[MyState, None, int]): async def run(self, ctx: GraphRunContext) -> Increment | End[int]: if ctx.state.number == 42: return Increment() else: return End(ctx.state.number) never_42_graph = Graph(nodes=(Increment, Check42)) ``` _(This example is complete, it can be run "as is")_ See [`run`][pydantic_graph.graph.Graph.run] For an example of running graph, and [`mermaid_code`][pydantic_graph.graph.Graph.mermaid_code] for an example of generating a mermaid diagram from the graph. """name:str|Nonenode_defs:dict[str,NodeDef[StateT,DepsT,RunEndT]]snapshot_state:Callable[[StateT],StateT]_state_type:type[StateT]|_utils.Unset=field(repr=False)_run_end_type:type[RunEndT]|_utils.Unset=field(repr=False)_auto_instrument:bool=field(repr=False)def__init__(self,*,nodes:Sequence[type[BaseNode[StateT,DepsT,RunEndT]]],name:str|None=None,state_type:type[StateT]|_utils.Unset=_utils.UNSET,run_end_type:type[RunEndT]|_utils.Unset=_utils.UNSET,snapshot_state:Callable[[StateT],StateT]=deep_copy_state,auto_instrument:bool=True,):"""Create a graph from a sequence of nodes. Args: nodes: The nodes which make up the graph, nodes need to be unique and all be generic in the same state type. name: Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method. state_type: The type of the state for the graph, this can generally be inferred from `nodes`. run_end_type: The type of the result of running the graph, this can generally be inferred from `nodes`. snapshot_state: A function to snapshot the state of the graph, this is used in [`NodeStep`][pydantic_graph.state.NodeStep] and [`EndStep`][pydantic_graph.state.EndStep] to record the state before each step. auto_instrument: Whether to create a span for the graph run and the execution of each node's run method. """self.name=nameself._state_type=state_typeself._run_end_type=run_end_typeself._auto_instrument=auto_instrumentself.snapshot_state=snapshot_stateparent_namespace=_utils.get_parent_namespace(inspect.currentframe())self.node_defs:dict[str,NodeDef[StateT,DepsT,RunEndT]]={}fornodeinnodes:self._register_node(node,parent_namespace)self._validate_edges()asyncdefrun(self:Graph[StateT,DepsT,T],start_node:BaseNode[StateT,DepsT,T],*,state:StateT=None,deps:DepsT=None,infer_name:bool=True,span:LogfireSpan|None=None,)->GraphRunResult[StateT,T]:"""Run the graph from a starting node until it ends. Args: start_node: the first node to run, since the graph definition doesn't define the entry point in the graph, you need to provide the starting node. state: The initial state of the graph. deps: The dependencies of the graph. infer_name: Whether to infer the graph name from the calling frame. span: The span to use for the graph run. If not provided, a span will be created depending on the value of the `_auto_instrument` field. Returns: A `GraphRunResult` containing information about the run, including its final result. Here's an example of running the graph from [above][pydantic_graph.graph.Graph]: ```py {title="run_never_42.py" noqa="I001" py="3.10"} from never_42 import Increment, MyState, never_42_graph async def main(): state = MyState(1) graph_run_result = await never_42_graph.run(Increment(), state=state) print(state) #> MyState(number=2) print(len(graph_run_result.history)) #> 3 state = MyState(41) graph_run_result = await never_42_graph.run(Increment(), state=state) print(state) #> MyState(number=43) print(len(graph_run_result.history)) #> 5 ``` """ifinfer_nameandself.nameisNone:self._infer_name(inspect.currentframe())asyncwithself.iter(start_node,state=state,deps=deps,infer_name=infer_name,span=span)asgraph_run:asyncfor_nodeingraph_run:passfinal_result=graph_run.resultassertfinal_resultisnotNone,'GraphRun should have a final result'returnfinal_result@asynccontextmanagerasyncdefiter(self:Graph[StateT,DepsT,T],start_node:BaseNode[StateT,DepsT,T],*,history:list[HistoryStep[StateT,T]]|None=None,state:StateT=None,deps:DepsT=None,infer_name:bool=True,span:AbstractContextManager[Any]|None=None,)->AsyncIterator[GraphRun[StateT,DepsT,T]]:"""A contextmanager which can be used to iterate over the graph's nodes as they are executed. This method returns a `GraphRun` object which can be used to async-iterate over the nodes of this `Graph` as they are executed. This is the API to use if you want to record or interact with the nodes as the graph execution unfolds. The `GraphRun` can also be used to manually drive the graph execution by calling [`GraphRun.next`][pydantic_graph.graph.GraphRun.next]. The `GraphRun` provides access to the full run history, state, deps, and the final result of the run once it has completed. For more details, see the API documentation of [`GraphRun`][pydantic_graph.graph.GraphRun]. Args: start_node: the first node to run. Since the graph definition doesn't define the entry point in the graph, you need to provide the starting node. history: The history of the graph run so far. If not provided, a new list will be created. state: The initial state of the graph. deps: The dependencies of the graph. infer_name: Whether to infer the graph name from the calling frame. span: The span to use for the graph run. If not provided, a new span will be created. Yields: A GraphRun that can be async iterated over to drive the graph to completion. """ifinfer_nameandself.nameisNone:self._infer_name(inspect.currentframe())ifself._auto_instrumentandspanisNone:span=logfire_api.span('run graph {graph.name}',graph=self)withExitStack()asstack:ifspanisnotNone:stack.enter_context(span)yieldGraphRun[StateT,DepsT,T](self,start_node,history=historyifhistoryisnotNoneelse[],state=state,deps=deps,auto_instrument=self._auto_instrument,)defrun_sync(self:Graph[StateT,DepsT,T],start_node:BaseNode[StateT,DepsT,T],*,state:StateT=None,deps:DepsT=None,infer_name:bool=True,)->GraphRunResult[StateT,T]:"""Synchronously run the graph. This is a convenience method that wraps [`self.run`][pydantic_graph.Graph.run] with `loop.run_until_complete(...)`. You therefore can't use this method inside async code or if there's an active event loop. Args: start_node: the first node to run, since the graph definition doesn't define the entry point in the graph, you need to provide the starting node. state: The initial state of the graph. deps: The dependencies of the graph. infer_name: Whether to infer the graph name from the calling frame. Returns: The result type from ending the run and the history of the run. """ifinfer_nameandself.nameisNone:self._infer_name(inspect.currentframe())return_utils.get_event_loop().run_until_complete(self.run(start_node,state=state,deps=deps,infer_name=False))asyncdefnext(self:Graph[StateT,DepsT,T],node:BaseNode[StateT,DepsT,T],history:list[HistoryStep[StateT,T]],*,state:StateT=None,deps:DepsT=None,infer_name:bool=True,)->BaseNode[StateT,DepsT,Any]|End[T]:"""Run a node in the graph and return the next node to run. Args: node: The node to run. history: The history of the graph run so far. NOTE: this will be mutated to add the new step. state: The current state of the graph. deps: The dependencies of the graph. infer_name: Whether to infer the graph name from the calling frame. Returns: The next node to run or [`End`][pydantic_graph.nodes.End] if the graph has finished. """ifinfer_nameandself.nameisNone:self._infer_name(inspect.currentframe())ifisinstance(node,End):# While technically this is not compatible with the documented method signature, it's an easy mistake to# make, and we should eagerly provide a more helpful error message than you'd get otherwise.raiseexceptions.GraphRuntimeError(f'Cannot call `next` with an `End` node: {node!r}.')node_id=node.get_id()ifnode_idnotinself.node_defs:raiseexceptions.GraphRuntimeError(f'Node `{node}` is not in the graph.')withExitStack()asstack:ifself._auto_instrument:stack.enter_context(_logfire.span('run node {node_id}',node_id=node_id,node=node))ctx=GraphRunContext(state,deps)start_ts=_utils.now_utc()start=perf_counter()next_node=awaitnode.run(ctx)duration=perf_counter()-starthistory.append(NodeStep(state=state,node=node,start_ts=start_ts,duration=duration,snapshot_state=self.snapshot_state))ifisinstance(next_node,End):history.append(EndStep(result=next_node))elifnotisinstance(next_node,BaseNode):ifTYPE_CHECKING:typing_extensions.assert_never(next_node)else:raiseexceptions.GraphRuntimeError(f'Invalid node return type: `{type(next_node).__name__}`. Expected `BaseNode` or `End`.')returnnext_nodedefdump_history(self:Graph[StateT,DepsT,T],history:list[HistoryStep[StateT,T]],*,indent:int|None=None)->bytes:"""Dump the history of a graph run as JSON. Args: history: The history of the graph run. indent: The number of spaces to indent the JSON. Returns: The JSON representation of the history. """returnself.history_type_adapter.dump_json(history,indent=indent)defload_history(self,json_bytes:str|bytes|bytearray)->list[HistoryStep[StateT,RunEndT]]:"""Load the history of a graph run from JSON. Args: json_bytes: The JSON representation of the history. Returns: The history of the graph run. """returnself.history_type_adapter.validate_json(json_bytes)@cached_propertydefhistory_type_adapter(self)->pydantic.TypeAdapter[list[HistoryStep[StateT,RunEndT]]]:nodes=[node_def.nodefornode_definself.node_defs.values()]state_t=self._get_state_type()end_t=self._get_run_end_type()token=nodes_schema_var.set(nodes)try:ta=pydantic.TypeAdapter(list[Annotated[HistoryStep[state_t,end_t],pydantic.Discriminator('kind')]])finally:nodes_schema_var.reset(token)returntadefmermaid_code(self,*,start_node:Sequence[mermaid.NodeIdent]|mermaid.NodeIdent|None=None,title:str|None|typing_extensions.Literal[False]=None,edge_labels:bool=True,notes:bool=True,highlighted_nodes:Sequence[mermaid.NodeIdent]|mermaid.NodeIdent|None=None,highlight_css:str=mermaid.DEFAULT_HIGHLIGHT_CSS,infer_name:bool=True,direction:mermaid.StateDiagramDirection|None=None,)->str:"""Generate a diagram representing the graph as [mermaid](https://mermaid.js.org/) diagram. This method calls [`pydantic_graph.mermaid.generate_code`][pydantic_graph.mermaid.generate_code]. Args: start_node: The node or nodes which can start the graph. title: The title of the diagram, use `False` to not include a title. edge_labels: Whether to include edge labels. notes: Whether to include notes on each node. highlighted_nodes: Optional node or nodes to highlight. highlight_css: The CSS to use for highlighting nodes. infer_name: Whether to infer the graph name from the calling frame. direction: The direction of flow. Returns: The mermaid code for the graph, which can then be rendered as a diagram. Here's an example of generating a diagram for the graph from [above][pydantic_graph.graph.Graph]: ```py {title="mermaid_never_42.py" py="3.10"} from never_42 import Increment, never_42_graph print(never_42_graph.mermaid_code(start_node=Increment)) ''' --- title: never_42_graph --- stateDiagram-v2 [*] --> Increment Increment --> Check42 Check42 --> Increment Check42 --> [*] ''' ``` The rendered diagram will look like this: ```mermaid --- title: never_42_graph --- stateDiagram-v2 [*] --> Increment Increment --> Check42 Check42 --> Increment Check42 --> [*] ``` """ifinfer_nameandself.nameisNone:self._infer_name(inspect.currentframe())iftitleisNoneandself.name:title=self.namereturnmermaid.generate_code(self,start_node=start_node,highlighted_nodes=highlighted_nodes,highlight_css=highlight_css,title=titleorNone,edge_labels=edge_labels,notes=notes,direction=direction,)defmermaid_image(self,infer_name:bool=True,**kwargs:typing_extensions.Unpack[mermaid.MermaidConfig])->bytes:"""Generate a diagram representing the graph as an image. The format and diagram can be customized using `kwargs`, see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig]. !!! note "Uses external service" This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink` is a free service not affiliated with Pydantic. Args: infer_name: Whether to infer the graph name from the calling frame. **kwargs: Additional arguments to pass to `mermaid.request_image`. Returns: The image bytes. """ifinfer_nameandself.nameisNone:self._infer_name(inspect.currentframe())if'title'notinkwargsandself.name:kwargs['title']=self.namereturnmermaid.request_image(self,**kwargs)defmermaid_save(self,path:Path|str,/,*,infer_name:bool=True,**kwargs:typing_extensions.Unpack[mermaid.MermaidConfig])->None:"""Generate a diagram representing the graph and save it as an image. The format and diagram can be customized using `kwargs`, see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig]. !!! note "Uses external service" This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink` is a free service not affiliated with Pydantic. Args: path: The path to save the image to. infer_name: Whether to infer the graph name from the calling frame. **kwargs: Additional arguments to pass to `mermaid.save_image`. """ifinfer_nameandself.nameisNone:self._infer_name(inspect.currentframe())if'title'notinkwargsandself.name:kwargs['title']=self.namemermaid.save_image(path,self,**kwargs)def_get_state_type(self)->type[StateT]:if_utils.is_set(self._state_type):returnself._state_typefornode_definself.node_defs.values():forbaseintyping_extensions.get_original_bases(node_def.node):iftyping_extensions.get_origin(base)isBaseNode:args=typing_extensions.get_args(base)ifargs:returnargs[0]# break the inner (bases) loopbreak# state defaults to None, so use that if we can't infer itreturntype(None)# pyright: ignore[reportReturnType]def_get_run_end_type(self)->type[RunEndT]:if_utils.is_set(self._run_end_type):returnself._run_end_typefornode_definself.node_defs.values():forbaseintyping_extensions.get_original_bases(node_def.node):iftyping_extensions.get_origin(base)isBaseNode:args=typing_extensions.get_args(base)iflen(args)==3:t=args[2]ifnottyping_objects.is_never(t):returnt# break the inner (bases) loopbreakraiseexceptions.GraphSetupError('Could not infer run end type from nodes, please set `run_end_type`.')def_register_node(self:Graph[StateT,DepsT,T],node:type[BaseNode[StateT,DepsT,T]],parent_namespace:dict[str,Any]|None,)->None:node_id=node.get_id()ifexisting_node:=self.node_defs.get(node_id):raiseexceptions.GraphSetupError(f'Node ID `{node_id}` is not unique — found on {existing_node.node} and {node}')else:self.node_defs[node_id]=node.get_node_def(parent_namespace)def_validate_edges(self):known_node_ids=self.node_defs.keys()bad_edges:dict[str,list[str]]={}fornode_id,node_definself.node_defs.items():foredgeinnode_def.next_node_edges.keys():ifedgenotinknown_node_ids:bad_edges.setdefault(edge,[]).append(f'`{node_id}`')ifbad_edges:bad_edges_list=[f'`{k}` is referenced by {_utils.comma_and(v)}'fork,vinbad_edges.items()]iflen(bad_edges_list)==1:raiseexceptions.GraphSetupError(f'{bad_edges_list[0]} but not included in the graph.')else:b='\n'.join(f' {be}'forbeinbad_edges_list)raiseexceptions.GraphSetupError(f'Nodes are referenced in the graph but not included in the graph:\n{b}')def_infer_name(self,function_frame:types.FrameType|None)->None:"""Infer the agent name from the call frame. Usage should be `self._infer_name(inspect.currentframe())`. Copied from `Agent`. """assertself.nameisNone,'Name already set'iffunction_frameisnotNoneand(parent_frame:=function_frame.f_back):# pragma: no branchforname,iteminparent_frame.f_locals.items():ifitemisself:self.name=namereturnifparent_frame.f_locals!=parent_frame.f_globals:# if we couldn't find the agent in locals and globals are a different dict, try globalsforname,iteminparent_frame.f_globals.items():ifitemisself:self.name=namereturn
def__init__(self,*,nodes:Sequence[type[BaseNode[StateT,DepsT,RunEndT]]],name:str|None=None,state_type:type[StateT]|_utils.Unset=_utils.UNSET,run_end_type:type[RunEndT]|_utils.Unset=_utils.UNSET,snapshot_state:Callable[[StateT],StateT]=deep_copy_state,auto_instrument:bool=True,):"""Create a graph from a sequence of nodes. Args: nodes: The nodes which make up the graph, nodes need to be unique and all be generic in the same state type. name: Optional name for the graph, if not provided the name will be inferred from the calling frame on the first call to a graph method. state_type: The type of the state for the graph, this can generally be inferred from `nodes`. run_end_type: The type of the result of running the graph, this can generally be inferred from `nodes`. snapshot_state: A function to snapshot the state of the graph, this is used in [`NodeStep`][pydantic_graph.state.NodeStep] and [`EndStep`][pydantic_graph.state.EndStep] to record the state before each step. auto_instrument: Whether to create a span for the graph run and the execution of each node's run method. """self.name=nameself._state_type=state_typeself._run_end_type=run_end_typeself._auto_instrument=auto_instrumentself.snapshot_state=snapshot_stateparent_namespace=_utils.get_parent_namespace(inspect.currentframe())self.node_defs:dict[str,NodeDef[StateT,DepsT,RunEndT]]={}fornodeinnodes:self._register_node(node,parent_namespace)self._validate_edges()
asyncdefrun(self:Graph[StateT,DepsT,T],start_node:BaseNode[StateT,DepsT,T],*,state:StateT=None,deps:DepsT=None,infer_name:bool=True,span:LogfireSpan|None=None,)->GraphRunResult[StateT,T]:"""Run the graph from a starting node until it ends. Args: start_node: the first node to run, since the graph definition doesn't define the entry point in the graph, you need to provide the starting node. state: The initial state of the graph. deps: The dependencies of the graph. infer_name: Whether to infer the graph name from the calling frame. span: The span to use for the graph run. If not provided, a span will be created depending on the value of the `_auto_instrument` field. Returns: A `GraphRunResult` containing information about the run, including its final result. Here's an example of running the graph from [above][pydantic_graph.graph.Graph]: ```py {title="run_never_42.py" noqa="I001" py="3.10"} from never_42 import Increment, MyState, never_42_graph async def main(): state = MyState(1) graph_run_result = await never_42_graph.run(Increment(), state=state) print(state) #> MyState(number=2) print(len(graph_run_result.history)) #> 3 state = MyState(41) graph_run_result = await never_42_graph.run(Increment(), state=state) print(state) #> MyState(number=43) print(len(graph_run_result.history)) #> 5 ``` """ifinfer_nameandself.nameisNone:self._infer_name(inspect.currentframe())asyncwithself.iter(start_node,state=state,deps=deps,infer_name=infer_name,span=span)asgraph_run:asyncfor_nodeingraph_run:passfinal_result=graph_run.resultassertfinal_resultisnotNone,'GraphRun should have a final result'returnfinal_result
A contextmanager which can be used to iterate over the graph's nodes as they are executed.
This method returns a GraphRun object which can be used to async-iterate over the nodes of this Graph as
they are executed. This is the API to use if you want to record or interact with the nodes as the graph
execution unfolds.
The GraphRun can also be used to manually drive the graph execution by calling
GraphRun.next.
The GraphRun provides access to the full run history, state, deps, and the final result of the run once
it has completed.
For more details, see the API documentation of GraphRun.
@asynccontextmanagerasyncdefiter(self:Graph[StateT,DepsT,T],start_node:BaseNode[StateT,DepsT,T],*,history:list[HistoryStep[StateT,T]]|None=None,state:StateT=None,deps:DepsT=None,infer_name:bool=True,span:AbstractContextManager[Any]|None=None,)->AsyncIterator[GraphRun[StateT,DepsT,T]]:"""A contextmanager which can be used to iterate over the graph's nodes as they are executed. This method returns a `GraphRun` object which can be used to async-iterate over the nodes of this `Graph` as they are executed. This is the API to use if you want to record or interact with the nodes as the graph execution unfolds. The `GraphRun` can also be used to manually drive the graph execution by calling [`GraphRun.next`][pydantic_graph.graph.GraphRun.next]. The `GraphRun` provides access to the full run history, state, deps, and the final result of the run once it has completed. For more details, see the API documentation of [`GraphRun`][pydantic_graph.graph.GraphRun]. Args: start_node: the first node to run. Since the graph definition doesn't define the entry point in the graph, you need to provide the starting node. history: The history of the graph run so far. If not provided, a new list will be created. state: The initial state of the graph. deps: The dependencies of the graph. infer_name: Whether to infer the graph name from the calling frame. span: The span to use for the graph run. If not provided, a new span will be created. Yields: A GraphRun that can be async iterated over to drive the graph to completion. """ifinfer_nameandself.nameisNone:self._infer_name(inspect.currentframe())ifself._auto_instrumentandspanisNone:span=logfire_api.span('run graph {graph.name}',graph=self)withExitStack()asstack:ifspanisnotNone:stack.enter_context(span)yieldGraphRun[StateT,DepsT,T](self,start_node,history=historyifhistoryisnotNoneelse[],state=state,deps=deps,auto_instrument=self._auto_instrument,)
This is a convenience method that wraps self.run with loop.run_until_complete(...).
You therefore can't use this method inside async code or if there's an active event loop.
defrun_sync(self:Graph[StateT,DepsT,T],start_node:BaseNode[StateT,DepsT,T],*,state:StateT=None,deps:DepsT=None,infer_name:bool=True,)->GraphRunResult[StateT,T]:"""Synchronously run the graph. This is a convenience method that wraps [`self.run`][pydantic_graph.Graph.run] with `loop.run_until_complete(...)`. You therefore can't use this method inside async code or if there's an active event loop. Args: start_node: the first node to run, since the graph definition doesn't define the entry point in the graph, you need to provide the starting node. state: The initial state of the graph. deps: The dependencies of the graph. infer_name: Whether to infer the graph name from the calling frame. Returns: The result type from ending the run and the history of the run. """ifinfer_nameandself.nameisNone:self._infer_name(inspect.currentframe())return_utils.get_event_loop().run_until_complete(self.run(start_node,state=state,deps=deps,infer_name=False))
asyncdefnext(self:Graph[StateT,DepsT,T],node:BaseNode[StateT,DepsT,T],history:list[HistoryStep[StateT,T]],*,state:StateT=None,deps:DepsT=None,infer_name:bool=True,)->BaseNode[StateT,DepsT,Any]|End[T]:"""Run a node in the graph and return the next node to run. Args: node: The node to run. history: The history of the graph run so far. NOTE: this will be mutated to add the new step. state: The current state of the graph. deps: The dependencies of the graph. infer_name: Whether to infer the graph name from the calling frame. Returns: The next node to run or [`End`][pydantic_graph.nodes.End] if the graph has finished. """ifinfer_nameandself.nameisNone:self._infer_name(inspect.currentframe())ifisinstance(node,End):# While technically this is not compatible with the documented method signature, it's an easy mistake to# make, and we should eagerly provide a more helpful error message than you'd get otherwise.raiseexceptions.GraphRuntimeError(f'Cannot call `next` with an `End` node: {node!r}.')node_id=node.get_id()ifnode_idnotinself.node_defs:raiseexceptions.GraphRuntimeError(f'Node `{node}` is not in the graph.')withExitStack()asstack:ifself._auto_instrument:stack.enter_context(_logfire.span('run node {node_id}',node_id=node_id,node=node))ctx=GraphRunContext(state,deps)start_ts=_utils.now_utc()start=perf_counter()next_node=awaitnode.run(ctx)duration=perf_counter()-starthistory.append(NodeStep(state=state,node=node,start_ts=start_ts,duration=duration,snapshot_state=self.snapshot_state))ifisinstance(next_node,End):history.append(EndStep(result=next_node))elifnotisinstance(next_node,BaseNode):ifTYPE_CHECKING:typing_extensions.assert_never(next_node)else:raiseexceptions.GraphRuntimeError(f'Invalid node return type: `{type(next_node).__name__}`. Expected `BaseNode` or `End`.')returnnext_node
Source code in pydantic_graph/pydantic_graph/graph.py
326327328329330331332333334335336337338
defdump_history(self:Graph[StateT,DepsT,T],history:list[HistoryStep[StateT,T]],*,indent:int|None=None)->bytes:"""Dump the history of a graph run as JSON. Args: history: The history of the graph run. indent: The number of spaces to indent the JSON. Returns: The JSON representation of the history. """returnself.history_type_adapter.dump_json(history,indent=indent)
Source code in pydantic_graph/pydantic_graph/graph.py
340341342343344345346347348349
defload_history(self,json_bytes:str|bytes|bytearray)->list[HistoryStep[StateT,RunEndT]]:"""Load the history of a graph run from JSON. Args: json_bytes: The JSON representation of the history. Returns: The history of the graph run. """returnself.history_type_adapter.validate_json(json_bytes)
defmermaid_code(self,*,start_node:Sequence[mermaid.NodeIdent]|mermaid.NodeIdent|None=None,title:str|None|typing_extensions.Literal[False]=None,edge_labels:bool=True,notes:bool=True,highlighted_nodes:Sequence[mermaid.NodeIdent]|mermaid.NodeIdent|None=None,highlight_css:str=mermaid.DEFAULT_HIGHLIGHT_CSS,infer_name:bool=True,direction:mermaid.StateDiagramDirection|None=None,)->str:"""Generate a diagram representing the graph as [mermaid](https://mermaid.js.org/) diagram. This method calls [`pydantic_graph.mermaid.generate_code`][pydantic_graph.mermaid.generate_code]. Args: start_node: The node or nodes which can start the graph. title: The title of the diagram, use `False` to not include a title. edge_labels: Whether to include edge labels. notes: Whether to include notes on each node. highlighted_nodes: Optional node or nodes to highlight. highlight_css: The CSS to use for highlighting nodes. infer_name: Whether to infer the graph name from the calling frame. direction: The direction of flow. Returns: The mermaid code for the graph, which can then be rendered as a diagram. Here's an example of generating a diagram for the graph from [above][pydantic_graph.graph.Graph]: ```py {title="mermaid_never_42.py" py="3.10"} from never_42 import Increment, never_42_graph print(never_42_graph.mermaid_code(start_node=Increment)) ''' --- title: never_42_graph --- stateDiagram-v2 [*] --> Increment Increment --> Check42 Check42 --> Increment Check42 --> [*] ''' ``` The rendered diagram will look like this: ```mermaid --- title: never_42_graph --- stateDiagram-v2 [*] --> Increment Increment --> Check42 Check42 --> Increment Check42 --> [*] ``` """ifinfer_nameandself.nameisNone:self._infer_name(inspect.currentframe())iftitleisNoneandself.name:title=self.namereturnmermaid.generate_code(self,start_node=start_node,highlighted_nodes=highlighted_nodes,highlight_css=highlight_css,title=titleorNone,edge_labels=edge_labels,notes=notes,direction=direction,)
defmermaid_image(self,infer_name:bool=True,**kwargs:typing_extensions.Unpack[mermaid.MermaidConfig])->bytes:"""Generate a diagram representing the graph as an image. The format and diagram can be customized using `kwargs`, see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig]. !!! note "Uses external service" This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink` is a free service not affiliated with Pydantic. Args: infer_name: Whether to infer the graph name from the calling frame. **kwargs: Additional arguments to pass to `mermaid.request_image`. Returns: The image bytes. """ifinfer_nameandself.nameisNone:self._infer_name(inspect.currentframe())if'title'notinkwargsandself.name:kwargs['title']=self.namereturnmermaid.request_image(self,**kwargs)
defmermaid_save(self,path:Path|str,/,*,infer_name:bool=True,**kwargs:typing_extensions.Unpack[mermaid.MermaidConfig])->None:"""Generate a diagram representing the graph and save it as an image. The format and diagram can be customized using `kwargs`, see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig]. !!! note "Uses external service" This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink` is a free service not affiliated with Pydantic. Args: path: The path to save the image to. infer_name: Whether to infer the graph name from the calling frame. **kwargs: Additional arguments to pass to `mermaid.save_image`. """ifinfer_nameandself.nameisNone:self._infer_name(inspect.currentframe())if'title'notinkwargsandself.name:kwargs['title']=self.namemermaid.save_image(path,self,**kwargs)
You typically get a GraphRun instance from calling
async with [my_graph.iter(...)][pydantic_graph.graph.Graph.iter] as graph_run:. That gives you the ability to iterate
through nodes as they run, either by async for iteration or by repeatedly calling .next(...).
Here's an example of iterating over the graph from above:
classGraphRun(Generic[StateT,DepsT,RunEndT]):"""A stateful, async-iterable run of a [`Graph`][pydantic_graph.graph.Graph]. You typically get a `GraphRun` instance from calling `async with [my_graph.iter(...)][pydantic_graph.graph.Graph.iter] as graph_run:`. That gives you the ability to iterate through nodes as they run, either by `async for` iteration or by repeatedly calling `.next(...)`. Here's an example of iterating over the graph from [above][pydantic_graph.graph.Graph]: ```py {title="iter_never_42.py" noqa="I001" py="3.10"} from copy import deepcopy from never_42 import Increment, MyState, never_42_graph async def main(): state = MyState(1) async with never_42_graph.iter(Increment(), state=state) as graph_run: node_states = [(graph_run.next_node, deepcopy(graph_run.state))] async for node in graph_run: node_states.append((node, deepcopy(graph_run.state))) print(node_states) ''' [ (Increment(), MyState(number=1)), (Check42(), MyState(number=2)), (End(data=2), MyState(number=2)), ] ''' state = MyState(41) async with never_42_graph.iter(Increment(), state=state) as graph_run: node_states = [(graph_run.next_node, deepcopy(graph_run.state))] async for node in graph_run: node_states.append((node, deepcopy(graph_run.state))) print(node_states) ''' [ (Increment(), MyState(number=41)), (Check42(), MyState(number=42)), (Increment(), MyState(number=42)), (Check42(), MyState(number=43)), (End(data=43), MyState(number=43)), ] ''' ``` See the [`GraphRun.next` documentation][pydantic_graph.graph.GraphRun.next] for an example of how to manually drive the graph run. """def__init__(self,graph:Graph[StateT,DepsT,RunEndT],start_node:BaseNode[StateT,DepsT,RunEndT],*,history:list[HistoryStep[StateT,RunEndT]],state:StateT,deps:DepsT,auto_instrument:bool,):"""Create a new run for a given graph, starting at the specified node. Typically, you'll use [`Graph.iter`][pydantic_graph.graph.Graph.iter] rather than calling this directly. Args: graph: The [`Graph`][pydantic_graph.graph.Graph] to run. start_node: The node where execution will begin. history: A list of [`HistoryStep`][pydantic_graph.state.HistoryStep] objects that describe each step of the run. Usually starts empty; can be populated if resuming. state: A shared state object or primitive (like a counter, dataclass, etc.) that is available to all nodes via `ctx.state`. deps: Optional dependencies that each node can access via `ctx.deps`, e.g. database connections, configuration, or logging clients. auto_instrument: Whether to automatically create instrumentation spans during the run. """self.graph=graphself.history=historyself.state=stateself.deps=depsself._auto_instrument=auto_instrumentself._next_node:BaseNode[StateT,DepsT,RunEndT]|End[RunEndT]=start_node@propertydefnext_node(self)->BaseNode[StateT,DepsT,RunEndT]|End[RunEndT]:"""The next node that will be run in the graph. This is the next node that will be used during async iteration, or if a node is not passed to `self.next(...)`. """returnself._next_node@propertydefresult(self)->GraphRunResult[StateT,RunEndT]|None:"""The final result of the graph run if the run is completed, otherwise `None`."""ifnotisinstance(self._next_node,End):returnNone# The GraphRun has not finished runningreturnGraphRunResult(self._next_node.data,state=self.state,history=self.history,)asyncdefnext(self:GraphRun[StateT,DepsT,T],node:BaseNode[StateT,DepsT,T]|None=None)->BaseNode[StateT,DepsT,T]|End[T]:"""Manually drive the graph run by passing in the node you want to run next. This lets you inspect or mutate the node before continuing execution, or skip certain nodes under dynamic conditions. The graph run should stop when you return an [`End`][pydantic_graph.nodes.End] node. Here's an example of using `next` to drive the graph from [above][pydantic_graph.graph.Graph]: ```py {title="next_never_42.py" noqa="I001" py="3.10"} from copy import deepcopy from pydantic_graph import End from never_42 import Increment, MyState, never_42_graph async def main(): state = MyState(48) async with never_42_graph.iter(Increment(), state=state) as graph_run: next_node = graph_run.next_node # start with the first node node_states = [(next_node, deepcopy(graph_run.state))] while not isinstance(next_node, End): if graph_run.state.number == 50: graph_run.state.number = 42 next_node = await graph_run.next(next_node) node_states.append((next_node, deepcopy(graph_run.state))) print(node_states) ''' [ (Increment(), MyState(number=48)), (Check42(), MyState(number=49)), (End(data=49), MyState(number=49)), ] ''' ``` Args: node: The node to run next in the graph. If not specified, uses `self.next_node`, which is initialized to the `start_node` of the run and updated each time a new node is returned. Returns: The next node returned by the graph logic, or an [`End`][pydantic_graph.nodes.End] node if the run has completed. """ifnodeisNone:ifisinstance(self._next_node,End):# Note: we could alternatively just return `self._next_node` here, but it's easier to start with an# error and relax the behavior later, than vice versa.raiseexceptions.GraphRuntimeError('This graph run has already ended.')node=self._next_nodehistory=self.historystate=self.statedeps=self.depsself._next_node=awaitself.graph.next(node,history,state=state,deps=deps,infer_name=False)returnself._next_nodedef__aiter__(self)->AsyncIterator[BaseNode[StateT,DepsT,RunEndT]|End[RunEndT]]:returnselfasyncdef__anext__(self)->BaseNode[StateT,DepsT,RunEndT]|End[RunEndT]:"""Use the last returned node as the input to `Graph.next`."""ifisinstance(self._next_node,End):raiseStopAsyncIterationreturnawaitself.next(self._next_node)def__repr__(self)->str:returnf'<GraphRun name={self.graph.nameor"<unnamed>"} step={len(self.history)+1}>'
def__init__(self,graph:Graph[StateT,DepsT,RunEndT],start_node:BaseNode[StateT,DepsT,RunEndT],*,history:list[HistoryStep[StateT,RunEndT]],state:StateT,deps:DepsT,auto_instrument:bool,):"""Create a new run for a given graph, starting at the specified node. Typically, you'll use [`Graph.iter`][pydantic_graph.graph.Graph.iter] rather than calling this directly. Args: graph: The [`Graph`][pydantic_graph.graph.Graph] to run. start_node: The node where execution will begin. history: A list of [`HistoryStep`][pydantic_graph.state.HistoryStep] objects that describe each step of the run. Usually starts empty; can be populated if resuming. state: A shared state object or primitive (like a counter, dataclass, etc.) that is available to all nodes via `ctx.state`. deps: Optional dependencies that each node can access via `ctx.deps`, e.g. database connections, configuration, or logging clients. auto_instrument: Whether to automatically create instrumentation spans during the run. """self.graph=graphself.history=historyself.state=stateself.deps=depsself._auto_instrument=auto_instrumentself._next_node:BaseNode[StateT,DepsT,RunEndT]|End[RunEndT]=start_node
Manually drive the graph run by passing in the node you want to run next.
This lets you inspect or mutate the node before continuing execution, or skip certain nodes
under dynamic conditions. The graph run should stop when you return an End node.
Here's an example of using next to drive the graph from above:
next_never_42.py
fromcopyimportdeepcopyfrompydantic_graphimportEndfromnever_42importIncrement,MyState,never_42_graphasyncdefmain():state=MyState(48)asyncwithnever_42_graph.iter(Increment(),state=state)asgraph_run:next_node=graph_run.next_node# start with the first nodenode_states=[(next_node,deepcopy(graph_run.state))]whilenotisinstance(next_node,End):ifgraph_run.state.number==50:graph_run.state.number=42next_node=awaitgraph_run.next(next_node)node_states.append((next_node,deepcopy(graph_run.state)))print(node_states)''' [ (Increment(), MyState(number=48)), (Check42(), MyState(number=49)), (End(data=49), MyState(number=49)), ] '''
The node to run next in the graph. If not specified, uses self.next_node, which is initialized to
the start_node of the run and updated each time a new node is returned.
asyncdefnext(self:GraphRun[StateT,DepsT,T],node:BaseNode[StateT,DepsT,T]|None=None)->BaseNode[StateT,DepsT,T]|End[T]:"""Manually drive the graph run by passing in the node you want to run next. This lets you inspect or mutate the node before continuing execution, or skip certain nodes under dynamic conditions. The graph run should stop when you return an [`End`][pydantic_graph.nodes.End] node. Here's an example of using `next` to drive the graph from [above][pydantic_graph.graph.Graph]: ```py {title="next_never_42.py" noqa="I001" py="3.10"} from copy import deepcopy from pydantic_graph import End from never_42 import Increment, MyState, never_42_graph async def main(): state = MyState(48) async with never_42_graph.iter(Increment(), state=state) as graph_run: next_node = graph_run.next_node # start with the first node node_states = [(next_node, deepcopy(graph_run.state))] while not isinstance(next_node, End): if graph_run.state.number == 50: graph_run.state.number = 42 next_node = await graph_run.next(next_node) node_states.append((next_node, deepcopy(graph_run.state))) print(node_states) ''' [ (Increment(), MyState(number=48)), (Check42(), MyState(number=49)), (End(data=49), MyState(number=49)), ] ''' ``` Args: node: The node to run next in the graph. If not specified, uses `self.next_node`, which is initialized to the `start_node` of the run and updated each time a new node is returned. Returns: The next node returned by the graph logic, or an [`End`][pydantic_graph.nodes.End] node if the run has completed. """ifnodeisNone:ifisinstance(self._next_node,End):# Note: we could alternatively just return `self._next_node` here, but it's easier to start with an# error and relax the behavior later, than vice versa.raiseexceptions.GraphRuntimeError('This graph run has already ended.')node=self._next_nodehistory=self.historystate=self.statedeps=self.depsself._next_node=awaitself.graph.next(node,history,state=state,deps=deps,infer_name=False)returnself._next_node
Use the last returned node as the input to Graph.next.
Source code in pydantic_graph/pydantic_graph/graph.py
732733734735736
asyncdef__anext__(self)->BaseNode[StateT,DepsT,RunEndT]|End[RunEndT]:"""Use the last returned node as the input to `Graph.next`."""ifisinstance(self._next_node,End):raiseStopAsyncIterationreturnawaitself.next(self._next_node)
Source code in pydantic_graph/pydantic_graph/graph.py
742743744745746747748
@dataclassclassGraphRunResult(Generic[StateT,RunEndT]):"""The final result of running a graph."""output:RunEndTstate:StateThistory:list[HistoryStep[StateT,RunEndT]]=field(repr=False)