Skip to content

Commit 20a38e4

Browse files
kobergjbutonic
andcommitted
Events (#2522)
* first draft for event system - includes example Signed-off-by: jkoberg <[email protected]> * add event middleware Signed-off-by: jkoberg <[email protected]> * events: distinguish grantee userid and groupid Signed-off-by: Jörn Friedrich Dreyer <[email protected]> * seperate consumer from publisher Signed-off-by: jkoberg <[email protected]> * code review suggestions Signed-off-by: jkoberg <[email protected]> * simplify example Signed-off-by: jkoberg <[email protected]> * add changelog Signed-off-by: jkoberg <[email protected]> * make nats server configurable Signed-off-by: jkoberg <[email protected]> * add license headers Signed-off-by: jkoberg <[email protected]> * cheat the linter Signed-off-by: jkoberg <[email protected]> Co-authored-by: Jörn Friedrich Dreyer <[email protected]>
1 parent 65bb12a commit 20a38e4

File tree

13 files changed

+995
-10
lines changed

13 files changed

+995
-10
lines changed

changelog/unreleased/events.md

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
Enhancement: introduce events
2+
3+
This will introduce events into the system. Events are a simple way to bring information from
4+
one service to another. Read `pkg/events/example` and subfolders for more information
5+
6+
https://github.com/cs3org/reva/pull/2522
7+

go.mod

+6-4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/Masterminds/sprig v2.22.0+incompatible
1010
github.com/ReneKroon/ttlcache/v2 v2.11.0
1111
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
12+
github.com/asim/go-micro/plugins/events/nats/v4 v4.0.0-20220118152736-9e0be6c85d75
1213
github.com/aws/aws-sdk-go v1.42.39
1314
github.com/beevik/etree v1.1.0
1415
github.com/bluele/gcache v0.0.2
@@ -38,7 +39,6 @@ require (
3839
github.com/hashicorp/go-hclog v1.1.0
3940
github.com/hashicorp/go-plugin v1.4.3
4041
github.com/huandu/xstrings v1.3.2 // indirect
41-
github.com/imdario/mergo v0.3.12 // indirect
4242
github.com/jedib0t/go-pretty v4.3.0+incompatible
4343
github.com/juliangruber/go-intersect v1.1.0
4444
github.com/mattn/go-sqlite3 v1.14.10
@@ -48,6 +48,8 @@ require (
4848
github.com/mitchellh/copystructure v1.2.0 // indirect
4949
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
5050
github.com/mitchellh/mapstructure v1.4.3
51+
github.com/nats-io/nats-server/v2 v2.7.2
52+
github.com/nats-io/nats-streaming-server v0.24.1
5153
github.com/onsi/ginkgo/v2 v2.0.0
5254
github.com/onsi/gomega v1.18.1
5355
github.com/pkg/errors v0.9.1
@@ -58,24 +60,24 @@ require (
5860
github.com/rs/zerolog v1.26.1
5961
github.com/sciencemesh/meshdirectory-web v1.0.4
6062
github.com/sethvargo/go-password v0.2.0
61-
github.com/stretchr/objx v0.3.0 // indirect
6263
github.com/stretchr/testify v1.7.0
6364
github.com/studio-b12/gowebdav v0.0.0-20211109083228-3f8721cd4b6f
6465
github.com/thanhpk/randstr v1.0.4
6566
github.com/tidwall/pretty v1.2.0 // indirect
6667
github.com/tus/tusd v1.8.0
6768
github.com/wk8/go-ordered-map v0.2.0
69+
go-micro.dev/v4 v4.3.1-0.20211108085239-0c2041e43908
6870
go.mongodb.org/mongo-driver v1.7.2 // indirect
6971
go.opencensus.io v0.23.0
7072
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.28.0
7173
go.opentelemetry.io/otel v1.3.0
7274
go.opentelemetry.io/otel/exporters/jaeger v1.3.0
7375
go.opentelemetry.io/otel/sdk v1.3.0
7476
go.opentelemetry.io/otel/trace v1.3.0
75-
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e
77+
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce
7678
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1
7779
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
78-
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e
80+
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9
7981
golang.org/x/term v0.0.0-20210916214954-140adaaadfaf
8082
google.golang.org/genproto v0.0.0-20211021150943-2b146023228c
8183
google.golang.org/grpc v1.42.0

go.sum

+424-6
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright 2018-2021 CERN
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// In applying this license, CERN does not waive the privileges and immunities
16+
// granted to it by virtue of its status as an Intergovernmental Organization
17+
// or submit itself to any jurisdiction.
18+
19+
package eventsmiddleware
20+
21+
import (
22+
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
23+
"github.com/cs3org/reva/pkg/events"
24+
)
25+
26+
// ShareCreated converts response to event
27+
func ShareCreated(r *collaboration.CreateShareResponse) events.ShareCreated {
28+
e := events.ShareCreated{
29+
Sharer: r.Share.Creator,
30+
GranteeUserID: r.Share.GetGrantee().GetUserId(),
31+
GranteeGroupID: r.Share.GetGrantee().GetGroupId(),
32+
ItemID: r.Share.ResourceId,
33+
CTime: r.Share.Ctime,
34+
}
35+
36+
return e
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2018-2021 CERN
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// In applying this license, CERN does not waive the privileges and immunities
16+
// granted to it by virtue of its status as an Intergovernmental Organization
17+
// or submit itself to any jurisdiction.
18+
19+
package eventsmiddleware
20+
21+
import (
22+
"context"
23+
"fmt"
24+
25+
"go-micro.dev/v4/util/log"
26+
"google.golang.org/grpc"
27+
28+
"github.com/asim/go-micro/plugins/events/nats/v4"
29+
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
30+
"github.com/cs3org/reva/pkg/events"
31+
"github.com/cs3org/reva/pkg/events/server"
32+
"github.com/cs3org/reva/pkg/rgrpc"
33+
)
34+
35+
const (
36+
defaultPriority = 200
37+
)
38+
39+
func init() {
40+
rgrpc.RegisterUnaryInterceptor("eventsmiddleware", NewUnary)
41+
}
42+
43+
// NewUnary returns a new unary interceptor that emits events when needed
44+
// no lint because of the switch statement that should be extendable
45+
//nolint:gocritic
46+
func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error) {
47+
publisher, err := publisherFromConfig(m)
48+
if err != nil {
49+
return nil, 0, err
50+
}
51+
52+
interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
53+
res, err := handler(ctx, req)
54+
if err != nil {
55+
return res, err
56+
}
57+
58+
var ev interface{}
59+
switch v := res.(type) {
60+
case *collaboration.CreateShareResponse:
61+
ev = ShareCreated(v)
62+
}
63+
64+
if ev != nil {
65+
if err := events.Publish(publisher, ev); err != nil {
66+
log.Error(err)
67+
}
68+
}
69+
70+
return res, nil
71+
}
72+
return interceptor, defaultPriority, nil
73+
}
74+
75+
// NewStream returns a new server stream interceptor
76+
// that creates the application context.
77+
func NewStream() grpc.StreamServerInterceptor {
78+
interceptor := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
79+
// TODO: Use ss.RecvMsg() and ss.SendMsg() to send events from a stream
80+
return handler(srv, ss)
81+
}
82+
return interceptor
83+
}
84+
85+
func publisherFromConfig(m map[string]interface{}) (events.Publisher, error) {
86+
typ := m["type"].(string)
87+
switch typ {
88+
default:
89+
return nil, fmt.Errorf("stream type '%s' not supported", typ)
90+
case "nats":
91+
address := m["address"].(string)
92+
cid := m["clusterID"].(string)
93+
return server.NewNatsStream(nats.Address(address), nats.ClusterID(cid))
94+
}
95+
}

internal/grpc/interceptors/loader/loader.go

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package loader
2020

2121
import (
2222
// Load core GRPC services
23+
_ "github.com/cs3org/reva/internal/grpc/interceptors/eventsmiddleware"
2324
_ "github.com/cs3org/reva/internal/grpc/interceptors/readonly"
2425
// Add your own service here
2526
)

pkg/events/events.go

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright 2018-2021 CERN
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// In applying this license, CERN does not waive the privileges and immunities
16+
// granted to it by virtue of its status as an Intergovernmental Organization
17+
// or submit itself to any jurisdiction.
18+
19+
package events
20+
21+
import (
22+
"log"
23+
"reflect"
24+
25+
"go-micro.dev/v4/events"
26+
)
27+
28+
var (
29+
// MainQueueName is the name of the main queue
30+
// All events will go through here as they are forwarded to the consumer via the
31+
// group name
32+
// TODO: "fan-out" so not all events go through the same queue? requires investigation
33+
MainQueueName = "main-queue"
34+
35+
// MetadatakeyEventType is the key used for the eventtype in the metadata map of the event
36+
MetadatakeyEventType = "eventtype"
37+
)
38+
39+
type (
40+
// Unmarshaller is the interface events need to fulfill
41+
Unmarshaller interface {
42+
Unmarshal([]byte) (interface{}, error)
43+
}
44+
45+
// Publisher is the interface publishers need to fulfill
46+
Publisher interface {
47+
Publish(string, interface{}, ...events.PublishOption) error
48+
}
49+
50+
// Consumer is the interface consumer need to fulfill
51+
Consumer interface {
52+
Consume(string, ...events.ConsumeOption) (<-chan events.Event, error)
53+
}
54+
55+
// Stream is the interface common to Publisher and Consumer
56+
Stream interface {
57+
Publish(string, interface{}, ...events.PublishOption) error
58+
Consume(string, ...events.ConsumeOption) (<-chan events.Event, error)
59+
}
60+
)
61+
62+
// Consume returns a channel that will get all events that match the given evs
63+
// group defines the service type: One group will get exactly one copy of a event that is emitted
64+
// NOTE: uses reflect on initialization
65+
func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan interface{}, error) {
66+
c, err := s.Consume(MainQueueName, events.WithGroup(group))
67+
if err != nil {
68+
return nil, err
69+
}
70+
71+
registeredEvents := map[string]Unmarshaller{}
72+
for _, e := range evs {
73+
typ := reflect.TypeOf(e)
74+
registeredEvents[typ.String()] = e
75+
}
76+
77+
outchan := make(chan interface{})
78+
go func() {
79+
for {
80+
e := <-c
81+
et := e.Metadata[MetadatakeyEventType]
82+
ev, ok := registeredEvents[et]
83+
if !ok {
84+
log.Printf("not registered: %s", et)
85+
continue
86+
}
87+
88+
event, err := ev.Unmarshal(e.Payload)
89+
if err != nil {
90+
log.Printf("can't unmarshal event %v", err)
91+
continue
92+
}
93+
94+
outchan <- event
95+
}
96+
}()
97+
return outchan, nil
98+
}
99+
100+
// Publish publishes the ev to the MainQueue from where it is distributed to all subscribers
101+
// NOTE: needs to use reflect on runtime
102+
func Publish(s Publisher, ev interface{}) error {
103+
evName := reflect.TypeOf(ev).String()
104+
return s.Publish(MainQueueName, ev, events.WithMetadata(map[string]string{
105+
MetadatakeyEventType: evName,
106+
}))
107+
}
+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright 2018-2021 CERN
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// In applying this license, CERN does not waive the privileges and immunities
16+
// granted to it by virtue of its status as an Intergovernmental Organization
17+
// or submit itself to any jurisdiction.
18+
19+
// Package consumer contains an example implementation of an event consumer
20+
package consumer
21+
22+
import (
23+
"fmt"
24+
"log"
25+
26+
"github.com/cs3org/reva/pkg/events"
27+
)
28+
29+
// Example consumes events from the queue
30+
func Example(c events.Consumer) {
31+
// Step 1 - which group does the consumer belong to?
32+
// each group will get each event that is emitted, but only one member of the group will get it.
33+
group := "test-consumer"
34+
35+
// Step 2 - which events does the consumer listen too?
36+
evs := []events.Unmarshaller{
37+
// for example created shares
38+
events.ShareCreated{},
39+
}
40+
41+
// Step 3 - create event channel
42+
evChan, err := events.Consume(c, group, evs...)
43+
if err != nil {
44+
log.Fatal(err)
45+
}
46+
47+
// Step 4 - listen to events
48+
for {
49+
event := <-evChan
50+
51+
// best to use type switch to differentiate events
52+
switch v := event.(type) {
53+
case events.ShareCreated:
54+
fmt.Printf("%s) Share created: %+v\n", group, v)
55+
default:
56+
fmt.Printf("%s) Unregistered event: %+v\n", group, v)
57+
}
58+
}
59+
60+
}

0 commit comments

Comments
 (0)