Skip to content

Commit 46a191c

Browse files
authored
Merge branch 'main' into dapr-state-store-clickhouse
2 parents 8547dd7 + 5f17025 commit 46a191c

23 files changed

+527
-118
lines changed

bindings/aws/s3/s3.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ const (
4949
metadataFilePath = "filePath"
5050
metadataPresignTTL = "presignTTL"
5151
metadataStorageClass = "storageClass"
52+
metadataTags = "tags"
5253

5354
metatadataContentType = "Content-Type"
5455
metadataKey = "key"
@@ -191,6 +192,15 @@ func (s *AWSS3) create(ctx context.Context, req *bindings.InvokeRequest) (*bindi
191192
if contentTypeStr != "" {
192193
contentType = &contentTypeStr
193194
}
195+
196+
var tagging *string
197+
if rawTags, ok := req.Metadata[metadataTags]; ok {
198+
tagging, err = s.parseS3Tags(rawTags)
199+
if err != nil {
200+
return nil, fmt.Errorf("s3 binding error: parsing tags falied error: %w", err)
201+
}
202+
}
203+
194204
var r io.Reader
195205
if metadata.FilePath != "" {
196206
r, err = os.Open(metadata.FilePath)
@@ -209,12 +219,14 @@ func (s *AWSS3) create(ctx context.Context, req *bindings.InvokeRequest) (*bindi
209219
if metadata.StorageClass != "" {
210220
storageClass = aws.String(metadata.StorageClass)
211221
}
222+
212223
resultUpload, err := s.authProvider.S3().Uploader.UploadWithContext(ctx, &s3manager.UploadInput{
213224
Bucket: ptr.Of(metadata.Bucket),
214225
Key: ptr.Of(key),
215226
Body: r,
216227
ContentType: contentType,
217228
StorageClass: storageClass,
229+
Tagging: tagging,
218230
})
219231
if err != nil {
220232
return nil, fmt.Errorf("s3 binding error: uploading failed: %w", err)
@@ -418,6 +430,26 @@ func (s *AWSS3) parseMetadata(md bindings.Metadata) (*s3Metadata, error) {
418430
return &m, nil
419431
}
420432

433+
// Helper for parsing s3 tags metadata
434+
func (s *AWSS3) parseS3Tags(raw string) (*string, error) {
435+
tagEntries := strings.Split(raw, ",")
436+
pairs := make([]string, 0, len(tagEntries))
437+
for _, tagEntry := range tagEntries {
438+
kv := strings.SplitN(strings.TrimSpace(tagEntry), "=", 2)
439+
isInvalidTag := len(kv) != 2 || strings.TrimSpace(kv[0]) == "" || strings.TrimSpace(kv[1]) == ""
440+
if isInvalidTag {
441+
return nil, fmt.Errorf("invalid tag format: '%s' (expected key=value)", tagEntry)
442+
}
443+
pairs = append(pairs, fmt.Sprintf("%s=%s", strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1])))
444+
}
445+
446+
if len(pairs) == 0 {
447+
return nil, nil
448+
}
449+
450+
return aws.String(strings.Join(pairs, "&")), nil
451+
}
452+
421453
// Helper to merge config and request metadata.
422454
func (metadata s3Metadata) mergeWithRequestMetadata(req *bindings.InvokeRequest) (s3Metadata, error) {
423455
merged := metadata

bindings/aws/s3/s3_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,24 @@ func TestParseMetadata(t *testing.T) {
5353
})
5454
}
5555

56+
func TestParseS3Tags(t *testing.T) {
57+
t.Run("Has parsed s3 tags", func(t *testing.T) {
58+
request := bindings.InvokeRequest{}
59+
request.Metadata = map[string]string{
60+
"decodeBase64": "yes",
61+
"encodeBase64": "false",
62+
"filePath": "/usr/vader.darth",
63+
"storageClass": "STANDARD_IA",
64+
"tags": "project=myproject,year=2024",
65+
}
66+
s3 := AWSS3{}
67+
parsedTags, err := s3.parseS3Tags(request.Metadata["tags"])
68+
69+
require.NoError(t, err)
70+
assert.Equal(t, "project=myproject&year=2024", *parsedTags)
71+
})
72+
}
73+
5674
func TestMergeWithRequestMetadata(t *testing.T) {
5775
t.Run("Has merged metadata", func(t *testing.T) {
5876
m := bindings.Metadata{}

common/component/redis/redis.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ type RedisClient interface {
8282
ConfigurationSubscribe(ctx context.Context, args *ConfigurationSubscribeArgs)
8383
SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (*bool, error)
8484
EvalInt(ctx context.Context, script string, keys []string, args ...interface{}) (*int, error, error)
85-
XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error)
85+
XAdd(ctx context.Context, stream string, maxLenApprox int64, streamTTL string, values map[string]interface{}) (string, error)
8686
XGroupCreateMkStream(ctx context.Context, stream string, group string, start string) error
8787
XAck(ctx context.Context, stream string, group string, messageID string) error
8888
XReadGroupResult(ctx context.Context, group string, consumer string, streams []string, count int64, block time.Duration) ([]RedisXStream, error)

common/component/redis/settings.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ type Settings struct {
102102
// The max len of stream
103103
MaxLenApprox int64 `mapstructure:"maxLenApprox" mdonly:"pubsub"`
104104

105+
// The TTL of stream entries
106+
StreamTTL time.Duration `mapstructure:"streamTTL" mdonly:"pubsub"`
107+
105108
// EntraID / AzureAD Authentication based on the shared code which essentially uses the DefaultAzureCredential
106109
// from the official Azure Identity SDK for Go
107110
UseEntraID bool `mapstructure:"useEntraID" mapstructurealiases:"useAzureAD"`
@@ -127,6 +130,15 @@ func (s *Settings) SetCertificate(fn func(cert *tls.Certificate)) error {
127130
return nil
128131
}
129132

133+
func (s *Settings) GetMinID(now time.Time) string {
134+
// If StreamTTL is not set, return empty string (no trimming)
135+
if s.StreamTTL == 0 {
136+
return ""
137+
}
138+
139+
return fmt.Sprintf("%d-1", now.Add(-s.StreamTTL).UnixMilli())
140+
}
141+
130142
type Duration time.Duration
131143

132144
func (r *Duration) DecodeString(value string) error {

common/component/redis/settings_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package redis
33
import (
44
"crypto/tls"
55
"testing"
6+
"time"
67

78
"github.com/stretchr/testify/require"
89
)
@@ -39,4 +40,34 @@ func TestSettings(t *testing.T) {
3940
require.NoError(t, err)
4041
require.NotNil(t, c)
4142
})
43+
44+
t.Run("stream TTL", func(t *testing.T) {
45+
fixedTime := time.Date(2025, 3, 14, 0o1, 59, 26, 0, time.UTC)
46+
47+
tests := []struct {
48+
name string
49+
streamTTL time.Duration
50+
want string
51+
}{
52+
{
53+
name: "with one hour TTL",
54+
streamTTL: time.Hour,
55+
want: "1741913966000-1",
56+
},
57+
{
58+
name: "with zero TTL",
59+
streamTTL: 0,
60+
want: "",
61+
},
62+
}
63+
64+
for _, tt := range tests {
65+
t.Run(tt.name, func(t *testing.T) {
66+
settings := &Settings{
67+
StreamTTL: tt.streamTTL,
68+
}
69+
require.Equal(t, tt.want, settings.GetMinID(fixedTime))
70+
})
71+
}
72+
})
4273
}

common/component/redis/v8client.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (c v8Client) SetNX(ctx context.Context, key string, value interface{}, expi
161161
return &val, nx.Err()
162162
}
163163

164-
func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) {
164+
func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, minIDApprox string, values map[string]interface{}) (string, error) {
165165
var writeCtx context.Context
166166
if c.writeTimeout > 0 {
167167
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
@@ -171,9 +171,11 @@ func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, v
171171
writeCtx = ctx
172172
}
173173
return c.client.XAdd(writeCtx, &v8.XAddArgs{
174-
Stream: stream,
175-
Values: values,
176-
MaxLenApprox: maxLenApprox,
174+
Stream: stream,
175+
Values: values,
176+
MaxLen: maxLenApprox,
177+
MinID: minIDApprox,
178+
Approx: true,
177179
}).Result()
178180
}
179181

common/component/redis/v9client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (c v9Client) SetNX(ctx context.Context, key string, value interface{}, expi
161161
return &val, nx.Err()
162162
}
163163

164-
func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) {
164+
func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, minIDApprox string, values map[string]interface{}) (string, error) {
165165
var writeCtx context.Context
166166
if c.writeTimeout > 0 {
167167
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
@@ -174,6 +174,7 @@ func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, v
174174
Stream: stream,
175175
Values: values,
176176
MaxLen: maxLenApprox,
177+
MinID: minIDApprox,
177178
Approx: true,
178179
}).Result()
179180
}

pubsub/pulsar/metadata.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type pulsarMetadata struct {
3838
MaxConcurrentHandlers uint `mapstructure:"maxConcurrentHandlers"`
3939
ReceiverQueueSize int `mapstructure:"receiverQueueSize"`
4040
SubscriptionType string `mapstructure:"subscribeType"`
41+
SubscriptionInitialPosition string `mapstructure:"subscribeInitialPosition"`
4142
Token string `mapstructure:"token"`
4243
oauth2.ClientCredentialsMetadata `mapstructure:",squash"`
4344
}

pubsub/pulsar/metadata.yaml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,4 +192,13 @@ metadata:
192192
example: '"exclusive"'
193193
url:
194194
title: "Pulsar Subscription Types"
195-
url: "https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#subscription-types"
195+
url: "https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#subscription-types"
196+
- name: subscribeInitialPosition
197+
type: string
198+
description: |
199+
Subscription position is the initial position which the cursor is set when start consuming: "latest", "earliest".
200+
default: '"latest"'
201+
example: '"earliest"'
202+
url:
203+
title: "Pulsar SubscriptionInitialPosition"
204+
url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionInitialPosition"

pubsub/pulsar/pulsar.go

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ const (
9494

9595
processModeAsync = "async"
9696
processModeSync = "sync"
97+
98+
subscribeInitialPosition = "subscribeInitialPosition"
99+
100+
subscribePositionEarliest = "earliest"
101+
subscribePositionLatest = "latest"
97102
)
98103

99104
type ProcessMode string
@@ -144,6 +149,11 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
144149
return nil, errors.New("invalid subscription type. Accepted values are `exclusive`, `shared`, `failover` and `key_shared`")
145150
}
146151

152+
m.SubscriptionInitialPosition, err = parseSubscriptionPosition(meta.Properties[subscribeInitialPosition])
153+
if err != nil {
154+
return nil, errors.New("invalid subscription initial position. Accepted values are `latest` and `earliest`")
155+
}
156+
147157
for k, v := range meta.Properties {
148158
switch {
149159
case strings.HasSuffix(k, topicJSONSchemaIdentifier):
@@ -421,6 +431,30 @@ func getSubscribeType(subsTypeStr string) pulsar.SubscriptionType {
421431
return subsType
422432
}
423433

434+
func parseSubscriptionPosition(in string) (string, error) {
435+
subsPosition := strings.ToLower(in)
436+
switch subsPosition {
437+
case subscribePositionEarliest, subscribePositionLatest:
438+
return subsPosition, nil
439+
case "":
440+
return subscribePositionLatest, nil
441+
default:
442+
return "", fmt.Errorf("invalid subscription initial position: %s", subsPosition)
443+
}
444+
}
445+
446+
func getSubscribePosition(subsPositionStr string) pulsar.SubscriptionInitialPosition {
447+
var subsPosition pulsar.SubscriptionInitialPosition
448+
449+
switch subsPositionStr {
450+
case subscribePositionEarliest:
451+
subsPosition = pulsar.SubscriptionPositionEarliest
452+
case subscribePositionLatest:
453+
subsPosition = pulsar.SubscriptionPositionLatest
454+
}
455+
return subsPosition
456+
}
457+
424458
func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
425459
if p.closed.Load() {
426460
return errors.New("component is closed")
@@ -436,12 +470,13 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
436470
}
437471

438472
options := pulsar.ConsumerOptions{
439-
Topic: topic,
440-
SubscriptionName: p.metadata.ConsumerID,
441-
Type: getSubscribeType(subscribeType),
442-
MessageChannel: channel,
443-
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
444-
ReceiverQueueSize: p.metadata.ReceiverQueueSize,
473+
Topic: topic,
474+
SubscriptionName: p.metadata.ConsumerID,
475+
Type: getSubscribeType(subscribeType),
476+
SubscriptionInitialPosition: getSubscribePosition(subscribeInitialPosition),
477+
MessageChannel: channel,
478+
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
479+
ReceiverQueueSize: p.metadata.ReceiverQueueSize,
445480
}
446481

447482
// Handle KeySharedPolicy for key_shared subscription type

pubsub/pulsar/pulsar_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,59 @@ func TestParsePulsarMetadataSubscriptionType(t *testing.T) {
117117
}
118118
}
119119

120+
func TestParsePulsarMetadataSubscriptionInitialPosition(t *testing.T) {
121+
tt := []struct {
122+
name string
123+
subscribeInitialPosition string
124+
expected string
125+
err bool
126+
}{
127+
{
128+
name: "test valid subscribe initial position - earliest",
129+
subscribeInitialPosition: "earliest",
130+
expected: "earliest",
131+
err: false,
132+
},
133+
{
134+
name: "test valid subscribe initial position - latest",
135+
subscribeInitialPosition: "latest",
136+
expected: "latest",
137+
err: false,
138+
},
139+
{
140+
name: "test valid subscribe initial position - empty",
141+
subscribeInitialPosition: "",
142+
expected: "latest",
143+
err: false,
144+
},
145+
{
146+
name: "test invalid subscribe initial position",
147+
subscribeInitialPosition: "invalid",
148+
err: true,
149+
},
150+
}
151+
for _, tc := range tt {
152+
t.Run(tc.name, func(t *testing.T) {
153+
m := pubsub.Metadata{}
154+
155+
m.Properties = map[string]string{
156+
"host": "a",
157+
"subscribeInitialPosition": tc.subscribeInitialPosition,
158+
}
159+
meta, err := parsePulsarMetadata(m)
160+
161+
if tc.err {
162+
require.Error(t, err)
163+
assert.Nil(t, meta)
164+
return
165+
}
166+
167+
require.NoError(t, err)
168+
assert.Equal(t, tc.expected, meta.SubscriptionInitialPosition)
169+
})
170+
}
171+
}
172+
120173
func TestParsePulsarSchemaMetadata(t *testing.T) {
121174
t.Run("test json", func(t *testing.T) {
122175
m := pubsub.Metadata{}

pubsub/redis/metadata.yaml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ metadata:
165165
- name: failover
166166
required: false
167167
description: |
168-
Property to enabled failover configuration. Needs sentinalMasterName to be set. Defaults to "false"
168+
Property to enabled failover configuration. Needs sentinelMasterName to be set. Defaults to "false"
169169
example: "false"
170170
type: bool
171171
- name: sentinelMasterName
@@ -175,9 +175,19 @@ metadata:
175175
type: string
176176
- name: maxLenApprox
177177
required: false
178-
description: Maximum number of items inside a stream.The old entries are automatically evicted when the specified length is reached, so that the stream is left at a constant size. Defaults to unlimited.
178+
description: Maximum number of items inside a stream. The old entries are automatically evicted when the specified length is reached, so that the stream is left at a constant size. Defaults to unlimited.
179179
example: "10000"
180180
type: number
181+
- name: streamTTL
182+
required: false
183+
description: |
184+
TTL duration for stream entries. Entries older than this duration will be evicted.
185+
This is an approximate value, as it's implemented using Redis stream's MINID trimming with the '~' modifier.
186+
The actual retention may include slightly more entries than strictly defined by the TTL,
187+
as Redis optimizes the trimming operation for efficiency by potentially keeping some additional entries.
188+
example: "30d"
189+
type: duration
190+
181191
builtinAuthenticationProfiles:
182192
- name: "azuread"
183193
metadata:

0 commit comments

Comments
 (0)