kubernetes/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go

const incomingBufSize

const outgoingBufSize

const processEventConcurrency

var defaultWatcherMaxLimit

var fatalOnDecodeError

func init() {}

// TestOnlySetFatalOnDecodeError should only be used for cases where decode errors are expected and need to be tested. e.g. conversion webhooks.
func TestOnlySetFatalOnDecodeError(b bool) {}

type watcher

type watchChan

// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
// If rev is zero, it will return the existing object(s) and then start watching from
// the maximum revision+1 from returned objects.
// If rev is non-zero, it will watch events happened after given revision.
// If opts.Recursive is false, it watches on given key.
// If opts.Recursive is true, it watches any children and directories under the key, excluding the root key itself.
// pred must be non-nil. Only if opts.Predicate matches the change, it will be returned.
func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage.ListOptions) (watch.Interface, error) {}

func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan {}

// getStartWatchResourceVersion returns a ResourceVersion
// the watch will be started from.
// Depending on the input parameters the semantics of the returned ResourceVersion are:
//   - start at Exact (return resourceVersion)
//   - start at Most Recent (return an RV from etcd)
func (w *watcher) getStartWatchResourceVersion(ctx context.Context, resourceVersion int64, opts storage.ListOptions) (int64, error) {}

// isInitialEventsEndBookmarkRequired since there is no way to directly set
// opts.ProgressNotify from the API and the etcd3 impl doesn't support
// notification for external clients we simply return initialEventsEndBookmarkRequired
// to only send the bookmark event after the initial list call.
//
// see: https://github.com/kubernetes/kubernetes/issues/120348
func isInitialEventsEndBookmarkRequired(opts storage.ListOptions) bool {}

// areInitialEventsRequired returns true if all events from the etcd should be returned.
func areInitialEventsRequired(resourceVersion int64, opts storage.ListOptions) bool {}

type etcdError

type grpcError

func isCancelError(err error) bool {}

func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bool) {}

func (wc *watchChan) Stop() {}

func (wc *watchChan) ResultChan() <-chan watch.Event {}

func (wc *watchChan) RequestWatchProgress() error {}

// sync tries to retrieve existing data and send them to process.
// The revision to watch will be set to the revision in response.
// All events sent will have isCreated=true
func (wc *watchChan) sync() error {}

func logWatchChannelErr(err error) {}

// startWatching does:
// - get current objects if initialRev=0; set initialRev to current rev
// - watch on given key and send events to process.
//
// initialEventsEndBookmarkSent helps us keep track
// of whether we have sent an annotated bookmark event.
//
// it's important to note that we don't
// need to track the actual RV because
// we only send the bookmark event
// after the initial list call.
//
// when this variable is set to false,
// it means we don't have any specific
// preferences for delivering bookmark events.
func (wc *watchChan) startWatching(watchClosedCh chan struct{}

// processEvents processes events from etcd watcher and sends results to resultChan.
func (wc *watchChan) processEvents(wg *sync.WaitGroup) {}

func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {}

func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {}

type concurrentOrderedEventProcessing

func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.Context, wg *sync.WaitGroup) {}

func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) {}

func (wc *watchChan) filter(obj runtime.Object) bool {}

func (wc *watchChan) acceptAll() bool {}

// transform transforms an event into a result for user if not filtered.
func (wc *watchChan) transform(e *event) (res *watch.Event) {}

func transformErrorToEvent(err error) *watch.Event {}

func (wc *watchChan) sendError(err error) {}

func (wc *watchChan) sendEvent(e *event) {}

func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {}

func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, rev int64) (_ runtime.Object, err error) {}