Source code for progress.pqnet.repository

"""
Some popular, ready-to-use, module behaviors for the DAG
"""
import netsquid as ns
from progress.pqnet.p_module import ProcessingModuleBehavior, SchedulingModuleBehavior
from progress.sockets import Token
import progress.progress_logging as log

__all__ = ["WaitForSwappingModuleBehavior", "EntanglementSwappingModuleBehavior", "FreeEverythingModuleBehavior",
           "RoundRobinSchedulingModuleBehavior", "ShortCircuitModuleBehavior", "DEJMPSModuleBehavior"]


[docs]class EntanglementSwappingModuleBehavior(ProcessingModuleBehavior): r""" This class implements the behavior of a module that performs the entanglement swapping protocol. """ def __init__(self, dest_devices, dest_module_ids, name, node, qnic=None): r""" Initialize the behavior of the module. Parameters ---------- dest_devices : tuple A tuple with the two lists of IDs identifying the device pools among which swapping must be carried out. The module swaps tokens entangled with one device from the first pool with tokens entangled with one device from the second pool. dest_module_ids : tuple A tuple with the two lists of IDs of the modules on the destination devices that the swapping outcome must be sent to. Each element of each list refers to the matching element of the device pool. They are supposed to be instances of :class:`~progress.pqnet.repository.WaitForSwappingModuleBehavior`. name : str The name of the module node : :class:`~progress.pqnet.p_module.Module` The module that this behavior is attached to qnic : int or None, optional See :class:`~progress.pqnet.p_module.ModuleBehavior`. """ self.dest_devices = dest_devices self.dest_module_ids = dest_module_ids self._swapped_ends = dest_devices super().__init__(name=name, node=node, qnic=qnic)
[docs] def handle_new_token(self, request): token = request.token target_pool_index = 0 if token.other_end.node in self.dest_devices[1] else 1 target_pool = self.dest_devices[target_pool_index] # look in the token table for a token that is waiting to be swapped target_token = self.node.token_table.get_target_token(target_pool) if target_token is None: # if no token is found, store the token in the token table self.node.token_table.add_token(token) return # pop the target token from the token table self.node.token_table.pop_token(local_end=target_token.socket) # request entanglement swapping if target_pool_index == 1: self.swap_tokens(token, target_token) else: self.swap_tokens(target_token, token)
@staticmethod def _get_state_after_swap(old_state_a, old_state_b, outcome): x_corr = 0 z_corr = 0 if old_state_a >= 2: z_corr += 1 if old_state_b >= 2: z_corr += 1 if outcome >= 2: z_corr += 1 if old_state_a % 2 != 0: x_corr += 1 if old_state_b % 2 != 0: x_corr += 1 if outcome % 2 != 0: x_corr += 1 x_corr %= 2 z_corr %= 2 return z_corr * 2 + x_corr
[docs] def handle_response(self, request): # the response is the outcome of the entanglement swapping protocol # we assert that the response is a swapping outcome # assert request.request.__name__ == "req_swap" # tokens have already been freed by the swapping protocol. no need to check them out again # send the outcome to the destination module outcome = request.response # the INSTR_BELL_MEASURE outcome does not exactly match bell states indices. if outcome == 3: outcome = 2 elif outcome == 2: outcome = 3 # print("Device: {}, swapping outcome: {}".format(self.node.device_id, outcome)) # DEBUG # get the new state of the token new_state = self._get_state_after_swap(request.request.token1.current_state, request.request.token2.current_state, outcome) new_pct = min(request.request.token1.pct, request.request.token2.pct) message = ns.components.Message(items=[request.request.token1.other_end, request.request.token2.other_end, new_state, new_pct], header="swapping_outcome") dest_device_a = request.request.token1.other_end.node dest_module_id_a = self.dest_module_ids[0][self.dest_devices[0].index(dest_device_a)] dest_device_b = request.request.token2.other_end.node dest_module_id_b = self.dest_module_ids[1][self.dest_devices[1].index(dest_device_b)] # DEBUG """ print("Device: {}, dests are ({},{}), ({},{})" .format(self.node.device_id, dest_device_a, dest_module_id_a, dest_device_b, dest_module_id_b)) """ self.send_message(message=message, dest_device=dest_device_a, dest_module_id=dest_module_id_a) self.send_message(message=message, dest_device=dest_device_b, dest_module_id=dest_module_id_b)
[docs] def handle_message(self, request): raise NotImplementedError("This module does not receive messages")
[docs]class WaitForSwappingModuleBehavior(SchedulingModuleBehavior): """ This class implements the behavior of a module that waits for the outcome of the entanglement swapping protocol. """ NEW_TOKEN_SIGNAL = "new_token" NEW_TOKEN_EVT_TYPE = ns.pydynaa.EventType("new_token", "A token has been captured by this module") """ Label used to signal that a token has been captured. """ def __init__(self, name, node, output_map=None, qnic=None, collect_stats=False): r""" Initialize the behavior of the module. Parameters ---------- name : str The name of the module node : :class:`~progress.pqnet.p_module.Module` The module that this behavior is attached to output_map : dict or None, optional A dictionary that maps each possible new remote entangled node to a different output port of the module. If None, all tokens are sent to the first output port. qnic : int or None, optional See :class:`~progress.pqnet.p_module.ModuleBehavior`. collect_stats : bool, optional Whether to collect statistics about the module's behavior. """ super().__init__(name=name, node=node, qnic=qnic) self.output_map = output_map self.collect_stats = collect_stats self.pending_requests = [] if collect_stats: self.add_signal(self.NEW_TOKEN_SIGNAL, self.NEW_TOKEN_EVT_TYPE) def _process_request(self, request): # get the two ends that were swapped end_a = request.message.items[0] end_b = request.message.items[1] new_pct = request.message.items[3] new_state = request.message.items[2] # check which end is the local end local_end = end_a if end_a.node == self.node.device_id else end_b # check which end is the remote end remote_end = end_a if end_a.node != self.node.device_id else end_b # get the token for the local end token = self.node.token_table.get_token(local_end, raise_error=False) if token is None: return False # update the token information new_token = Token(socket=local_end, other_end=remote_end, pct=new_pct, purified=0, current_state=new_state, additional_info=token.additional_info) # promote the token if self.output_map is None: self.promote_token(new_token) else: self.promote_token(new_token, output_port=self.output_map[remote_end.node]) return True
[docs] def handle_message(self, request): # the message is the outcome of the entanglement swapping protocol # we assert that the message is a swapping outcome assert request.message.meta["header"] == "swapping_outcome" result = self._process_request(request) if not result: self.pending_requests.append(request)
[docs] def handle_new_token(self, request): # simply store the token in the token table if self.collect_stats: sig_result = (ns.sim_time(), request.token.other_end.node, request.token.socket.created_at, request.token) self.send_signal(self.NEW_TOKEN_SIGNAL, sig_result) self.node.token_table.add_token(request.token) # check if there are pending requests for this token for pending_request in self.pending_requests: end_a = pending_request.message.items[0] end_b = pending_request.message.items[1] local_end = end_a if end_a.node == self.node.device_id else end_b if local_end == request.token.socket: result = self._process_request(pending_request) if result: self.pending_requests.remove(pending_request)
[docs] def handle_response(self, request): raise NotImplementedError("This module does not receive responses")
[docs]class DEJMPSModuleBehavior(ProcessingModuleBehavior): r""" This class implements the behavior of a module that performs the DEJMPS protocol. Parameters ---------- module_id : int The ID of the module dest_device : int The ID of the device that the distillation outcome must be sent to dest_module_id : int The ID of the module on the destination device that the distillation outcome must be sent to is_solicitor : bool Whether the module is the solicitor or not name : str The name of the module node : :class:`~progress.pqnet.p_module.Module` The module that this behavior is attached to qnic : int or None, optional See :class:`~progress.pqnet.p_module.ModuleBehavior`. """ def __init__(self, module_id, dest_device, dest_module_id, is_solicitor, name, node, qnic=None): self.module_id = module_id self.dest_device = dest_device self.dest_module_id = dest_module_id self.is_solicitor = is_solicitor self._stored_outcomes = {} super().__init__(name=name, node=node, qnic=qnic)
[docs] def handle_new_token(self, request): r""" If the module is the solicitor, it must check whether a new distillation can be carried out. If the module is not the solicitor, it must wait for a message from the solicitor, and just stores the token. See Also -------- :meth:`~progress.pqnet.p_module.ModuleBehavior.handle_new_token` """ token = request.token if not self.is_solicitor: # if the module is not the solicitor, it must wait for a message from the solicitor # the message will contain the token to be distilled self.node.token_table.add_token(token) return # if the module is the solicitor, it must check whether a new distillation can be carried out # look in the token table for a token that is waiting to be distilled target_token = self.node.token_table.get_target_token(token.other_end.node, additional_info_filters={"dejmps_pending": False}) token.additional_info["dejmps_pending"] = False self.node.token_table.add_token(token) if target_token is not None: # pop the target token from the token table (it will be used as ancilla anyway) self.node.token_table.pop_token(local_end=target_token.socket) token.additional_info["dejmps_pending"] = True target_token.additional_info["dejmps_pending"] = True # request distillation self.dejmps_tokens(token, target_token, role='A')
[docs] def handle_response(self, request): r""" If the module is the solicitor, it must send the distillation outcome to the solicited module. If the module is not the solicitor, it must check whether the distillation was successful and tell the solicitor. See Also -------- :meth:`~progress.pqnet.p_module.ModuleBehavior.handle_response` """ # the response is the outcome of the DEJMPS protocol # we assert that the response is a DEJMPS outcome # assert request.request.__name__ == "req_dejmps" # tokens have already been freed by the DEJMPS protocol. no need to check them out again outcome = request.response other_end_a = request.request.token1.other_end other_end_b = request.request.token2.other_end local_end_a = request.request.token1.socket """ # DEBUG log.info(f"DEJMPS outcome for tokens {request.request.token1} and {request.request.token2} is {outcome}", repeater_id=self.node.device_id) """ if self.is_solicitor: # if the module is the solicitor, it must send the outcome to the destination module message = ns.components.Message(items=[other_end_a, other_end_b, outcome], header="dejmps_solicitation") self.send_message(message=message, dest_device=self.dest_device, dest_module_id=self.dest_module_id) # store the outcome self._stored_outcomes[(other_end_a, other_end_b)] = outcome else: # retrieve the stored outcome from the other end stored_outcome = self._stored_outcomes[(other_end_a, other_end_b)] del self._stored_outcomes[(other_end_a, other_end_b)] success = (outcome == stored_outcome) """ # DEBUG log.info(f"DEJMPS {success} outcome for tokens {request.request.token1} and {request.request.token2}", repeater_id=self.node.device_id) """ message = ns.components.Message(items=[other_end_a, success], header="dejmps_outcome") self.send_message(message=message, dest_device=self.dest_device, dest_module_id=self.dest_module_id) # log.info(f"Sending DEJMPS outcome {message} for {other_end_a} to {self.dest_device} {self.dest_module_id}", repeater_id=self.node.device_id) if success: # in case of success we promote the distilled token token = self.node.token_table.get_token(local_end=local_end_a) self.promote_token(token) else: # in case of failure we must free the token token = self.node.token_table.pop_token(local_end=local_end_a) self.free_token(token)
[docs] def handle_message(self, request): r""" If the module is the solicitor, it receives distillation outcomes from the solicited module. If the module is not the solicitor, it receives distillation solicitations from the solicitor. See Also -------- :meth:`~progress.pqnet.p_module.ModuleBehavior.handle_message` """ if self.is_solicitor: # the solicitor can only receive "dejmps_outcome" messages if request.message.meta["header"] == "dejmps_outcome": success = request.message.items[1] if success: # in case of success we promote the distilled token token = self.node.token_table.get_token(local_end=request.message.items[0]) # log.info(f"Received distillation outcome for token {token}", repeater_id=self.node.device_id) del token.additional_info["dejmps_pending"] self.promote_token(token) else: # in case of failure we must free the tokens token = self.node.token_table.pop_token(local_end=request.message.items[0]) self.free_token(token) else: # the non-solicitor can only receive "dejmps_solicitation" messages if request.message.meta["header"] == "dejmps_solicitation": # retrieve the token to be distilled token_a = self.node.token_table.get_token(local_end=request.message.items[0], raise_error=False) token_b = self.node.token_table.get_token(local_end=request.message.items[1], raise_error=False) if token_a is None or token_b is None: # if the tokens are not in the token table, there might have been an incostistency due to the # fact that the controller did not reprogram all devices at the same time. In this case, we # simply ignore the message and free the tokens if token_a is not None: self.free_token(token_a) if token_b is not None: self.free_token(token_b) return # log.info(f"Received distillation solicit for tokens {token_a} and {token_b}", repeater_id=self.node.device_id) # pop token_b from the token table self.node.token_table.pop_token(local_end=token_b.socket) # store the outcome in the solicitation message outcome = request.message.items[2] self._stored_outcomes[(token_a.other_end, token_b.other_end)] = outcome # request distillation self.dejmps_tokens(token_a, token_b, role='B')
[docs]class RoundRobinSchedulingModuleBehavior(SchedulingModuleBehavior): r""" This class implements the behavior of a module that performs a simple, non-blocking, round-robin scheduling of tokens among output ports. """ def __init__(self, name, node, qnic=None): r""" Initialize the behavior of the module. Parameters ---------- name : str The name of the module node : :class:`~progress.pqnet.p_module.Module` The module that this behavior is attached to qnic : int or None, optional See :class:`~progress.pqnet.p_module.ModuleBehavior`. """ super().__init__(name=name, node=node, qnic=qnic) self._next_out_index = 0 def _get_next_out_idx(self, token=None): if token is None: ret = self._next_out_index self._next_out_index = (self._next_out_index + 1) % self.node.num_output return ret else: # return a simple hash of the token return hash(token) % self.node.num_output
[docs] def handle_new_token(self, request): """ Promote the token to the next output port. See Also -------- :meth:`progress.pqnet.p_module.ModuleBehavior.handle_new_token` """ out_index = self._get_next_out_idx() # log.info(f"Promoting token {request.token} to output port {out_index}", repeater_id=self.node.device_id) self.promote_token(request.token, output_port=out_index)
[docs] def handle_response(self, request): r""" Raise an error when called. This module does not receive responses. See Also -------- :meth:`progress.pqnet.p_module.ModuleBehavior.handle_response` """ raise NotImplementedError("This module does not receive responses")
[docs] def handle_message(self, request): r""" Raise an error when called. This module does not receive messages. See Also -------- :meth:`progress.pqnet.p_module.ModuleBehavior.handle_message` """ raise NotImplementedError("This module does not receive messages")
[docs]class FreeEverythingModuleBehavior(ProcessingModuleBehavior): r""" This class implements the behavior of a dummy module that frees all tokens. Attributes ---------- FREED_TOKEN_SIGNAL : str Label used to signal that a token has been freed. Only used if ``collect_stats`` is ``True``. FREED_TOKEN_EVT_TYPE : :class:`pydynaa.core.EventType` Event type used to signal that a token has been freed. Only used if ``collect_stats`` is ``True``. Parameters ---------- name : str The name of the module node : :class:`~progress.pqnet.p_module.Module` The module that this behavior is attached to qnic : int or None, optional See :class:`~progress.pqnet.p_module.ModuleBehavior`. collect_stats : bool, optional Whether to collect statistics about the freed tokens. """ FREED_TOKEN_SIGNAL = "freed_token" FREED_TOKEN_EVT_TYPE = ns.pydynaa.EventType("freed_token", "A token has been freed by this module") """ Label used to signal that a token has been freed. """ def __init__(self, name, node, qnic=None, collect_stats=False): super().__init__(name=name, node=node, qnic=qnic) self.collect_stats = collect_stats if self.collect_stats: self.add_signal(self.FREED_TOKEN_SIGNAL, self.FREED_TOKEN_EVT_TYPE)
[docs] def handle_new_token(self, request): """ Free the token. See Also -------- :meth:`progress.pqnet.p_module.ProcessingModuleBehavior.handle_new_token` """ # log.info("Freeing token {}".format(request.token), repeater_id=self.node.device_id) if self.collect_stats: # peek at the qubits inside the quantum memory (simulation cheat) qhardware = self.node.supercomponent.supercomponent.qhardware position = qhardware.map_info_to_qubit(request.token.socket.qnic, request.token.socket.idx) qubits = qhardware.qmemory.peek(position)[0].qstate.qubits fid_sq = self._compute_fidelity(qubits, request.token.current_state) slot_id = self.node.supercomponent.supercomponent.current_topology_id sig_result = (ns.sim_time(), request.token, fid_sq, slot_id) self.send_signal(self.FREED_TOKEN_SIGNAL, sig_result) self.free_token(request.token)
[docs] def handle_response(self, request): r""" Raises an error if called. This module does not receive responses. See Also -------- :meth:`~progress.pqnet.p_module.ModuleBehavior.handle_response` """ raise NotImplementedError("This module does not receive responses")
[docs] def handle_message(self, request): r""" Raises an error if called. This module does not receive messages. See Also -------- :meth:`~progress.pqnet.p_module.ModuleBehavior.handle_message` """ raise NotImplementedError(f"[Device {self.node.device_id}] This module ({self.node.module_id}) does not receive messages")
def _compute_fidelity(self, qubits, bell_state): """ Compute the fidelity of the given qubits with the given bell state index reference. """ if len(qubits) == 2: if bell_state == 0: fid_sq = ns.qubits.fidelity( qubits, reference_state=ns.qubits.ketstates.b00, squared=True) elif bell_state == 1: fid_sq = ns.qubits.fidelity( qubits, reference_state=ns.qubits.ketstates.b01, squared=True) elif bell_state == 2: fid_sq = ns.qubits.fidelity( qubits, reference_state=ns.qubits.ketstates.b10, squared=True) elif bell_state == 3: fid_sq = ns.qubits.fidelity( qubits, reference_state=ns.qubits.ketstates.b11, squared=True) else: log.error('Unknown bell state {} delivered'.format(bell_state), repeater_id=self.end_nodes[0]) fid_sq = -1 else: fid_sq = 0. # print("qubits state:", qubits[0].qstate.qrepr, "bell state:", bell_state, "fid:", fid_sq) return fid_sq
[docs]class ShortCircuitModuleBehavior(SchedulingModuleBehavior): r""" This class implements the behavior of a dummy module that promotes all tokens to its first output port. Parameters ---------- name : str The name of the module node : :class:`~progress.pqnet.p_module.Module` The module that this behavior is attached to qnic : int or None, optional See :class:`~progress.pqnet.p_module.ModuleBehavior`. """ def __init__(self, name, node, qnic=None): super().__init__(name=name, node=node, qnic=qnic)
[docs] def handle_new_token(self, request): r""" Simply promotes the token to the first output port. See also -------- :meth:`~progress.pqnet.p_module.ModuleBehavior.handle_new_token` """ out_index = 0 self.promote_token(request.token, output_port=out_index)
[docs] def handle_response(self, request): r""" Raises an error if called. This module does not receive responses. See Also -------- :meth:`~progress.pqnet.p_module.ModuleBehavior.handle_response` """ raise NotImplementedError("This module does not receive responses")
[docs] def handle_message(self, request): r""" Raises an error if called. This module does not receive messages. See Also -------- :meth:`~progress.pqnet.p_module.ModuleBehavior.handle_message` """ raise NotImplementedError("This module does not receive messages")