type PodConfigNotificationMode … const PodConfigNotificationUnknown … const PodConfigNotificationSnapshot … const PodConfigNotificationSnapshotAndUpdates … const PodConfigNotificationIncremental … type podStartupSLIObserver … type PodConfig … // NewPodConfig creates an object that can merge many configuration sources into a stream // of normalized updates to a pod configuration. func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *PodConfig { … } // Channel creates or returns a config source channel. The channel // only accepts PodUpdates func (c *PodConfig) Channel(ctx context.Context, source string) chan<- interface{ … } // SeenAllSources returns true if seenSources contains all sources in the // config, and also this config has received a SET message from each source. func (c *PodConfig) SeenAllSources(seenSources sets.Set[string]) bool { … } // Updates returns a channel of updates to the configuration, properly denormalized. func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate { … } // Sync requests the full configuration be delivered to the update channel. func (c *PodConfig) Sync() { … } type podStorage … // TODO: PodConfigNotificationMode could be handled by a listener to the updates channel // in the future, especially with multiple listeners. // TODO: allow initialization of the current state of the store with snapshotted version. func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *podStorage { … } // Merge normalizes a set of incoming changes from different sources into a map of all Pods // and ensures that redundant changes are filtered out, and then pushes zero or more minimal // updates onto the update channel. Ensures that updates are delivered in order. func (s *podStorage) Merge(source string, change interface{ … } func (s *podStorage) merge(source string, change interface{ … } func (s *podStorage) markSourceSet(source string) { … } func (s *podStorage) seenSources(sources ...string) bool { … } func filterInvalidPods(pods []*v1.Pod, source string, recorder record.EventRecorder) (filtered []*v1.Pod) { … } var localAnnotations … func isLocalAnnotationKey(key string) bool { … } // isAnnotationMapEqual returns true if the existing annotation Map is equal to candidate except // for local annotations. func isAnnotationMapEqual(existingMap, candidateMap map[string]string) bool { … } // recordFirstSeenTime records the first seen time of this pod. func recordFirstSeenTime(pod *v1.Pod) { … } // updateAnnotations returns an Annotation map containing the api annotation map plus // locally managed annotations func updateAnnotations(existing, ref *v1.Pod) { … } func podsDifferSemantically(existing, ref *v1.Pod) bool { … } // checkAndUpdatePod updates existing, and: // - if ref makes a meaningful change, returns needUpdate=true // - if ref makes a meaningful change, and this change is graceful deletion, returns needGracefulDelete=true // - if ref makes no meaningful change, but changes the pod status, returns needReconcile=true // - else return all false // Now, needUpdate, needGracefulDelete and needReconcile should never be both true func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGracefulDelete bool) { … } // sync sends a copy of the current state through the update channel. func (s *podStorage) sync() { … } func (s *podStorage) mergedState() interface{ … } func copyPods(sourcePods []*v1.Pod) []*v1.Pod { … }