Skip to content

Commit

Permalink
GH-6380: Support Map as input in BeanPropertySqlParameterSourceFactory
Browse files Browse the repository at this point in the history
Fixes: #6380

The `MapSqlParameterSource` is much faster, then reflection or SpEL,
so, it would be great to have such an interaction when we evaluate values for SQL queries

* Enhance `BeanPropertySqlParameterSourceFactory` to use `MapSqlParameterSource` if `input` is a `Map`
* Expose `JdbcMessageHandler.usePayloadAsParameterSource` for convenience with `SqlParameterSourceFactory`,
especially when the payload is a map
  • Loading branch information
artembilan committed Feb 27, 2025
1 parent 96ddc01 commit 3ad8828
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,17 +16,17 @@

package org.springframework.integration.jdbc;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.springframework.jdbc.core.namedparam.AbstractSqlParameterSource;
import org.springframework.jdbc.core.namedparam.BeanPropertySqlParameterSource;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;

/**
* A default implementation of {@link SqlParameterSourceFactory} which creates an {@link SqlParameterSource} to
* reference bean properties in its input.
* reference bean properties (or map keys) in its input.
*
* @author Dave Syer
* @author Gary Russell
Expand All @@ -36,16 +36,15 @@
*/
public class BeanPropertySqlParameterSourceFactory implements SqlParameterSourceFactory {

private Map<String, Object> staticParameters = Collections.unmodifiableMap(new HashMap<>());
private final Map<String, Object> staticParameters = new HashMap<>();

/**
* If the input is a List or a Map, the output is a map parameter source, and in that case some static parameters
* can be added (default is empty). If the input is not a List or a Map then this value is ignored.
*
* @param staticParameters the static parameters to set
*/
public void setStaticParameters(Map<String, Object> staticParameters) {
this.staticParameters = staticParameters;
this.staticParameters.putAll(staticParameters);
}

@Override
Expand All @@ -55,19 +54,25 @@ public SqlParameterSource createParameterSource(Object input) {

private static final class StaticBeanPropertySqlParameterSource extends AbstractSqlParameterSource {

private final BeanPropertySqlParameterSource input;
private final SqlParameterSource input;

private final Map<String, Object> staticParameters;

@SuppressWarnings("unchecked")
StaticBeanPropertySqlParameterSource(Object input, Map<String, Object> staticParameters) {
this.input = new BeanPropertySqlParameterSource(input);
this.input =
input instanceof Map
? new MapSqlParameterSource((Map<String, Object>) input)
: new BeanPropertySqlParameterSource(input);

this.staticParameters = staticParameters;
}

@Override
public Object getValue(String paramName) throws IllegalArgumentException {
return this.staticParameters.containsKey(paramName) ? this.staticParameters.get(paramName) : this.input
.getValue(paramName);
return this.staticParameters.containsKey(paramName)
? this.staticParameters.get(paramName)
: this.input.getValue(paramName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,7 +70,7 @@
* {@link JdbcOperations#batchUpdate(String, BatchPreparedStatementSetter)} function.
* <p>
* NOTE: The batch update is not supported when {@link #keysGenerated} is in use.
*
* <p>
* N.B. do not use quotes to escape the header keys. The default SQL parameter source (from Spring JDBC) can also handle
* headers with dotted names (e.g. <code>business.id</code>)
*
Expand All @@ -97,6 +97,8 @@ public class JdbcMessageHandler extends AbstractMessageHandler {

private MessagePreparedStatementSetter preparedStatementSetter;

private boolean usePayloadAsParameterSource;

/**
* Constructor taking {@link DataSource} from which the DB Connection can be obtained and the select query to
* execute to retrieve new rows.
Expand Down Expand Up @@ -137,6 +139,18 @@ public void setSqlParameterSourceFactory(SqlParameterSourceFactory sqlParameterS
this.sqlParameterSourceFactory = sqlParameterSourceFactory;
}

/**
* If set to 'true', the payload of the Message will be used as a source for
* providing parameters. If false the entire {@link Message} will be available
* as a source for parameters.
* Makes sense only if {@link #setPreparedStatementSetter(MessagePreparedStatementSetter)} is not provided.
* @param usePayloadAsParameterSource false for the entire {@link Message} as parameter source.
* @since 6.5
*/
public void setUsePayloadAsParameterSource(boolean usePayloadAsParameterSource) {
this.usePayloadAsParameterSource = usePayloadAsParameterSource;
}

/**
* Specify a {@link MessagePreparedStatementSetter} to populate parameters on the
* {@link PreparedStatement} with the {@link Message} context.
Expand Down Expand Up @@ -210,21 +224,23 @@ protected List<? extends Map<String, Object>> executeUpdateQuery(final Message<?
}
else {
KeyHolder keyHolder = new GeneratedKeyHolder();
Object parameterSource = this.usePayloadAsParameterSource ? message.getPayload() : message;
this.jdbcOperations.update(this.updateSql,
this.sqlParameterSourceFactory.createParameterSource(message), keyHolder);
this.sqlParameterSourceFactory.createParameterSource(parameterSource), keyHolder);
return keyHolder.getKeyList();
}
}
else {
if (message.getPayload() instanceof Iterable) {
Stream<? extends Message<?>> messageStream =
StreamSupport.stream(((Iterable<?>) message.getPayload()).spliterator(), false)
.map(payload -> payloadToMessage(payload, message.getHeaders()));
if (message.getPayload() instanceof Iterable<?> iterable) {
Stream<?> payloadStream = StreamSupport.stream(iterable.spliterator(), false);

int[] updates;

if (this.preparedStatementSetter != null) {
Message<?>[] messages = messageStream.toArray(Message<?>[]::new);
Message<?>[] messages =
payloadStream
.map(payload -> payloadToMessage(payload, message.getHeaders()))
.toArray(Message<?>[]::new);

updates = this.jdbcOperations.getJdbcOperations()
.batchUpdate(this.updateSql, new BatchPreparedStatementSetter() {
Expand All @@ -243,7 +259,12 @@ public int getBatchSize() {
}
else {
SqlParameterSource[] sqlParameterSources =
messageStream.map(this.sqlParameterSourceFactory::createParameterSource)
payloadStream
.map((payload) ->
this.usePayloadAsParameterSource
? payload :
payloadToMessage(payload, message.getHeaders()))
.map(this.sqlParameterSourceFactory::createParameterSource)
.toArray(SqlParameterSource[]::new);

updates = this.jdbcOperations.batchUpdate(this.updateSql, sqlParameterSources);
Expand All @@ -265,8 +286,9 @@ public int getBatchSize() {
.update(this.updateSql, ps -> this.preparedStatementSetter.setValues(ps, message));
}
else {
Object parameterSource = this.usePayloadAsParameterSource ? message.getPayload() : message;
updated = this.jdbcOperations.update(this.updateSql,
this.sqlParameterSourceFactory.createParameterSource(message));
this.sqlParameterSourceFactory.createParameterSource(parameterSource));
}

LinkedCaseInsensitiveMap<Object> map = new LinkedCaseInsensitiveMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,15 +30,12 @@
/**
* @author Dave Syer
* @author Artem Bilan
*
* @since 2.0
*
*/
public class JdbcMessageHandlerParser extends AbstractOutboundChannelAdapterParser {

protected boolean shouldGenerateId() {
return false;
}

protected boolean shouldGenerateIdAsFallback() {
return true;
}
Expand Down Expand Up @@ -72,6 +69,8 @@ protected AbstractBeanDefinition parseConsumer(Element element, ParserContext pa
builder.addConstructorArgReference(jdbcOperationsRef);
}
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "sql-parameter-source-factory");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "payload-as-parameter-source",
"usePayloadAsParameterSource");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "prepared-statement-setter");
builder.addConstructorArgValue(query);
return builder.getBeanDefinition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,16 @@
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="payload-as-parameter-source" default="false">
<xsd:annotation>
<xsd:documentation>
Whether to use only payload from request message as parameter source for 'sql-parameter-source-factory'.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:union memberTypes="xsd:boolean xsd:string"/>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="prepared-statement-setter" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
xsi:schemaLocation="http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
xsi:schemaLocation="http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jdbc https://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">

Expand All @@ -12,48 +12,45 @@
<int:poller id="defaultPoller" default="true" fixed-rate="5000"/>

<int:gateway id="startGateway" default-request-channel="startChannel"
service-interface="org.springframework.integration.jdbc.storedproc.CreateUser" />
service-interface="org.springframework.integration.jdbc.storedproc.CreateUser"/>

<int:channel id="startChannel"/>

<int-jdbc:stored-proc-outbound-gateway request-channel="startChannel"
stored-procedure-name="CREATE_USER_RETURN_ALL" data-source="dataSource"
auto-startup="true"
id="gateway"
ignore-column-meta-data="false"
is-function="false"
expect-single-result="true"
reply-channel="outputChannel">
stored-procedure-name="CREATE_USER_RETURN_ALL" data-source="dataSource"
auto-startup="true"
id="gateway"
expect-single-result="true"
reply-channel="outputChannel">
<int-jdbc:parameter name="username" expression="payload.username"/>
<int-jdbc:parameter name="password" expression="payload.password"/>
<int-jdbc:parameter name="email" expression="@testService.quote(payload.email)"/>
<int-jdbc:returning-resultset name="out" row-mapper="org.springframework.integration.jdbc.storedproc.UserMapper" />
<int-jdbc:parameter name="email" expression="@testService.quote(payload.email)"/>
<int-jdbc:returning-resultset name="out"
row-mapper="org.springframework.integration.jdbc.storedproc.UserMapper"/>
</int-jdbc:stored-proc-outbound-gateway>


<int:channel id="outputChannel"/>

<int:service-activator id="consumerEndpoint" input-channel="outputChannel" ref="consumer" />
<bean id="consumer" class="org.springframework.integration.jdbc.StoredProcOutboundGatewayWithNamespaceIntegrationTests$Consumer"/>
<int:service-activator id="consumerEndpoint" input-channel="outputChannel" ref="consumer"/>
<bean id="consumer"
class="org.springframework.integration.jdbc.StoredProcOutboundGatewayWithNamespaceIntegrationTests$Consumer"/>

<int:logging-channel-adapter channel="errorChannel" log-full-message="true"/>

<int:chain input-channel="storedProcOutboundGatewayInsideChain" output-channel="replyChannel">
<int-jdbc:stored-proc-outbound-gateway stored-procedure-name="CREATE_USER_RETURN_ALL" data-source="dataSource"
ignore-column-meta-data="false"
is-function="false"
expect-single-result="true">
<int-jdbc:parameter name="username" expression="payload.username"/>
<int-jdbc:parameter name="password" expression="payload.password"/>
<int-jdbc:parameter name="email" expression="payload.email"/>
<int-jdbc:returning-resultset name="out" row-mapper="org.springframework.integration.jdbc.storedproc.UserMapper" />
<int-jdbc:returning-resultset name="out"
row-mapper="org.springframework.integration.jdbc.storedproc.UserMapper"/>
</int-jdbc:stored-proc-outbound-gateway>
</int:chain>

<int:channel id="replyChannel">
<int:queue/>
</int:channel>

<bean id="testService" class="org.springframework.integration.jdbc.StoredProcOutboundGatewayWithNamespaceIntegrationTests$TestService"/>
<bean id="testService"
class="org.springframework.integration.jdbc.StoredProcOutboundGatewayWithNamespaceIntegrationTests$TestService"/>

</beans>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -97,7 +98,11 @@ public void test() throws Exception {

@Test
public void testStoredProcOutboundGatewayInsideChain() {
Message<User> requestMessage = new GenericMessage<>(new User("myUsername", "myPassword", "myEmail"));
Message<Map<String, String>> requestMessage =
new GenericMessage<>(
Map.of("username", "myUsername",
"password", "myPassword",
"email", "myEmail"));

storedProcOutboundGatewayInsideChain.send(requestMessage);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import javax.sql.DataSource;

Expand Down Expand Up @@ -85,10 +86,11 @@ public void testDollarHeaderOutboundChannelAdapter() {
public void testMapPayloadOutboundChannelAdapter() {
setUp("handlingMapPayloadJdbcOutboundChannelAdapterTest.xml", getClass());
assertThat(context.containsBean("jdbcAdapter")).isTrue();
Message<?> message = MessageBuilder.withPayload(Collections.singletonMap("foo", "bar")).build();
UUID testId = UUID.randomUUID();
Message<?> message = MessageBuilder.withPayload(Map.of("id", testId, "foo", "bar")).build();
channel.send(message);
Map<String, Object> map = this.jdbcTemplate.queryForMap("SELECT * from FOOS");
assertThat(map.get("ID")).as("Wrong id").isEqualTo(message.getHeaders().getId().toString());
assertThat(map.get("ID")).as("Wrong id").isEqualTo(testId.toString());
assertThat(map.get("name")).as("Wrong name").isEqualTo("bar");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration/jdbc"
xmlns:beans="http://www.springframework.org/schema/beans" xmlns:si="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jdbc https://www.springframework.org/schema/jdbc/spring-jdbc.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jdbc
https://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd">

<outbound-channel-adapter id="jdbcAdapter" query="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])"
channel="target" data-source="dataSource"/>
<outbound-channel-adapter id="jdbcAdapter" query="insert into foos (id, status, name) values (:id, 0, :foo)"
payload-as-parameter-source="true"
channel="target" data-source="dataSource"/>

<beans:import resource="jdbcOutboundChannelAdapterCommonConfig.xml" />
<beans:import resource="jdbcOutboundChannelAdapterCommonConfig.xml"/>

</beans:beans>
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ You can inject a different `SqlParameterSourceFactory` to get different behavior

The outbound adapter requires a reference to either a `DataSource` or a `JdbcTemplate`.
You can also inject a `SqlParameterSourceFactory` to control the binding of each incoming message to a query.
To make use of `SqlParameterSourceFactory` (especially default `BeanPropertySqlParameterSourceFactory` with its `MapSqlParameterSource`) more smooth, starting with version 6.5, the `JdbcMessageHandler` exposes a `usePayloadAsParameterSource` flag to indicate whether the whole message should be passed as parameter source input.

If the input channel is a direct channel, the outbound adapter runs its query in the same thread and, therefore, the same transaction (if there is one) as the sender of the message.

Expand Down
Loading

0 comments on commit 3ad8828

Please sign in to comment.