const defaultExpectedTypeName … var defaultMinWatchTimeout … type Reflector … func (r *Reflector) Name() string { … } func (r *Reflector) TypeDescription() string { … } type ResourceVersionUpdater … type WatchErrorHandler … // DefaultWatchErrorHandler is the default implementation of WatchErrorHandler func DefaultWatchErrorHandler(r *Reflector, err error) { … } // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector // The indexer is configured to key on namespace func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{ … } // NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack // that is outside this package. See NewReflectorWithOptions for further information. func NewReflector(lw ListerWatcher, expectedType interface{ … } // NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further // information. func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{ … } type ReflectorOptions … // NewReflectorWithOptions creates a new Reflector object which will keep the // given store up to date with the server's contents for the given // resource. Reflector promises to only put things in the store that // have the type of expectedType, unless expectedType is nil. If // resyncPeriod is non-zero, then the reflector will periodically // consult its ShouldResync function to determine whether to invoke // the Store's Resync operation; `ShouldResync==nil` means always // "yes". This enables you to use reflectors to periodically process // everything as well as incrementally processing the things that // change. func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{ … } func getTypeDescriptionFromObject(expectedType interface{ … } func getExpectedGVKFromObject(expectedType interface{ … } var internalPackages … // Run repeatedly uses the reflector's ListAndWatch to fetch all the // objects and subsequent deltas. // Run will exit when stopCh is closed. func (r *Reflector) Run(stopCh <-chan struct{ … } var neverExitWatch … var errorStopRequested … // resyncChan returns a channel which will receive something when a resync is // required, and a cleanup function. func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { … } // ListAndWatch first lists all items and get the resource version at the moment of call, // and then use the resource version to watch. // It returns error if ListAndWatch didn't even try to initialize watch. func (r *Reflector) ListAndWatch(stopCh <-chan struct{ … } // startResync periodically calls r.store.Resync() method. // Note that this method is blocking and should be // called in a separate goroutine. func (r *Reflector) startResync(stopCh <-chan struct{ … } // watchWithResync runs watch with startResync in the background. func (r *Reflector) watchWithResync(w watch.Interface, stopCh <-chan struct{ … } // watch simply starts a watch request with the server. func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{ … } // list simply lists all items and records a resource version obtained from the server at the moment of the call. // the resource version can be used for further progress notification (aka. watch). func (r *Reflector) list(stopCh <-chan struct{ … } // watchList establishes a stream to get a consistent snapshot of data // from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal // // case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan) // Establishes a consistent stream with the server. // That means the returned data is consistent, as if, served directly from etcd via a quorum read. // It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion. // It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion. // After receiving a "Bookmark" event the reflector is considered to be synchronized. // It replaces its internal store with the collected items and // reuses the current watch requests for getting further events. // // case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan) // Establishes a stream with the server at the provided resource version. // To establish the initial state the server begins with synthetic "Added" events. // It ends with a synthetic "Bookmark" event containing the provided or newer resource version. // After receiving a "Bookmark" event the reflector is considered to be synchronized. // It replaces its internal store with the collected items and // reuses the current watch requests for getting further events. func (r *Reflector) watchList(stopCh <-chan struct{ … } // syncWith replaces the store's items with the given list. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { … } // handleListWatch consumes events from w, updates the Store, and records the // last seen ResourceVersion, to allow continuing from that ResourceVersion on // retry. If successful, the watcher will be left open after receiving the // initial set of objects, to allow watching for future events. func handleListWatch( start time.Time, w watch.Interface, store Store, expectedType reflect.Type, expectedGVK *schema.GroupVersionKind, name string, expectedTypeName string, setLastSyncResourceVersion func(string), clock clock.Clock, errCh chan error, stopCh <-chan struct{ … } // handleListWatch consumes events from w, updates the Store, and records the // last seen ResourceVersion, to allow continuing from that ResourceVersion on // retry. The watcher will always be stopped on exit. func handleWatch( start time.Time, w watch.Interface, store Store, expectedType reflect.Type, expectedGVK *schema.GroupVersionKind, name string, expectedTypeName string, setLastSyncResourceVersion func(string), clock clock.Clock, errCh chan error, stopCh <-chan struct{ … } // handleAnyWatch consumes events from w, updates the Store, and records the last // seen ResourceVersion, to allow continuing from that ResourceVersion on retry. // If exitOnWatchListBookmarkReceived is true, the watch events will be consumed // until a bookmark event is received with the WatchList annotation present. // Returns true (watchListBookmarkReceived) if the WatchList bookmark was // received, even if exitOnWatchListBookmarkReceived is false. // The watcher will always be stopped, unless exitOnWatchListBookmarkReceived is // true and watchListBookmarkReceived is true. This allows the same watch stream // to be re-used by the caller to continue watching for new events. func handleAnyWatch(start time.Time, w watch.Interface, store Store, expectedType reflect.Type, expectedGVK *schema.GroupVersionKind, name string, expectedTypeName string, setLastSyncResourceVersion func(string), exitOnWatchListBookmarkReceived bool, clock clock.Clock, errCh chan error, stopCh <-chan struct{ … } // LastSyncResourceVersion is the resource version observed when last sync with the underlying store // The value returned is not synchronized with access to the underlying store and is not thread-safe func (r *Reflector) LastSyncResourceVersion() string { … } func (r *Reflector) setLastSyncResourceVersion(v string) { … } // relistResourceVersion determines the resource version the reflector should list or relist from. // Returns either the lastSyncResourceVersion so that this reflector will relist with a resource // versions no older than has already been observed in relist results or watch events, or, if the last relist resulted // in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in // etcd via a quorum read. func (r *Reflector) relistResourceVersion() string { … } // rewatchResourceVersion determines the resource version the reflector should start streaming from. func (r *Reflector) rewatchResourceVersion() string { … } // setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned // "expired" or "too large resource version" error. func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) { … } func isExpiredError(err error) bool { … } func isTooLargeResourceVersionError(err error) bool { … } // isWatchErrorRetriable determines if it is safe to retry // a watch error retrieved from the server. func isWatchErrorRetriable(err error) bool { … } // wrapListFuncWithContext simply wraps ListFunction into another function that accepts a context and ignores it. func wrapListFuncWithContext(listFn ListFunc) func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { … } type initialEventsEndBookmarkTicker … // newInitialEventsEndBookmarkTicker returns a noop ticker if exitOnInitialEventsEndBookmarkRequested is false. // Otherwise, it returns a ticker that exposes a method producing a warning if the bookmark event, // which marks the end of the watch stream, has not been received within the defined tick interval. // // Note that the caller controls whether to call t.C() and t.Stop(). // // In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method. func newInitialEventsEndBookmarkTicker(name string, c clock.Clock, watchStart time.Time, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { … } func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { … } func (t *initialEventsEndBookmarkTicker) observeLastEventTimeStamp(lastEventObserveTime time.Time) { … } func (t *initialEventsEndBookmarkTicker) warnIfExpired() { … } // produceWarningIfExpired returns an error that represents a warning when // the time elapsed since the last received event exceeds the tickInterval. // // Note that this method should be called when t.C() yields a value. func (t *initialEventsEndBookmarkTicker) produceWarningIfExpired() error { … } var _ … type noopTicker … func (t *noopTicker) C() <-chan time.Time { … } func (t *noopTicker) Stop() { … }