Skip to content

Redesign test to better handle low CPU capacity CI machines #45127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.core.test.http.MockHttpResponse;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.SharedExecutorService;
import com.azure.core.util.serializer.TypeReference;
import com.azure.json.JsonProviders;
import com.azure.json.JsonReader;
Expand All @@ -37,7 +38,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -55,6 +57,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

@Execution(ExecutionMode.CONCURRENT)
public class SearchIndexingBufferedSenderUnitTests {
Expand Down Expand Up @@ -1010,7 +1013,7 @@ public void concurrentFlushesOnlyAllowsOneProcessor() throws InterruptedExceptio
= getSearchClientBuilder().httpClient(wrapWithAsserting(request -> {
int count = callCount.getAndIncrement();
if (count == 0) {
sleep(2000);
sleep(3000);
return createMockBatchSplittingResponse(request, 0, 5);
} else if (count == 1) {
return createMockBatchSplittingResponse(request, 5, 5);
Expand All @@ -1028,31 +1031,40 @@ public void concurrentFlushesOnlyAllowsOneProcessor() throws InterruptedExceptio
batchingClient.addUploadActions(readJsonFileToList(HOTELS_DATA_JSON));

AtomicLong firstFlushCompletionTime = new AtomicLong();
ForkJoinPool.commonPool().execute(() -> {
SharedExecutorService.getInstance().execute(() -> {
try {
batchingClient.flush();
} finally {
firstFlushCompletionTime.set(System.nanoTime());
firstFlushCompletionTime.set(System.currentTimeMillis());
countDownLatch.countDown();
}
});

Thread.sleep(10); // Give the first operation a chance to start

// Delay the second flush by 100ms to ensure that it starts after the first flush.
// The mocked HttpRequest will delay the response by 2 seconds, so if the second flush does finish first it will
// be a true indication of incorrect behavior.
AtomicLong secondFlushCompletionTime = new AtomicLong();
ForkJoinPool.commonPool().execute(() -> {
SharedExecutorService.getInstance().schedule(() -> {
try {
batchingClient.flush();
} finally {
secondFlushCompletionTime.set(System.nanoTime());
secondFlushCompletionTime.set(System.currentTimeMillis());
countDownLatch.countDown();
}
});
}, 500, TimeUnit.MILLISECONDS);

if (!countDownLatch.await(10, TimeUnit.SECONDS)) {
fail("Timed out waiting for flushes to complete.");
}

countDownLatch.await();
assertTrue(firstFlushCompletionTime.get() > secondFlushCompletionTime.get(),
() -> "Expected first flush to complete before the second flush but was " + firstFlushCompletionTime.get()
+ " and " + secondFlushCompletionTime.get() + ".");
long firstFlushTime = firstFlushCompletionTime.get();
long secondFlushTime = secondFlushCompletionTime.get();
long differenceMillis = Math.abs(firstFlushTime - secondFlushTime);
assertTrue(firstFlushTime >= secondFlushTime,
() -> "Expected second flush to completed before long-running first flush as multiple flushes can't happen "
+ "concurrently, and when a flush is in-flight all others no-op until the in-flight flush completes. "
+ "First flush finished at " + firstFlushTime + ", second flush finished at " + secondFlushTime
+ ", difference was " + differenceMillis + "ms");
}

@Test
Expand All @@ -1063,7 +1075,8 @@ public void concurrentFlushesOnlyAllowsOneProcessorAsync() throws InterruptedExc
= getSearchClientBuilder().httpClient(wrapWithAsserting(request -> {
int count = callCount.getAndIncrement();
if (count == 0) {
return createMockBatchSplittingResponse(request, 0, 5).delayElement(Duration.ofSeconds(2));
sleep(3000);
return createMockBatchSplittingResponse(request, 0, 5);
} else if (count == 1) {
return createMockBatchSplittingResponse(request, 5, 5);
} else {
Expand All @@ -1080,26 +1093,38 @@ public void concurrentFlushesOnlyAllowsOneProcessorAsync() throws InterruptedExc
batchingClient.addUploadActions(readJsonFileToList(HOTELS_DATA_JSON)).block();

AtomicLong firstFlushCompletionTime = new AtomicLong();
Mono.using(() -> 1, ignored -> batchingClient.flush(), ignored -> {
firstFlushCompletionTime.set(System.nanoTime());
countDownLatch.countDown();
}).subscribe();

Thread.sleep(10); // Give the first operation a chance to start
SharedExecutorService.getInstance().execute(() -> {
Mono.using(() -> 1, ignored -> batchingClient.flush(), ignored -> {
firstFlushCompletionTime.set(System.currentTimeMillis());
countDownLatch.countDown();
}).block();
});

// Delay the second flush by 100ms to ensure that it starts after the first flush.
// The mocked HttpRequest will delay the response by 2 seconds, so if the second flush does finish first it will
// be a true indication of incorrect behavior.
AtomicLong secondFlushCompletionTime = new AtomicLong();
Mono.using(() -> 1, ignored -> batchingClient.flush(), ignored -> {
secondFlushCompletionTime.set(System.nanoTime());
countDownLatch.countDown();
}).subscribe();

countDownLatch.await();
assertTrue(firstFlushCompletionTime.get() > secondFlushCompletionTime.get(),
() -> "Expected first flush to complete before the second flush but was " + firstFlushCompletionTime.get()
+ " and " + secondFlushCompletionTime.get() + ".");
SharedExecutorService.getInstance().schedule(() -> {
Mono.using(() -> 1, ignored -> batchingClient.flush(), ignored -> {
secondFlushCompletionTime.set(System.currentTimeMillis());
countDownLatch.countDown();
}).block();
}, 500, TimeUnit.MILLISECONDS);

if (!countDownLatch.await(10, TimeUnit.SECONDS)) {
fail("Timed out waiting for flushes to complete.");
}

long firstFlushTime = firstFlushCompletionTime.get();
long secondFlushTime = secondFlushCompletionTime.get();
long differenceMillis = Math.abs(firstFlushTime - secondFlushTime);
assertTrue(firstFlushTime >= secondFlushTime,
() -> "Expected second flush to completed before long-running first flush as multiple flushes can't happen "
+ "concurrently, and when a flush is in-flight all others no-op until the in-flight flush completes. "
+ "First flush finished at " + firstFlushTime + ", second flush finished at " + secondFlushTime
+ ", difference was " + differenceMillis + "ms");
}

//@RepeatedTest(1000)
@Test
public void closeWillWaitForAnyCurrentFlushesToCompleteBeforeRunning() throws InterruptedException {
AtomicInteger callCount = new AtomicInteger();
Expand All @@ -1108,7 +1133,8 @@ public void closeWillWaitForAnyCurrentFlushesToCompleteBeforeRunning() throws In
= getSearchClientBuilder().httpClient(wrapWithAsserting(request -> {
int count = callCount.getAndIncrement();
if (count == 0) {
return createMockBatchSplittingResponse(request, 0, 5).delayElement(Duration.ofSeconds(2));
sleep(2000);
return createMockBatchSplittingResponse(request, 0, 5);
} else if (count == 1) {
return createMockBatchSplittingResponse(request, 5, 5);
} else {
Expand All @@ -1124,30 +1150,46 @@ public void closeWillWaitForAnyCurrentFlushesToCompleteBeforeRunning() throws In
CountDownLatch countDownLatch = new CountDownLatch(2);
batchingClient.addUploadActions(readJsonFileToList(HOTELS_DATA_JSON));

AtomicLong firstFlushCompletionTime = new AtomicLong();
ForkJoinPool.commonPool().execute(() -> {
AtomicLong flushCompletionTime = new AtomicLong();
Future<?> future1 = SharedExecutorService.getInstance().submit(() -> {
try {
batchingClient.flush();
} finally {
firstFlushCompletionTime.set(System.nanoTime());
flushCompletionTime.set(System.currentTimeMillis());
countDownLatch.countDown();
}
});

Thread.sleep(10); // Give the first operation a chance to start.

AtomicLong secondFlushCompletionTime = new AtomicLong();
ForkJoinPool.commonPool().execute(() -> {
// Delay the close by 100ms to ensure that it starts after the flush.
// The mocked HttpRequest will delay the response by 2 seconds, so if the close does finish first it will be a
// true indication of incorrect behavior.
AtomicLong closeCompletionTime = new AtomicLong();
Future<?> future2 = SharedExecutorService.getInstance().schedule(() -> {
try {
batchingClient.close();
} finally {
secondFlushCompletionTime.set(System.nanoTime());
closeCompletionTime.set(System.currentTimeMillis());
countDownLatch.countDown();
}
});
}, 100, TimeUnit.MILLISECONDS);

if (!countDownLatch.await(10, TimeUnit.SECONDS)) {
fail("Timed out waiting for closes to complete.");
}

countDownLatch.await();
assertTrue(firstFlushCompletionTime.get() <= secondFlushCompletionTime.get());
assertDoesNotThrow(() -> future1.get());
assertDoesNotThrow(() -> future2.get());

long flushTime = flushCompletionTime.get();
long closeTime = closeCompletionTime.get();
long differenceMillis = Math.abs(flushTime - closeTime);

// Add a little wiggle room for close completion time.
// If close did run before flush, then an exception would have been thrown in one of the futures.
assertTrue(flushCompletionTime.get() <= (closeCompletionTime.get() + 10),
() -> "Expected flush to complete before close as close will wait for any in-flight flush requests to "
+ "finish before closing buffered sender. Flush finished at " + flushTime + ", close finished at "
+ closeTime + ", difference was " + differenceMillis + "ms");
}

@Test
Expand Down Expand Up @@ -1175,22 +1217,42 @@ public void closeWillWaitForAnyCurrentFlushesToCompleteBeforeRunningAsync() thro
CountDownLatch countDownLatch = new CountDownLatch(2);
batchingClient.addUploadActions(readJsonFileToList(HOTELS_DATA_JSON)).block();

AtomicLong firstFlushCompletionTime = new AtomicLong();
Mono.using(() -> 1, ignored -> batchingClient.flush(), ignored -> {
firstFlushCompletionTime.set(System.nanoTime());
countDownLatch.countDown();
}).subscribe();
AtomicLong flushCompletionTime = new AtomicLong();
Future<?> future1 = SharedExecutorService.getInstance().submit(() -> {
Mono.using(() -> 1, ignored -> batchingClient.flush(), ignored -> {
flushCompletionTime.set(System.currentTimeMillis());
countDownLatch.countDown();
}).block();
});

AtomicLong secondFlushCompletionTime = new AtomicLong();
Mono.using(() -> 1, ignored -> batchingClient.close(), ignored -> {
secondFlushCompletionTime.set(System.nanoTime());
countDownLatch.countDown();
}).subscribe();

countDownLatch.await();
assertTrue(firstFlushCompletionTime.get() <= secondFlushCompletionTime.get(),
() -> "Expected first flush attempt to complete before second flush attempt. First flush finished at "
+ firstFlushCompletionTime.get() + ", second flush finished at " + secondFlushCompletionTime.get());
// Delay the close by 100ms to ensure that it starts after the flush.
// The mocked HttpRequest will delay the response by 2 seconds, so if the close does finish first it will be a
// true indication of incorrect behavior.
AtomicLong closeCompletionTime = new AtomicLong();
Future<?> future2 = SharedExecutorService.getInstance().schedule(() -> {
Mono.using(() -> 1, ignored -> batchingClient.close(), ignored -> {
closeCompletionTime.set(System.currentTimeMillis());
countDownLatch.countDown();
}).block();
}, 100, TimeUnit.MILLISECONDS);

if (!countDownLatch.await(10, TimeUnit.SECONDS)) {
fail("Timed out waiting for closes to complete.");
}

assertDoesNotThrow(() -> future1.get());
assertDoesNotThrow(() -> future2.get());

long flushTime = flushCompletionTime.get();
long closeTime = closeCompletionTime.get();
long differenceMillis = Math.abs(flushTime - closeTime);

// Add a little wiggle room for close completion time.
// If close did run before flush, then an exception would have been thrown in one of the futures.
assertTrue(flushCompletionTime.get() <= (closeCompletionTime.get() + 10),
() -> "Expected flush to complete before close as close will wait for any in-flight flush requests to "
+ "finish before closing buffered sender. Flush finished at " + flushTime + ", close finished at "
+ closeTime + ", difference was " + differenceMillis + "ms");
}

@Test
Expand Down