"""
This module implements the quantum hardware (the Physical layer) of a quantum network device.
"""
from collections import namedtuple, deque
import netsquid as ns
import math
from progress.hardware.llps.llp import LinkProtocol
import progress.progress_logging as log
__all__ = ['get_processor', 'QHardware', 'QuantumOperationsService']
INSTR_Rx = ns.components.IGate("Rx_gate", ns.qubits.operators.create_rotation_op(math.pi / 2, (1, 0, 0)))
"""
Pi/2 rotation around the x-axis
"""
INSTR_RxC = ns.components.IGate("RxC_gate", ns.qubits.operators.create_rotation_op(math.pi / 2, (1, 0, 0), conjugate=True))
"""
-Pi/2 rotation around the x-axis
"""
[docs]def get_processor(num_positions, coherence_time=None, one_qbit_noise=None, two_qbit_noise=None,
two_qbit_p_err=0.005, meas_p_err=0., instr_duration=0.):
r"""Get an operational quantum processor
Parameters
----------
num_positions : int
The number of qubits in the quantum memory of this processor.
coherence_time : int or None, optional
The coherence time of the quantum memory. It is the time after which the fidelity of the qubit state
drops more than 5%. [ns]
one_qbit_noise : :class:`netsquid.models.model.Model`, None, optional
The noise model of one qubit instructions of this processor.
two_qbit_noise : :class:`netsquid.models.model.Model`, None, optional
The noise model of two qubit instructions of this processor.
two_qbit_p_err : float, optional
The probability of error of two qubit instructions of this processor. The error means that the instruction
depolarizes the qubits.
If `two_qbit_noise` is not None, this parameter is ignored.
meas_p_err : float, optional
The probability of error of CBS measurement instruction of this processor. The error means that the instruction
depolarizes the qubit right before the measurement.
instr_duration : float, optional
The duration of each instruction of this processor. Defaults to 0. [ns]
Returns
-------
:class:`netsquid.components.qprocessor.QuantumProcessor`
The quantum processor, ready to use.
"""
if two_qbit_noise is None:
two_qbit_noise = ns.components.DepolarNoiseModel(depolar_rate=two_qbit_p_err, time_independent=True)
meas_noise = ns.components.DepolarNoiseModel(depolar_rate=meas_p_err, time_independent=True)
phys_instructions = [
ns.components.PhysicalInstruction(ns.components.INSTR_X, duration=instr_duration, parallel=True,
q_noise_model=one_qbit_noise),
ns.components.PhysicalInstruction(ns.components.INSTR_Z, duration=instr_duration, parallel=True,
q_noise_model=one_qbit_noise),
ns.components.PhysicalInstruction(ns.components.INSTR_CX, duration=instr_duration, parallel=True,
q_noise_model=two_qbit_noise),
ns.components.PhysicalInstruction(ns.components.INSTR_H, duration=instr_duration, parallel=True,
q_noise_model=one_qbit_noise),
ns.components.PhysicalInstruction(ns.components.INSTR_INIT, duration=instr_duration, parallel=True,
q_noise_model=one_qbit_noise),
ns.components.PhysicalInstruction(ns.components.INSTR_MEASURE_BELL, duration=instr_duration,parallel=True,
q_noise_model=two_qbit_noise, apply_q_noise_after=False),
ns.components.PhysicalInstruction(ns.components.INSTR_MEASURE, duration=instr_duration, parallel=True,
q_noise_model=meas_noise, apply_q_noise_after=False),
ns.components.PhysicalInstruction(ns.components.INSTR_ROT_Y, duration=instr_duration, parallel=True,
q_noise_model=one_qbit_noise),
ns.components.PhysicalInstruction(INSTR_Rx, duration=instr_duration, parallel=True,
q_noise_model=one_qbit_noise),
ns.components.PhysicalInstruction(INSTR_RxC, duration=instr_duration, parallel=True,
q_noise_model=one_qbit_noise),
]
if coherence_time is None:
qproc = ns.components.QuantumProcessor(name="QuantumProcessor",
num_positions=num_positions,
phys_instructions=phys_instructions)
else:
# Used to apply an initial imperfection to the qubits (F_0 =~ 0.98)
models = {'qin_noise_model': ns.components.DepolarNoiseModel(depolar_rate=0.03,
time_independent=True)}
mem_noise_model = ns.components.DephaseNoiseModel(dephase_rate=-math.log(0.98)*1e9 / coherence_time)
# mem_noise_model = DephaseNoiseModel(dephase_rate=10.0)
qproc = ns.components.QuantumProcessor(name="QuantumProcessor",
num_positions=num_positions,
mem_noise_models=[mem_noise_model] * num_positions,
phys_instructions=phys_instructions)
qproc.models['qin_noise_model'] = models['qin_noise_model']
return qproc
[docs]class QHardware(ns.nodes.Node):
r"""
This node contains the quantum hardware of a Quantum Internet node, including the quantum processor and the QNICS.
Parameters
----------
name : str
The name of this Repeater
num_qnics : int, optional
The number of QNICS of this device. Defaults to 2.
num_qbits_qnic : int, optional
The number of physical qubits assigned to each QNIC of this device. Defaults to 1.
qproc_params : dict[str, any] or None, optional
The parameters of the quantum processor of this device. See :func:`~progress.hardware.qhardware.get_processor`
for details. If `None`, a default processor is created. Defaults to `None`. The field `num_positions` can be
omitted, as it is set to `num_qnics * num_qbits_qnic`.
Attributes
----------
qproc_params : dict[str, any]
The parameters of the quantum processor of this device.
num_qnics : int
The number of QNICS of this device.
num_qbits_qnic : int
The number of physical qubits assigned to each QNIC of this device.
qmemory : :class:`netsquid.components.qprocessor.QuantumProcessor`
The quantum processor of this device.
Notes
-----
Ports:
- "qnic{0..[num_qnics-1]}" (in/out): The QNICS of this device.
These ports are forwarded to the "q{i}" ports of the quantum device.
- "q_ops" (output): Used to send out responses and outcomes for requested quantum operations.
- "new_entanglements" (output): Used to signal to the outside (typically to the QHAL)
that a new entangled qubit is available.
"""
def __init__(self, name, num_qnics=2, num_qbits_qnic=1, qproc_params=None):
ports = ["qnic{}".format(i) for i in range(num_qnics)] + ["q_ops", "new_entanglements"]
super().__init__(name=name, port_names=ports)
self.qproc_params = qproc_params
self.num_qnics = num_qnics
self.num_qbits_qnic = num_qbits_qnic
self.qproc_coherence_time = None
if self.qproc_params is not None:
self.qproc_coherence_time = self.qproc_params.get('coherence_time', None)
self._llp_subscriptions = [None for _ in range(num_qnics)]
if qproc_params is None:
qproc_params = {}
qproc_params['num_positions'] = num_qnics * num_qbits_qnic
self.qmemory = get_processor(**qproc_params)
self._qops_service = QuantumOperationsService(name="qops_service", node=self)
self._qops_service.start()
[docs] def put_qop(self, request):
"""
Submit a quantum operation request. The response is sent through the port "q_ops" of the node.
Parameters
----------
request: namedtuple
The request to be submitted. See :class:`~progress.hardware.qhardware.QuantumOperationRequest` for details.
"""
self._qops_service.put(request)
[docs] def get_subscribed_llp(self, qnic):
r"""
Returns the Link Layer protocol subscribed to the given QNIC.
Parameters
----------
qnic : int or str
The QNIC to get the subscribed link protocol from. Can be either the qnic's name or its index.
Returns
-------
:class:`~progress.hardware.llps.llp.LinkProtocol` or None
"""
# get the qnic as an integer
if not isinstance(qnic, int):
qnic = int(qnic[4:])
return self._llp_subscriptions[qnic]
[docs] def subscribe_llp(self, qnic, llp):
r"""
Subscribe a link layer protocol to the given QNIC.
Parameters
----------
qnic : int or str
The QNIC to subscribe the link protocol to. Can be either the qnic's name or its index.
llp : :class:`~progress.hardware.llps.llp.LinkProtocol`
The link layer protocol to subscribe.
"""
# get the qnic as an integer
if not isinstance(qnic, int):
qnic = int(qnic[4:])
self._llp_subscriptions[qnic] = llp
[docs] def map_info_to_qubit(self, qnic, idx):
"""
Maps the qnic and the index of the qubit to the position in the quantum processor.
"""
if isinstance(qnic, int):
return qnic * self.num_qbits_qnic + idx
elif isinstance(qnic, str):
return int(qnic[4:]) * self.num_qbits_qnic + idx
else:
raise ValueError("The qnic must be either an integer or a string.")
[docs]class QuantumOperationsService(ns.protocols.ServiceProtocol):
r"""
This protocol is used to request quantum operations to the quantum processor.
It sends the measurement outcomes (if present) out from the port `q_ops` of the quantum hardware.
Parameters
----------
node : :class:`~netsquid.node.Node`
The component to which the service is attached.
name : str or None, optional
The name of the service, for labelling purposes. Defaults to `None`.
"""
req_free = namedtuple("req_free", ["qnic", "idx"])
"""
Request to free a qubit from the quantum processor at a specified position.
Parameters:
qnic (int or str): The qnic to which the qubit is assigned.
idx (int): The index of the qubit relative to the qnic.
"""
req_swap = namedtuple("req_swap", ["id", "qnic1", "idx1", "qnic2", "idx2"])
"""
Request to perform entanglement swapping on two qubits in the quantum processor.
Parameters:
id (int): The id of the request.
qnic1 (int or str): The qnic to which the first qubit is assigned.
idx1 (int): The index of the first qubit relative to the qnic.
qnic2 (int or str): The qnic to which the second qubit is assigned.
idx2 (int): The index of the second qubit relative to the qnic.
"""
req_dejmps = namedtuple("req_purify", ["id", "qnic1", "idx1", "qnic2", "idx2", "role"])
"""
Request to perform DEJMPS distillation on two qubits in the quantum processor. The first one is the one distilled,
the second one is used as ancilla. The "role" field is used to determine which rotation to apply (pi/2 or -pi/2),
and can assume two values, either 'A' (pi/2) or 'B' (-pi/2).
Parameters:
id (int): The id of the request.
qnic1 (int or str): The qnic to which the first qubit is assigned.
idx1 (int): The index of the first qubit relative to the qnic.
qnic2 (int or str): The qnic to which the second qubit is assigned.
idx2 (int): The index of the second qubit relative to the qnic.
role (str): The role of this device in the distillation. Can be either 'A' or 'B'.
"""
req_correct = namedtuple("req_correct", ["id", "qnic1", "idx1", "cur_state"])
r"""
Request to correct a qubit in the quantum processor. The qubit is in the state specified by `cur_state`, which is
an integer between 0 and 3 indicating the Bell state. The qubit is corrected to the Bell state
:math:`\vert \beta_{00} \rangle`.
Parameters:
id (int): The id of the request.
qnic1 (int or str): The qnic to which the qubit is assigned.
idx1 (int): The index of the qubit relative to the qnic.
cur_state (int): The current state of the qubit. Can be either 0, 1, 2 or 3.
"""
req_qcirc = namedtuple("req_qcirc", ["id", "qubits_map", "qcirc"])
"""
Request to perform a generic quantum circuit on a set of qubits in the quantum processor.
When the circuit is executed, the measurement outcomes are sent out from the port `q_ops`.
Parameters:
id (int): The id of the request.
qubits_map (list of tuples): A list of tuples, each one containing the qnic and the index of the qubit
relative to the qnic.
qcirc (QuantumProgram): The quantum program to execute.
"""
[docs] def handle_request(self, request, identifier, start_time=None, **kwargs):
r"""Schedule the request on the queue.
Parameters
----------
request :
The object representing the request.
identifier : str
The identifier for this request.
start_time : float, optional
The time at which the request can be executed. Default current simulation time. [ns]
kwargs : dict, optional
Additional arguments which can be set by the service.
Returns
-------
dict
The dictionary with additional arguments.
Notes
-----
This method is called after
:meth:`netsquid.protocols.serviceprotocol.ServiceProtocol.put` which
does the type checking etc.
"""
if start_time is None:
start_time = ns.sim_time()
self.queue.append((start_time, (identifier, request, kwargs)))
self.send_signal(self._new_req_signal)
return kwargs
[docs] def run(self):
r"""Wait for a new request signal, then run the requests one by one.
Assumes request handlers are generators and not functions.
References
----------
See :meth:`~netsquid.protocols.Protocol.run`.
"""
while True:
yield self.await_signal(self, self._new_req_signal)
while len(self.queue) > 0:
start_time, (handler_id, request, kwargs) = self.queue.popleft()
if start_time > ns.sim_time():
yield self.await_timer(end_time=start_time)
func = self.request_handlers[handler_id]
args = request
gen = func(args, **kwargs)
if gen is not None:
yield from gen
[docs] class CorrectProgram(ns.components.QuantumProgram):
"""Quantum processor program that applies corrections to restore the |beta00> state."""
default_num_qubits = 1
curr_state = 0
def set_corrections(self, current_state):
self.curr_state = current_state
[docs] def program(self):
q1, = self.get_qubit_indices(1)
if self.curr_state == 1 or self.curr_state == 3:
self.apply(ns.components.instructions.INSTR_X, q1)
if self.curr_state == 2 or self.curr_state == 3:
self.apply(ns.components.instructions.INSTR_Z, q1)
yield self.run()
[docs] def free(self, qnic, idx):
r"""
Free the qubit on the given socket.
Parameters
----------
qnic : str
The qnic identifier.
idx : int
The index of the qubit on the qnic.
"""
"""
# DEBUG
log.info("Freeing qubit on qnic %s, index %d" % (qnic, idx),
repeater_id=self.node.supercomponent.device_id,
protocol="QHardware")
"""
link_protocol = self.node.get_subscribed_llp(qnic)
if link_protocol is None:
raise ValueError("No link protocol subscribed to the given qnic.")
request = LinkProtocol.req_free(idx=idx)
link_protocol.put(request)
"""
response = self.res_free('Done')
self.send_response(response)
"""
def _handle_free(self, request):
r"""
Handle a free request.
"""
self.free(request.qnic, request.idx)
def _handle_qcirc(self, request):
r"""
Handle a quantum circuit request.
"""
positions = [self.node.map_info_to_qubit(qnic, idx) for qnic, idx in request.qubits_map]
qcirc = request.qcirc
self.qproc.execute_program(program=qcirc, qubit_mapping=positions, error_on_fail=True)
yield self.await_program(processor=self.qproc)
out = qcirc.output
if out is None or len(out) == 0:
self.send_response('Done', name=request.id)
else:
self.send_response(out, name=request.id)
def _handle_correct(self, request):
cur_state = request.cur_state
positions = [self.node.map_info_to_qubit(request.qnic1, request.idx1)]
self.correct_program.set_corrections(cur_state)
self.qproc.execute_program(program=self.correct_program, qubit_mapping=positions, error_on_fail=True)
yield self.await_program(processor=self.qproc)
self.send_response('Done', name=request.id)
@staticmethod
def _setup_dejmps_program(conj_rotation):
"""
Set up the DEJMPS quantum program.
Parameters
----------
conj_rotation : bool
Whether to apply the conjugate of the rotation.
Returns
-------
dejmp_program : :class:`~netsquid.programs.Program`
"""
INSTR_ROT = INSTR_Rx if not conj_rotation else INSTR_RxC
prog = ns.components.QuantumProgram(num_qubits=2)
q1, q2 = prog.get_qubit_indices(2)
prog.apply(INSTR_ROT, [q1])
prog.apply(INSTR_ROT, [q2])
prog.apply(ns.components.instructions.INSTR_CX, [q1, q2])
prog.apply(ns.components.instructions.INSTR_MEASURE, q2, output_key="m", inplace=False)
return prog
def _handle_dejmps(self, request):
r"""
Handle a DEJMPS distillation request. It automatically frees measured qubit at the end.
"""
role = request.role
if role == 'A':
# pi/2 rotation
prog = self._setup_dejmps_program(conj_rotation=False)
elif role == 'B':
# -pi/2 rotation
prog = self._setup_dejmps_program(conj_rotation=True)
else:
raise ValueError("The role must be either 'A' or 'B'.")
positions = [self.node.map_info_to_qubit(qnic, idx)
for qnic, idx in [(request.qnic1, request.idx1), (request.qnic2, request.idx2)]]
self.qproc.execute_program(program=prog, qubit_mapping=positions, error_on_fail=True)
yield self.await_program(processor=self.qproc)
outcome = prog.output["m"][0]
# free the ancilla
self.free(request.qnic2, request.idx2)
self.send_response(outcome, name=request.id)
def _handle_swap(self, request):
positions = [self.node.map_info_to_qubit(qnic, idx)
for qnic, idx in [(request.qnic1, request.idx1), (request.qnic2, request.idx2)]]
self.qproc.execute_program(program=self._es_program, qubit_mapping=positions, error_on_fail=True)
yield self.await_program(processor=self.qproc)
bell_result, = self._es_program.output["m"]
# free the swapped qubits
self.free(request.qnic1, request.idx1)
self.free(request.qnic2, request.idx2)
self.send_response(bell_result, name=request.id)
[docs] def send_response(self, response, name=None):
r"""
Sends a response to the port `q_ops`.
"""
header = "Q OP RESP"
if name is not None:
header += " REQ " + str(name)
msg = ns.components.Message(header=header, items=[name, response])
self.node.ports['q_ops'].tx_output(msg)
def __init__(self, node, name=None):
if name is None:
name = "Quantum Operations Service"
super().__init__(node=node, name=name)
# We will use a queue for requests
self.queue = deque()
self._new_req_signal = "New request in queue"
self.add_signal(self._new_req_signal)
self._create_id = 0
self.qproc = self.node.qmemory
# save the entanglement swapping program
self._es_program = ns.components.QuantumProgram(num_qubits=2)
q1, q2 = self._es_program.get_qubit_indices(num_qubits=2)
self._es_program.apply(ns.components.instructions.INSTR_MEASURE_BELL, [q1, q2], output_key="m", inplace=False)
self.correct_program = self.CorrectProgram()
self.register_request(self.req_free, self._handle_free)
self.register_request(self.req_qcirc, self._handle_qcirc)
self.register_request(self.req_correct, self._handle_correct)
self.register_request(self.req_dejmps, self._handle_dejmps)
self.register_request(self.req_swap, self._handle_swap)