Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3067: Draft of mapping multiple headers with same key #3101

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.assertj.core.util.Streams;

import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
Expand All @@ -48,12 +51,14 @@
* @author Gary Russell
* @author Artem Bilan
* @author Soby Chacko
* @author Grzegorz Poznachowski
*
* @since 1.3
*
*/
public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {

private static final String ITERABLE_HEADER_TYPE_PATTERN = "%s#%s";

private static final String JAVA_LANG_STRING = "java.lang.String";

private static final Set<String> TRUSTED_ARRAY_TYPES = Set.of(
Expand Down Expand Up @@ -96,6 +101,7 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
* {@link KafkaHeaders} are never mapped as headers since they represent data in
* consumer/producer records.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, consider to not reformat Javadocs: no empty lines for methods.

* @see #DefaultKafkaHeaderMapper(ObjectMapper)
*/
public DefaultKafkaHeaderMapper() {
Expand All @@ -110,6 +116,7 @@ public DefaultKafkaHeaderMapper() {
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
* {@link KafkaHeaders} are never mapped as headers since they represent data in
* consumer/producer records.
*
* @param objectMapper the object mapper.
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
Expand All @@ -128,6 +135,7 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) {
* generally should not map the {@code "id" and "timestamp"} headers. Note:
* most of the headers in {@link KafkaHeaders} are ever mapped as headers since they
* represent data in consumer/producer records.
*
* @param patterns the patterns.
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
Expand All @@ -143,8 +151,9 @@ public DefaultKafkaHeaderMapper(String... patterns) {
* you generally should not map the {@code "id" and "timestamp"} headers. Note: most
* of the headers in {@link KafkaHeaders} are never mapped as headers since they
* represent data in consumer/producer records.
*
* @param objectMapper the object mapper.
* @param patterns the patterns.
* @param patterns the patterns.
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) {
Expand All @@ -160,6 +169,7 @@ private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, St

/**
* Create an instance for inbound mapping only with pattern matching.
*
* @param patterns the patterns to match.
* @return the header mapper.
* @since 2.8.8
Expand All @@ -170,8 +180,9 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patt

/**
* Create an instance for inbound mapping only with pattern matching.
*
* @param objectMapper the object mapper.
* @param patterns the patterns to match.
* @param patterns the patterns to match.
* @return the header mapper.
* @since 2.8.8
*/
Expand All @@ -181,6 +192,7 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper o

/**
* Return the object mapper.
*
* @return the mapper.
*/
protected ObjectMapper getObjectMapper() {
Expand All @@ -189,6 +201,7 @@ protected ObjectMapper getObjectMapper() {

/**
* Provide direct access to the trusted packages set for subclasses.
*
* @return the trusted packages.
* @since 2.2
*/
Expand All @@ -198,6 +211,7 @@ protected Set<String> getTrustedPackages() {

/**
* Provide direct access to the toString() classes by subclasses.
*
* @return the toString() classes.
* @since 2.2
*/
Expand All @@ -214,6 +228,7 @@ protected boolean isEncodeStrings() {
* raw String value is converted to a byte array using the configured charset. Set to
* true if a consumer of the outbound record is using Spring for Apache Kafka version
* less than 2.3
*
* @param encodeStrings true to encode (default false).
* @since 2.3
*/
Expand All @@ -234,6 +249,7 @@ public void setEncodeStrings(boolean encodeStrings) {
* If any of the supplied packages is {@code "*"}, all packages are trusted.
* If a class for a non-trusted package is encountered, the header is returned to the
* application with value of type {@link NonTrustedHeaderType}.
*
* @param packagesToTrust the packages to trust.
*/
public void addTrustedPackages(String... packagesToTrust) {
Expand All @@ -253,6 +269,7 @@ public void addTrustedPackages(String... packagesToTrust) {
/**
* Add class names that the outbound mapper should perform toString() operations on
* before mapping.
*
* @param classNames the class names.
* @since 2.2
*/
Expand All @@ -264,32 +281,15 @@ public void addToStringClasses(String... classNames) {
public void fromHeaders(MessageHeaders headers, Headers target) {
final Map<String, String> jsonHeaders = new HashMap<>();
final ObjectMapper headerObjectMapper = getObjectMapper();
headers.forEach((key, rawValue) -> {
if (matches(key, rawValue)) {
Object valueToAdd = headerValueToAddOut(key, rawValue);
if (valueToAdd instanceof byte[]) {
target.add(new RecordHeader(key, (byte[]) valueToAdd));
headers.forEach((key, value) -> {
if (matches(key, value)) {
if (value instanceof List<?> values) {
for (int i = 0; i < values.size(); i++) {
resolveHeader(key, values.get(i), target, jsonHeaders, i);
}
}
else {
try {
String className = valueToAdd.getClass().getName();
boolean encodeToJson = this.encodeStrings;
if (this.toStringClasses.contains(className)) {
valueToAdd = valueToAdd.toString();
className = JAVA_LANG_STRING;
encodeToJson = true;
}
if (!encodeToJson && valueToAdd instanceof String) {
target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset())));
}
else {
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd)));
}
jsonHeaders.put(key, className);
}
catch (Exception e) {
logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName());
}
resolveHeader(key, value, target, jsonHeaders, null);
}
}
});
Expand All @@ -303,34 +303,84 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
}
}

@Override
public void toHeaders(Headers source, final Map<String, Object> headers) {
final Map<String, String> jsonTypes = decodeJsonTypes(source);
source.forEach(header -> {
String headerName = header.key();
if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT) && matchesForInbound(headerName)) {
headers.put(headerName, ByteBuffer.wrap(header.value()).getInt());
}
else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) {
headers.put(headerName, new String(header.value(), getCharset()));
}
else if (headerName.equals(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER) ||
headerName.equals(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)) {
headers.put(headerName, header);
}
else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
if (jsonTypes.containsKey(headerName)) {
String requestedType = jsonTypes.get(headerName);
populateJsonValueHeader(header, requestedType, headers);
private void resolveHeader(String headerName, Object value, Headers target, Map<String, String> jsonHeaders, Integer headerIndex) {
Object valueToAdd = headerValueToAddOut(headerName, value);
if (valueToAdd instanceof byte[] byteArray) {
target.add(new RecordHeader(headerName, byteArray));
}
else {
try {
String className = valueToAdd.getClass().getName();
boolean encodeToJson = this.encodeStrings;
if (this.toStringClasses.contains(className)) {
valueToAdd = valueToAdd.toString();
className = JAVA_LANG_STRING;
encodeToJson = true;
}
if (!encodeToJson && valueToAdd instanceof String stringValue) {
target.add(new RecordHeader(headerName, stringValue.getBytes(getCharset())));
}
else {
headers.put(headerName, headerValueToAddIn(header));
target.add(new RecordHeader(headerName, this.objectMapper.writeValueAsBytes(valueToAdd)));
}
jsonHeaders.put(headerIndex == null ?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you are doing here, but I think all the framework headers must be as a single value Kafka headers anyway. There are a lot of logic in the framework which relies on them.
We just must agree that we have to make a fix only for end-user headers which can be iterable.
Even if that JSON headers looks like a collection, it is really something what the framework rely on as as single header.
So, I don't expect too drastic changes in this area.
Thanks.

headerName :
ITERABLE_HEADER_TYPE_PATTERN.formatted(headerName, headerIndex), className);
}
});
catch (Exception e) {
logger.error(e, () -> "Could not map " + headerName + " with type " + value.getClass().getName());
}
}
}

@Override
public void toHeaders(Headers source, final Map<String, Object> target) {
final Map<String, String> jsonTypes = decodeJsonTypes(source);

Streams.stream(source)
.collect(Collectors.groupingBy(Header::key))
.forEach((headerName, headers) -> {
Header lastHeader = headers.get(headers.size() - 1);
if (headerName.equals(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER) ||
headerName.equals(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)) {
target.put(headerName, lastHeader);
}
else if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT) && matchesForInbound(headerName)) {
target.put(headerName, ByteBuffer.wrap(lastHeader.value()).getInt());
}
else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) {
target.put(headerName, new String(lastHeader.value(), getCharset()));
}
else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
if (headers.size() == 1) {
if (jsonTypes.containsKey(headerName)) {
String requestedType = jsonTypes.get(headerName);
target.put(headerName, resolveJsonValueHeader(headers.get(0), requestedType));
}
else {
target.put(headerName, headerValueToAddIn(headers.get(0)));
}
}
else {
List<Object> valueList = new ArrayList<>();
for (int i = 0; i < headers.size(); i++) {
var jsonTypeIterableHeader = ITERABLE_HEADER_TYPE_PATTERN.formatted(headerName, i);
if (jsonTypes.containsKey(jsonTypeIterableHeader)) {
String requestedType = jsonTypes.get(jsonTypeIterableHeader);
valueList.add(resolveJsonValueHeader(headers.get(i), requestedType));
}
else {
valueList.add(headerValueToAddIn(headers.get(i)));
}
}
Collections.reverse(valueList);
target.put(headerName, valueList);
}
}
});
}

private void populateJsonValueHeader(Header header, String requestedType, Map<String, Object> headers) {
private Object resolveJsonValueHeader(Header header, String requestedType) {
Class<?> type = Object.class;
boolean trusted = false;
try {
Expand All @@ -343,22 +393,21 @@ private void populateJsonValueHeader(Header header, String requestedType, Map<St
logger.error(e, () -> "Could not load class for header: " + header.key());
}
if (String.class.equals(type) && (header.value().length == 0 || header.value()[0] != '"')) {
headers.put(header.key(), new String(header.value(), getCharset()));
return new String(header.value(), getCharset());
}
else {
if (trusted) {
try {
Object value = decodeValue(header, type);
headers.put(header.key(), value);
return decodeValue(header, type);
}
catch (IOException e) {
logger.error(e, () ->
"Could not decode json type: " + requestedType + " for key: " + header.key());
headers.put(header.key(), header.value());
return header.value();
}
}
else {
headers.put(header.key(), new NonTrustedHeaderType(header.value(), requestedType));
return new NonTrustedHeaderType(header.value(), requestedType);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.kafka.support;

import java.nio.ByteBuffer;
import java.util.List;

import org.springframework.kafka.retrytopic.RetryTopicHeaders;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -64,10 +65,27 @@ public int getNonBlockingRetryDeliveryAttempt() {
}

private int fromBytes(String headerName) {
byte[] header = getHeader(headerName, byte[].class);
byte[] header = getFirstHeaderIfIterable(headerName, byte[].class);
return header == null ? 1 : ByteBuffer.wrap(header).getInt();
}

@SuppressWarnings("unchecked")
@Nullable
public <T> T getFirstHeaderIfIterable(String key, Class<T> type) {
Object value = getHeader(key);
if (value == null) {
return null;
}
if (value instanceof List<?> iterable) {
value = iterable.get(0);
}
if (!type.isAssignableFrom(value.getClass())) {
throw new IllegalArgumentException("Incorrect type specified for header '" + key + "'. Expected [" + type
+ "] but actual type is [" + value.getClass() + "]");
}
return (T) value;
}

/**
* Get a header value with a specific type.
* @param <T> the type.
Expand Down
Loading