Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StepSynchronizationManager caches wrong StepExecution context in a multitenant environment #4774

Open
galovics opened this issue Feb 26, 2025 · 2 comments
Labels
status: waiting-for-triage Issues that we did not analyse yet type: bug

Comments

@galovics
Copy link

Bug description
In the https://github.com/apache/fineract project, we are using Spring Batch to execute our jobs. Fineract is a multitenant application and it uses a thread-bound context to identify which tenant a certain job/API request/etc should be handled for.

In terms of DataSource, Fineract uses a routing data-source which based on this thread-bound context retrieves the right tenant-specific DataSource.

This has been configured for Spring Batch as well and the job executions are working perfectly fine, however when it comes to managing Spring Batch's internal state, there are issues. Not with the DataSource itself but the fact that Spring Batch uses a global caching mechanism located in StepSynchronizationManager and JobSynchronizationManager.

Consider the following:

  • There are 2 tenants in Fineract, each with its respective databases
  • Therefore the whole database structure is the same for both databases, so there are Spring Batch tables for tenant1 and tenant2

In our case, the jobs are scheduled with Quartz via a cron expression and multiple threads are spawning for each tenant, so essentially jobs are started in parallel.

This brings us to the issue with the StepSynchronizationManager because when Spring Batch tries to manage the state of the StepExecutions, it uses the StepSynchronizationManager to retrieve the right StepExecution but they are getting mixed up due to the fact that the underlying implementation in StepSynchronizationManager is using SynchronizationManagerSupport which in turn uses a ConcurrentHashMap. The key of this HashMap is the StepExecution itself that relies on it's ID field to do a hashCode calculation and an equality check.

Based on this, it's possible that 2 threads in parallel running the same job getting each other's StepExecution because their cron schedule is the same and the StepExecution IDs are also the same.

This eventually turns into:

org.quartz.JobExecutionException: exitCode=FAILED;exitDescription=org.springframework.batch.core.step.FatalStepExecutionException: JobRepository failure forcing rollback
	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:447)
	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:312)
	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
	at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:255)
	at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
	at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:369)
	at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:206)
	at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:140)
	at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:240)
	at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:229)
	at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:153)
	at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:418)
	at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:132)
	at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:317)
	at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:157)
	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
	at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:148)
	at org.springframework.batch.core.launch.support.TaskExecutorJobLauncher.run(TaskExecutorJobLauncher.java:59)
	at org.apache.fineract.infrastructure.jobs.service.JobStarter.run(JobStarter.java:96)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:285)
	at org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean$MethodInvokingJob.executeInternal(MethodInvokingJobDetailFactoryBean.java:269)
	at org.springframework.scheduling.quartz.QuartzJobBean.execute(QuartzJobBean.java:75)
	at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
	at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
Caused by: org.springframework.dao.OptimisticLockingFailureException: Attempt to update step execution id=43 with wrong version (2), where current version is 1
	at org.springframework.batch.core.repository.dao.JdbcStepExecutionDao.updateStepExecution(JdbcStepExecutionDao.java:291)
	at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:230)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:355)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:379)
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223)
	at jdk.proxy2/jdk.proxy2.$Proxy292.update(Unknown Source)
	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:440)
	... 27 more

	at org.apache.fineract.infrastructure.jobs.service.JobStarter.run(JobStarter.java:98)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:285)
	at org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean$MethodInvokingJob.executeInternal(MethodInvokingJobDetailFactoryBean.java:269)
	at org.springframework.scheduling.quartz.QuartzJobBean.execute(QuartzJobBean.java:75)
	at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
	at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)

Environment
Java 17, PostgreSQL, Spring Batch 5.1.2

Expected behavior
No concurrency issue is present.

Notes
After a long trial-and-error session, I was able to get around this issue by forcing the system to not only do an ID hashCode and equals calculation but to consider the tenant's ID as well.

Here's a sketch of it (I'll have a PR in the fineract repo soon and link it here):

Essentially I copy pasted the StepSynchronizationManager with the same package and class name as Spring Batch so it gets picked up earlier during startup hence the Spring Batch implementation is replaced.

public class StepSynchronizationManager {

