-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathpubsub.go
141 lines (112 loc) · 4.17 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright (C) 2021 Cisco Systems Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package common
import (
"fmt"
log "github.com/sirupsen/logrus"
)
type CalicoVppEventType string
const (
ChanSize = 500
PeerNodeStateChanged CalicoVppEventType = "PeerNodeStateChanged"
FelixConfChanged CalicoVppEventType = "FelixConfChanged"
IpamConfChanged CalicoVppEventType = "IpamConfChanged"
BGPConfChanged CalicoVppEventType = "BGPConfChanged"
ConnectivityAdded CalicoVppEventType = "ConnectivityAdded"
ConnectivityDeleted CalicoVppEventType = "ConnectivityDeleted"
SRv6PolicyAdded CalicoVppEventType = "SRv6PolicyAdded"
SRv6PolicyDeleted CalicoVppEventType = "SRv6PolicyDeleted"
PodAdded CalicoVppEventType = "PodAdded"
PodDeleted CalicoVppEventType = "PodDeleted"
LocalPodAddressAdded CalicoVppEventType = "LocalPodAddressAdded"
LocalPodAddressDeleted CalicoVppEventType = "LocalPodAddressDeleted"
TunnelAdded CalicoVppEventType = "TunnelAdded"
TunnelDeleted CalicoVppEventType = "TunnelDeleted"
BGPPeerAdded CalicoVppEventType = "BGPPeerAdded"
BGPPeerDeleted CalicoVppEventType = "BGPPeerDeleted"
BGPPeerUpdated CalicoVppEventType = "BGPPeerUpdated"
BGPSecretChanged CalicoVppEventType = "BGPSecretChanged"
BGPFilterAddedOrUpdated CalicoVppEventType = "BGPFilterAddedOrUpdated"
BGPFilterDeleted CalicoVppEventType = "BGPFilterDeleted"
BGPDefinedSetAdded CalicoVppEventType = "BGPDefinedSetAdded"
BGPDefinedSetDeleted CalicoVppEventType = "BGPDefinedSetDeleted"
BGPPathAdded CalicoVppEventType = "BGPPathAdded"
BGPPathDeleted CalicoVppEventType = "BGPPathDeleted"
NetAddedOrUpdated CalicoVppEventType = "NetAddedOrUpdated"
NetDeleted CalicoVppEventType = "NetDeleted"
NetsSynced CalicoVppEventType = "NetsSynced"
WireguardPublicKeyChanged CalicoVppEventType = "WireguardPublicKeyChanged"
UplinksUpdated CalicoVppEventType = "UplinksUpdated"
)
var (
ThePubSub *PubSub
)
type CalicoVppEvent struct {
Type CalicoVppEventType
Old interface{}
New interface{}
}
type PubSubHandlerRegistration struct {
/* Name for the registration, for logging & debugging */
name string
/* Channel where to send events */
channel chan CalicoVppEvent
/* Receive only these events. If empty we'll receive all */
expectedEvents map[CalicoVppEventType]bool
/* Receive all events */
expectAllEvents bool
}
func (reg *PubSubHandlerRegistration) ExpectEvents(eventTypes ...CalicoVppEventType) {
for _, eventType := range eventTypes {
reg.expectedEvents[eventType] = true
}
reg.expectAllEvents = false
}
type PubSub struct {
log *log.Entry
pubSubHandlerRegistrations []*PubSubHandlerRegistration
}
func RegisterHandler(channel chan CalicoVppEvent, name string) *PubSubHandlerRegistration {
reg := &PubSubHandlerRegistration{
channel: channel,
name: name,
expectedEvents: make(map[CalicoVppEventType]bool),
expectAllEvents: true, /* By default receive everything, unless we ask for a filter */
}
ThePubSub.pubSubHandlerRegistrations = append(ThePubSub.pubSubHandlerRegistrations, reg)
return reg
}
func redactPassword(event CalicoVppEvent) string {
switch event.Type {
case BGPPeerAdded:
return string(event.Type)
default:
return fmt.Sprintf("%+v", event)
}
}
func SendEvent(event CalicoVppEvent) {
ThePubSub.log.Debugf("Broadcasting event %s", redactPassword(event))
for _, reg := range ThePubSub.pubSubHandlerRegistrations {
if reg.expectAllEvents || reg.expectedEvents[event.Type] {
reg.channel <- event
}
}
}
func NewPubSub(log *log.Entry) *PubSub {
return &PubSub{
log: log,
pubSubHandlerRegistrations: make([]*PubSubHandlerRegistration, 0),
}
}