const maxRetries … const maxCapacity … const truncated … // NewEndpointController returns a new *Controller. func NewEndpointController(ctx context.Context, podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller { … } type Controller … // Run will not return until stopCh is closed. workers determines how many // endpoints will be handled in parallel. func (e *Controller) Run(ctx context.Context, workers int) { … } // When a pod is added, figure out what services it will be a member of and // enqueue them. obj must have *v1.Pod type. func (e *Controller) addPod(obj interface{ … } func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) { … } // When a pod is updated, figure out what services it used to be a member of // and what services it will be a member of, and enqueue the union of these. // old and cur must be *v1.Pod types. func (e *Controller) updatePod(old, cur interface{ … } // When a pod is deleted, enqueue the services the pod used to be a member of. // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item. func (e *Controller) deletePod(obj interface{ … } // onServiceUpdate updates the Service Selector in the cache and queues the Service for processing. func (e *Controller) onServiceUpdate(obj interface{ … } // onServiceDelete removes the Service Selector from the cache and queues the Service for processing. func (e *Controller) onServiceDelete(obj interface{ … } func (e *Controller) onEndpointsDelete(obj interface{ … } // worker runs a worker thread that just dequeues items, processes them, and // marks them done. You may run as many of these in parallel as you wish; the // workqueue guarantees that they will not end up processing the same service // at the same time. func (e *Controller) worker(ctx context.Context) { … } func (e *Controller) processNextWorkItem(ctx context.Context) bool { … } func (e *Controller) handleErr(logger klog.Logger, err error, key string) { … } func (e *Controller) syncService(ctx context.Context, key string) error { … } // checkLeftoverEndpoints lists all currently existing endpoints and adds their // service to the queue. This will detect endpoints that exist with no // corresponding service; these endpoints need to be deleted. We only need to // do this once on startup, because in steady-state these are detected (but // some stragglers could have been left behind if the endpoint controller // reboots). func (e *Controller) checkLeftoverEndpoints() { … } // addEndpointSubset add the endpoints addresses and ports to the EndpointSubset. // The addresses are added to the corresponding field, ready or not ready, depending // on the pod status and the Service PublishNotReadyAddresses field value. // The pod passed to this function must have already been filtered through ShouldPodBeInEndpoints. func addEndpointSubset(logger klog.Logger, subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress, epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) { … } func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.EndpointPort { … } // capacityAnnotationSetCorrectly returns false if number of endpoints is greater than maxCapacity or // returns true if underCapacity and the annotation is not set. func capacityAnnotationSetCorrectly(annotations map[string]string, subsets []v1.EndpointSubset) bool { … } // truncateEndpoints by best effort will distribute the endpoints over the subsets based on the proportion // of endpoints per subset and will prioritize Ready Endpoints over NotReady Endpoints. func truncateEndpoints(endpoints *v1.Endpoints) bool { … } // addressSubset takes a list of addresses and returns a subset if the length is greater // than the maxNum. If less than the maxNum, the entire list is returned. func addressSubset(addresses []v1.EndpointAddress, maxNum int) []v1.EndpointAddress { … } var semanticIgnoreResourceVersion … // endpointSubsetsEqualIgnoreResourceVersion returns true if EndpointSubsets // have equal attributes but excludes ResourceVersion of Pod. func endpointSubsetsEqualIgnoreResourceVersion(subsets1, subsets2 []v1.EndpointSubset) bool { … }