Skip to content
This repository was archived by the owner on Apr 17, 2019. It is now read-only.

Allocate and remember cluster uid #680

Merged
merged 3 commits into from
May 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hack/verify-flags/known-flags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,4 @@ vrrp-password
watch-namespace
weak-stable-jobs
whitelist-override-label
config-file-path
2 changes: 1 addition & 1 deletion ingress/controllers/gce/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
all: push

# 0.0 shouldn't clobber any released builds
TAG = 0.6.2
TAG = 0.6.3
PREFIX = gcr.io/google_containers/glbc

server:
Expand Down
4 changes: 2 additions & 2 deletions ingress/controllers/gce/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ type Backends struct {
nodePool instances.NodePool
healthChecker healthchecks.HealthChecker
snapshotter storage.Snapshotter
namer utils.Namer
// ignoredPorts are a set of ports excluded from GC, even
// after the Ingress has been deleted. Note that invoking
// a Delete() on these ports will still delete the backend.
ignoredPorts sets.String
namer *utils.Namer
}

func portKey(port int64) string {
Expand All @@ -60,7 +60,7 @@ func NewBackendPool(
cloud BackendServices,
healthChecker healthchecks.HealthChecker,
nodePool instances.NodePool,
namer utils.Namer,
namer *utils.Namer,
ignorePorts []int64,
resyncWithCloud bool) *Backends {

Expand Down
2 changes: 1 addition & 1 deletion ingress/controllers/gce/backends/backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool {
namer := utils.Namer{}
namer := &utils.Namer{}
return NewBackendPool(
f,
healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer),
Expand Down
32 changes: 27 additions & 5 deletions ingress/controllers/gce/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package controller

import (
"fmt"
"io"
"net/http"
"os"
"time"

"k8s.io/contrib/ingress/controllers/gce/backends"
Expand Down Expand Up @@ -66,7 +68,7 @@ const (

// ClusterManager manages cluster resource pools.
type ClusterManager struct {
ClusterNamer utils.Namer
ClusterNamer *utils.Namer
defaultBackendNodePort int64
instancePool instances.NodePool
backendPool backends.BackendPool
Expand Down Expand Up @@ -184,13 +186,13 @@ func defaultInstanceGroupName(clusterName string) string {
return fmt.Sprintf("%v-%v", instanceGroupPrefix, clusterName)
}

func getGCEClient() *gce.GCECloud {
func getGCEClient(config io.Reader) *gce.GCECloud {
// Creating the cloud interface involves resolving the metadata server to get
// an oauth token. If this fails, the token provider assumes it's not on GCE.
// No errors are thrown. So we need to keep retrying till it works because
// we know we're on GCE.
for {
cloudInterface, err := cloudprovider.GetCloudProvider("gce", nil)
cloudInterface, err := cloudprovider.GetCloudProvider("gce", config)
if err == nil {
cloud := cloudInterface.(*gce.GCECloud)

Expand All @@ -217,22 +219,40 @@ func getGCEClient() *gce.GCECloud {
// the kubernetes Service that serves the 404 page if no urls match.
// - defaultHealthCheckPath: is the default path used for L7 health checks, eg: "/healthz"
func NewClusterManager(
configFilePath string,
name string,
defaultBackendNodePort int64,
defaultHealthCheckPath string) (*ClusterManager, error) {

var config *os.File
var err error
if configFilePath != "" {
glog.Infof("Reading config from path %v", configFilePath)
config, err = os.Open(configFilePath)
if err != nil {
return nil, err
}
defer config.Close()
}

// TODO: Make this more resilient. Currently we create the cloud client
// and pass it through to all the pools. This makes unittesting easier.
// However if the cloud client suddenly fails, we should try to re-create it
// and continue.
cloud := getGCEClient()
cloud := getGCEClient(config)
glog.Infof("Successfully loaded cloudprovider using config %q", configFilePath)

cluster := ClusterManager{ClusterNamer: utils.Namer{name}}
// Names are fundamental to the cluster, the uid allocator makes sure names don't collide.
cluster := ClusterManager{ClusterNamer: &utils.Namer{name}}
zone, err := cloud.GetZone()
if err != nil {
return nil, err
}

// NodePool stores GCE vms that are in this Kubernetes cluster.
cluster.instancePool = instances.NewNodePool(cloud, zone.FailureDomain)

// BackendPool creates GCE BackendServices and associated health checks.
healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer)

// TODO: This needs to change to a consolidated management of the default backend.
Expand All @@ -242,6 +262,8 @@ func NewClusterManager(
defaultBackendPool := backends.NewBackendPool(
cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false)
cluster.defaultBackendNodePort = defaultBackendNodePort

// L7 pool creates targetHTTPProxy, ForwardingRules, UrlMaps, StaticIPs.
cluster.l7Pool = loadbalancers.NewLoadBalancerPool(
cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer)
cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer)
Expand Down
2 changes: 1 addition & 1 deletion ingress/controllers/gce/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (lbc *LoadBalancerController) sync(key string) {
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
lbc.recorder.Eventf(&ing, api.EventTypeWarning, "UrlMap", err.Error())
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
} else if lbc.updateIngressStatus(l7, ing); err != nil {
} else if err := lbc.updateIngressStatus(l7, ing); err != nil {
lbc.recorder.Eventf(&ing, api.EventTypeWarning, "Status", err.Error())
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
}
Expand Down
7 changes: 4 additions & 3 deletions ingress/controllers/gce/controller/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ limitations under the License.
package controller

import (
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/sets"

"k8s.io/contrib/ingress/controllers/gce/backends"
"k8s.io/contrib/ingress/controllers/gce/firewalls"
"k8s.io/contrib/ingress/controllers/gce/healthchecks"
"k8s.io/contrib/ingress/controllers/gce/instances"
"k8s.io/contrib/ingress/controllers/gce/loadbalancers"
"k8s.io/contrib/ingress/controllers/gce/utils"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/sets"
)

const (
Expand All @@ -48,7 +49,7 @@ func NewFakeClusterManager(clusterName string) *fakeClusterManager {
fakeBackends := backends.NewFakeBackendServices()
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
fakeHCs := healthchecks.NewFakeHealthChecks()
namer := utils.Namer{clusterName}
namer := &utils.Namer{clusterName}
nodePool := instances.NewNodePool(fakeIGs, defaultZone)
healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer)
backendPool := backends.NewBackendPool(
Expand Down
4 changes: 2 additions & 2 deletions ingress/controllers/gce/firewalls/firewalls.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ const l7SrcRange = "130.211.0.0/22"
// FirewallRules manages firewall rules.
type FirewallRules struct {
cloud Firewall
namer utils.Namer
namer *utils.Namer
srcRange netset.IPNet
}

// NewFirewallPool creates a new firewall rule manager.
// cloud: the cloud object implementing Firewall.
// namer: cluster namer.
func NewFirewallPool(cloud Firewall, namer utils.Namer) SingleFirewallPool {
func NewFirewallPool(cloud Firewall, namer *utils.Namer) SingleFirewallPool {
srcNetSet, err := netset.ParseIPNets(l7SrcRange)
if err != nil {
glog.Fatalf("Could not parse L7 src range %v for firewall rule: %v", l7SrcRange, err)
Expand Down
4 changes: 2 additions & 2 deletions ingress/controllers/gce/healthchecks/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ import (
type HealthChecks struct {
cloud SingleHealthCheck
defaultPath string
namer utils.Namer
namer *utils.Namer
}

// NewHealthChecker creates a new health checker.
// cloud: the cloud object implementing SingleHealthCheck.
// defaultHealthCheckPath: is the HTTP path to use for health checks.
func NewHealthChecker(cloud SingleHealthCheck, defaultHealthCheckPath string, namer utils.Namer) HealthChecker {
func NewHealthChecker(cloud SingleHealthCheck, defaultHealthCheckPath string, namer *utils.Namer) HealthChecker {
return &HealthChecks{cloud, defaultHealthCheckPath, namer}
}

Expand Down
6 changes: 3 additions & 3 deletions ingress/controllers/gce/loadbalancers/loadbalancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type L7s struct {
glbcDefaultBackend *compute.BackendService
defaultBackendPool backends.BackendPool
defaultBackendNodePort int64
namer utils.Namer
namer *utils.Namer
}

// NewLoadBalancerPool returns a new loadbalancer pool.
Expand All @@ -80,7 +80,7 @@ type L7s struct {
func NewLoadBalancerPool(
cloud LoadBalancers,
defaultBackendPool backends.BackendPool,
defaultBackendNodePort int64, namer utils.Namer) LoadBalancerPool {
defaultBackendNodePort int64, namer *utils.Namer) LoadBalancerPool {
return &L7s{cloud, storage.NewInMemoryPool(), nil, defaultBackendPool, defaultBackendNodePort, namer}
}

Expand Down Expand Up @@ -284,7 +284,7 @@ type L7 struct {
// TODO: Expose this to users.
glbcDefaultBackend *compute.BackendService
// namer is used to compute names of the various sub-components of an L7.
namer utils.Namer
namer *utils.Namer
}

func (l *L7) checkUrlMap(backend *compute.BackendService) (err error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool {
fakeBackends := backends.NewFakeBackendServices()
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
fakeHCs := healthchecks.NewFakeHealthChecks()
namer := utils.Namer{}
namer := &utils.Namer{}
healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer)
backendPool := backends.NewBackendPool(
fakeBackends, healthChecker, instances.NewNodePool(fakeIGs, defaultZone), namer, []int64{}, false)
Expand Down
79 changes: 74 additions & 5 deletions ingress/controllers/gce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (

flag "github.com/spf13/pflag"
"k8s.io/contrib/ingress/controllers/gce/controller"
"k8s.io/contrib/ingress/controllers/gce/storage"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
kubectl_util "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/wait"

"github.com/golang/glog"
Expand All @@ -56,7 +58,10 @@ const (
alphaNumericChar = "0"

// Current docker image version. Only used in debug logging.
imageVersion = "glbc:0.6.2"
imageVersion = "glbc:0.6.3"

// Key used to persist UIDs to configmaps.
uidConfigMapName = "ingress-uid"
)

var (
Expand Down Expand Up @@ -105,6 +110,13 @@ var (

verbose = flags.Bool("verbose", false,
`If true, logs are displayed at V(4), otherwise V(2).`)

configFilePath = flags.String("config-file-path", "",
`Path to a file containing the gce config. If left unspecified this
controller only works with default zones.`)

healthzPort = flags.Int("healthz-port", lbApiPort,
`Port to run healthz server. Must match the health check port in yaml.`)
)

func registerHandlers(lbc *controller.LoadBalancerController) {
Expand All @@ -122,7 +134,7 @@ func registerHandlers(lbc *controller.LoadBalancerController) {
lbc.Stop(true)
})

glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", lbApiPort), nil))
glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *healthzPort), nil))
}

func handleSigterm(lbc *controller.LoadBalancerController, deleteAll bool) {
Expand Down Expand Up @@ -156,7 +168,7 @@ func main() {
go_flag.Lookup("logtostderr").Value.Set("true")
go_flag.Set("v", "4")
}
glog.Infof("Starting GLBC image: %v", imageVersion)
glog.Infof("Starting GLBC image: %v, cluster name %v", imageVersion, *clusterName)
if *defaultSvc == "" {
glog.Fatalf("Please specify --default-backend")
}
Expand Down Expand Up @@ -187,8 +199,11 @@ func main() {

if *inCluster || *useRealCloud {
// Create cluster manager
clusterManager, err = controller.NewClusterManager(
*clusterName, defaultBackendNodePort, *healthCheckPath)
name, err := getClusterUID(kubeClient, *clusterName)
if err != nil {
glog.Fatalf("%v", err)
}
clusterManager, err = controller.NewClusterManager(*configFilePath, name, defaultBackendNodePort, *healthCheckPath)
if err != nil {
glog.Fatalf("%v", err)
}
Expand All @@ -215,6 +230,60 @@ func main() {
}
}

// getClusterUID returns the cluster UID. Rules for UID generation:
// If the user specifies a --cluster-uid param it overwrites everything
// else, check UID config map for a previously recorded uid
// else, check if there are any working Ingresses
// - remember that "" is the cluster uid
// else, allocate a new uid
func getClusterUID(kubeClient *client.Client, name string) (string, error) {
cfgVault := storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)
if name != "" {
glog.Infof("Using user provided cluster uid %v", name)
// Don't save the uid in the vault, so users can rollback through
// --cluster-uid=""
return name, nil
}

existingUID, found, err := cfgVault.Get()
if found {
glog.Infof("Using saved cluster uid %q", name)
return existingUID, nil
} else if err != nil {
// This can fail because of:
// 1. No such config map - found=false, err=nil
// 2. No such key in config map - found=false, err=nil
// 3. Apiserver flake - found=false, err!=nil
// It is not safe to proceed in 3.
return "", fmt.Errorf("Failed to retrieve current uid: %v, using %q as name", err, name)
}

// Check if the cluster has an Ingress with ip
ings, err := kubeClient.Extensions().Ingress(api.NamespaceAll).List(api.ListOptions{LabelSelector: labels.Everything()})
if err != nil {
return "", err
}
for _, ing := range ings.Items {
if len(ing.Status.LoadBalancer.Ingress) != 0 {
glog.Infof("Found a working Ingress, assuming uid is empty string")
return "", cfgVault.Put("")
}
}

// Allocate new uid
f, err := os.Open("/dev/urandom")
if err != nil {
return "", err
}
defer f.Close()
b := make([]byte, 8)
if _, err := f.Read(b); err != nil {
return "", err
}
uid := fmt.Sprintf("%x", b)
return uid, cfgVault.Put(uid)
}

// getNodePort waits for the Service, and returns it's first node port.
func getNodePort(client *client.Client, ns, name string) (nodePort int64, err error) {
var svc *api.Service
Expand Down
Loading