const maxLruCacheEntries … const defaultAggregateMaxEvents … const defaultAggregateIntervalInSeconds … const defaultSpamBurst … const defaultSpamQPS … // getEventKey builds unique event key based on source, involvedObject, reason, message func getEventKey(event *v1.Event) string { … } // getSpamKey builds unique event key based on source, involvedObject func getSpamKey(event *v1.Event) string { … } type EventSpamKeyFunc … type EventFilterFunc … type EventSourceObjectSpamFilter … // NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill. func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.PassiveClock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter { … } type spamRecord … // Filter controls that a given source+object are not exceeding the allowed rate. func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool { … } type EventAggregatorKeyFunc … // EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, event.Type, // event.Reason, event.ReportingController and event.ReportingInstance func EventAggregatorByReasonFunc(event *v1.Event) (string, string) { … } type EventAggregatorMessageFunc … // EventAggregatorByReasonMessageFunc returns an aggregate message by prefixing the incoming message func EventAggregatorByReasonMessageFunc(event *v1.Event) string { … } type EventAggregator … // NewEventAggregator returns a new instance of an EventAggregator func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc, maxEvents int, maxIntervalInSeconds int, clock clock.PassiveClock) *EventAggregator { … } type aggregateRecord … // EventAggregate checks if a similar event has been seen according to the // aggregation configuration (max events, max interval, etc) and returns: // // - The (potentially modified) event that should be created // - The cache key for the event, for correlation purposes. This will be set to // the full key for normal events, and to the result of // EventAggregatorMessageFunc for aggregate events. func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) { … } type eventLog … type eventLogger … // newEventLogger observes events and counts their frequencies func newEventLogger(lruCacheEntries int, clock clock.PassiveClock) *eventLogger { … } // eventObserve records an event, or updates an existing one if key is a cache hit func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) { … } // updateState updates its internal tracking information based on latest server state func (e *eventLogger) updateState(event *v1.Event) { … } // lastEventObservationFromCache returns the event from the cache, reads must be protected via external lock func (e *eventLogger) lastEventObservationFromCache(key string) eventLog { … } type EventCorrelator … type EventCorrelateResult … // NewEventCorrelator returns an EventCorrelator configured with default values. // // The EventCorrelator is responsible for event filtering, aggregating, and counting // prior to interacting with the API server to record the event. // // The default behavior is as follows: // - Aggregation is performed if a similar event is recorded 10 times // in a 10 minute rolling interval. A similar event is an event that varies only by // the Event.Message field. Rather than recording the precise event, aggregation // will create a new event whose message reports that it has combined events with // the same reason. // - Events are incrementally counted if the exact same event is encountered multiple // times. // - A source may burst 25 events about an object, but has a refill rate budget // per object of 1 event every 5 minutes to control long-tail of spam. func NewEventCorrelator(clock clock.PassiveClock) *EventCorrelator { … } func NewEventCorrelatorWithOptions(options CorrelatorOptions) *EventCorrelator { … } // populateDefaults populates the zero value options with defaults func populateDefaults(options CorrelatorOptions) CorrelatorOptions { … } // EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) { … } // UpdateState based on the latest observed state from server func (c *EventCorrelator) UpdateState(event *v1.Event) { … }