const incomingBufSize … const outgoingBufSize … const processEventConcurrency … var defaultWatcherMaxLimit … var fatalOnDecodeError … func init() { … } // TestOnlySetFatalOnDecodeError should only be used for cases where decode errors are expected and need to be tested. e.g. conversion webhooks. func TestOnlySetFatalOnDecodeError(b bool) { … } type watcher … type watchChan … // Watch watches on a key and returns a watch.Interface that transfers relevant notifications. // If rev is zero, it will return the existing object(s) and then start watching from // the maximum revision+1 from returned objects. // If rev is non-zero, it will watch events happened after given revision. // If opts.Recursive is false, it watches on given key. // If opts.Recursive is true, it watches any children and directories under the key, excluding the root key itself. // pred must be non-nil. Only if opts.Predicate matches the change, it will be returned. func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage.ListOptions) (watch.Interface, error) { … } func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan { … } // getStartWatchResourceVersion returns a ResourceVersion // the watch will be started from. // Depending on the input parameters the semantics of the returned ResourceVersion are: // - start at Exact (return resourceVersion) // - start at Most Recent (return an RV from etcd) func (w *watcher) getStartWatchResourceVersion(ctx context.Context, resourceVersion int64, opts storage.ListOptions) (int64, error) { … } // isInitialEventsEndBookmarkRequired since there is no way to directly set // opts.ProgressNotify from the API and the etcd3 impl doesn't support // notification for external clients we simply return initialEventsEndBookmarkRequired // to only send the bookmark event after the initial list call. // // see: https://github.com/kubernetes/kubernetes/issues/120348 func isInitialEventsEndBookmarkRequired(opts storage.ListOptions) bool { … } // areInitialEventsRequired returns true if all events from the etcd should be returned. func areInitialEventsRequired(resourceVersion int64, opts storage.ListOptions) bool { … } type etcdError … type grpcError … func isCancelError(err error) bool { … } func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bool) { … } func (wc *watchChan) Stop() { … } func (wc *watchChan) ResultChan() <-chan watch.Event { … } func (wc *watchChan) RequestWatchProgress() error { … } // sync tries to retrieve existing data and send them to process. // The revision to watch will be set to the revision in response. // All events sent will have isCreated=true func (wc *watchChan) sync() error { … } func logWatchChannelErr(err error) { … } // startWatching does: // - get current objects if initialRev=0; set initialRev to current rev // - watch on given key and send events to process. // // initialEventsEndBookmarkSent helps us keep track // of whether we have sent an annotated bookmark event. // // it's important to note that we don't // need to track the actual RV because // we only send the bookmark event // after the initial list call. // // when this variable is set to false, // it means we don't have any specific // preferences for delivering bookmark events. func (wc *watchChan) startWatching(watchClosedCh chan struct{ … } // processEvents processes events from etcd watcher and sends results to resultChan. func (wc *watchChan) processEvents(wg *sync.WaitGroup) { … } func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) { … } func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) { … } type concurrentOrderedEventProcessing … func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.Context, wg *sync.WaitGroup) { … } func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) { … } func (wc *watchChan) filter(obj runtime.Object) bool { … } func (wc *watchChan) acceptAll() bool { … } // transform transforms an event into a result for user if not filtered. func (wc *watchChan) transform(e *event) (res *watch.Event) { … } func transformErrorToEvent(err error) *watch.Event { … } func (wc *watchChan) sendError(err error) { … } func (wc *watchChan) sendEvent(e *event) { … } func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) { … } func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, rev int64) (_ runtime.Object, err error) { … }