Skip to content

Commit

Permalink
Improve step execution polling and retrieval
Browse files Browse the repository at this point in the history
Resolves #3790
  • Loading branch information
hpoettker authored and fmbenhassine committed Feb 22, 2023
1 parent 708f1c8 commit c68da18
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2019 the original author or authors.
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import javax.batch.operations.BatchRuntimeException;
import javax.batch.operations.JobExecutionAlreadyCompleteException;
import javax.batch.operations.JobExecutionIsRunningException;
Expand All @@ -47,6 +48,7 @@
import org.apache.commons.logging.LogFactory;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Entity;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
Expand Down Expand Up @@ -412,9 +414,11 @@ public List<StepExecution> getStepExecutions(long executionId)
List<StepExecution> batchExecutions = new ArrayList<>();

if(executions != null) {
for (org.springframework.batch.core.StepExecution stepExecution : executions) {
if(!stepExecution.getStepName().contains(":partition")) {
batchExecutions.add(new JsrStepExecution(jobExplorer.getStepExecution(executionId, stepExecution.getId())));
Set<Long> stepExecutionIds = executions.stream().map(Entity::getId).collect(Collectors.toSet());
org.springframework.batch.core.JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
for (org.springframework.batch.core.StepExecution stepExecution : jobExecution.getStepExecutions()) {
if(!stepExecution.getStepName().contains(":partition") && stepExecutionIds.contains(stepExecution.getId())) {
batchExecutions.add(new JsrStepExecution(stepExecution));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2013 the original author or authors.
* Copyright 2006-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,9 +16,12 @@

package org.springframework.batch.core.partition.support;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.beans.factory.InitializingBean;
Expand Down Expand Up @@ -90,14 +93,16 @@ public void aggregate(StepExecution result, Collection<StepExecution> executions
if (executions == null) {
return;
}
Collection<StepExecution> updates = new ArrayList<>();
for (StepExecution stepExecution : executions) {
Set<Long> stepExecutionIds = executions.stream().map(stepExecution -> {
Long id = stepExecution.getId();
Assert.state(id != null, "StepExecution has null id. It must be saved first: " + stepExecution);
StepExecution update = jobExplorer.getStepExecution(stepExecution.getJobExecutionId(), id);
Assert.state(update != null, "Could not reload StepExecution from JobRepository: " + stepExecution);
updates.add(update);
}
return id;
}).collect(Collectors.toSet());
JobExecution jobExecution = jobExplorer.getJobExecution(result.getJobExecutionId());
Assert.state(jobExecution != null,
"Could not load JobExecution from JobRepository for id " + result.getJobExecutionId());
List<StepExecution> updates = jobExecution.getStepExecutions().stream()
.filter(stepExecution -> stepExecutionIds.contains(stepExecution.getId())).collect(Collectors.toList());
delegate.aggregate(result, updates);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2018 the original author or authors.
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -403,8 +403,6 @@ public void testGetStepExecutionsRoseyScenario() {
jobExecution.addStepExecutions(stepExecutions);

when(jobExplorer.getJobExecution(5L)).thenReturn(jobExecution);
when(jobExplorer.getStepExecution(5L, 1L)).thenReturn(new StepExecution("step1", jobExecution, 1L));
when(jobExplorer.getStepExecution(5L, 2L)).thenReturn(new StepExecution("step2", jobExecution, 2L));

List<javax.batch.runtime.StepExecution> results = jsrJobOperator.getStepExecutions(5L);

Expand All @@ -429,8 +427,6 @@ public void testGetStepExecutionsPartitionedStepScenario() {
jobExecution.addStepExecutions(stepExecutions);

when(jobExplorer.getJobExecution(5L)).thenReturn(jobExecution);
when(jobExplorer.getStepExecution(5L, 1L)).thenReturn(new StepExecution("step1", jobExecution, 1L));
when(jobExplorer.getStepExecution(5L, 2L)).thenReturn(new StepExecution("step2", jobExecution, 2L));

List<javax.batch.runtime.StepExecution> results = jsrJobOperator.getStepExecutions(5L);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,35 @@
/*
* Copyright 2009-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.integration.partition;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.sql.DataSource;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.explore.JobExplorer;
Expand Down Expand Up @@ -242,19 +258,12 @@ private Collection<StepExecution> pollReplies(final StepExecution masterStepExec
Callable<Collection<StepExecution>> callback = new Callable<Collection<StepExecution>>() {
@Override
public Collection<StepExecution> call() throws Exception {

for(Iterator<StepExecution> stepExecutionIterator = split.iterator(); stepExecutionIterator.hasNext(); ) {
StepExecution curStepExecution = stepExecutionIterator.next();

if(!result.contains(curStepExecution)) {
StepExecution partitionStepExecution =
jobExplorer.getStepExecution(masterStepExecution.getJobExecutionId(), curStepExecution.getId());

if(!partitionStepExecution.getStatus().isRunning()) {
result.add(partitionStepExecution);
}
}
}
Set<Long> currentStepExecutionIds = split.stream().map(StepExecution::getId).collect(Collectors.toSet());
JobExecution jobExecution = jobExplorer.getJobExecution(masterStepExecution.getJobExecutionId());
jobExecution.getStepExecutions().stream()
.filter(stepExecution -> currentStepExecutionIds.contains(stepExecution.getId()))
.filter(stepExecution -> !result.contains(stepExecution))
.filter(stepExecution -> !stepExecution.getStatus().isRunning()).forEach(result::add);

if(logger.isDebugEnabled()) {
logger.debug(String.format("Currently waiting on %s partitions to finish", split.size()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
/*
* Copyright 2020-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.integration.partition;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -154,7 +171,12 @@ public void testHandleWithJobRepositoryPolling() throws Exception {
stepExecutions.add(partition2);
stepExecutions.add(partition3);
when(stepExecutionSplitter.split(any(StepExecution.class), eq(1))).thenReturn(stepExecutions);
when(jobExplorer.getStepExecution(eq(5L), any(Long.class))).thenReturn(partition2, partition1, partition3, partition3, partition3, partition3, partition4);
JobExecution runningJobExecution = new JobExecution(5L, new JobParameters());
runningJobExecution.addStepExecutions(Arrays.asList(partition2, partition1, partition3));
JobExecution completedJobExecution = new JobExecution(5L, new JobParameters());
completedJobExecution.addStepExecutions(Arrays.asList(partition2, partition1, partition4));
when(jobExplorer.getJobExecution(5L)).thenReturn(runningJobExecution, runningJobExecution, runningJobExecution,
completedJobExecution);

//set
messageChannelPartitionHandler.setMessagingOperations(operations);
Expand Down Expand Up @@ -198,7 +220,9 @@ public void testHandleWithJobRepositoryPollingTimeout() throws Exception {
stepExecutions.add(partition2);
stepExecutions.add(partition3);
when(stepExecutionSplitter.split(any(StepExecution.class), eq(1))).thenReturn(stepExecutions);
when(jobExplorer.getStepExecution(eq(5L), any(Long.class))).thenReturn(partition2, partition1, partition3);
JobExecution runningJobExecution = new JobExecution(5L, new JobParameters());
runningJobExecution.addStepExecutions(Arrays.asList(partition2, partition1, partition3));
when(jobExplorer.getJobExecution(5L)).thenReturn(runningJobExecution);

//set
messageChannelPartitionHandler.setMessagingOperations(operations);
Expand Down

0 comments on commit c68da18

Please sign in to comment.