Skip to content

Add ContractIdleWiresInControlFlow optimisation pass #13779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions qiskit/transpiler/passes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
OptimizeAnnotated
Split2QUnitaries
RemoveIdentityEquivalent
ContractIdleWiresInControlFlow

Calibration
=============
Expand Down Expand Up @@ -233,6 +234,7 @@
from .optimization import OptimizeAnnotated
from .optimization import RemoveIdentityEquivalent
from .optimization import Split2QUnitaries
from .optimization import ContractIdleWiresInControlFlow

# circuit analysis
from .analysis import ResourceEstimation
Expand Down
1 change: 1 addition & 0 deletions qiskit/transpiler/passes/optimization/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@
from .remove_identity_equiv import RemoveIdentityEquivalent
from .split_2q_unitaries import Split2QUnitaries
from .collect_and_collapse import CollectAndCollapse
from .contract_idle_wires_in_control_flow import ContractIdleWiresInControlFlow
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# This code is part of Qiskit.
#
# (C) Copyright IBM 2025
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Contract control-flow operations that contain idle wires."""

from qiskit.circuit import Qubit, Clbit, QuantumCircuit
from qiskit.dagcircuit import DAGCircuit, DAGOpNode
from qiskit.transpiler.basepasses import TransformationPass


class ContractIdleWiresInControlFlow(TransformationPass):
"""Remove idle qubits from control-flow operations of a :class:`.DAGCircuit`."""

def run(self, dag):
# `control_flow_op_nodes` is eager and doesn't borrow; we're mutating the DAG in the loop.
for node in dag.control_flow_op_nodes() or []:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll probably remember, but adding a note here to remove the or [] once it is no longer needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to highlight thanks, because it's now been long enough that I probably would have forgotten to do it haha.

inst = node._to_circuit_instruction()
new_inst = _contract_control_flow(inst)
if new_inst is inst:
# No top-level contraction; nothing to do.
continue
replacement = DAGCircuit()
# Dictionaries to retain insertion order for reproducibility, and because we can
# then re-use them as mapping dictionaries.
qubits, clbits, vars_ = {}, {}, {}
for _, _, wire in dag.edges(node):
if isinstance(wire, Qubit):
qubits[wire] = wire
elif isinstance(wire, Clbit):
clbits[wire] = wire
else:
vars_[wire] = wire
replacement.add_qubits(list(qubits))
replacement.add_clbits(list(clbits))
for var in vars_:
replacement.add_captured_var(var)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add local variables too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the replacement they're all "capturing" from the circuit they're about to be inlined into - the outer circuit defines the true "type" of them.

replacement._apply_op_node_back(DAGOpNode.from_instruction(new_inst))
# The replacement DAG is defined over all the same qubits, but with the correct
# qubits now explicitly marked as idle, so everything gets linked up correctly.
dag.substitute_node_with_dag(node, replacement, wires=qubits | clbits | vars_)
return dag


def _contract_control_flow(inst):
"""Contract a `CircuitInstruction` containing a control-flow operation.