    private static final SynchronizationManagerSupport<StepExecution, StepContext> manager = new SynchronizationManagerSupport<>() {

        @Override
        protected StepContext createNewContext(StepExecution execution) {
            return new StepContext(execution);
        }

        @Override
        protected void close(StepContext context) {
            context.close();
        }
    };

    @Nullable
    public static StepContext getContext() {
        return manager.getContext();
    }

    public static StepContext register(StepExecution stepExecution) {
        ProxyFactory factory = new ProxyFactory(stepExecution);
        factory.setProxyTargetClass(true);
        factory.addAdvice(new TenantAwareEqualsHashCodeAdvice(stepExecution));
        return manager.register((StepExecution) factory.getProxy());
    }

    public static void close() {
        manager.close();
    }

    public static void release() {
        manager.release();
    }

}

And the advice code is:

public class TenantAwareEqualsHashCodeAdvice implements MethodInterceptor {
    private final Object target;
    private final String tenantIdentifier;

    public TenantAwareEqualsHashCodeAdvice(Object target) {
        this.target = target;
        FineractPlatformTenant tenant = ThreadLocalContextUtil.getTenant();
        this.tenantIdentifier = tenant != null ? tenant.getTenantIdentifier() : null;
    }

    @Nullable
    @Override
    public Object invoke(@NotNull MethodInvocation invocation) throws Throwable {
        String methodName = invocation.getMethod().getName();

        if (methodName.equals("equals") && invocation.getArguments().length == 1) {
            Object other = invocation.getArguments()[0];
            return Objects.equals(tenantIdentifier, ((TenantAwareEqualsHashCodeAdvice) target).tenantIdentifier) && target.equals(other);
        }

        if (methodName.equals("hashCode") && invocation.getArguments().length == 0) {
            return Objects.hash(target.hashCode(), tenantIdentifier);
        }

        return invocation.proceed();
    }
}

@galovics galovics added status: waiting-for-triage Issues that we did not analyse yet type: bug labels Feb 26, 2025
@galovics
Copy link
Author

Fineract PR is here: apache/fineract#4386

@galovics
Copy link
Author

Turns out I need to use a direct CGLib proxy instead of creating it with Spring AOP to catch the equals/hashCode method, here's the update:

public class StepSynchronizationManager {

    private static final SynchronizationManagerSupport<StepExecution, StepContext> manager = new SynchronizationManagerSupport<>() {

        @Override
        protected StepContext createNewContext(StepExecution execution) {
            return new StepContext(execution);
        }

        @Override
        protected void close(StepContext context) {
            context.close();
        }
    };

    @Nullable
    public static StepContext getContext() {
        return manager.getContext();
    }

    public static StepContext register(StepExecution stepExecution) {
        Enhancer enhancer = new Enhancer();
        enhancer.setSuperclass(StepExecution.class);
        enhancer.setCallback(new TenantAwareEqualsHashCodeAdvice(stepExecution));
        return manager.register((StepExecution) enhancer.create(new Class[] { String.class, JobExecution.class },
                new Object[] { stepExecution.getStepName(), stepExecution.getJobExecution() }));
    }

    public static void close() {
        manager.close();
    }

    public static void release() {
        manager.release();
    }

}
public class TenantAwareEqualsHashCodeAdvice implements MethodInterceptor {

    private final Object target;
    private final String tenantIdentifier;

    public TenantAwareEqualsHashCodeAdvice(Object target) {
        this.target = target;
        FineractPlatformTenant tenant = ThreadLocalContextUtil.getTenant();
        this.tenantIdentifier = tenant != null ? tenant.getTenantIdentifier() : null;
    }

    @Override
    public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
        String methodName = method.getName();

        if ("equals".equals(methodName) && args.length == 1) {
            Object other = args[0];

            if (other instanceof Factory) {

                TenantAwareEqualsHashCodeAdvice otherProxy = (TenantAwareEqualsHashCodeAdvice) ((Factory) other).getCallback(0);
                return Objects.equals(target, otherProxy.target) && Objects.equals(tenantIdentifier, otherProxy.tenantIdentifier);
            }
            return false;
        }

        if ("hashCode".equals(methodName) && args.length == 0) {
            return Objects.hash(target.hashCode(), tenantIdentifier);
        }

        return proxy.invoke(target, args);
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-triage Issues that we did not analyse yet type: bug
Projects
None yet
Development

No branches or pull requests

1 participant