type FullChannelBehavior … const WaitIfChannelFull … const DropIfChannelFull … const incomingQueueLength … type Broadcaster … // NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher. // It is guaranteed that events will be distributed in the order in which they occur, // but the order in which a single event is distributed among all of the watchers is unspecified. func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster { … } // NewLongQueueBroadcaster functions nearly identically to NewBroadcaster, // except that the incoming queue is the same size as the outgoing queues // (specified by queueLength). func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster { … } const internalRunFunctionMarker … type functionFakeRuntimeObject … func (obj functionFakeRuntimeObject) GetObjectKind() schema.ObjectKind { … } func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object { … } // Execute f, blocking the incoming queue (and waiting for it to drain first). // The purpose of this terrible hack is so that watchers added after an event // won't ever see that event, and will always see any event after they are // added. func (m *Broadcaster) blockQueue(f func()) { … } // Watch adds a new watcher to the list and returns an Interface for it. // Note: new watchers will only receive new events. They won't get an entire history // of previous events. It will block until the watcher is actually added to the // broadcaster. func (m *Broadcaster) Watch() (Interface, error) { … } // WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends // queuedEvents down the new watch before beginning to send ordinary events from Broadcaster. // The returned watch will have a queue length that is at least large enough to accommodate // all of the items in queuedEvents. It will block until the watcher is actually added to // the broadcaster. func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) (Interface, error) { … } // stopWatching stops the given watcher and removes it from the list. func (m *Broadcaster) stopWatching(id int64) { … } // closeAll disconnects all watchers (presumably in response to a Shutdown call). func (m *Broadcaster) closeAll() { … } // Action distributes the given event among all watchers. func (m *Broadcaster) Action(action EventType, obj runtime.Object) error { … } // Action distributes the given event among all watchers, or drops it on the floor // if too many incoming actions are queued up. Returns true if the action was sent, // false if dropped. func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) (bool, error) { … } // Shutdown disconnects all watchers (but any queued events will still be distributed). // You must not call Action or Watch* after calling Shutdown. This call blocks // until all events have been distributed through the outbound channels. Note // that since they can be buffered, this means that the watchers might not // have received the data yet as it can remain sitting in the buffered // channel. It will block until the broadcaster stop request is actually executed func (m *Broadcaster) Shutdown() { … } // loop receives from m.incoming and distributes to all watchers. func (m *Broadcaster) loop() { … } // distribute sends event to all watchers. Blocking. func (m *Broadcaster) distribute(event Event) { … } type broadcasterWatcher … // ResultChan returns a channel to use for waiting on events. func (mw *broadcasterWatcher) ResultChan() <-chan Event { … } // Stop stops watching and removes mw from its list. // It will block until the watcher stop request is actually executed func (mw *broadcasterWatcher) Stop() { … }