-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add inventory support #1026
base: main
Are you sure you want to change the base?
Add inventory support #1026
Conversation
6de446c
to
c2a85a1
Compare
cmd/main.go
Outdated
@@ -281,6 +288,8 @@ func startServiceController(mgr manager.Manager, nsxClient *nsx.Client) { | |||
networkpolicycontroller.StartNetworkPolicyController(mgr, commonService, vpcService) | |||
service.StartServiceLbController(mgr, commonService) | |||
subnetbindingcontroller.StartSubnetBindingController(mgr, subnetService, subnetBindingService) | |||
inventoryControll := inventory.NewInventoryController(mgr.GetClient(), inventoryService, cf) | |||
inventoryControll.SetupWithManager(mgr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better comply with StartXXController?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
log.Error(err, "Create pod Informer error") | ||
} | ||
|
||
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unhandled error
package inventory | ||
|
||
import ( | ||
"context" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imports are not sorted
return nil | ||
} | ||
|
||
func (c *InventoryController) handlePod(obj interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some receivers are c, and others are r, could you unify them?
case cache.DeletedFinalStateUnknown: | ||
pod, ok = obj1.Obj.(*v1.Pod) | ||
if !ok { | ||
log.Error(errors.New("Unknown obj"), "DeletedFinalStateUnknown Obj is not *v1.Pod") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use fmt.Errorof as others modules.
} | ||
|
||
const ( | ||
operation_create = "CREATE" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use camel case instead of snake case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
operation_delete = "DELETE" | ||
operation_none = "NONE" | ||
|
||
INVENTORY_STATUS_UP = "UP" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
This implementation is quite similar to the existing controllers. We can continue using Kubebuilder's Reconcile pattern if we convert the Kubernetes objects into inventory request queue items, which should not lead to any significant issues.
|
Consider replacing the inventoryObjectQueue with a channel? I think this scenario is highly suited for the producer-consumer pattern. |
log.Info("Send update to NSX clusterId ", "ContainerInventoryData", s.requestBuffer) | ||
// TODO, check the context.TODO() be replaced by InventoryClient related todo | ||
resp, err := s.NSXClient.InventoryClient.ContainerInventoryApi.AddContainerInventoryUpdateUpdates(context.TODO(), s.NSXConfig.Cluster, containerinventory.ContainerInventoryData{ContainerInventoryObjects: s.requestBuffer}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder AddContainerInventoryUpdateUpdates if the batch operation request is asynchronous,
- How do you know if the process is successful or not?
- If failed, after updateInventoryStore, how do you guarantee the consistency of the local store and NSX side?
return | ||
} | ||
|
||
func (s *InventoryService) compareAndMergeUpdate(pre interface{}, cur interface{}) (string, map[string]interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is too large, I think we can also comply with builder and compare pattern.
Splitting it into small files is good for scaling.
return nil | ||
} | ||
|
||
func (r *InventoryController) handlePodEvent(event watch.Event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plan where to use it?
switch obj1 := obj.(type) { | ||
case *v1.Pod: | ||
pod = obj1 | ||
case cache.DeletedFinalStateUnknown: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does DeletedFinalStateUnknown mean, why other controllers don't have it?
} | ||
} | ||
log.V(1).Info("Inventory processing Pod", "namespace", pod.Namespace, "name", pod.Name) | ||
key, _ := keyFunc(pod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here keyFunc is cache.DeletionHandlingMetaNamespaceKeyFunc, why?
} | ||
|
||
func (c *InventoryController) processNextInventoryWorkItem() bool { | ||
key, quit := c.inventoryObjectQueue.Get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If use chan here, we don't need lock any more.
|
||
case ContainerApplicationInstance: | ||
pod := &corev1.Pod{} | ||
err := s.Client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, pod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you keep a inventoryObjectQueue which contains only keys, any by client.get to check if we delete or add.
Why not construct a key and operation(delete/update/create) object into queue at handlePod, then we don't need to Get here again.
Client client.Client | ||
service *inventory.InventoryService | ||
inventoryObjectQueue workqueue.TypedRateLimitingInterface[any] | ||
keyBuffer inventory.KeySet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think keyBuffer is redundant, inventoryObjectQueue is enough, if you use channel, you don't need to get obj from key again at all, The pseudocode is as follows
func (b *BatchProcessor) Run(stopCh <-chan struct{}) {
batch := make([]NSXOperation, 0, batchSize)
ticker := time.NewTicker(flushInterval)
for {
select {
case op := <-b.operations:
batch = append(batch, op)
if len(batch) >= batchSize {
b.processBatch(batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
b.processBatch(batch)
batch = batch[:0]
}
case <-stopCh:
return
}
}
}
To summarize, I think we can maintain two channels, one is of inventoryObjectQueue, another is of ContainerInventoryObject, We can try to postpone process batching as much as possible.
This method has several benefits:
|
1a8213c
to
4b6586e
Compare
/e2e |
4 similar comments
/e2e |
/e2e |
/e2e |
/e2e |
/e2e |
1 similar comment
/e2e |
/e2e |
Add inventory controller, service. Add ApplicationInstanceStore, ClusteStore Test Done 1. inventory cluster could be created 2. pod inventory could be reported
No description provided.