type DeltaFIFOOptions … type DeltaFIFO … type TransformFunc … type DeltaType … const Added … const Updated … const Deleted … const Replaced … const Sync … type Delta … type Deltas … // NewDeltaFIFO returns a Queue which can be used to process changes to items. // // keyFunc is used to figure out what key an object should have. (It is // exposed in the returned DeltaFIFO's KeyOf() method, with additional handling // around deleted objects and queue state). // // 'knownObjects' may be supplied to modify the behavior of Delete, // Replace, and Resync. It may be nil if you do not need those // modifications. // // TODO: consider merging keyLister with this object, tracking a list of // "known" keys when Pop() is called. Have to think about how that // affects error retrying. // // NOTE: It is possible to misuse this and cause a race when using an // external known object source. // Whether there is a potential race depends on how the consumer // modifies knownObjects. In Pop(), process function is called under // lock, so it is safe to update data structures in it that need to be // in sync with the queue (e.g. knownObjects). // // Example: // In case of sharedIndexInformer being a consumer // (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L192), // there is no race as knownObjects (s.indexer) is modified safely // under DeltaFIFO's lock. The only exceptions are GetStore() and // GetIndexer() methods, which expose ways to modify the underlying // storage. Currently these two methods are used for creating Lister // and internal tests. // // Also see the comment on DeltaFIFO. // // Warning: This constructs a DeltaFIFO that does not differentiate between // events caused by a call to Replace (e.g., from a relist, which may // contain object updates), and synthetic events caused by a periodic resync // (which just emit the existing object). See https://issue.k8s.io/86015 for details. // // Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})` // instead to receive a `Replaced` event depending on the type. // // Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects}) func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { … } // NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to // items. See also the comment on DeltaFIFO. func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { … } var _ … var ErrZeroLengthDeltasObject … // Close the queue. func (f *DeltaFIFO) Close() { … } // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or // DeletedFinalStateUnknown objects. func (f *DeltaFIFO) KeyOf(obj interface{ … } // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, // or the first batch of items inserted by Replace() has been popped. func (f *DeltaFIFO) HasSynced() bool { … } func (f *DeltaFIFO) hasSynced_locked() bool { … } // Add inserts an item, and puts it in the queue. The item is only enqueued // if it doesn't already exist in the set. func (f *DeltaFIFO) Add(obj interface{ … } // Update is just like Add, but makes an Updated Delta. func (f *DeltaFIFO) Update(obj interface{ … } // Delete is just like Add, but makes a Deleted Delta. If the given // object does not already exist, it will be ignored. (It may have // already been deleted by a Replace (re-list), for example.) In this // method `f.knownObjects`, if not nil, provides (via GetByKey) // _additional_ objects that are considered to already exist. func (f *DeltaFIFO) Delete(obj interface{ … } // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already // present in the set, it is neither enqueued nor added to the set. // // This is useful in a single producer/consumer scenario so that the consumer can // safely retry items without contending with the producer and potentially enqueueing // stale items. // // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is // different from the Add/Update/Delete functions. func (f *DeltaFIFO) AddIfNotPresent(obj interface{ … } // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller // already holds the fifo lock. func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { … } // re-listing and watching can deliver the same update multiple times in any // order. This will combine the most recent two deltas if they are the same. func dedupDeltas(deltas Deltas) Deltas { … } // If a & b represent the same event, returns the delta that ought to be kept. // Otherwise, returns nil. // TODO: is there anything other than deletions that need deduping? func isDup(a, b *Delta) *Delta { … } // keep the one with the most information if both are deletions. func isDeletionDup(a, b *Delta) *Delta { … } // queueActionLocked appends to the delta list for the object. // Caller must lock first. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{ … } // queueActionInternalLocked appends to the delta list for the object. // The actionType is emitted and must honor emitDeltaTypeReplaced. // The internalActionType is only used within this function and must // ignore emitDeltaTypeReplaced. // Caller must lock first. func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType DeltaType, obj interface{ … } // List returns a list of all the items; it returns the object // from the most recent Delta. // You should treat the items returned inside the deltas as immutable. func (f *DeltaFIFO) List() []interface{ … } func (f *DeltaFIFO) listLocked() []interface{ … } // ListKeys returns a list of all the keys of the objects currently // in the FIFO. func (f *DeltaFIFO) ListKeys() []string { … } // Get returns the complete list of deltas for the requested item, // or sets exists=false. // You should treat the items returned inside the deltas as immutable. func (f *DeltaFIFO) Get(obj interface{ … } // GetByKey returns the complete list of deltas for the requested item, // setting exists=false if that list is empty. // You should treat the items returned inside the deltas as immutable. func (f *DeltaFIFO) GetByKey(key string) (item interface{ … } // IsClosed checks if the queue is closed func (f *DeltaFIFO) IsClosed() bool { … } // Pop blocks until the queue has some items, and then returns one. If // multiple items are ready, they are returned in the order in which they were // added/updated. The item is removed from the queue (and the store) before it // is returned, so if you don't successfully process it, you need to add it back // with AddIfNotPresent(). // process function is called under lock, so it is safe to update data structures // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc // may return an instance of ErrRequeue with a nested error to indicate the current // item should be requeued (equivalent to calling AddIfNotPresent under the lock). // process should avoid expensive I/O operation so that other queue operations, i.e. // Add() and Get(), won't be blocked for too long. // // Pop returns a 'Deltas', which has a complete list of all the things // that happened to the object (deltas) while it was sitting in the queue. func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{ … } // Replace atomically does two things: (1) it adds the given objects // using the Sync or Replace DeltaType and then (2) it does some deletions. // In particular: for every pre-existing key K that is not the key of // an object in `list` there is the effect of // `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known // object of K. The pre-existing keys are those in the union set of the keys in // `f.items` and `f.knownObjects` (if not nil). The last known object for key K is // the one present in the last delta in `f.items`. If there is no delta for K // in `f.items`, it is the object in `f.knownObjects` func (f *DeltaFIFO) Replace(list []interface{ … } // Resync adds, with a Sync type of Delta, every object listed by // `f.knownObjects` whose key is not already queued for processing. // If `f.knownObjects` is `nil` then Resync does nothing. func (f *DeltaFIFO) Resync() error { … } func (f *DeltaFIFO) syncKeyLocked(key string) error { … } type KeyListerGetter … type KeyLister … type KeyGetter … // Oldest is a convenience function that returns the oldest delta, or // nil if there are no deltas. func (d Deltas) Oldest() *Delta { … } // Newest is a convenience function that returns the newest delta, or // nil if there are no deltas. func (d Deltas) Newest() *Delta { … } // copyDeltas returns a shallow copy of d; that is, it copies the slice but not // the objects in the slice. This allows Get/List to return an object that we // know won't be clobbered by a subsequent modifications. func copyDeltas(d Deltas) Deltas { … } type DeletedFinalStateUnknown …