Skip to content

Commit c9f933a

Browse files
Merge pull request #2218 from nats-io/si_ext
Fix stream source lookup and add in optional External to StreamSource
2 parents dd91bc8 + 060f2ec commit c9f933a

File tree

2 files changed

+68
-9
lines changed

2 files changed

+68
-9
lines changed

server/jetstream_cluster_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -6367,6 +6367,51 @@ func TestJetStreamClusterDomainsAndSameNameSources(t *testing.T) {
63676367
if si.State.Msgs != 2 {
63686368
t.Fatalf("Expected 2 msgs, got %d", si.State.Msgs)
63696369
}
6370+
6371+
// Make sure we can see our external information.
6372+
// This not in the Go client yet so manual for now.
6373+
resp, err := nc.Request(fmt.Sprintf(JSApiStreamInfoT, "S"), nil, time.Second)
6374+
if err != nil {
6375+
t.Fatalf("Unexpected error: %v", err)
6376+
}
6377+
var ssi StreamInfo
6378+
if err = json.Unmarshal(resp.Data, &ssi); err != nil {
6379+
t.Fatalf("Unexpected error: %v", err)
6380+
}
6381+
if len(ssi.Sources) != 2 {
6382+
t.Fatalf("Expected 2 source streams, got %d", len(ssi.Sources))
6383+
}
6384+
if ssi.Sources[0].External == nil {
6385+
t.Fatalf("Expected a non-nil external designation")
6386+
}
6387+
if ssi.Sources[0].External.ApiPrefix != "$JS.SPOKE-1.API" {
6388+
t.Fatalf("Expected external api of %q, got %q", "$JS.SPOKE-1.API", ssi.Sources[0].External.ApiPrefix)
6389+
}
6390+
6391+
// Also create a mirror.
6392+
_, err = js.AddStream(&nats.StreamConfig{
6393+
Name: "M",
6394+
Mirror: &nats.StreamSource{
6395+
Name: "TEST",
6396+
External: &nats.ExternalStream{APIPrefix: "$JS.SPOKE-1.API"},
6397+
},
6398+
})
6399+
if err != nil {
6400+
t.Fatalf("Unexpected error: %v", err)
6401+
}
6402+
resp, err = nc.Request(fmt.Sprintf(JSApiStreamInfoT, "M"), nil, time.Second)
6403+
if err != nil {
6404+
t.Fatalf("Unexpected error: %v", err)
6405+
}
6406+
if err = json.Unmarshal(resp.Data, &ssi); err != nil {
6407+
t.Fatalf("Unexpected error: %v", err)
6408+
}
6409+
if ssi.Mirror == nil || ssi.Mirror.External == nil {
6410+
t.Fatalf("Expected a non-nil external designation for our mirror")
6411+
}
6412+
if ssi.Mirror.External.ApiPrefix != "$JS.SPOKE-1.API" {
6413+
t.Fatalf("Expected external api of %q, got %q", "$JS.SPOKE-1.API", ssi.Sources[0].External.ApiPrefix)
6414+
}
63706415
}
63716416

63726417
func TestJetStreamClusterLeafDifferentAccounts(t *testing.T) {

server/stream.go

+23-9
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,11 @@ type PeerInfo struct {
103103

104104
// StreamSourceInfo shows information about an upstream stream source.
105105
type StreamSourceInfo struct {
106-
Name string `json:"name"`
107-
Lag uint64 `json:"lag"`
108-
Active time.Duration `json:"active"`
109-
Error *ApiError `json:"error,omitempty"`
106+
Name string `json:"name"`
107+
External *ExternalStream `json:"external,omitempty"`
108+
Lag uint64 `json:"lag"`
109+
Active time.Duration `json:"active"`
110+
Error *ApiError `json:"error,omitempty"`
110111
}
111112

112113
// StreamSource dictates how streams can source from other streams.
@@ -117,7 +118,7 @@ type StreamSource struct {
117118
FilterSubject string `json:"filter_subject,omitempty"`
118119
External *ExternalStream `json:"external,omitempty"`
119120

120-
// Internal
121+
// Internale
121122
iname string // For indexing when stream names are the same for multiple sources.
122123
}
123124

@@ -1151,7 +1152,20 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo {
11511152
if si == nil {
11521153
return nil
11531154
}
1154-
return &StreamSourceInfo{Name: si.name, Lag: si.lag, Active: time.Since(si.last), Error: si.err}
1155+
ssi := &StreamSourceInfo{Name: si.name, Lag: si.lag, Active: time.Since(si.last), Error: si.err}
1156+
var ext *ExternalStream
1157+
if mset.cfg.Mirror != nil {
1158+
ext = mset.cfg.Mirror.External
1159+
} else if ss := mset.streamSource(si.iname); ss != nil && ss.External != nil {
1160+
ext = ss.External
1161+
}
1162+
if ext != nil {
1163+
ssi.External = &ExternalStream{
1164+
ApiPrefix: ext.ApiPrefix,
1165+
DeliverPrefix: ext.DeliverPrefix,
1166+
}
1167+
}
1168+
return ssi
11551169
}
11561170

11571171
// Return our source info for our mirror.
@@ -1563,9 +1577,9 @@ func (mset *stream) setupMirrorConsumer() error {
15631577
return nil
15641578
}
15651579

1566-
func (mset *stream) streamSource(sname string) *StreamSource {
1580+
func (mset *stream) streamSource(iname string) *StreamSource {
15671581
for _, ssi := range mset.cfg.Sources {
1568-
if ssi.Name == sname {
1582+
if ssi.iname == iname {
15691583
return ssi
15701584
}
15711585
}
@@ -1626,7 +1640,7 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64) {
16261640

16271641
si.sseq, si.dseq = seq, 0
16281642
si.last = time.Now()
1629-
ssi := mset.streamSource(si.name)
1643+
ssi := mset.streamSource(iname)
16301644

16311645
// Determine subjects etc.
16321646
var deliverSubject string

0 commit comments

Comments
 (0)