const timeFmt … const priorityLevelMaxSeatsPercent … const borrowingAdjustmentPeriod … const seatDemandSmoothingCoefficient … type StartFunction … type RequestDigest … type configController … type updateAttempt … type priorityLevelState … type seatDemandStats … func (stats *seatDemandStats) update(obs fq.IntegratorResults) { … } // NewTestableController is extra flexible to facilitate testing func newTestableController(config TestableConfig) *configController { … } func (cfgCtlr *configController) Run(stopCh <-chan struct{ … } func (cfgCtlr *configController) updateBorrowing() { … } func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plStates map[string]*priorityLevelState) { … } // runWorker is the logic of the one and only worker goroutine. We // limit the number to one in order to obviate explicit // synchronization around access to `cfgCtlr.mostRecentUpdates`. func (cfgCtlr *configController) runWorker() { … } // processNextWorkItem works on one entry from the work queue. // Only invoke this in the one and only worker goroutine. func (cfgCtlr *configController) processNextWorkItem() bool { … } // syncOne does one full synchronization. It reads all the API // objects that configure API Priority and Fairness and updates the // local configController accordingly. // Only invoke this in the one and only worker goroutine func (cfgCtlr *configController) syncOne() (specificDelay time.Duration, err error) { … } type cfgMeal … type fsStatusUpdate … // digestConfigObjects is given all the API objects that configure // cfgCtlr and writes its consequent new configState. // Only invoke this in the one and only worker goroutine func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) (time.Duration, error) { … } func apply(client flowcontrolclient.FlowSchemaInterface, fsu fsStatusUpdate, asFieldManager string) error { … } func toFlowSchemaApplyConfiguration(fsUpdate fsStatusUpdate) *flowcontrolapplyconfiguration.FlowSchemaApplyConfiguration { … } // shouldDelayUpdate checks to see if a flowschema has been updated too often and returns true if a delay is needed. // Only invoke this in the one and only worker goroutine func (cfgCtlr *configController) shouldDelayUpdate(flowSchemaName string) bool { … } // addUpdateResult adds the result. It isn't a ring buffer because // this is small and rate limited. // Only invoke this in the one and only worker goroutine func (cfgCtlr *configController) addUpdateResult(result updateAttempt) { … } func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) []fsStatusUpdate { … } // Digest the new set of PriorityLevelConfiguration objects. // Pretend broken ones do not exist. func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfiguration) { … } // Digest the given FlowSchema objects. Ones that reference a missing // or broken priority level are not to be passed on to the filter for // use. We do this before holding over old priority levels so that // requests stop going to those levels and FlowSchemaStatus values // reflect this. This function also adds any missing mandatory // FlowSchema objects. The given objects must all have distinct // names. func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*flowcontrol.FlowSchema) { … } // Consider all the priority levels in the previous configuration. // Keep the ones that are in the new config, supply mandatory // behavior, or are still busy; for the rest: drop it if it has no // queues, otherwise start the quiescing process if that has not // already been started. func (meal *cfgMeal) processOldPLsLocked() { … } // For all the priority levels of the new config, divide up the // server's total concurrency limit among them and create/update their // QueueSets. func (meal *cfgMeal) finishQueueSetReconfigsLocked() { … } // queueSetCompleterForPL returns an appropriate QueueSetCompleter for the // given priority level configuration. Returns nil and an error if the given // object is malformed in a way that is a problem for this package. func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) { … } func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangling bool, plName string) { … } // imaginePL adds a priority level based on one of the mandatory ones // that does not actually exist (right now) as a real API object. func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration) { … } // startRequest classifies and, if appropriate, enqueues the request. // Returns a nil Request if and only if the request is to be rejected. // The returned bool indicates whether the request is exempt from // limitation. The startWaitingTime is when the request started // waiting in its queue, or `Time{}` if this did not happen. func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), workEstimator func() fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) { … } // maybeReap will remove the last internal traces of the named // priority level if it has no more use. Call this after getting a // clue that the given priority level is undesired and idle. func (cfgCtlr *configController) maybeReap(plName string) { … } // maybeReapLocked requires the cfgCtlr's lock to already be held and // will remove the last internal traces of the named priority level if // it has no more use. Call this if both (1) plState.queues is // non-nil and reported being idle, and (2) cfgCtlr's lock has not // been released since then. func (cfgCtlr *configController) maybeReapReadLocked(plName string, plState *priorityLevelState) { … } // computeFlowDistinguisher extracts the flow distinguisher according to the given method func computeFlowDistinguisher(rd RequestDigest, method *flowcontrol.FlowDistinguisherMethod) string { … } func hashFlowID(fsName, fDistinguisher string) uint64 { … } func relDiff(x, y float64) float64 { … } // plSpecCommons returns the (NominalConcurrencyShares, LendablePercent, BorrowingLimitPercent) of the given priority level config func plSpecCommons(pl *flowcontrol.PriorityLevelConfiguration) (*int32, *int32, *int32) { … }