Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flag to execute actions in sync way while accepting event #1021

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected StateMachine<S, E> buildStateMachineInternal(Collection<State<S, E>> s
extendedState, uuid);
machine.setId(machineId);
machine.setHistoryState(historyState);
machine.setTransitionConflightPolicy(stateMachineModel.getConfigurationData().getTransitionConflictPolicy());
machine.setTransitionConflictPolicy(stateMachineModel.getConfigurationData().getTransitionConflictPolicy());
if (contextEventsEnabled != null) {
machine.setContextEventsEnabled(contextEventsEnabled);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class StateMachineConfigurationBuilder<S, E>
private StateMachineMonitor<S, E> stateMachineMonitor;
private final List<StateMachineInterceptor<S, E>> interceptors = new ArrayList<StateMachineInterceptor<S, E>>();
private StateMachineRuntimePersister<S, E, ?> persister;
private boolean executeActionsInSyncEnabled;

/**
* Instantiates a new state machine configuration builder.
Expand Down Expand Up @@ -149,7 +150,9 @@ protected ConfigurationData<S, E> performBuild() throws Exception {
return new ConfigurationData<S, E>(beanFactory, autoStart, ensemble, listeners, securityEnabled,
transitionSecurityAccessDecisionManager, eventSecurityAccessDecisionManager, eventSecurityRule,
transitionSecurityRule, verifierEnabled, verifier, machineId, stateMachineMonitor, interceptorsCopy,
transitionConflictPolicy, stateDoActionPolicy, stateDoActionPolicyTimeout, regionExecutionPolicy);
transitionConflictPolicy, stateDoActionPolicy, stateDoActionPolicyTimeout, regionExecutionPolicy,
executeActionsInSyncEnabled);

}

/**
Expand Down Expand Up @@ -307,4 +310,12 @@ public void setStateDoActionPolicy(StateDoActionPolicy stateDoActionPolicy, Long
public void setRegionExecutionPolicy(RegionExecutionPolicy regionExecutionPolicy) {
this.regionExecutionPolicy = regionExecutionPolicy;
}

/**
* sets execute actions in sync flag
* @param executeActionsInSyncEnabled
*/
public void setExecuteActionsInSyncEnabled(boolean executeActionsInSyncEnabled) {
this.executeActionsInSyncEnabled = executeActionsInSyncEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,15 @@ public interface ConfigurationConfigurer<S, E> extends
* @return the configuration configurer
*/
ConfigurationConfigurer<S, E> regionExecutionPolicy(RegionExecutionPolicy regionExecutionPolicy);


/**
* Specify if state machine should execute all transition actions, and state entry, exit actions
* in sync while accepting event.
* any action execution failure can prevent acceptance of the event
*
* @param executeActionsInSyncEnabled the autoStartup flag
* @return configurer for chaining
*/
ConfigurationConfigurer<S, E> executeActionsInSyncEnabled(boolean executeActionsInSyncEnabled);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class DefaultConfigurationConfigurer<S, E>
private Long stateDoActionPolicyTimeout;
private RegionExecutionPolicy regionExecutionPolicy;
private final List<StateMachineListener<S, E>> listeners = new ArrayList<StateMachineListener<S, E>>();
private boolean executeActionsInSyncEnabled = false;

@Override
public void configure(StateMachineConfigurationBuilder<S, E> builder) throws Exception {
Expand All @@ -59,6 +60,7 @@ public void configure(StateMachineConfigurationBuilder<S, E> builder) throws Exc
builder.setTransitionConflictPolicy(transitionConflightPolicy);
builder.setStateDoActionPolicy(stateDoActionPolicy, stateDoActionPolicyTimeout);
builder.setRegionExecutionPolicy(regionExecutionPolicy);
builder.setExecuteActionsInSyncEnabled(executeActionsInSyncEnabled);
}

@Override
Expand Down Expand Up @@ -108,4 +110,11 @@ public ConfigurationConfigurer<S, E> regionExecutionPolicy(RegionExecutionPolicy
this.regionExecutionPolicy = regionExecutionPolicy;
return this;
}


@Override
public ConfigurationConfigurer<S, E> executeActionsInSyncEnabled(boolean executeActionsInSync) {
this.executeActionsInSyncEnabled = executeActionsInSync;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class ConfigurationData<S, E> {
private final StateMachineMonitor<S, E> stateMachineMonitor;
private final List<StateMachineInterceptor<S, E>> interceptors;
private final RegionExecutionPolicy regionExecutionPolicy;
private final boolean executeActionsInSyncEnabled;

/**
* Instantiates a new state machine configuration config data.
Expand Down Expand Up @@ -96,7 +97,7 @@ public ConfigurationData(BeanFactory beanFactory, boolean autoStart, StateMachin
List<StateMachineInterceptor<S, E>> interceptors) {
this(beanFactory, autoStart, ensemble, listeners, securityEnabled, transitionSecurityAccessDecisionManager,
eventSecurityAccessDecisionManager, eventSecurityRule, transitionSecurityRule, verifierEnabled,
verifier, machineId, stateMachineMonitor, interceptors, null, null, null, null);
verifier, machineId, stateMachineMonitor, interceptors, null, null, null, null, false);
}

/**
Expand All @@ -120,6 +121,7 @@ public ConfigurationData(BeanFactory beanFactory, boolean autoStart, StateMachin
* @param stateDoActionPolicy the state do action policy
* @param stateDoActionPolicyTimeout the state do action policy timeout
* @param regionExecutionPolicy the region execution policy
* @param executeActionsInSyncEnabled the execute actions in sync enabled flag
*/
public ConfigurationData(BeanFactory beanFactory, boolean autoStart, StateMachineEnsemble<S, E> ensemble,
List<StateMachineListener<S, E>> listeners, boolean securityEnabled,
Expand All @@ -129,7 +131,7 @@ public ConfigurationData(BeanFactory beanFactory, boolean autoStart, StateMachin
String machineId, StateMachineMonitor<S, E> stateMachineMonitor,
List<StateMachineInterceptor<S, E>> interceptors, TransitionConflictPolicy transitionConflightPolicy,
StateDoActionPolicy stateDoActionPolicy, Long stateDoActionPolicyTimeout,
RegionExecutionPolicy regionExecutionPolicy) {
RegionExecutionPolicy regionExecutionPolicy,boolean executeActionsInSyncEnabled) {
this.beanFactory = beanFactory;
this.autoStart = autoStart;
this.ensemble = ensemble;
Expand All @@ -148,6 +150,7 @@ public ConfigurationData(BeanFactory beanFactory, boolean autoStart, StateMachin
this.stateDoActionPolicy = stateDoActionPolicy;
this.stateDoActionPolicyTimeout = stateDoActionPolicyTimeout;
this.regionExecutionPolicy = regionExecutionPolicy;
this.executeActionsInSyncEnabled = executeActionsInSyncEnabled;
}

public String getMachineId() {
Expand Down Expand Up @@ -306,4 +309,14 @@ public Long getStateDoActionPolicyTimeout() {
public RegionExecutionPolicy getRegionExecutionPolicy() {
return regionExecutionPolicy;
}


/**
* Checks if execute actions in sync is enabled.
*
* @return true, if execute actions in sync is enabled
*/
public boolean isExecuteActionsInSyncEnabled() {
return executeActionsInSyncEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public abstract class AbstractStateMachine<S, E> extends StateMachineObjectSuppo

private StateMachine<S, E> parentMachine;

private boolean executeActionsInSyncEnabled;

/**
* Instantiates a new abstract state machine.
*
Expand Down Expand Up @@ -310,7 +312,7 @@ public Mono<Void> doOnComplete(StateContext<S, E> context) {
}

ReactiveStateMachineExecutor<S, E> executor = new ReactiveStateMachineExecutor<S, E>(this, getRelayStateMachine(), transitions,
triggerToTransitionMap, triggerlessTransitions, initialTransition, initialEvent, transitionConflictPolicy);
triggerToTransitionMap, triggerlessTransitions, initialTransition, initialEvent, transitionConflictPolicy, executeActionsInSyncEnabled);
if (getBeanFactory() != null) {
executor.setBeanFactory(getBeanFactory());
}
Expand Down Expand Up @@ -614,7 +616,7 @@ public void setForwardedInitialEvent(Message<E> message) {
*
* @param transitionConflictPolicy the new transition conflict policy
*/
public void setTransitionConflightPolicy(TransitionConflictPolicy transitionConflictPolicy) {
public void setTransitionConflictPolicy(TransitionConflictPolicy transitionConflictPolicy) {
this.transitionConflictPolicy = transitionConflictPolicy;
}

Expand Down Expand Up @@ -1484,4 +1486,14 @@ private static <S, E> boolean isDirectSubstate(State<S, E> left, State<S, E> rig
return false;
}
}

/**
* Sets execute actions in sync flag.
* if enabled statemachine will execute all actions reactive non-reactive in sync while accepting an event,
* If any action results in error statemachine will error out
* @param executeActionsInSyncEnabled
*/
public void setExecuteActionsInSync(boolean executeActionsInSyncEnabled) {
this.executeActionsInSyncEnabled = executeActionsInSyncEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,12 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
private Many<TriggerQueueItem> triggerSink;
private Flux<Void> triggerFlux;
private Disposable triggerDisposable;
private final boolean executeActionsInSyncEnabled;

public ReactiveStateMachineExecutor(StateMachine<S, E> stateMachine, StateMachine<S, E> relayStateMachine,
Collection<Transition<S, E>> transitions, Map<Trigger<S, E>, Transition<S, E>> triggerToTransitionMap,
List<Transition<S, E>> triggerlessTransitions, Transition<S, E> initialTransition, Message<E> initialEvent,
TransitionConflictPolicy transitionConflictPolicy) {
TransitionConflictPolicy transitionConflictPolicy, boolean executeActionsInSyncEnabled) {
this.stateMachine = stateMachine;
this.relayStateMachine = relayStateMachine;
this.triggerToTransitionMap = triggerToTransitionMap;
Expand All @@ -109,6 +110,7 @@ public ReactiveStateMachineExecutor(StateMachine<S, E> stateMachine, StateMachin
this.transitionConflictPolicy = transitionConflictPolicy;
// anonymous transitions are fixed, sort those now
this.triggerlessTransitions.sort(transitionComparator);
this.executeActionsInSyncEnabled = executeActionsInSyncEnabled;
registerTriggerListener();
}

Expand Down Expand Up @@ -203,11 +205,12 @@ public Mono<Void> queueEvent(Mono<Message<E>> message, StateMachineExecutorCallb

return messages
.flatMap(m -> handleEvent(m, callback, triggerCallback))
.flatMap(tqi -> Mono.fromRunnable(() -> {
triggerSink.emitNext(tqi, EmitFailureHandler.FAIL_FAST);
})
.retryWhen(Retry.fixedDelay(10, Duration.ofMillis(10))))
.then()
.flatMap(tqi -> executeActionsInSyncEnabled ? handleTrigger(tqi) : Mono.fromRunnable(() -> {
triggerSink.emitNext(tqi, EmitFailureHandler.FAIL_FAST);
})
.retryWhen(Retry.fixedDelay(10, Duration.ofMillis(10)))
)
.then()
.and(triggerCallbackSink);
}

Expand Down Expand Up @@ -457,12 +460,12 @@ public void triggered() {
log.debug("TimedTrigger triggered " + trigger);
}
Mono.just(new TriggerQueueItem(trigger, null, null, null))
.flatMap(tqi -> Mono.fromCallable(() -> {
triggerSink.emitNext(tqi, EmitFailureHandler.FAIL_FAST);
return null;
})
.retryWhen(Retry.fixedDelay(10, Duration.ofNanos(10))))
.subscribe();
.flatMap(tqi -> executeActionsInSyncEnabled ? handleTrigger(tqi) : Mono.fromRunnable(() -> {
triggerSink.emitNext(tqi, EmitFailureHandler.FAIL_FAST);
})
.retryWhen(Retry.fixedDelay(10, Duration.ofMillis(10)))
)
.subscribe();
}
});
}
Expand Down