Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poll the count of running step executions #3791

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package org.springframework.batch.core;

import java.util.Arrays;
import java.util.List;

/**
* Enumeration representing the status of an Execution.
*
Expand All @@ -39,6 +42,8 @@ public enum BatchStatus {
*/
COMPLETED, STARTING, STARTED, STOPPING, STOPPED, FAILED, ABANDONED, UNKNOWN;

public static final List<BatchStatus> RUNNING_STATUSES = Arrays.asList(STARTING, STARTED);

public static BatchStatus max(BatchStatus status1, BatchStatus status2) {
return status1.isGreaterThan(status2) ? status1 : status2;
}
Expand All @@ -49,7 +54,7 @@ public static BatchStatus max(BatchStatus status1, BatchStatus status2) {
* @return true if the status is STARTING, STARTED
*/
public boolean isRunning() {
return this == STARTING || this == STARTED;
return RUNNING_STATUSES.contains(this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
*/
package org.springframework.batch.core.explore;

import java.util.List;
import java.util.Set;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.lang.Nullable;

import java.util.Collection;
import java.util.List;
import java.util.Set;

/**
* Entry point for browsing executions of running or historical jobs and steps.
* Since the data may be re-hydrated from persistent storage, it may not contain
Expand Down Expand Up @@ -89,6 +91,14 @@ default JobInstance getLastJobInstance(String jobName) {
@Nullable
StepExecution getStepExecution(@Nullable Long jobExecutionId, @Nullable Long stepExecutionId);

/**
* Retrieve number of step executions that match the step execution ids and the batch statuses
* @param stepExecutionIds given step execution ids
* @param matchingBatchStatuses given batch statuses to match against
* @return number of {@link StepExecution} matching the criteria
*/
int getStepExecutionCount(Collection<Long> stepExecutionIds, Collection<BatchStatus> matchingBatchStatuses);

/**
* @param instanceId {@link Long} id for the jobInstance to obtain.
* @return the {@link JobInstance} with this id, or null
Expand Down Expand Up @@ -164,4 +174,11 @@ default JobExecution getLastJobExecution(JobInstance jobInstance) {
*/
int getJobInstanceCount(@Nullable String jobName) throws NoSuchJobException;

/**
* Find step executions in bulk
* @param jobExecutionId given job execution id
* @param stepExecutionIds given step execution ids
* @return collection of {@link StepExecution}
*/
Collection<StepExecution> getStepExecutions(Long jobExecutionId, Collection<Long> stepExecutionIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.batch.core.explore.support;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.StepExecution;
Expand All @@ -27,6 +28,7 @@
import org.springframework.batch.core.repository.dao.StepExecutionDao;
import org.springframework.lang.Nullable;

import java.util.Collection;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -165,6 +167,14 @@ public StepExecution getStepExecution(@Nullable Long jobExecutionId, @Nullable L
return stepExecution;
}

@Override
public int getStepExecutionCount(Collection<Long> stepExecutionIds, Collection<BatchStatus> matchingBatchStatuses) {
if (stepExecutionIds.isEmpty() || matchingBatchStatuses.isEmpty()) {
return 0;
}
return stepExecutionDao.countStepExecutions(stepExecutionIds, matchingBatchStatuses);
}

/*
* (non-Javadoc)
*
Expand Down Expand Up @@ -221,6 +231,19 @@ public int getJobInstanceCount(@Nullable String jobName) throws NoSuchJobExcepti
return jobInstanceDao.getJobInstanceCount(jobName);
}

@Override
@Nullable
public Collection<StepExecution> getStepExecutions(Long jobExecutionId, Collection<Long> stepExecutionIds) {
JobExecution jobExecution = jobExecutionDao.getJobExecution(jobExecutionId);
if (jobExecution == null) {
return null;
}
getJobExecutionDependencies(jobExecution);
Collection<StepExecution> stepExecutions = stepExecutionDao.getStepExecutions(jobExecution, stepExecutionIds);
stepExecutions.forEach(this::getStepExecutionDependencies);
return stepExecutions;
}

/*
* Find all dependencies for a JobExecution, including JobInstance (which
* requires JobParameters) plus StepExecutions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,9 @@

package org.springframework.batch.core.repository.dao;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.*;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
Expand All @@ -43,6 +27,11 @@
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* JDBC implementation of {@link StepExecutionDao}.<br>
*
Expand Down Expand Up @@ -114,6 +103,15 @@ public class JdbcStepExecutionDao extends AbstractJdbcBatchMetadataDao implement
" and SE.JOB_EXECUTION_ID = JE.JOB_EXECUTION_ID " +
" and SE.STEP_NAME = ?";

// need to replace the %STEP_EXECUTION_IDS% with a known number of ?s
private static final String GET_STEP_EXECUTIONS_BY_IDS = GET_RAW_STEP_EXECUTIONS + " and STEP_EXECUTION_ID IN (%STEP_EXECUTION_IDS%)";

// need to replace the %STEP_EXECUTION_IDS% and %STEP_STATUSES% with a known number of ?s
private static final String COUNT_STEP_EXECUTIONS_MATCHING_IDS_AND_STATUSES = "SELECT COUNT(*) " +
"from %PREFIX%STEP_EXECUTION SE " +
"where SE.STEP_EXECUTION_ID IN (%STEP_EXECUTION_IDS%) " +
"and SE.STATUS IN (%STEP_STATUSES%)";

private int exitMessageLength = DEFAULT_EXIT_MESSAGE_LENGTH;

private DataFieldMaxValueIncrementer stepExecutionIncrementer;
Expand Down Expand Up @@ -350,12 +348,31 @@ public StepExecution getLastStepExecution(JobInstance jobInstance, String stepNa
}
}

@Override
public Collection<StepExecution> getStepExecutions(JobExecution jobExecution, Collection<Long> stepExecutionIds) {
String sql = createParameterizedQuery(GET_STEP_EXECUTIONS_BY_IDS, "%STEP_EXECUTION_IDS%", stepExecutionIds);
return getJdbcTemplate().query(getQuery(sql),
new StepExecutionRowMapper(jobExecution),
Stream.concat(Stream.of(jobExecution.getId()), stepExecutionIds.stream()).toArray());
}

@Override
public void addStepExecutions(JobExecution jobExecution) {
getJdbcTemplate().query(getQuery(GET_STEP_EXECUTIONS), new StepExecutionRowMapper(jobExecution),
jobExecution.getId());
}

@Override
public int countStepExecutions(Collection<Long> stepExecutionIds, Collection<BatchStatus> matchingBatchStatuses) {
String sql = createParameterizedQuery(COUNT_STEP_EXECUTIONS_MATCHING_IDS_AND_STATUSES, "%STEP_EXECUTION_IDS%", stepExecutionIds);
sql = createParameterizedQuery(sql, "%STEP_STATUSES%", matchingBatchStatuses);
Object[] args = Stream.concat(stepExecutionIds.stream(),
matchingBatchStatuses.stream().map(BatchStatus::name)).toArray();
return getJdbcTemplate().queryForObject(getQuery(sql),
Integer.class,
args);
}

@Override
public int countStepExecutions(JobInstance jobInstance, String stepName) {
return getJdbcTemplate().queryForObject(getQuery(COUNT_STEP_EXECUTIONS), new Object[] { jobInstance.getInstanceId(), stepName }, Integer.class);
Expand Down Expand Up @@ -391,4 +408,17 @@ public StepExecution mapRow(ResultSet rs, int rowNum) throws SQLException {

}

/**
* Replaces a given placeholder with a number of parameters (i.e. "?").
*
* @param sqlTemplate given sql template
* @param placeholder placeholder that is being used for parameters
* @param parameters collection of parameters with variable size
*
* @return sql query replaced with a number of parameters
*/
private static String createParameterizedQuery(String sqlTemplate, String placeholder, Collection<?> parameters) {
String params = parameters.stream().map(p -> "?").collect(Collectors.joining(", "));
return sqlTemplate.replace(placeholder, params);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,19 @@
*/
package org.springframework.batch.core.repository.dao;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import org.springframework.batch.core.Entity;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.*;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.SerializationUtils;

import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

/**
* In-memory implementation of {@link StepExecutionDao}.
*
Expand Down Expand Up @@ -189,4 +182,23 @@ public int countStepExecutions(JobInstance jobInstance, String stepName) {
}
return count;
}

@Override
public int countStepExecutions(Collection<Long> stepExecutionIds, Collection<BatchStatus> matchingBatchStatuses) {
int count = 0;

for (Long id: stepExecutionIds) {
if (executionsByStepExecutionId.containsKey(id) && matchingBatchStatuses.contains(executionsByStepExecutionId.get(id).getStatus())) {
count++;
}
}
return count;
}

@Override
public Collection<StepExecution> getStepExecutions(JobExecution jobExecution, Collection<Long> stepExecutionIds) {
return executionsByStepExecutionId.values().stream()
.filter(se -> stepExecutionIds.contains(se.getId()) && se.getJobExecutionId().equals(jobExecution.getId()))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package org.springframework.batch.core.repository.dao;

import java.util.Collection;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.StepExecution;
import org.springframework.lang.Nullable;

import java.util.Collection;

public interface StepExecutionDao {

/**
Expand Down Expand Up @@ -86,6 +87,22 @@ default StepExecution getLastStepExecution(JobInstance jobInstance, String stepN
*/
void addStepExecutions(JobExecution jobExecution);

/**
* Count {@link StepExecution} that match the ids and statuses of them - avoid loading them into memory
* @param stepExecutionIds given step execution ids
* @param matchingBatchStatuses
* @return
*/
int countStepExecutions(Collection<Long> stepExecutionIds, Collection<BatchStatus> matchingBatchStatuses);

/**
* Get a collection of {@link StepExecution} matching job execution and step execution ids.
* @param jobExecution the parent job execution
* @param stepExecutionIds the step execution ids
* @return collection of {@link StepExecution}
*/
@Nullable
Collection<StepExecution> getStepExecutions(JobExecution jobExecution, Collection<Long> stepExecutionIds);
/**
* Counts all the {@link StepExecution} for a given step name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,10 @@
*/
package org.springframework.batch.core.launch.support;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.*;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.converter.JobParametersConverter;
import org.springframework.batch.core.explore.JobExplorer;
Expand All @@ -49,6 +31,10 @@
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;

import java.io.IOException;
import java.io.InputStream;
import java.util.*;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -538,6 +524,11 @@ public StepExecution getStepExecution(@Nullable Long jobExecutionId, @Nullable L
throw new UnsupportedOperationException();
}

@Override
public int getStepExecutionCount(Collection<Long> stepExecutionIds, Collection<BatchStatus> matchingBatchStatuses) {
throw new UnsupportedOperationException();
}

@Override
public List<String> getJobNames() {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -566,6 +557,10 @@ public int getJobInstanceCount(@Nullable String jobName)
}
}

@Override
public Collection<StepExecution> getStepExecutions(Long jobExecutionId, Collection<Long> stepExecutionIds) {
throw new UnsupportedOperationException();
}
}

public static class StubJobParametersConverter implements JobParametersConverter {
Expand Down
Loading