Returns the input object by the same reference if there's no contraction to be done at the call
site, though nested control-flow ops may have been contracted in place."""
op = inst.operation
idle = set(inst.qubits)
for block in op.blocks:
qubit_map = dict(zip(block.qubits, inst.qubits))
for i, inner in enumerate(block.data):
if inner.is_control_flow():
# In `QuantumCircuit` it's easy to replace an instruction with a narrower one, so it
# doesn't matter much if this is replacing it with itself.
block.data[i] = inner = _contract_control_flow(inner)
for qubit in inner.qubits:
idle.discard(qubit_map[qubit])
# If a box, we still want the prior side-effect of contracting any internal control-flow
# operations (optimisations are still valid _within_ a box), but we don't want to contract the
# box itself. If there's no idle qubits, we're also done here.
if not idle or inst.name == "box":
return inst

def contract(block):
out = QuantumCircuit(
name=block.name,
global_phase=block.global_phase,
metadata=block.metadata,
captures=block.iter_captured_vars(),
)
out.add_bits(
[
block_qubit
for (block_qubit, inst_qubit) in zip(block.qubits, inst.qubits)
if inst_qubit not in idle
]
)
out.add_bits(block.clbits)
for creg in block.cregs:
out.add_register(creg)
# Control-flow ops can only have captures and locals, and we already added the captures.
for var in block.iter_declared_vars():
out.add_uninitialized_var(var)
for inner in block:
out._append(inner)
return out

return inst.replace(
operation=op.replace_blocks(contract(block) for block in op.blocks),
qubits=[qubit for qubit in inst.qubits if qubit not in idle],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
features_transpiler:
- |
A new transpiler pass, :class:`.ContractIdleWiresInControlFlow`, is available from
:mod:`qiskit.transpiler.passes`. This pass removes qubits from control-flow blocks if the
semantics allow this, and the qubit is idle throughout the control-flow operation. Previously,
the routing stage of the preset pass managers might have done this as an accidental side-effect
of how they worked, but the behavior is now more properly placed in an optimization pass.
183 changes: 183 additions & 0 deletions test/python/transpiler/test_contract_idle_wires_in_control_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# This code is part of Qiskit.
#
# (C) Copyright IBM 2025
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

# pylint: disable=missing-class-docstring,missing-module-docstring,missing-function-docstring

from qiskit.circuit import QuantumCircuit, ClassicalRegister, QuantumRegister
from qiskit.circuit.classical import expr, types
from qiskit.transpiler.passes import ContractIdleWiresInControlFlow

from test import QiskitTestCase # pylint: disable=wrong-import-order


class TestContractIdleWiresInControlFlow(QiskitTestCase):
def test_simple_body(self):
qc = QuantumCircuit(3, 1)
with qc.while_loop((qc.clbits[0], False)):
qc.cx(0, 1)
qc.noop(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly beyond the scope of this PR, but I find it a bit strange that a noop doesn't add an operation (I did not see the introductory PR for it). I was surprised to see that there's no noop in the expected circuit of this test, and thought I may have missed something in this new pass where they get removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire point of noop is not to add an operation haha. id is the mathematical expression of "unitary identity", whereas noop is "mark this qubit as used, but do explicitly do not do anything with it".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, I suppose I'm thinking of it more like a noop instruction in classical computing, which does end up getting executed by the CPU and takes some number of cycles to do nothing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It kind of does here too, but qubit operations are all inherently super parallel. Putting a noop on a qubit to bring it into a box causes it to idle for the duration of the box (unless you apply other things to it).


expected = QuantumCircuit(3, 1)
with expected.while_loop((expected.clbits[0], False)):
expected.cx(0, 1)

self.assertEqual(ContractIdleWiresInControlFlow()(qc), expected)

def test_nothing_to_do(self):
qc = QuantumCircuit(3, 1)
with qc.for_loop(range(3)):
qc.h(0)
qc.cx(0, 1)
self.assertEqual(ContractIdleWiresInControlFlow()(qc), qc)

def test_disparate_if_else_left_alone(self):
qc = QuantumCircuit(3, 1)
# The true body only uses 0, the false body only uses (1, 2), but because they're part of
# the shared op, there is no valid contraction here.
with qc.if_test((qc.clbits[0], True)) as else_:
qc.h(0)
with else_:
qc.cx(1, 2)
self.assertEqual(ContractIdleWiresInControlFlow()(qc), qc)

def test_contract_if_else_both_bodies(self):
qc = QuantumCircuit(3, 1)
# Explicit idle in the true body only.
with qc.if_test((qc.clbits[0], True)) as else_:
qc.h(0)
qc.cx(0, 2)
qc.noop(1)
with else_:
qc.cz(0, 2)
# Explicit idle in the false body only.
with qc.if_test((qc.clbits[0], True)) as else_:
qc.h(0)
qc.cx(0, 1)
with else_:
qc.cz(0, 1)
qc.noop(2)
# Explicit idle in both bodies.
with qc.if_test((qc.clbits[0], True)) as else_:
qc.h(1)
qc.cx(1, 2)
qc.noop(0)
with else_:
qc.cz(1, 2)
qc.noop(0)

expected = QuantumCircuit(3, 1)
with expected.if_test((expected.clbits[0], True)) as else_:
expected.h(0)
expected.cx(0, 2)
with else_:
expected.cz(0, 2)
with expected.if_test((expected.clbits[0], True)) as else_:
expected.h(0)
expected.cx(0, 1)
with else_:
expected.cz(0, 1)
with expected.if_test((expected.clbits[0], True)) as else_:
expected.h(1)
expected.cx(1, 2)
with else_:
expected.cz(1, 2)

self.assertEqual(ContractIdleWiresInControlFlow()(qc), expected)

def test_recursively_contract(self):
qc = QuantumCircuit(3, 1)
with qc.if_test((qc.clbits[0], True)):
qc.h(0)
with qc.if_test((qc.clbits[0], True)):
qc.cx(0, 1)
qc.noop(2)
with qc.while_loop((qc.clbits[0], True)):
with qc.if_test((qc.clbits[0], True)) as else_:
qc.h(0)
qc.noop(1, 2)
with else_:
qc.cx(0, 1)
qc.noop(2)

expected = QuantumCircuit(3, 1)
with expected.if_test((expected.clbits[0], True)):
expected.h(0)
with expected.if_test((expected.clbits[0], True)):
expected.cx(0, 1)
with expected.while_loop((expected.clbits[0], True)):
with expected.if_test((expected.clbits[0], True)) as else_:
expected.h(0)
with else_:
expected.cx(0, 1)

actual = ContractIdleWiresInControlFlow()(qc)
self.assertNotEqual(qc, actual) # Smoke test.
self.assertEqual(actual, expected)

def test_handles_vars_in_contraction(self):
a = expr.Var.new("a", types.Bool())
b = expr.Var.new("b", types.Uint(8))
c = expr.Var.new("c", types.Bool())

qc = QuantumCircuit(3, inputs=[a])
qc.add_var(b, 5)
with qc.if_test(a):
qc.add_var(c, False)
with qc.if_test(c):
qc.x(0)
qc.noop(1, 2)
with qc.switch(b) as case:
with case(0):
qc.x(0)
with case(1):
qc.noop(0, 1)
with case(case.DEFAULT):
with qc.if_test(a):
qc.x(0)
qc.noop(1, 2)

expected = QuantumCircuit(3, inputs=[a])
expected.add_var(b, 5)
with expected.if_test(a):
expected.add_var(c, False)
with expected.if_test(c):
expected.x(0)
with expected.switch(b) as case:
with case(0):
expected.x(0)
with case(1):
pass
with case(case.DEFAULT):
with expected.if_test(a):
expected.x(0)

actual = ContractIdleWiresInControlFlow()(qc)
self.assertNotEqual(qc, actual) # Smoke test.
self.assertEqual(actual, expected)

def test_handles_registers_in_contraction(self):
qr = QuantumRegister(3, "q")
cr1 = ClassicalRegister(3, "cr1")
cr2 = ClassicalRegister(3, "cr2")

qc = QuantumCircuit(qr, cr1, cr2)
with qc.if_test((cr1, 3)):
with qc.if_test((cr2, 3)):
qc.noop(0, 1, 2)
expected = QuantumCircuit(qr, cr1, cr2)
with expected.if_test((cr1, 3)):
with expected.if_test((cr2, 3)):
pass

actual = ContractIdleWiresInControlFlow()(qc)
self.assertNotEqual(qc, actual) # Smoke test.
self.assertEqual(actual, expected)