From 09bf8c9216f32d1678bfdd1f219b0f151d90e62b Mon Sep 17 00:00:00 2001 From: Yogesh Pandey Date: Thu, 27 Jan 2022 21:52:09 +0530 Subject: [PATCH] Enable flag in State machine to execute action in sync way while accepting event --- .../config/ObjectStateMachineFactory.java | 2 +- .../StateMachineConfigurationBuilder.java | 13 ++++++++- .../configurers/ConfigurationConfigurer.java | 11 ++++++++ .../DefaultConfigurationConfigurer.java | 9 +++++++ .../config/model/ConfigurationData.java | 17 ++++++++++-- .../support/AbstractStateMachine.java | 16 +++++++++-- .../support/ReactiveStateMachineExecutor.java | 27 ++++++++++--------- 7 files changed, 77 insertions(+), 18 deletions(-) diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/ObjectStateMachineFactory.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/ObjectStateMachineFactory.java index 42ebe1996..c01fa2811 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/ObjectStateMachineFactory.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/ObjectStateMachineFactory.java @@ -79,7 +79,7 @@ protected StateMachine buildStateMachineInternal(Collection> s extendedState, uuid); machine.setId(machineId); machine.setHistoryState(historyState); - machine.setTransitionConflightPolicy(stateMachineModel.getConfigurationData().getTransitionConflictPolicy()); + machine.setTransitionConflictPolicy(stateMachineModel.getConfigurationData().getTransitionConflictPolicy()); if (contextEventsEnabled != null) { machine.setContextEventsEnabled(contextEventsEnabled); } diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/builders/StateMachineConfigurationBuilder.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/builders/StateMachineConfigurationBuilder.java index bf07831e1..790a65961 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/builders/StateMachineConfigurationBuilder.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/builders/StateMachineConfigurationBuilder.java @@ -79,6 +79,7 @@ public class StateMachineConfigurationBuilder private StateMachineMonitor stateMachineMonitor; private final List> interceptors = new ArrayList>(); private StateMachineRuntimePersister persister; + private boolean executeActionsInSyncEnabled; /** * Instantiates a new state machine configuration builder. @@ -149,7 +150,9 @@ protected ConfigurationData performBuild() throws Exception { return new ConfigurationData(beanFactory, autoStart, ensemble, listeners, securityEnabled, transitionSecurityAccessDecisionManager, eventSecurityAccessDecisionManager, eventSecurityRule, transitionSecurityRule, verifierEnabled, verifier, machineId, stateMachineMonitor, interceptorsCopy, - transitionConflictPolicy, stateDoActionPolicy, stateDoActionPolicyTimeout, regionExecutionPolicy); + transitionConflictPolicy, stateDoActionPolicy, stateDoActionPolicyTimeout, regionExecutionPolicy, + executeActionsInSyncEnabled); + } /** @@ -307,4 +310,12 @@ public void setStateDoActionPolicy(StateDoActionPolicy stateDoActionPolicy, Long public void setRegionExecutionPolicy(RegionExecutionPolicy regionExecutionPolicy) { this.regionExecutionPolicy = regionExecutionPolicy; } + + /** + * sets execute actions in sync flag + * @param executeActionsInSyncEnabled + */ + public void setExecuteActionsInSyncEnabled(boolean executeActionsInSyncEnabled) { + this.executeActionsInSyncEnabled = executeActionsInSyncEnabled; + } } diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/configurers/ConfigurationConfigurer.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/configurers/ConfigurationConfigurer.java index 15ac2c167..3789af281 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/configurers/ConfigurationConfigurer.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/configurers/ConfigurationConfigurer.java @@ -105,4 +105,15 @@ public interface ConfigurationConfigurer extends * @return the configuration configurer */ ConfigurationConfigurer regionExecutionPolicy(RegionExecutionPolicy regionExecutionPolicy); + + + /** + * Specify if state machine should execute all transition actions, and state entry, exit actions + * in sync while accepting event. + * any action execution failure can prevent acceptance of the event + * + * @param executeActionsInSyncEnabled the autoStartup flag + * @return configurer for chaining + */ + ConfigurationConfigurer executeActionsInSyncEnabled(boolean executeActionsInSyncEnabled); } diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/configurers/DefaultConfigurationConfigurer.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/configurers/DefaultConfigurationConfigurer.java index 6bc510cf0..aae38c2c8 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/configurers/DefaultConfigurationConfigurer.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/configurers/DefaultConfigurationConfigurer.java @@ -49,6 +49,7 @@ public class DefaultConfigurationConfigurer private Long stateDoActionPolicyTimeout; private RegionExecutionPolicy regionExecutionPolicy; private final List> listeners = new ArrayList>(); + private boolean executeActionsInSyncEnabled = false; @Override public void configure(StateMachineConfigurationBuilder builder) throws Exception { @@ -59,6 +60,7 @@ public void configure(StateMachineConfigurationBuilder builder) throws Exc builder.setTransitionConflictPolicy(transitionConflightPolicy); builder.setStateDoActionPolicy(stateDoActionPolicy, stateDoActionPolicyTimeout); builder.setRegionExecutionPolicy(regionExecutionPolicy); + builder.setExecuteActionsInSyncEnabled(executeActionsInSyncEnabled); } @Override @@ -108,4 +110,11 @@ public ConfigurationConfigurer regionExecutionPolicy(RegionExecutionPolicy this.regionExecutionPolicy = regionExecutionPolicy; return this; } + + + @Override + public ConfigurationConfigurer executeActionsInSyncEnabled(boolean executeActionsInSync) { + this.executeActionsInSyncEnabled = executeActionsInSync; + return this; + } } diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/model/ConfigurationData.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/model/ConfigurationData.java index 79431a77f..f784db88d 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/model/ConfigurationData.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/config/model/ConfigurationData.java @@ -60,6 +60,7 @@ public class ConfigurationData { private final StateMachineMonitor stateMachineMonitor; private final List> interceptors; private final RegionExecutionPolicy regionExecutionPolicy; + private final boolean executeActionsInSyncEnabled; /** * Instantiates a new state machine configuration config data. @@ -96,7 +97,7 @@ public ConfigurationData(BeanFactory beanFactory, boolean autoStart, StateMachin List> interceptors) { this(beanFactory, autoStart, ensemble, listeners, securityEnabled, transitionSecurityAccessDecisionManager, eventSecurityAccessDecisionManager, eventSecurityRule, transitionSecurityRule, verifierEnabled, - verifier, machineId, stateMachineMonitor, interceptors, null, null, null, null); + verifier, machineId, stateMachineMonitor, interceptors, null, null, null, null, false); } /** @@ -120,6 +121,7 @@ public ConfigurationData(BeanFactory beanFactory, boolean autoStart, StateMachin * @param stateDoActionPolicy the state do action policy * @param stateDoActionPolicyTimeout the state do action policy timeout * @param regionExecutionPolicy the region execution policy + * @param executeActionsInSyncEnabled the execute actions in sync enabled flag */ public ConfigurationData(BeanFactory beanFactory, boolean autoStart, StateMachineEnsemble ensemble, List> listeners, boolean securityEnabled, @@ -129,7 +131,7 @@ public ConfigurationData(BeanFactory beanFactory, boolean autoStart, StateMachin String machineId, StateMachineMonitor stateMachineMonitor, List> interceptors, TransitionConflictPolicy transitionConflightPolicy, StateDoActionPolicy stateDoActionPolicy, Long stateDoActionPolicyTimeout, - RegionExecutionPolicy regionExecutionPolicy) { + RegionExecutionPolicy regionExecutionPolicy,boolean executeActionsInSyncEnabled) { this.beanFactory = beanFactory; this.autoStart = autoStart; this.ensemble = ensemble; @@ -148,6 +150,7 @@ public ConfigurationData(BeanFactory beanFactory, boolean autoStart, StateMachin this.stateDoActionPolicy = stateDoActionPolicy; this.stateDoActionPolicyTimeout = stateDoActionPolicyTimeout; this.regionExecutionPolicy = regionExecutionPolicy; + this.executeActionsInSyncEnabled = executeActionsInSyncEnabled; } public String getMachineId() { @@ -306,4 +309,14 @@ public Long getStateDoActionPolicyTimeout() { public RegionExecutionPolicy getRegionExecutionPolicy() { return regionExecutionPolicy; } + + + /** + * Checks if execute actions in sync is enabled. + * + * @return true, if execute actions in sync is enabled + */ + public boolean isExecuteActionsInSyncEnabled() { + return executeActionsInSyncEnabled; + } } diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/AbstractStateMachine.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/AbstractStateMachine.java index ec12d739b..351a3ed27 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/AbstractStateMachine.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/AbstractStateMachine.java @@ -128,6 +128,8 @@ public abstract class AbstractStateMachine extends StateMachineObjectSuppo private StateMachine parentMachine; + private boolean executeActionsInSyncEnabled; + /** * Instantiates a new abstract state machine. * @@ -310,7 +312,7 @@ public Mono doOnComplete(StateContext context) { } ReactiveStateMachineExecutor executor = new ReactiveStateMachineExecutor(this, getRelayStateMachine(), transitions, - triggerToTransitionMap, triggerlessTransitions, initialTransition, initialEvent, transitionConflictPolicy); + triggerToTransitionMap, triggerlessTransitions, initialTransition, initialEvent, transitionConflictPolicy, executeActionsInSyncEnabled); if (getBeanFactory() != null) { executor.setBeanFactory(getBeanFactory()); } @@ -614,7 +616,7 @@ public void setForwardedInitialEvent(Message message) { * * @param transitionConflictPolicy the new transition conflict policy */ - public void setTransitionConflightPolicy(TransitionConflictPolicy transitionConflictPolicy) { + public void setTransitionConflictPolicy(TransitionConflictPolicy transitionConflictPolicy) { this.transitionConflictPolicy = transitionConflictPolicy; } @@ -1484,4 +1486,14 @@ private static boolean isDirectSubstate(State left, State rig return false; } } + + /** + * Sets execute actions in sync flag. + * if enabled statemachine will execute all actions reactive non-reactive in sync while accepting an event, + * If any action results in error statemachine will error out + * @param executeActionsInSyncEnabled + */ + public void setExecuteActionsInSync(boolean executeActionsInSyncEnabled) { + this.executeActionsInSyncEnabled = executeActionsInSyncEnabled; + } } diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveStateMachineExecutor.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveStateMachineExecutor.java index 26f063bf2..97685f64b 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveStateMachineExecutor.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveStateMachineExecutor.java @@ -93,11 +93,12 @@ public class ReactiveStateMachineExecutor extends LifecycleObjectSupport i private Many triggerSink; private Flux triggerFlux; private Disposable triggerDisposable; + private final boolean executeActionsInSyncEnabled; public ReactiveStateMachineExecutor(StateMachine stateMachine, StateMachine relayStateMachine, Collection> transitions, Map, Transition> triggerToTransitionMap, List> triggerlessTransitions, Transition initialTransition, Message initialEvent, - TransitionConflictPolicy transitionConflictPolicy) { + TransitionConflictPolicy transitionConflictPolicy, boolean executeActionsInSyncEnabled) { this.stateMachine = stateMachine; this.relayStateMachine = relayStateMachine; this.triggerToTransitionMap = triggerToTransitionMap; @@ -109,6 +110,7 @@ public ReactiveStateMachineExecutor(StateMachine stateMachine, StateMachin this.transitionConflictPolicy = transitionConflictPolicy; // anonymous transitions are fixed, sort those now this.triggerlessTransitions.sort(transitionComparator); + this.executeActionsInSyncEnabled = executeActionsInSyncEnabled; registerTriggerListener(); } @@ -203,11 +205,12 @@ public Mono queueEvent(Mono> message, StateMachineExecutorCallb return messages .flatMap(m -> handleEvent(m, callback, triggerCallback)) - .flatMap(tqi -> Mono.fromRunnable(() -> { - triggerSink.emitNext(tqi, EmitFailureHandler.FAIL_FAST); - }) - .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(10)))) - .then() + .flatMap(tqi -> executeActionsInSyncEnabled ? handleTrigger(tqi) : Mono.fromRunnable(() -> { + triggerSink.emitNext(tqi, EmitFailureHandler.FAIL_FAST); + }) + .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(10))) + ) + .then() .and(triggerCallbackSink); } @@ -457,12 +460,12 @@ public void triggered() { log.debug("TimedTrigger triggered " + trigger); } Mono.just(new TriggerQueueItem(trigger, null, null, null)) - .flatMap(tqi -> Mono.fromCallable(() -> { - triggerSink.emitNext(tqi, EmitFailureHandler.FAIL_FAST); - return null; - }) - .retryWhen(Retry.fixedDelay(10, Duration.ofNanos(10)))) - .subscribe(); + .flatMap(tqi -> executeActionsInSyncEnabled ? handleTrigger(tqi) : Mono.fromRunnable(() -> { + triggerSink.emitNext(tqi, EmitFailureHandler.FAIL_FAST); + }) + .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(10))) + ) + .subscribe(); } }); }