Source code for progress.hardware.ep_source

"""
This module implements a source of entangled bell pairs which generates pairs according to the stochastic generation
process of Midpoint Source Protocol.
"""
import math

from netsquid.components.qsource import QSource, SourceStatus
from netsquid.components.models.delaymodels import DelayModel
import netsquid.qubits.ketstates as ks
from netsquid.nodes import Node
from netsquid.util.simtools import get_random_state
from netsquid.qubits import StateSampler
import netsquid as ns

__all__ = ["MPSSourceNode"]

from progress.hardware.llps.mps import MPSSourceProtocol
from progress import progress_logging as log


class MPSSource(QSource):
    r"""This source implements the Midpoint source (MS) protocols stochastic generation of entanglement. This class
    should be used only inside :class:`~qi_simulation.mps.components.ep_source.MPSSourceNode`.

    Parameters
    ----------
    name : str
        The name of this component.
    source : :class:`~qi_simulation.mps.components.ep_source.MPSSourceNode`
        The mid-point source that this instance is serving.
    p_left : float
        The probability of successfully latching the emitted qubit on the left with the components on that side.
        It should keep into account the loss probability on the link and the probability of failure at the components,
        which is due to frequency conversion and partial BSA in the case of standard MS,
        or due to imperfect nDPD and absorption in the case of AFC-enhanced MS.
    p_right : float, optional
        The probability of successfully latching the emitted qubit on the right with the components on that side.
        If ``None``, it is set equal to ``p_left``. Defaults to ``None``.
    p_mid : float, optional
        The probability that the midpoint entangled pair source successfully emits a pair at a given clock cycle.
        Defaults to 1.
    num_positions : int, optional
        The number of modes of the repeaters' quantum memories attached to the link. Defaults to 1.
    t_clock : int or float, optional
        The clock period of the MS protocols. Defaults to 1. [ns]
    t_link : int or float
        The total transmission time of the link between the two repeaters.
    """

    class MPSDelayModel(DelayModel):
        r"""
        This stateful delay model determines the time between the generation of two consecutive entangled pairs
        in MPS protocol. It should only be used inside its parent class
        :class:`~qi_simulation.mps.components.ep_source.MPSSource`.
        """

        def __init__(self, source,  p_left, p_right, num_positions, p_mid, t_clock, t_link, rng=None, **kwargs):
            self._p_left = p_left
            self._p_right = p_right
            self._num_positions = num_positions
            self._p_mid = p_mid
            self._t_clock = t_clock
            self._length = (t_link/1e9)*200000
            self._K = math.ceil(3/(min(p_left, p_right)*p_mid))
            self._t_link = t_link
            self._t_round = self._t_link + num_positions * self._K * t_clock
            self._last_bin_and_trial = (-1, -1)
            self._source = source
            super().__init__(rng=rng, **kwargs)
            self._successful_pairs = self._get_successful_pairs()
            self._first_time = True

        def generate_delay(self, **kwargs):

            if len(self._successful_pairs) > 0:
                bin_idx, trial_idx = self._successful_pairs.pop(0)
                last_bin, last_trial = self._last_bin_and_trial
                self._last_bin_and_trial = (bin_idx, trial_idx)

                # this is ugly but efficient to inform the node about the position of the generated entanglement
                self._source.subcomponents["qsource"].output_meta["position"] = bin_idx
                # this keeps the t_link updated on receiving nodes
                self._source.subcomponents["qsource"].output_meta["time_sent"] = ns.sim_time()

                # update status of qubit
                self._source.pos_status_list[bin_idx] = "busy"

                return self._get_inter_time(
                    old_bin=last_bin, old_trial=last_trial, new_bin=bin_idx, new_trial=trial_idx
                )

            else:
                self._successful_pairs = self._get_successful_pairs()
                if len(self._successful_pairs) == 0:

                    # used to inform that there was no successful trial in this round
                    self._source.subcomponents["qsource"].output_meta["position"] = -1
                    last_bin, last_trial = self._last_bin_and_trial
                    self._last_bin_and_trial = (-1, -1)

                    # used to notify that no trial was successful in the last round
                    self._source.subcomponents["qsource"].output_meta["position"] = -1

                    if last_bin == -1 and last_trial == -1:
                        return self._get_round_time()

                    return self._get_time_left_in_round(last_bin, last_trial) + self._get_round_time()
                else:
                    bin_idx, trial_idx = self._successful_pairs.pop(0)
                    last_bin, last_trial = self._last_bin_and_trial
                    self._last_bin_and_trial = (bin_idx, trial_idx)

                    # this is ugly but efficient to inform the node about the position of the generated entanglement
                    self._source.subcomponents["qsource"].output_meta["position"] = bin_idx

                    # update status of qubit
                    self._source.pos_status_list[bin_idx] = "busy"

                    delay = self._get_time_left_in_round(last_bin, last_trial) + self._get_inter_time(
                        old_bin=-1, old_trial=-1, new_bin=bin_idx, new_trial=trial_idx
                    )

                    return delay

        def _get_time_left_in_round(self, bin_idx, trial_idx):
            if bin_idx == -1 and trial_idx == -1:
                return 0
            full_bins_left = self._num_positions - bin_idx - 1
            trials_in_bin_left = self._K - trial_idx - 1
            return (trials_in_bin_left + self._K*full_bins_left)*self._t_clock + self._t_link

        def _get_round_time(self):
            return self._K * self._num_positions * self._t_clock + self._t_link

        def _get_inter_time(self, old_bin, old_trial, new_bin, new_trial):
            trials_in_bin_left = self._K - old_trial - 1
            full_bins_left = new_bin - old_bin - 1

            # no successes in last round, we behave as if we are at the beginning of the round
            if old_bin == -1 and old_trial == -1:
                trials_in_bin_left = 0

            ret = (trials_in_bin_left + full_bins_left * self._K + new_trial + 1) * self._t_clock
            if self._first_time:
                # because there is t_link/2 latency given by the qchannel and another t_link/2 must be added "manually"
                ret += self._t_link/2
                self._first_time = False
            return ret

        def _get_successful_pairs(self):
            """
            counter = 0
            for pairs in self._source.pos_status_list:
                if pairs == "free":
                    counter += 1
            if self._source.ID == 13:
                qilog.debug("{} free pairs found. Status are {}".format(counter, self._source.pos_status_list))
            """

            successful_pairs = []
            rng = self.rng
            if rng is None:
                rng = get_random_state()
            for i in range(self._num_positions):
                if self._source.pos_status_list[i] == "free":
                    trials_left = rng.geometric(self._p_left)
                    trials_right = rng.geometric(self._p_right)
                    if trials_left == trials_right:
                        gen_failures = rng.negative_binomial(trials_right, self._p_mid)
                        gen_trials = gen_failures + trials_right
                        if gen_trials <= self._K:
                            successful_pairs.append((i, gen_trials-1))  # bin_index and trial_index
            return successful_pairs

        def reset(self):
            self._last_bin_and_trial = (-1, -1)
            self._source.subcomponents["qsource"].output_meta["position"] = None
            for i, _ in enumerate(self._source.pos_status_list):
                self._source.pos_status_list[i] = "free"
            self._successful_pairs = self._get_successful_pairs()

    def __init__(self, name, source, t_link, p_left, p_right=None, p_mid=1., num_positions=1, t_clock=1, **kwargs):
        if p_right is None:
            p_right = p_left

        state_sampler = StateSampler([ks.b00], [1.])
        model_params = {"source": source, "p_left": p_left, "p_right": p_right, "num_positions": num_positions,
                        "p_mid": p_mid, "t_clock": t_clock, "t_link": t_link}
        timing = self.MPSDelayModel(**model_params)
        super().__init__(name, state_sampler=state_sampler, timing_model=timing, num_ports=2,
                         status=SourceStatus.OFF, **kwargs)

    def reset(self):
        self.subcomponents["internal_clock"].models["timing_model"].reset()


