Skip to content

Add descriptions to JetStream streams and consumers. #2377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type ConsumerInfo struct {

type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
Description string `json:"description,omitempty"`
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
Expand Down Expand Up @@ -273,6 +274,10 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
return nil, ApiErrors[JSConsumerConfigRequiredErr]
}

if len(config.Description) > JSMaxDescriptionLen {
return nil, ApiErrors[JSConsumerDescriptionTooLongErrF].NewT("{max}", JSMaxDescriptionLen)
}

var err error
// For now expect a literal subject if its not empty. Empty means work queue mode (pull mode).
if config.DeliverSubject != _EMPTY_ {
Expand Down Expand Up @@ -498,6 +503,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if isDurableConsumer(config) {
if len(config.Durable) > JSMaxNameLen {
mset.mu.Unlock()
o.deleteWithoutAdvisory()
return nil, ApiErrors[JSConsumerNameTooLongErrF].NewT("{max}", JSMaxNameLen)
}
o.name = config.Durable
Expand Down
10 changes: 10 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1038,5 +1038,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerDescriptionTooLongErrF",
"code": 400,
"error_code": 10107,
"description": "consumer description is too long, maximum allowed is {max}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
3 changes: 3 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ const (
JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
)

// JSMaxDescription is the maximum description length for streams and consumers.
const JSMaxDescriptionLen = 4 * 1024

// JSMaxNameLen is the maximum name lengths for streams, consumers and templates.
const JSMaxNameLen = 256

Expand Down
4 changes: 4 additions & 0 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ const (
// JSConsumerDeliverToWildcardsErr consumer deliver subject has wildcards
JSConsumerDeliverToWildcardsErr ErrorIdentifier = 10079

// JSConsumerDescriptionTooLongErrF consumer description is too long, maximum allowed is {max}
JSConsumerDescriptionTooLongErrF ErrorIdentifier = 10107

// JSConsumerDirectRequiresEphemeralErr consumer direct requires an ephemeral consumer
JSConsumerDirectRequiresEphemeralErr ErrorIdentifier = 10091

Expand Down Expand Up @@ -336,6 +339,7 @@ var (
JSConsumerCreateErrF: {Code: 500, ErrCode: 10012, Description: "{err}"},
JSConsumerDeliverCycleErr: {Code: 400, ErrCode: 10081, Description: "consumer deliver subject forms a cycle"},
JSConsumerDeliverToWildcardsErr: {Code: 400, ErrCode: 10079, Description: "consumer deliver subject has wildcards"},
JSConsumerDescriptionTooLongErrF: {Code: 400, ErrCode: 10107, Description: "consumer description is too long, maximum allowed is {max}"},
JSConsumerDirectRequiresEphemeralErr: {Code: 400, ErrCode: 10091, Description: "consumer direct requires an ephemeral consumer"},
JSConsumerDirectRequiresPushErr: {Code: 400, ErrCode: 10090, Description: "consumer direct requires a push based consumer"},
JSConsumerDurableNameNotInSubjectErr: {Code: 400, ErrCode: 10016, Description: "consumer expected to be durable but no durable name set in subject"},
Expand Down
55 changes: 54 additions & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package server
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -360,6 +361,58 @@ func TestJetStreamAutoTuneFSConfig(t *testing.T) {
testBlkSize("foo_bar_baz", -1, 32*1024*1024*1024*1024, FileStoreMaxBlkSize)
}

func TestJetStreamConsumerAndStreamDescriptions(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}

descr := "foo asset"
acc := s.GlobalAccount()

// Check stream's first.
mset, err := acc.addStream(&StreamConfig{Name: "foo", Description: descr})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
if cfg := mset.config(); cfg.Description != descr {
t.Fatalf("Expected a description of %q, got %q", descr, cfg.Description)
}

// Now consumer
edescr := "analytics"
o, err := mset.addConsumer(&ConsumerConfig{
Description: edescr,
DeliverSubject: "to",
AckPolicy: AckNone})
if err != nil {
t.Fatalf("Unexpected error adding consumer: %v", err)
}
if cfg := o.config(); cfg.Description != edescr {
t.Fatalf("Expected a description of %q, got %q", edescr, cfg.Description)
}

// Test max.
data := make([]byte, JSMaxDescriptionLen+1)
rand.Read(data)
bigDescr := base64.StdEncoding.EncodeToString(data)

_, err = acc.addStream(&StreamConfig{Name: "bar", Description: bigDescr})
if err == nil || !strings.Contains(err.Error(), "description is too long") {
t.Fatalf("Expected an error but got none")
}

_, err = mset.addConsumer(&ConsumerConfig{
Description: bigDescr,
DeliverSubject: "to",
AckPolicy: AckNone})
if err == nil || !strings.Contains(err.Error(), "description is too long") {
t.Fatalf("Expected an error but got none")
}
}

func TestJetStreamPubAck(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
Expand Down Expand Up @@ -3699,7 +3752,7 @@ func TestJetStreamEphemeralConsumerRecoveryAfterServerRestart(t *testing.T) {
t.Fatalf("Error looking up consumer %q", oname)
}
// Make sure config does not have durable.
if cfg := o.config(); cfg.Durable != "" {
if cfg := o.config(); cfg.Durable != _EMPTY_ {
t.Fatalf("Expected no durable to be set")
}
// Wait for it to become active
Expand Down
5 changes: 5 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
// for a given stream. If subjects is empty the name will be used.
type StreamConfig struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
Expand Down Expand Up @@ -753,6 +754,10 @@ func checkStreamCfg(config *StreamConfig) (StreamConfig, error) {
if len(config.Name) > JSMaxNameLen {
return StreamConfig{}, fmt.Errorf("stream name is too long, maximum allowed is %d", JSMaxNameLen)
}
if len(config.Description) > JSMaxDescriptionLen {
return StreamConfig{}, fmt.Errorf("stream description is too long, maximum allowed is %d", JSMaxDescriptionLen)
}

cfg := *config

// Make file the default.
Expand Down