kubernetes/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go

const nsTimeFmt

type queueSetFactory

type promiseFactory

type promiseFactoryFactory

type queueSetCompleter

type queueSet

// NewQueueSetFactory creates a new QueueSetFactory object
func NewQueueSetFactory(c eventclock.Interface) fq.QueueSetFactory {}

// newTestableQueueSetFactory creates a new QueueSetFactory object with the given promiseFactoryFactory
func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory promiseFactoryFactory) fq.QueueSetFactory {}

func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge, seatDemandIntegrator metrics.Gauge) (fq.QueueSetCompleter, error) {}

// checkConfig returns a non-nil Dealer if the config is valid and
// calls for one, and returns a non-nil error if the given config is
// invalid.
func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) {}

func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {}

// createQueues is a helper method for initializing an array of n queues
func createQueues(n, baseIndex int) []*queue {}

func (qs *queueSet) BeginConfigChange(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) {}

// setConfiguration is used to set the configuration for a queueSet.
// Update handling for when fields are updated is handled here as well -
// eg: if DesiredNum is increased, setConfiguration reconciles by
// adding more queues.
func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) {}

type requestDecision

const decisionExecute

const decisionCancel

// StartRequest begins the process of handling a request.  We take the
// approach of updating the metrics about total requests queued and
// executing at each point where there is a change in that quantity,
// because the metrics --- and only the metrics --- track that
// quantity per FlowSchema.
// The queueSet's promiseFactory is invoked once if the returned Request is non-nil,
// not invoked if the Request is nil.
func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}

// ordinaryPromiseFactoryFactory is the promiseFactoryFactory that
// a queueSetFactory would ordinarily use.
// Test code might use something different.
func ordinaryPromiseFactoryFactory(qs *queueSet) promiseFactory {}

// MaxSeats returns the maximum number of seats this request requires, it is
// the maxumum of the two - WorkEstimate.InitialSeats, WorkEstimate.FinalSeats.
func (req *request) MaxSeats() int {}

func (req *request) InitialSeats() int {}

func (req *request) NoteQueued(inQueue bool) {}

func (req *request) Finish(execFn func()) bool {}

func (req *request) wait() (bool, bool) {}

func (qs *queueSet) IsIdle() bool {}

func (qs *queueSet) isIdleLocked() bool {}

// lockAndSyncTime acquires the lock and updates the virtual time.
// Doing them together avoids the mistake of modifying some queue state
// before calling syncTimeLocked.
func (qs *queueSet) lockAndSyncTime(ctx context.Context) {}

// syncTimeLocked updates the virtual time based on the assumption
// that the current state of the queues has been in effect since
// `qs.lastRealTime`.  Thus, it should be invoked after acquiring the
// lock and before modifying the state of any queue.
func (qs *queueSet) syncTimeLocked(ctx context.Context) {}

const rDecrement

const highR

// advanceEpoch subtracts rDecrement from the global progress meter R
// and all the readings that have been taked from that meter.
// The now and incrR parameters are only used to add info to the log messages.
func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR fqrequest.SeatSeconds) {}

// getVirtualTimeRatio calculates the rate at which virtual time has
// been advancing, according to the logic in `doc.go`.
func (qs *queueSet) getVirtualTimeRatioLocked() float64 {}

// shuffleShardAndRejectOrEnqueueLocked encapsulates the logic required
// to validate and enqueue a request for the queueSet/QueueSet:
// 1) Start with shuffle sharding, to pick a queue.
// 2) Reject current request if there is not enough concurrency shares and
// we are at max queue length
// 3) If not rejected, create a request and enqueue
// returns the enqueud request on a successful enqueue
// returns nil in the case that there is no available concurrency or
// the queuelengthlimit has been reached
func (qs *queueSet) shuffleShardAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}

// shuffleShardLocked uses shuffle sharding to select a queue index
// using the given hashValue and the shuffle sharding parameters of the queueSet.
func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interface{}

// rejectOrEnqueueToBoundLocked rejects or enqueues the newly arrived
// request, which has been assigned to a queue.  If up against the
// queue length limit and the concurrency limit then returns false.
// Otherwise enqueues and returns true.
func (qs *queueSet) rejectOrEnqueueToBoundLocked(request *request) bool {}

// enqueues a request into its queue.
func (qs *queueSet) enqueueToBoundLocked(request *request) {}

// dispatchAsMuchAsPossibleLocked does as many dispatches as possible now.
func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {}

func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, flowDistinguisher, fsName string, descr1, descr2 interface{}

// dispatchLocked uses the Fair Queuing for Server Requests method to
// select a queue and dispatch the oldest request in that queue.  The
// return value indicates whether a request was dequeued; this will
// be false when either all queues are empty or the request at the head
// of the next queue cannot be dispatched.
func (qs *queueSet) dispatchLocked() bool {}

// canAccommodateSeatsLocked returns true if this queueSet has enough
// seats available to accommodate a request with the given number of seats,
// otherwise it returns false.
func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {}

// findDispatchQueueToBoundLocked examines the queues in round robin order and
// returns the first one of those for which the virtual finish time of
// the oldest waiting request is minimal, and also returns that request.
// Returns nils if the head of the selected queue can not be dispatched now,
// in which case the caller does not need to follow up with`qs.boundNextDispatchLocked`.
func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) {}

// finishRequestAndDispatchAsMuchAsPossible is a convenience method
// which calls finishRequest for a given request and then dispatches
// as many requests as possible.  This is all of what needs to be done
// once a request finishes execution or is canceled.  This returns a bool
// indicating whether the QueueSet is now idle.
func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool {}

// finishRequestLocked is a callback that should be used when a
// previously dispatched request has completed it's service.  This
// callback updates important state in the queueSet
func (qs *queueSet) finishRequestLocked(r *request) {}

// boundNextDispatchLocked applies the anti-windup hack.
// We need a hack because all non-empty queues are allocated the same
// number of seats.  A queue that can not use all those seats and does
// not go empty accumulates a progresively earlier `virtualStart` compared
// to queues that are using more than they are allocated.
// The following hack addresses the first side of that inequity,
// by insisting that dispatch in the virtual world not precede arrival.
func (qs *queueSet) boundNextDispatchLocked(queue *queue) {}

func (qs *queueSet) removeQueueIfEmptyLocked(r *request) {}

// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice
// and then updates the 'index' field of the queues to be correct
func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue {}

func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {}

func OnRequestDispatched(r fq.Request) {}