Source code for progress.pqnet.dag

import netsquid as ns
import networkx as nx

from progress.pqnet.p_module import Module
from progress.pqnet.messages import InterModuleMessage
import progress.progress_logging as log


[docs]class DAG: r""" This class is a representation of a Directed Acyclic Graph (DAG), where each node is assigned with an instance of the class :class:`~progress.pqnet.kernel.p_module.Module`. The DAG is used to represent the input/output relationship between modules, that determines the order in which the modules process the tokens. Parameters ---------- nodes : dict[str, :class:`~progress.pqnet.kernel.p_module.Module`] A dictionary that maps the name of each node to its corresponding module. edges : list[tuple[str, str]] A list of tuples, where each tuple represents an edge in the DAG. The order of the tuple is important, as it determines the direction of the edge. Attributes ---------- wrapping_node : :class:`netsquid.nodes.Node` A :class:`netsquid.nodes.Node` that wraps the DAG. This node is used to connect the DAG to the rest of the components of the quantum network device. Notes ----- The Ports of the wrapping node are: - "token_in_[0..roots]": The input port for tokens from the i-th QNIC. Tokens are delivered to the i-th root of the DAG. The i-th root is the i-th node in `nodes` that has no predecessors. - "messages": The input/output port for classical messages. - "tokens_ops_in": The input port for responses from the quantum hardware for operations on the tokens. """ def __init__(self, nodes, edges): self._graph = nx.DiGraph() self._graph.add_nodes_from(nodes.keys()) self._graph.add_edges_from(edges) self._nodes = nodes if not nx.is_directed_acyclic_graph(self._graph): raise ValueError("The graph is not a DAG.") # save the roots of the dag self._roots = [] for node in self._graph.nodes: if len(list(self._graph.predecessors(node))) == 0: self._roots.append(node) # add a wrapper node to the dag and add the ports self.wrapping_node = ns.nodes.Node(name="DAG") r""" A :class:`netsquid.nodes.Node` that wraps the DAG. This node is used to connect the DAG to the rest of the components of the device. Notes ----- The ports of this node are: 1. token_in_[0..roots]: The input port for tokens of the i-th root of the DAG. The i-th root is the i-th node in `nodes` that has no predecessors. 2. messages: The input/output port for classical messages. 3. tokens_ops_in: The input port for responses from the quantum hardware for operations on the tokens. """ self.wrapping_node.add_ports([f"token_in_{i}" for i in range(len(self._roots))] + ["messages", "tokens_ops_in"]) # connect the ports of the modules ports_used = {} for node in self._graph.nodes: ports_used[node] = (0, 0) self.wrapping_node.add_subcomponent(component=self._nodes[node], name=node) for edge in edges: source = edge[0] destination = edge[1] self._nodes[source].ports[f"out{ports_used[source][0]}"].connect( self._nodes[destination].ports[f"in{ports_used[destination][1]}"] ) ports_used[source] = (ports_used[source][0] + 1, ports_used[source][1]) ports_used[destination] = (ports_used[destination][0], ports_used[destination][1] + 1) # create a switch and connect it to the "messages" port of each module self._switch = DAGMessagesSwitch(dag=self) self.wrapping_node.add_subcomponent(component=self._switch, name="switch") for i, node in enumerate(self._nodes.keys()): self._nodes[node].ports["messages"].connect(self._switch.ports[f"module_{i}"]) # forward input and output ports to the switch self.wrapping_node.ports["messages"].forward_input(self._switch.ports["ext"]) self._switch.ports["ext"].forward_output(self.wrapping_node.ports["messages"]) self.wrapping_node.ports["tokens_ops_in"].forward_input(self._switch.ports["qhal_responses"]) # finally, forward input tokens to roots of the dag depending on their qnic attribute for root in self._roots: qnic = self._nodes[root].behavior.qnic if qnic is None: raise ValueError(f"Module {root} is a root of the DAG but has no qnic attribute.") self.wrapping_node.ports[f"token_in_{qnic}"].forward_input(self._nodes[root].ports["in0"])
[docs] def set_qhal(self, qhal): r""" Set the QHAL instance on all modules. Parameters ---------- qhal : :class:`~progress.pqnet.kernel.qhal.QHAL` """ for node in self._nodes.values(): node.qhal = qhal
[docs] def start(self): r""" Start all modules. """ for node in self._nodes.values(): node.start()
[docs] def get_nodes(self): r""" Get a list of the nodes of the DAG, where each element is the module identifier. Returns ------- list[str] """ return list(self._nodes.keys())
[docs] def get_dag_node(self): r""" Get the netsquid node that wraps the DAG. Returns ------- :class:`~netsquid.nodes.node.Node` """ return self.wrapping_node
[docs] def terminate(self): r""" Terminate all modules. """ for node in self._nodes.values(): node.stop() node.behavior.terminate()
[docs] def remove(self): r""" Remove all modules. """ for node in self._nodes.values(): self.wrapping_node.rem_subcomponent(node) for port in node.ports.values(): port.disconnect() self._switch.remove() self.wrapping_node.remove() for port in self.wrapping_node.ports.values(): port.disconnect()
[docs]class DAGFactory: r""" This class is a factory that creates a DAG from a list of edges and a dictionary of :class:`~progress.pqnet.kernel.p_module.ModuleBehavior` classes. Parameters ---------- edges : list[tuple[str, str]] A list of tuples, where each tuple represents an edge in the DAG. The order of the tuple is important, as it determines the direction of the edge. module_behaviors : dict[str, tuple(:class:`~progress.pqnet.kernel.p_module.ModuleBehavior.class`, dict)] A dictionary mapping the name of each module to a tuple of the module behavior class and a dictionary of the parameters to pass to the constructor of the module class. The dictionary must have keys that are numerical strings, so that they can be used to derive the module id. Roots of the DAG must have the optional integer parameter `"qnic"` to link them to the specific token queue from the QHAL. """ def __init__(self, edges, module_behaviors, device_id): self._edges = edges self._module_behaviors = module_behaviors self._module_params = {} # infer the number of input and output ports for each module for node in self._module_behaviors.keys(): num_input = 0 num_output = 0 if "qnic" in self._module_behaviors[node][1].keys(): num_input += 1 for edge in edges: if edge[1] == node: num_input += 1 if edge[0] == node: num_output += 1 self._module_params[node] = {} self._module_params[node]["num_input"] = num_input self._module_params[node]["num_output"] = num_output self._module_params[node]["device_id"] = device_id self._module_params[node]["name"] = "Module " + node self._module_params[node]["module_id"] = int(node)
[docs] def create_dag(self): r""" Create a DAG. Returns ------- :class:`~progress.pqnet.kernel.dag.DAG` """ modules = {} for node in self._module_behaviors.keys(): modules[node] = Module(**self._module_params[node]) self._module_behaviors[node][1]["node"] = modules[node] behavior = self._module_behaviors[node][0](**self._module_behaviors[node][1]) modules[node].behavior = behavior return DAG(modules.copy(), self._edges.copy())
[docs]class DAGMessagesSwitch(ns.components.Switch): r""" This class abstracts a virtual switch that routes incoming messages to the correct module in the DAG using `module_id` as routing key. It also acts as a multiplexer for all outgoing messages from bricks in the DAG. Parameters ---------- dag : :class:`~progress.pqnet.kernel.dag.DAG` The DAG that the switch is associated with. name : str or None, optional The name of the switch instance. If `None`, a default name is used. Defaults to `None`. """ def __init__(self, dag, name=None): r""" Initialize the switch. """ port_names = [f"module_{i}" for i in range(len(dag.get_nodes()))] + ["ext", "qhal_responses"] mux = {} if name is None: name = "DAGSwitch" # The multiplexing part is already initialized for i in range(len(dag.get_nodes())): mux[port_names[i]] = "ext" # Routing table: routing_table = {} for node in dag.get_nodes(): routing_table[int(node)] = port_names[int(node)] super().__init__(name=name, port_names=port_names, properties={"mux_table": mux, "routing_table": routing_table})
[docs] def routing_table(self, input_port, message) -> [(ns.components.Message, str)]: r""" See :class:`~netsquid.components.switch.Switch` for more information. """ if input_port == "ext": if isinstance(message, InterModuleMessage): if message.destination_id not in self.properties["routing_table"]: log.warning(f"No entry for module {message.destination_id} in the DAG") return [] return [(message, self.properties["routing_table"][message.destination_id])] elif input_port == "qhal_responses": # message is a response from the QHAL return [(message, self.properties["routing_table"][message.items[2].id])] else: return [(message, self.properties["mux_table"][input_port])]