[docs]class MPSSourceNode(Node): r"""This node simulates the whole Midpoint source (MPS) protocols stochastic generation of entanglement. It keeps into account all the loss probabilities and only generates a pair when MPS would succeed in generating one. It is a way to implement the MPS protocol in a lightweight fashion, while maintaining all of its stochastic properties. Parameters ---------- name : str The name of this node. p_left : float The probability of successfully latching the emitted qubit on the left with the components on that side. It should keep into account the loss probability on the link and the probability of failure at the components, which is due to frequency conversion and partial BSA in the case of standard MS, or due to imperfect nDPD and absorption in the case of AFC-enhanced MS. p_right : float, optional The probability of successfully latching the emitted qubit on the right with the components on that side. If ``None``, it is set equal to ``p_left``. Defaults to ``None``. num_positions : int, optional The number of modes of the repeaters' quantum memories attached to the link. Defaults to 1. p_mid : float, optional The probability that the midpoint entangled pair source successfully emits a pair at a given clock cycle. Defaults to 1. t_clock : int or float, optional The clock period of the MS protocols. Defaults to 1. [ns] rng : :class:`~numpy.random.RandomState`, optional The rng used in the stochastic generation of entangled pairs. """ def __init__(self, name, p_left, p_right, num_positions, p_mid, t_clock, rng=None): port_names = ["qout0", "qout1", "c0", "c1"] super().__init__(name=name, port_names=port_names) self.pos_status_list = ["free" for _ in range(num_positions)] self.mps_params = {"p_left": p_left, "p_right": p_right, "num_positions": num_positions, "p_mid": p_mid, "t_clock": t_clock, "rng": rng} protocol = MPSSourceProtocol(node=self) protocol.start() def init_source(self, t_link): qsource = MPSSource(f"{self.name}_inner_src", self, **self.mps_params, t_link=t_link) qsource.output_meta["position"] = None # must be initialized qsource.output_meta["time_sent"] = ns.sim_time() self.add_subcomponent(qsource, name="qsource") qsource.ports["qout0"].forward_output(self.ports["qout0"]) qsource.ports["qout1"].forward_output(self.ports["qout1"])
[docs] def start(self): """ Set the status of the inner source to INTERNAL so that it starts producing entanglement. """ self.subcomponents["qsource"].status = SourceStatus.INTERNAL
[docs] def stop(self): """ Set the status of the inner source to OFF so that it stops producing entanglement. """ self.subcomponents["qsource"].status = SourceStatus.OFF
@property def status(self): """ The status of the internal entangling quantum source. Returns ------- :class:`netsquid.components.qsource.SourceStatus` The status of the source """ return self.subcomponents["qsource"].status
[docs] def reset(self, and_restart=True): """ Reset the status of the inner source. Should be called while the inner source is OFF. Parameters ---------- and_restart : bool, optional If True, also restarts the inner source, i.e. it sets its status to INTERNAL. Defaults to True. """ qsource = self.subcomponents["qsource"] qsource.reset() if and_restart: qsource.status = SourceStatus.INTERNAL