kubernetes/staging/src/k8s.io/client-go/tools/cache/reflector.go

const defaultExpectedTypeName

var defaultMinWatchTimeout

type Reflector

func (r *Reflector) Name() string {}

func (r *Reflector) TypeDescription() string {}

type ResourceVersionUpdater

type WatchErrorHandler

// DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
func DefaultWatchErrorHandler(r *Reflector, err error) {}

// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
// The indexer is configured to key on namespace
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}

// NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack
// that is outside this package. See NewReflectorWithOptions for further information.
func NewReflector(lw ListerWatcher, expectedType interface{}

// NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further
// information.
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}

type ReflectorOptions

// NewReflectorWithOptions creates a new Reflector object which will keep the
// given store up to date with the server's contents for the given
// resource. Reflector promises to only put things in the store that
// have the type of expectedType, unless expectedType is nil. If
// resyncPeriod is non-zero, then the reflector will periodically
// consult its ShouldResync function to determine whether to invoke
// the Store's Resync operation; `ShouldResync==nil` means always
// "yes".  This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that
// change.
func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}

func getTypeDescriptionFromObject(expectedType interface{}

func getExpectedGVKFromObject(expectedType interface{}

var internalPackages

// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}

var neverExitWatch

var errorStopRequested

// resyncChan returns a channel which will receive something when a resync is
// required, and a cleanup function.
func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {}

// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}

// startResync periodically calls r.store.Resync() method.
// Note that this method is blocking and should be
// called in a separate goroutine.
func (r *Reflector) startResync(stopCh <-chan struct{}

// watchWithResync runs watch with startResync in the background.
func (r *Reflector) watchWithResync(w watch.Interface, stopCh <-chan struct{}

// watch simply starts a watch request with the server.
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}

// list simply lists all items and records a resource version obtained from the server at the moment of the call.
// the resource version can be used for further progress notification (aka. watch).
func (r *Reflector) list(stopCh <-chan struct{}

// watchList establishes a stream to get a consistent snapshot of data
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
//
// case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a consistent stream with the server.
// That means the returned data is consistent, as if, served directly from etcd via a quorum read.
// It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion.
// It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
//
// case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a stream with the server at the provided resource version.
// To establish the initial state the server begins with synthetic "Added" events.
// It ends with a synthetic "Bookmark" event containing the provided or newer resource version.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
func (r *Reflector) watchList(stopCh <-chan struct{}

// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {}

// handleListWatch consumes events from w, updates the Store, and records the
// last seen ResourceVersion, to allow continuing from that ResourceVersion on
// retry. If successful, the watcher will be left open after receiving the
// initial set of objects, to allow watching for future events.
func handleListWatch(
	start time.Time,
	w watch.Interface,
	store Store,
	expectedType reflect.Type,
	expectedGVK *schema.GroupVersionKind,
	name string,
	expectedTypeName string,
	setLastSyncResourceVersion func(string),
	clock clock.Clock,
	errCh chan error,
	stopCh <-chan struct{}

// handleListWatch consumes events from w, updates the Store, and records the
// last seen ResourceVersion, to allow continuing from that ResourceVersion on
// retry. The watcher will always be stopped on exit.
func handleWatch(
	start time.Time,
	w watch.Interface,
	store Store,
	expectedType reflect.Type,
	expectedGVK *schema.GroupVersionKind,
	name string,
	expectedTypeName string,
	setLastSyncResourceVersion func(string),
	clock clock.Clock,
	errCh chan error,
	stopCh <-chan struct{}

// handleAnyWatch consumes events from w, updates the Store, and records the last
// seen ResourceVersion, to allow continuing from that ResourceVersion on retry.
// If exitOnWatchListBookmarkReceived is true, the watch events will be consumed
// until a bookmark event is received with the WatchList annotation present.
// Returns true (watchListBookmarkReceived) if the WatchList bookmark was
// received, even if exitOnWatchListBookmarkReceived is false.
// The watcher will always be stopped, unless exitOnWatchListBookmarkReceived is
// true and watchListBookmarkReceived is true. This allows the same watch stream
// to be re-used by the caller to continue watching for new events.
func handleAnyWatch(start time.Time,
	w watch.Interface,
	store Store,
	expectedType reflect.Type,
	expectedGVK *schema.GroupVersionKind,
	name string,
	expectedTypeName string,
	setLastSyncResourceVersion func(string),
	exitOnWatchListBookmarkReceived bool,
	clock clock.Clock,
	errCh chan error,
	stopCh <-chan struct{}

// LastSyncResourceVersion is the resource version observed when last sync with the underlying store
// The value returned is not synchronized with access to the underlying store and is not thread-safe
func (r *Reflector) LastSyncResourceVersion() string {}

func (r *Reflector) setLastSyncResourceVersion(v string) {}

// relistResourceVersion determines the resource version the reflector should list or relist from.
// Returns either the lastSyncResourceVersion so that this reflector will relist with a resource
// versions no older than has already been observed in relist results or watch events, or, if the last relist resulted
// in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in
// etcd via a quorum read.
func (r *Reflector) relistResourceVersion() string {}

// rewatchResourceVersion determines the resource version the reflector should start streaming from.
func (r *Reflector) rewatchResourceVersion() string {}

// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
// "expired" or "too large resource version" error.
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {}

func isExpiredError(err error) bool {}

func isTooLargeResourceVersionError(err error) bool {}

// isWatchErrorRetriable determines if it is safe to retry
// a watch error retrieved from the server.
func isWatchErrorRetriable(err error) bool {}

// wrapListFuncWithContext simply wraps ListFunction into another function that accepts a context and ignores it.
func wrapListFuncWithContext(listFn ListFunc) func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {}

type initialEventsEndBookmarkTicker

// newInitialEventsEndBookmarkTicker returns a noop ticker if exitOnInitialEventsEndBookmarkRequested is false.
// Otherwise, it returns a ticker that exposes a method producing a warning if the bookmark event,
// which marks the end of the watch stream, has not been received within the defined tick interval.
//
// Note that the caller controls whether to call t.C() and t.Stop().
//
// In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method.
func newInitialEventsEndBookmarkTicker(name string, c clock.Clock, watchStart time.Time, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker {}

func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker {}

func (t *initialEventsEndBookmarkTicker) observeLastEventTimeStamp(lastEventObserveTime time.Time) {}

func (t *initialEventsEndBookmarkTicker) warnIfExpired() {}

// produceWarningIfExpired returns an error that represents a warning when
// the time elapsed since the last received event exceeds the tickInterval.
//
// Note that this method should be called when t.C() yields a value.
func (t *initialEventsEndBookmarkTicker) produceWarningIfExpired() error {}

var _

type noopTicker

func (t *noopTicker) C() <-chan time.Time {}

func (t *noopTicker) Stop() {}