|
13 | 13 | # See the License for the specific language governing permissions and
|
14 | 14 | # limitations under the License.
|
15 | 15 |
|
| 16 | +from typing import Callable, Hashable, Tuple |
| 17 | + |
16 | 18 | from twisted.internet import defer, reactor
|
17 |
| -from twisted.internet.defer import CancelledError |
| 19 | +from twisted.internet.base import ReactorBase |
| 20 | +from twisted.internet.defer import CancelledError, Deferred |
18 | 21 |
|
19 | 22 | from synapse.logging.context import LoggingContext, current_context
|
20 |
| -from synapse.util import Clock |
21 | 23 | from synapse.util.async_helpers import Linearizer
|
22 | 24 |
|
23 | 25 | from tests import unittest
|
24 | 26 |
|
25 | 27 |
|
26 | 28 | class LinearizerTestCase(unittest.TestCase):
|
27 |
| - @defer.inlineCallbacks |
28 |
| - def test_linearizer(self): |
| 29 | + def _start_task( |
| 30 | + self, linearizer: Linearizer, key: Hashable |
| 31 | + ) -> Tuple["Deferred[None]", "Deferred[None]", Callable[[], None]]: |
| 32 | + """Starts a task which acquires the linearizer lock, blocks, then completes. |
| 33 | +
|
| 34 | + Args: |
| 35 | + linearizer: The `Linearizer`. |
| 36 | + key: The `Linearizer` key. |
| 37 | +
|
| 38 | + Returns: |
| 39 | + A tuple containing: |
| 40 | + * A cancellable `Deferred` for the entire task. |
| 41 | + * A `Deferred` that resolves once the task acquires the lock. |
| 42 | + * A function that unblocks the task. Must be called by the caller |
| 43 | + to allow the task to release the lock and complete. |
| 44 | + """ |
| 45 | + acquired_d: "Deferred[None]" = Deferred() |
| 46 | + unblock_d: "Deferred[None]" = Deferred() |
| 47 | + |
| 48 | + async def task() -> None: |
| 49 | + with await linearizer.queue(key): |
| 50 | + acquired_d.callback(None) |
| 51 | + await unblock_d |
| 52 | + |
| 53 | + d = defer.ensureDeferred(task()) |
| 54 | + |
| 55 | + def unblock() -> None: |
| 56 | + unblock_d.callback(None) |
| 57 | + # The next task, if it exists, will acquire the lock and require a kick of |
| 58 | + # the reactor to advance. |
| 59 | + self._pump() |
| 60 | + |
| 61 | + return d, acquired_d, unblock |
| 62 | + |
| 63 | + def _pump(self) -> None: |
| 64 | + """Pump the reactor to advance `Linearizer`s.""" |
| 65 | + assert isinstance(reactor, ReactorBase) |
| 66 | + while reactor.getDelayedCalls(): |
| 67 | + reactor.runUntilCurrent() |
| 68 | + |
| 69 | + def test_linearizer(self) -> None: |
| 70 | + """Tests that a task is queued up behind an earlier task.""" |
29 | 71 | linearizer = Linearizer()
|
30 | 72 |
|
31 | 73 | key = object()
|
32 | 74 |
|
33 |
| - d1 = linearizer.queue(key) |
34 |
| - cm1 = yield d1 |
| 75 | + _, acquired_d1, unblock1 = self._start_task(linearizer, key) |
| 76 | + self.assertTrue(acquired_d1.called) |
| 77 | + |
| 78 | + _, acquired_d2, unblock2 = self._start_task(linearizer, key) |
| 79 | + self.assertFalse(acquired_d2.called) |
35 | 80 |
|
36 |
| - d2 = linearizer.queue(key) |
37 |
| - self.assertFalse(d2.called) |
| 81 | + # Once the first task is done, the second task can continue. |
| 82 | + unblock1() |
| 83 | + self.assertTrue(acquired_d2.called) |
38 | 84 |
|
39 |
| - with cm1: |
40 |
| - self.assertFalse(d2.called) |
| 85 | + unblock2() |
41 | 86 |
|
42 |
| - with (yield d2): |
43 |
| - pass |
| 87 | + def test_linearizer_is_queued(self) -> None: |
| 88 | + """Tests `Linearizer.is_queued`. |
44 | 89 |
|
45 |
| - @defer.inlineCallbacks |
46 |
| - def test_linearizer_is_queued(self): |
| 90 | + Runs through the same scenario as `test_linearizer`. |
| 91 | + """ |
47 | 92 | linearizer = Linearizer()
|
48 | 93 |
|
49 | 94 | key = object()
|
50 | 95 |
|
51 |
| - d1 = linearizer.queue(key) |
52 |
| - cm1 = yield d1 |
| 96 | + _, acquired_d1, unblock1 = self._start_task(linearizer, key) |
| 97 | + self.assertTrue(acquired_d1.called) |
53 | 98 |
|
54 |
| - # Since d1 gets called immediately, "is_queued" should return false. |
| 99 | + # Since the first task acquires the lock immediately, "is_queued" should return |
| 100 | + # false. |
55 | 101 | self.assertFalse(linearizer.is_queued(key))
|
56 | 102 |
|
57 |
| - d2 = linearizer.queue(key) |
58 |
| - self.assertFalse(d2.called) |
| 103 | + _, acquired_d2, unblock2 = self._start_task(linearizer, key) |
| 104 | + self.assertFalse(acquired_d2.called) |
59 | 105 |
|
60 |
| - # Now d2 is queued up behind successful completion of cm1 |
| 106 | + # Now the second task is queued up behind the first. |
61 | 107 | self.assertTrue(linearizer.is_queued(key))
|
62 | 108 |
|
63 |
| - with cm1: |
64 |
| - self.assertFalse(d2.called) |
65 |
| - |
66 |
| - # cm1 still not done, so d2 still queued. |
67 |
| - self.assertTrue(linearizer.is_queued(key)) |
| 109 | + unblock1() |
68 | 110 |
|
69 |
| - # And now d2 is called and nothing is in the queue again |
| 111 | + # And now the second task acquires the lock and nothing is in the queue again. |
| 112 | + self.assertTrue(acquired_d2.called) |
70 | 113 | self.assertFalse(linearizer.is_queued(key))
|
71 | 114 |
|
72 |
| - with (yield d2): |
73 |
| - self.assertFalse(linearizer.is_queued(key)) |
74 |
| - |
| 115 | + unblock2() |
75 | 116 | self.assertFalse(linearizer.is_queued(key))
|
76 | 117 |
|
77 |
| - def test_lots_of_queued_things(self): |
78 |
| - # we have one slow thing, and lots of fast things queued up behind it. |
79 |
| - # it should *not* explode the stack. |
| 118 | + def test_lots_of_queued_things(self) -> None: |
| 119 | + """Tests lots of fast things queued up behind a slow thing. |
| 120 | +
|
| 121 | + The stack should *not* explode when the slow thing completes. |
| 122 | + """ |
80 | 123 | linearizer = Linearizer()
|
| 124 | + key = "" |
81 | 125 |
|
82 |
| - @defer.inlineCallbacks |
83 |
| - def func(i, sleep=False): |
| 126 | + async def func(i: int) -> None: |
84 | 127 | with LoggingContext("func(%s)" % i) as lc:
|
85 |
| - with (yield linearizer.queue("")): |
| 128 | + with (await linearizer.queue(key)): |
86 | 129 | self.assertEqual(current_context(), lc)
|
87 |
| - if sleep: |
88 |
| - yield Clock(reactor).sleep(0) |
89 | 130 |
|
90 | 131 | self.assertEqual(current_context(), lc)
|
91 | 132 |
|
92 |
| - func(0, sleep=True) |
| 133 | + _, _, unblock = self._start_task(linearizer, key) |
93 | 134 | for i in range(1, 100):
|
94 |
| - func(i) |
| 135 | + defer.ensureDeferred(func(i)) |
95 | 136 |
|
96 |
| - return func(1000) |
| 137 | + d = defer.ensureDeferred(func(1000)) |
| 138 | + unblock() |
| 139 | + self.successResultOf(d) |
97 | 140 |
|
98 |
| - @defer.inlineCallbacks |
99 |
| - def test_multiple_entries(self): |
| 141 | + def test_multiple_entries(self) -> None: |
| 142 | + """Tests a `Linearizer` with a concurrency above 1.""" |
100 | 143 | limiter = Linearizer(max_count=3)
|
101 | 144 |
|
102 | 145 | key = object()
|
103 | 146 |
|
104 |
| - d1 = limiter.queue(key) |
105 |
| - cm1 = yield d1 |
106 |
| - |
107 |
| - d2 = limiter.queue(key) |
108 |
| - cm2 = yield d2 |
109 |
| - |
110 |
| - d3 = limiter.queue(key) |
111 |
| - cm3 = yield d3 |
112 |
| - |
113 |
| - d4 = limiter.queue(key) |
114 |
| - self.assertFalse(d4.called) |
115 |
| - |
116 |
| - d5 = limiter.queue(key) |
117 |
| - self.assertFalse(d5.called) |
| 147 | + _, acquired_d1, unblock1 = self._start_task(limiter, key) |
| 148 | + self.assertTrue(acquired_d1.called) |
118 | 149 |
|
119 |
| - with cm1: |
120 |
| - self.assertFalse(d4.called) |
121 |
| - self.assertFalse(d5.called) |
| 150 | + _, acquired_d2, unblock2 = self._start_task(limiter, key) |
| 151 | + self.assertTrue(acquired_d2.called) |
122 | 152 |
|
123 |
| - cm4 = yield d4 |
124 |
| - self.assertFalse(d5.called) |
| 153 | + _, acquired_d3, unblock3 = self._start_task(limiter, key) |
| 154 | + self.assertTrue(acquired_d3.called) |
125 | 155 |
|
126 |
| - with cm3: |
127 |
| - self.assertFalse(d5.called) |
| 156 | + # These next two tasks have to wait. |
| 157 | + _, acquired_d4, unblock4 = self._start_task(limiter, key) |
| 158 | + self.assertFalse(acquired_d4.called) |
128 | 159 |
|
129 |
| - cm5 = yield d5 |
| 160 | + _, acquired_d5, unblock5 = self._start_task(limiter, key) |
| 161 | + self.assertFalse(acquired_d5.called) |
130 | 162 |
|
131 |
| - with cm2: |
132 |
| - pass |
| 163 | + # Once the first task completes, the fourth task can continue. |
| 164 | + unblock1() |
| 165 | + self.assertTrue(acquired_d4.called) |
| 166 | + self.assertFalse(acquired_d5.called) |
133 | 167 |
|
134 |
| - with cm4: |
135 |
| - pass |
| 168 | + # Once the third task completes, the fifth task can continue. |
| 169 | + unblock3() |
| 170 | + self.assertTrue(acquired_d5.called) |
136 | 171 |
|
137 |
| - with cm5: |
138 |
| - pass |
| 172 | + # Make all tasks finish. |
| 173 | + unblock2() |
| 174 | + unblock4() |
| 175 | + unblock5() |
139 | 176 |
|
140 |
| - d6 = limiter.queue(key) |
141 |
| - with (yield d6): |
142 |
| - pass |
| 177 | + # The next task shouldn't have to wait. |
| 178 | + _, acquired_d6, unblock6 = self._start_task(limiter, key) |
| 179 | + self.assertTrue(acquired_d6) |
| 180 | + unblock6() |
143 | 181 |
|
144 |
| - @defer.inlineCallbacks |
145 |
| - def test_cancellation(self): |
| 182 | + def test_cancellation(self) -> None: |
| 183 | + """Tests cancellation while waiting for a `Linearizer`.""" |
146 | 184 | linearizer = Linearizer()
|
147 | 185 |
|
148 | 186 | key = object()
|
149 | 187 |
|
150 |
| - d1 = linearizer.queue(key) |
151 |
| - cm1 = yield d1 |
| 188 | + d1, acquired_d1, unblock1 = self._start_task(linearizer, key) |
| 189 | + self.assertTrue(acquired_d1.called) |
152 | 190 |
|
153 |
| - d2 = linearizer.queue(key) |
154 |
| - self.assertFalse(d2.called) |
| 191 | + # Create a second task, waiting for the first task. |
| 192 | + d2, acquired_d2, _ = self._start_task(linearizer, key) |
| 193 | + self.assertFalse(acquired_d2.called) |
155 | 194 |
|
156 |
| - d3 = linearizer.queue(key) |
157 |
| - self.assertFalse(d3.called) |
| 195 | + # Create a third task, waiting for the second task. |
| 196 | + d3, acquired_d3, unblock3 = self._start_task(linearizer, key) |
| 197 | + self.assertFalse(acquired_d3.called) |
158 | 198 |
|
| 199 | + # Cancel the waiting second task. |
159 | 200 | d2.cancel()
|
160 | 201 |
|
161 |
| - with cm1: |
162 |
| - pass |
| 202 | + unblock1() |
| 203 | + self.successResultOf(d1) |
163 | 204 |
|
164 | 205 | self.assertTrue(d2.called)
|
165 |
| - try: |
166 |
| - yield d2 |
167 |
| - self.fail("Expected d2 to raise CancelledError") |
168 |
| - except CancelledError: |
169 |
| - pass |
170 |
| - |
171 |
| - with (yield d3): |
172 |
| - pass |
| 206 | + self.failureResultOf(d2, CancelledError) |
| 207 | + |
| 208 | + # The third task should continue running. |
| 209 | + self.assertTrue( |
| 210 | + acquired_d3.called, |
| 211 | + "Third task did not get the lock after the second task was cancelled", |
| 212 | + ) |
| 213 | + unblock3() |
| 214 | + self.successResultOf(d3) |
0 commit comments