Skip to content
This repository was archived by the owner on Apr 17, 2019. It is now read-only.

Commit cfdd590

Browse files
authored
Merge pull request #1133 from bprashanth/ubernetes_healthchecks
Ubernetes multizone and custom healthchecks
2 parents 17fadfa + 06674d3 commit cfdd590

File tree

22 files changed

+752
-188
lines changed

22 files changed

+752
-188
lines changed

ingress/controllers/gce/backends/backends.go

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) {
107107
return be, nil
108108
}
109109

110-
func (b *Backends) create(ig *compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) {
110+
func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) {
111111
// Create a new health check
112-
if err := b.healthChecker.Add(namedPort.Port, ""); err != nil {
112+
if err := b.healthChecker.Add(namedPort.Port); err != nil {
113113
return nil, err
114114
}
115115
hc, err := b.healthChecker.Get(namedPort.Port)
@@ -120,11 +120,7 @@ func (b *Backends) create(ig *compute.InstanceGroup, namedPort *compute.NamedPor
120120
backend := &compute.BackendService{
121121
Name: name,
122122
Protocol: "HTTP",
123-
Backends: []*compute.Backend{
124-
{
125-
Group: ig.SelfLink,
126-
},
127-
},
123+
Backends: getBackendsForIGs(igs),
128124
// Api expects one, means little to kubernetes.
129125
HealthChecks: []string{hc.SelfLink},
130126
Port: namedPort.Port,
@@ -143,20 +139,24 @@ func (b *Backends) Add(port int64) error {
143139
be := &compute.BackendService{}
144140
defer func() { b.snapshotter.Add(portKey(port), be) }()
145141

146-
ig, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port)
142+
igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port)
147143
if err != nil {
148144
return err
149145
}
150146
be, _ = b.Get(port)
151147
if be == nil {
152-
glog.Infof("Creating backend for instance group %v port %v named port %v",
153-
ig.Name, port, namedPort)
154-
be, err = b.create(ig, namedPort, b.namer.BeName(port))
148+
glog.Infof("Creating backend for %d instance groups, port %v named port %v",
149+
len(igs), port, namedPort)
150+
be, err = b.create(igs, namedPort, b.namer.BeName(port))
155151
if err != nil {
156152
return err
157153
}
158154
}
159-
if err := b.edgeHop(be, ig); err != nil {
155+
// we won't find any igs till the node pool syncs nodes.
156+
if len(igs) == 0 {
157+
return nil
158+
}
159+
if err := b.edgeHop(be, igs); err != nil {
160160
return err
161161
}
162162
return err
@@ -201,18 +201,31 @@ func (b *Backends) List() ([]interface{}, error) {
201201
return interList, nil
202202
}
203203

204+
func getBackendsForIGs(igs []*compute.InstanceGroup) []*compute.Backend {
205+
backends := []*compute.Backend{}
206+
for _, ig := range igs {
207+
backends = append(backends, &compute.Backend{Group: ig.SelfLink})
208+
}
209+
return backends
210+
}
211+
204212
// edgeHop checks the links of the given backend by executing an edge hop.
205213
// It fixes broken links.
206-
func (b *Backends) edgeHop(be *compute.BackendService, ig *compute.InstanceGroup) error {
207-
if len(be.Backends) == 1 &&
208-
utils.CompareLinks(be.Backends[0].Group, ig.SelfLink) {
209-
return nil
214+
func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGroup) error {
215+
beIGs := sets.String{}
216+
for _, beToIG := range be.Backends {
217+
beIGs.Insert(beToIG.Group)
210218
}
211-
glog.Infof("Backend %v has a broken edge, adding link to %v",
212-
be.Name, ig.Name)
213-
be.Backends = []*compute.Backend{
214-
{Group: ig.SelfLink},
219+
igLinks := sets.String{}
220+
for _, igToBE := range igs {
221+
igLinks.Insert(igToBE.SelfLink)
222+
}
223+
if igLinks.Equal(beIGs) {
224+
return nil
215225
}
226+
glog.Infof("Backend %v has a broken edge, expected igs %+v, current igs %+v",
227+
be.Name, igLinks.List(), beIGs.List())
228+
be.Backends = getBackendsForIGs(igs)
216229
if err := b.cloud.UpdateBackendService(be); err != nil {
217230
return err
218231
}

ingress/controllers/gce/backends/backends_test.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,16 @@ import (
2727
"k8s.io/kubernetes/pkg/util/sets"
2828
)
2929

30+
const defaultZone = "zone-a"
31+
3032
func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool {
3133
namer := &utils.Namer{}
34+
nodePool := instances.NewNodePool(fakeIGs)
35+
nodePool.Init(&instances.FakeZoneLister{[]string{defaultZone}})
36+
healthChecks := healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer)
37+
healthChecks.Init(&healthchecks.FakeHealthCheckGetter{nil})
3238
return NewBackendPool(
33-
f,
34-
healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer),
35-
instances.NewNodePool(fakeIGs, "default-zone"), namer, []int64{}, syncWithCloud)
39+
f, healthChecks, nodePool, namer, []int64{}, syncWithCloud)
3640
}
3741

3842
func TestBackendPoolAdd(t *testing.T) {
@@ -80,8 +84,14 @@ func TestBackendPoolAdd(t *testing.T) {
8084
t.Fatalf("Unexpected create for existing backend service")
8185
}
8286
}
83-
gotBackend, _ := f.GetBackendService(beName)
84-
gotGroup, _ := fakeIGs.GetInstanceGroup(namer.IGName(), "default-zone")
87+
gotBackend, err := f.GetBackendService(beName)
88+
if err != nil {
89+
t.Fatalf("Failed to find a backend with name %v: %v", beName, err)
90+
}
91+
gotGroup, err := fakeIGs.GetInstanceGroup(namer.IGName(), defaultZone)
92+
if err != nil {
93+
t.Fatalf("Failed to find instance group %v", namer.IGName())
94+
}
8595
if gotBackend.Backends[0].Group != gotGroup.SelfLink {
8696
t.Fatalf(
8797
"Broken instance group link: %v %v",

ingress/controllers/gce/backends/fakes.go

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -99,47 +99,3 @@ func (f *FakeBackendServices) GetHealth(name, instanceGroupLink string) (*comput
9999
return &compute.BackendServiceGroupHealth{
100100
HealthStatus: states}, nil
101101
}
102-
103-
// NewFakeHealthChecks returns a health check fake.
104-
func NewFakeHealthChecks() *FakeHealthChecks {
105-
return &FakeHealthChecks{hc: []*compute.HttpHealthCheck{}}
106-
}
107-
108-
// FakeHealthChecks fakes out health checks.
109-
type FakeHealthChecks struct {
110-
hc []*compute.HttpHealthCheck
111-
}
112-
113-
// CreateHttpHealthCheck fakes health check creation.
114-
func (f *FakeHealthChecks) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error {
115-
f.hc = append(f.hc, hc)
116-
return nil
117-
}
118-
119-
// GetHttpHealthCheck fakes getting a http health check.
120-
func (f *FakeHealthChecks) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) {
121-
for _, h := range f.hc {
122-
if h.Name == name {
123-
return h, nil
124-
}
125-
}
126-
return nil, fmt.Errorf("Health check %v not found.", name)
127-
}
128-
129-
// DeleteHttpHealthCheck fakes deleting a http health check.
130-
func (f *FakeHealthChecks) DeleteHttpHealthCheck(name string) error {
131-
healthChecks := []*compute.HttpHealthCheck{}
132-
exists := false
133-
for _, h := range f.hc {
134-
if h.Name == name {
135-
exists = true
136-
continue
137-
}
138-
healthChecks = append(healthChecks, h)
139-
}
140-
if !exists {
141-
return fmt.Errorf("Failed to find health check %v", name)
142-
}
143-
f.hc = healthChecks
144-
return nil
145-
}

ingress/controllers/gce/backends/interfaces.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,3 @@ type BackendServices interface {
4242
ListBackendServices() (*compute.BackendServiceList, error)
4343
GetHealth(name, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error)
4444
}
45-
46-
// SingleHealthCheck is an interface to manage a single GCE health check.
47-
type SingleHealthCheck interface {
48-
CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error
49-
DeleteHttpHealthCheck(name string) error
50-
GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error)
51-
}
52-
53-
// HealthChecker is an interface to manage cloud HTTPHealthChecks.
54-
type HealthChecker interface {
55-
Add(port int64, path string) error
56-
Delete(port int64) error
57-
Get(port int64) (*compute.HttpHealthCheck, error)
58-
}

ingress/controllers/gce/controller/cluster_manager.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,23 @@ type ClusterManager struct {
7474
backendPool backends.BackendPool
7575
l7Pool loadbalancers.LoadBalancerPool
7676
firewallPool firewalls.SingleFirewallPool
77+
78+
// TODO: Refactor so we simply init a health check pool.
79+
// Currently health checks are tied to backends because each backend needs
80+
// the link of the associated health, but both the backend pool and
81+
// loadbalancer pool manage backends, because the lifetime of the default
82+
// backend is tied to the last/first loadbalancer not the life of the
83+
// nodeport service or Ingress.
84+
healthCheckers []healthchecks.HealthChecker
85+
}
86+
87+
// Init initializes the cluster manager.
88+
func (c *ClusterManager) Init(tr *GCETranslator) {
89+
c.instancePool.Init(tr)
90+
for _, h := range c.healthCheckers {
91+
h.Init(tr)
92+
}
93+
// TODO: Initialize other members as needed.
7794
}
7895

7996
// IsHealthy returns an error if the cluster manager is unhealthy.
@@ -217,7 +234,7 @@ func getGCEClient(config io.Reader) *gce.GCECloud {
217234
// string passed to glbc via --gce-cluster-name.
218235
// - defaultBackendNodePort: is the node port of glbc's default backend. This is
219236
// the kubernetes Service that serves the 404 page if no urls match.
220-
// - defaultHealthCheckPath: is the default path used for L7 health checks, eg: "/healthz"
237+
// - defaultHealthCheckPath: is the default path used for L7 health checks, eg: "/healthz".
221238
func NewClusterManager(
222239
configFilePath string,
223240
name string,
@@ -244,21 +261,20 @@ func NewClusterManager(
244261

245262
// Names are fundamental to the cluster, the uid allocator makes sure names don't collide.
246263
cluster := ClusterManager{ClusterNamer: &utils.Namer{name}}
247-
zone, err := cloud.GetZone()
248-
if err != nil {
249-
return nil, err
250-
}
251264

252265
// NodePool stores GCE vms that are in this Kubernetes cluster.
253-
cluster.instancePool = instances.NewNodePool(cloud, zone.FailureDomain)
266+
cluster.instancePool = instances.NewNodePool(cloud)
254267

255268
// BackendPool creates GCE BackendServices and associated health checks.
256269
healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer)
270+
// Loadbalancer pool manages the default backend and its health check.
271+
defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer)
272+
273+
cluster.healthCheckers = []healthchecks.HealthChecker{healthChecker, defaultBackendHealthChecker}
257274

258275
// TODO: This needs to change to a consolidated management of the default backend.
259276
cluster.backendPool = backends.NewBackendPool(
260277
cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort}, true)
261-
defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer)
262278
defaultBackendPool := backends.NewBackendPool(
263279
cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false)
264280
cluster.defaultBackendNodePort = defaultBackendNodePort

ingress/controllers/gce/controller/controller.go

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,25 @@ var (
4444
// DefaultClusterUID is the uid to use for clusters resources created by an
4545
// L7 controller created without specifying the --cluster-uid flag.
4646
DefaultClusterUID = ""
47+
48+
// Frequency to poll on local stores to sync.
49+
storeSyncPollPeriod = 5 * time.Second
4750
)
4851

4952
// LoadBalancerController watches the kubernetes api and adds/removes services
5053
// from the loadbalancer, via loadBalancerConfig.
5154
type LoadBalancerController struct {
52-
client *client.Client
53-
ingController *framework.Controller
54-
nodeController *framework.Controller
55-
svcController *framework.Controller
56-
ingLister StoreToIngressLister
57-
nodeLister cache.StoreToNodeLister
58-
svcLister cache.StoreToServiceLister
55+
client *client.Client
56+
ingController *framework.Controller
57+
nodeController *framework.Controller
58+
svcController *framework.Controller
59+
podController *framework.Controller
60+
ingLister StoreToIngressLister
61+
nodeLister cache.StoreToNodeLister
62+
svcLister cache.StoreToServiceLister
63+
// Health checks are the readiness probes of containers on pods.
64+
podLister cache.StoreToPodLister
65+
// TODO: Watch secrets
5966
CloudClusterManager *ClusterManager
6067
recorder record.EventRecorder
6168
nodeQueue *taskQueue
@@ -69,6 +76,9 @@ type LoadBalancerController struct {
6976
shutdown bool
7077
// tlsLoader loads secrets from the Kubernetes apiserver for Ingresses.
7178
tlsLoader tlsLoader
79+
// hasSynced returns true if all associated sub-controllers have synced.
80+
// Abstracted into a func for testing.
81+
hasSynced func() bool
7282
}
7383

7484
// NewLoadBalancerController creates a controller for gce loadbalancers.
@@ -90,6 +100,7 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste
90100
}
91101
lbc.nodeQueue = NewTaskQueue(lbc.syncNodes)
92102
lbc.ingQueue = NewTaskQueue(lbc.sync)
103+
lbc.hasSynced = lbc.storesSynced
93104

94105
// Ingress watch handlers
95106
pathHandlers := framework.ResourceEventHandlerFuncs{
@@ -130,12 +141,19 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste
130141
lbc.client, "services", namespace, fields.Everything()),
131142
&api.Service{}, resyncPeriod, svcHandlers)
132143

144+
lbc.podLister.Indexer, lbc.podController = framework.NewIndexerInformer(
145+
cache.NewListWatchFromClient(lbc.client, "pods", namespace, fields.Everything()),
146+
&api.Pod{},
147+
resyncPeriod,
148+
framework.ResourceEventHandlerFuncs{},
149+
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
150+
)
151+
133152
nodeHandlers := framework.ResourceEventHandlerFuncs{
134153
AddFunc: lbc.nodeQueue.enqueue,
135154
DeleteFunc: lbc.nodeQueue.enqueue,
136155
// Nodes are updated every 10s and we don't care, so no update handler.
137156
}
138-
139157
// Node watch handlers
140158
lbc.nodeLister.Store, lbc.nodeController = framework.NewInformer(
141159
&cache.ListWatch{
@@ -194,6 +212,7 @@ func (lbc *LoadBalancerController) Run() {
194212
go lbc.ingController.Run(lbc.stopCh)
195213
go lbc.nodeController.Run(lbc.stopCh)
196214
go lbc.svcController.Run(lbc.stopCh)
215+
go lbc.podController.Run(lbc.stopCh)
197216
go lbc.ingQueue.run(time.Second, lbc.stopCh)
198217
go lbc.nodeQueue.run(time.Second, lbc.stopCh)
199218
<-lbc.stopCh
@@ -224,8 +243,29 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error {
224243
return nil
225244
}
226245

246+
// storesSynced returns true if all the sub-controllers have finished their
247+
// first sync with apiserver.
248+
func (lbc *LoadBalancerController) storesSynced() bool {
249+
return (
250+
// wait for pods to sync so we don't allocate a default health check when
251+
// an endpoint has a readiness probe.
252+
lbc.podController.HasSynced() &&
253+
// wait for services so we don't thrash on backend creation.
254+
lbc.svcController.HasSynced() &&
255+
// wait for nodes so we don't disconnect a backend from an instance
256+
// group just because we don't realize there are nodes in that zone.
257+
lbc.nodeController.HasSynced() &&
258+
// Wait for ingresses as a safety measure. We don't really need this.
259+
lbc.ingController.HasSynced())
260+
}
261+
227262
// sync manages Ingress create/updates/deletes.
228263
func (lbc *LoadBalancerController) sync(key string) {
264+
if !lbc.hasSynced() {
265+
time.Sleep(storeSyncPollPeriod)
266+
lbc.ingQueue.requeue(key, fmt.Errorf("Waiting for stores to sync"))
267+
return
268+
}
229269
glog.V(3).Infof("Syncing %v", key)
230270

231271
paths, err := lbc.ingLister.List()

ingress/controllers/gce/controller/controller_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func newLoadBalancerController(t *testing.T, cm *fakeClusterManager, masterUrl s
5555
if err != nil {
5656
t.Fatalf("%v", err)
5757
}
58+
lb.hasSynced = func() bool { return true }
5859
return lb
5960
}
6061

ingress/controllers/gce/controller/fakes.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030

3131
const (
3232
testDefaultBeNodePort = int64(3000)
33-
defaultZone = "default-zone"
3433
)
3534

3635
var testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80}
@@ -50,8 +49,13 @@ func NewFakeClusterManager(clusterName string) *fakeClusterManager {
5049
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
5150
fakeHCs := healthchecks.NewFakeHealthChecks()
5251
namer := &utils.Namer{clusterName}
53-
nodePool := instances.NewNodePool(fakeIGs, defaultZone)
52+
53+
nodePool := instances.NewNodePool(fakeIGs)
54+
nodePool.Init(&instances.FakeZoneLister{[]string{"zone-a"}})
55+
5456
healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer)
57+
healthChecker.Init(&healthchecks.FakeHealthCheckGetter{nil})
58+
5559
backendPool := backends.NewBackendPool(
5660
fakeBackends,
5761
healthChecker, nodePool, namer, []int64{}, false)

0 commit comments

Comments
 (0)