const nsTimeFmt … type queueSetFactory … type promiseFactory … type promiseFactoryFactory … type queueSetCompleter … type queueSet … // NewQueueSetFactory creates a new QueueSetFactory object func NewQueueSetFactory(c eventclock.Interface) fq.QueueSetFactory { … } // newTestableQueueSetFactory creates a new QueueSetFactory object with the given promiseFactoryFactory func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory promiseFactoryFactory) fq.QueueSetFactory { … } func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge, seatDemandIntegrator metrics.Gauge) (fq.QueueSetCompleter, error) { … } // checkConfig returns a non-nil Dealer if the config is valid and // calls for one, and returns a non-nil error if the given config is // invalid. func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) { … } func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { … } // createQueues is a helper method for initializing an array of n queues func createQueues(n, baseIndex int) []*queue { … } func (qs *queueSet) BeginConfigChange(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { … } // setConfiguration is used to set the configuration for a queueSet. // Update handling for when fields are updated is handled here as well - // eg: if DesiredNum is increased, setConfiguration reconciles by // adding more queues. func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) { … } type requestDecision … const decisionExecute … const decisionCancel … // StartRequest begins the process of handling a request. We take the // approach of updating the metrics about total requests queued and // executing at each point where there is a change in that quantity, // because the metrics --- and only the metrics --- track that // quantity per FlowSchema. // The queueSet's promiseFactory is invoked once if the returned Request is non-nil, // not invoked if the Request is nil. func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{ … } // ordinaryPromiseFactoryFactory is the promiseFactoryFactory that // a queueSetFactory would ordinarily use. // Test code might use something different. func ordinaryPromiseFactoryFactory(qs *queueSet) promiseFactory { … } // MaxSeats returns the maximum number of seats this request requires, it is // the maxumum of the two - WorkEstimate.InitialSeats, WorkEstimate.FinalSeats. func (req *request) MaxSeats() int { … } func (req *request) InitialSeats() int { … } func (req *request) NoteQueued(inQueue bool) { … } func (req *request) Finish(execFn func()) bool { … } func (req *request) wait() (bool, bool) { … } func (qs *queueSet) IsIdle() bool { … } func (qs *queueSet) isIdleLocked() bool { … } // lockAndSyncTime acquires the lock and updates the virtual time. // Doing them together avoids the mistake of modifying some queue state // before calling syncTimeLocked. func (qs *queueSet) lockAndSyncTime(ctx context.Context) { … } // syncTimeLocked updates the virtual time based on the assumption // that the current state of the queues has been in effect since // `qs.lastRealTime`. Thus, it should be invoked after acquiring the // lock and before modifying the state of any queue. func (qs *queueSet) syncTimeLocked(ctx context.Context) { … } const rDecrement … const highR … // advanceEpoch subtracts rDecrement from the global progress meter R // and all the readings that have been taked from that meter. // The now and incrR parameters are only used to add info to the log messages. func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR fqrequest.SeatSeconds) { … } // getVirtualTimeRatio calculates the rate at which virtual time has // been advancing, according to the logic in `doc.go`. func (qs *queueSet) getVirtualTimeRatioLocked() float64 { … } // shuffleShardAndRejectOrEnqueueLocked encapsulates the logic required // to validate and enqueue a request for the queueSet/QueueSet: // 1) Start with shuffle sharding, to pick a queue. // 2) Reject current request if there is not enough concurrency shares and // we are at max queue length // 3) If not rejected, create a request and enqueue // returns the enqueud request on a successful enqueue // returns nil in the case that there is no available concurrency or // the queuelengthlimit has been reached func (qs *queueSet) shuffleShardAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{ … } // shuffleShardLocked uses shuffle sharding to select a queue index // using the given hashValue and the shuffle sharding parameters of the queueSet. func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interface{ … } // rejectOrEnqueueToBoundLocked rejects or enqueues the newly arrived // request, which has been assigned to a queue. If up against the // queue length limit and the concurrency limit then returns false. // Otherwise enqueues and returns true. func (qs *queueSet) rejectOrEnqueueToBoundLocked(request *request) bool { … } // enqueues a request into its queue. func (qs *queueSet) enqueueToBoundLocked(request *request) { … } // dispatchAsMuchAsPossibleLocked does as many dispatches as possible now. func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { … } func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, flowDistinguisher, fsName string, descr1, descr2 interface{ … } // dispatchLocked uses the Fair Queuing for Server Requests method to // select a queue and dispatch the oldest request in that queue. The // return value indicates whether a request was dequeued; this will // be false when either all queues are empty or the request at the head // of the next queue cannot be dispatched. func (qs *queueSet) dispatchLocked() bool { … } // canAccommodateSeatsLocked returns true if this queueSet has enough // seats available to accommodate a request with the given number of seats, // otherwise it returns false. func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { … } // findDispatchQueueToBoundLocked examines the queues in round robin order and // returns the first one of those for which the virtual finish time of // the oldest waiting request is minimal, and also returns that request. // Returns nils if the head of the selected queue can not be dispatched now, // in which case the caller does not need to follow up with`qs.boundNextDispatchLocked`. func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) { … } // finishRequestAndDispatchAsMuchAsPossible is a convenience method // which calls finishRequest for a given request and then dispatches // as many requests as possible. This is all of what needs to be done // once a request finishes execution or is canceled. This returns a bool // indicating whether the QueueSet is now idle. func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool { … } // finishRequestLocked is a callback that should be used when a // previously dispatched request has completed it's service. This // callback updates important state in the queueSet func (qs *queueSet) finishRequestLocked(r *request) { … } // boundNextDispatchLocked applies the anti-windup hack. // We need a hack because all non-empty queues are allocated the same // number of seats. A queue that can not use all those seats and does // not go empty accumulates a progresively earlier `virtualStart` compared // to queues that are using more than they are allocated. // The following hack addresses the first side of that inequity, // by insisting that dispatch in the virtual world not precede arrival. func (qs *queueSet) boundNextDispatchLocked(queue *queue) { … } func (qs *queueSet) removeQueueIfEmptyLocked(r *request) { … } // removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice // and then updates the 'index' field of the queues to be correct func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue { … } func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { … } func OnRequestDispatched(r fq.Request) { … }