const DefaultPodMaxInUnschedulablePodsDuration … const activeQ … const backoffQ … const unschedulablePods … const preEnqueue … const DefaultPodInitialBackoffDuration … const DefaultPodMaxBackoffDuration … type PreEnqueueCheck … type SchedulingQueue … // NewSchedulingQueue initializes a priority queue as a new scheduling queue. func NewSchedulingQueue( lessFn framework.LessFunc, informerFactory informers.SharedInformerFactory, opts ...Option) SchedulingQueue { … } type PriorityQueue … type QueueingHintFunction … type clusterEvent … type priorityQueueOptions … type Option … // WithClock sets clock for PriorityQueue, the default clock is clock.RealClock. func WithClock(clock clock.Clock) Option { … } // WithPodInitialBackoffDuration sets pod initial backoff duration for PriorityQueue. func WithPodInitialBackoffDuration(duration time.Duration) Option { … } // WithPodMaxBackoffDuration sets pod max backoff duration for PriorityQueue. func WithPodMaxBackoffDuration(duration time.Duration) Option { … } // WithPodLister sets pod lister for PriorityQueue. func WithPodLister(pl listersv1.PodLister) Option { … } // WithPodMaxInUnschedulablePodsDuration sets podMaxInUnschedulablePodsDuration for PriorityQueue. func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option { … } type QueueingHintMapPerProfile … type QueueingHintMap … // WithQueueingHintMapPerProfile sets queueingHintMap for PriorityQueue. func WithQueueingHintMapPerProfile(m QueueingHintMapPerProfile) Option { … } // WithPreEnqueuePluginMap sets preEnqueuePluginMap for PriorityQueue. func WithPreEnqueuePluginMap(m map[string][]framework.PreEnqueuePlugin) Option { … } // WithMetricsRecorder sets metrics recorder. func WithMetricsRecorder(recorder metrics.MetricAsyncRecorder) Option { … } // WithPluginMetricsSamplePercent sets the percentage of plugin metrics to be sampled. func WithPluginMetricsSamplePercent(percent int) Option { … } var defaultPriorityQueueOptions … var _ … // newQueuedPodInfoForLookup builds a QueuedPodInfo object for a lookup in the queue. func newQueuedPodInfoForLookup(pod *v1.Pod, plugins ...string) *framework.QueuedPodInfo { … } // NewPriorityQueue creates a PriorityQueue object. func NewPriorityQueue( lessFn framework.LessFunc, informerFactory informers.SharedInformerFactory, opts ...Option, ) *PriorityQueue { … } // Run starts the goroutine to pump from podBackoffQ to activeQ func (p *PriorityQueue) Run(logger klog.Logger) { … } type queueingStrategy … const queueSkip … const queueAfterBackoff … const queueImmediately … // isEventOfInterest returns true if the event is of interest by some plugins. func (p *PriorityQueue) isEventOfInterest(logger klog.Logger, event framework.ClusterEvent) bool { … } // isPodWorthRequeuing calls QueueingHintFn of only plugins registered in pInfo.unschedulablePlugins and pInfo.PendingPlugins. // // If any of pInfo.PendingPlugins return Queue, // the scheduling queue is supposed to enqueue this Pod to activeQ, skipping backoffQ. // If any of pInfo.unschedulablePlugins return Queue, // the scheduling queue is supposed to enqueue this Pod to activeQ/backoffQ depending on the remaining backoff time of the Pod. // If all QueueingHintFns returns Skip, the scheduling queue enqueues the Pod back to unschedulable Pod pool // because no plugin changes the scheduling result via the event. func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework.QueuedPodInfo, event framework.ClusterEvent, oldObj, newObj interface{ … } // queueingHintToLabel converts a hint and an error from QHint to a label string. func queueingHintToLabel(hint framework.QueueingHint, err error) string { … } // runPreEnqueuePlugins iterates PreEnqueue function in each registered PreEnqueuePlugin. // It returns true if all PreEnqueue function run successfully; otherwise returns false // upon the first failure. // Note: we need to associate the failed plugin to `pInfo`, so that the pod can be moved back // to activeQ by related cluster event. func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framework.QueuedPodInfo) bool { … } func (p *PriorityQueue) runPreEnqueuePlugin(ctx context.Context, pl framework.PreEnqueuePlugin, pod *v1.Pod, shouldRecordMetric bool) *framework.Status { … } // moveToActiveQ tries to add pod to active queue and remove it from unschedulable and backoff queues. // It returns 2 parameters: // 1. a boolean flag to indicate whether the pod is added successfully. // 2. an error for the caller to act on. func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) bool { … } // Add adds a pod to the active queue. It should be called only when a new pod // is added so there is no chance the pod is already in active/unschedulable/backoff queues func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) { … } // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ. func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) { … } func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool { … } // isPodBackingoff returns true if a pod is still waiting for its backoff timer. // If this returns true, the pod should not be re-tried. func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool { … } // SchedulingCycle returns current scheduling cycle. func (p *PriorityQueue) SchedulingCycle() int64 { … } // determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod // and determines the scheduling hint for this Pod while checking the events that happened during in-flight. func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) queueingStrategy { … } // addUnschedulableIfNotPresentWithoutQueueingHint inserts a pod that cannot be scheduled into // the queue, unless it is already in the queue. Normally, PriorityQueue puts // unschedulable pods in `unschedulablePods`. But if there has been a recent move // request, then the pod is put in `podBackoffQ`. // TODO: This function is called only when p.isSchedulingQueueHintEnabled is false, // and this will be removed after SchedulingQueueHint goes to stable and the feature gate is removed. func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error { … } // AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into // the queue, unless it is already in the queue. Normally, PriorityQueue puts // unschedulable pods in `unschedulablePods`. But if there has been a recent move // request, then the pod is put in `podBackoffQ`. func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error { … } // flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) { … } // flushUnschedulablePodsLeftover moves pods which stay in unschedulablePods // longer than podMaxInUnschedulablePodsDuration to backoffQ or activeQ. func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) { … } // Pop removes the head of the active queue and returns it. It blocks if the // activeQ is empty and waits until a new item is added to the queue. It // increments scheduling cycle when a pod is popped. // Note: This method should NOT be locked by the p.lock at any moment, // as it would lead to scheduling throughput degradation. func (p *PriorityQueue) Pop(logger klog.Logger) (*framework.QueuedPodInfo, error) { … } // Done must be called for pod returned by Pop. This allows the queue to // keep track of which pods are currently being processed. func (p *PriorityQueue) Done(pod types.UID) { … } func (p *PriorityQueue) InFlightPods() []*v1.Pod { … } // isPodUpdated checks if the pod is updated in a way that it may have become // schedulable. It drops status of the pod and compares it with old version, // except for pod.status.resourceClaimStatuses: changing that may have an // effect on scheduling. func isPodUpdated(oldPod, newPod *v1.Pod) bool { … } // Update updates a pod in the active or backoff queue if present. Otherwise, it removes // the item from the unschedulable queue if pod is updated in a way that it may // become schedulable and adds the updated one to the active queue. // If pod is not present in any of the queues, it is added to the active queue. func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) { … } // Delete deletes the item from either of the two queues. It assumes the pod is // only in one queue. func (p *PriorityQueue) Delete(pod *v1.Pod) { … } // AssignedPodAdded is called when a bound pod is added. Creation of this pod // may make pending pods with matching affinity terms schedulable. func (p *PriorityQueue) AssignedPodAdded(logger klog.Logger, pod *v1.Pod) { … } // AssignedPodUpdated is called when a bound pod is updated. Change of labels // may make pending pods with matching affinity terms schedulable. func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod, event framework.ClusterEvent) { … } // NOTE: this function assumes a lock has been acquired in the caller. // moveAllToActiveOrBackoffQueue moves all pods from unschedulablePods to activeQ or backoffQ. // This function adds all pods and then signals the condition variable to ensure that // if Pop() is waiting for an item, it receives the signal after all the pods are in the // queue and the head is the highest priority pod. func (p *PriorityQueue) moveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{ … } // MoveAllToActiveOrBackoffQueue moves all pods from unschedulablePods to activeQ or backoffQ. // This function adds all pods and then signals the condition variable to ensure that // if Pop() is waiting for an item, it receives the signal after all the pods are in the // queue and the head is the highest priority pod. func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{ … } // requeuePodViaQueueingHint tries to requeue Pod to activeQ, backoffQ or unschedulable pod pool based on schedulingHint. // It returns the queue name Pod goes. // // NOTE: this function assumes lock has been acquired in caller func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, strategy queueingStrategy, event string) string { … } // NOTE: this function assumes lock has been acquired in caller func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent, oldObj, newObj interface{ … } // getUnschedulablePodsWithCrossTopologyTerm returns unschedulable pods which either of following conditions is met: // - have any affinity term that matches "pod". // - rejected by PodTopologySpread plugin. // NOTE: this function assumes lock has been acquired in caller. func (p *PriorityQueue) getUnschedulablePodsWithCrossTopologyTerm(logger klog.Logger, pod *v1.Pod) []*framework.QueuedPodInfo { … } // PodsInActiveQ returns all the Pods in the activeQ. func (p *PriorityQueue) PodsInActiveQ() []*v1.Pod { … } var pendingPodsSummary … // GetPod searches for a pod in the activeQ, backoffQ, and unschedulablePods. func (p *PriorityQueue) GetPod(name, namespace string) (pInfo *framework.QueuedPodInfo, ok bool) { … } // PendingPods returns all the pending pods in the queue; accompanied by a debugging string // recording showing the number of pods in each queue respectively. // This function is used for debugging purposes in the scheduler cache dumper and comparer. func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { … } // Note: this function assumes the caller locks both p.lock.RLock and p.activeQ.getLock().RLock. func (p *PriorityQueue) nominatedPodToInfo(np podRef, unlockedActiveQ unlockedActiveQueueReader) *framework.PodInfo { … } // Close closes the priority queue. func (p *PriorityQueue) Close() { … } // NominatedPodsForNode returns a copy of pods that are nominated to run on the given node, // but they are waiting for other pods to be removed from the node. // CAUTION: Make sure you don't call this function while taking any queue's lock in any scenario. func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*framework.PodInfo { … } func (p *PriorityQueue) podsCompareBackoffCompleted(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { … } // newQueuedPodInfo builds a QueuedPodInfo object. func (p *PriorityQueue) newQueuedPodInfo(pod *v1.Pod, plugins ...string) *framework.QueuedPodInfo { … } // getBackoffTime returns the time that podInfo completes backoff func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time { … } // calculateBackoffDuration is a helper function for calculating the backoffDuration // based on the number of attempts the pod has made. func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration { … } type UnschedulablePods … // addOrUpdate adds a pod to the unschedulable podInfoMap. func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo) { … } // delete deletes a pod from the unschedulable podInfoMap. // The `gated` parameter is used to figure out which metric should be decreased. func (u *UnschedulablePods) delete(pod *v1.Pod, gated bool) { … } // get returns the QueuedPodInfo if a pod with the same key as the key of the given "pod" // is found in the map. It returns nil otherwise. func (u *UnschedulablePods) get(pod *v1.Pod) *framework.QueuedPodInfo { … } // clear removes all the entries from the unschedulable podInfoMap. func (u *UnschedulablePods) clear() { … } // newUnschedulablePods initializes a new object of UnschedulablePods. func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRecorder) *UnschedulablePods { … } func podInfoKeyFunc(pInfo *framework.QueuedPodInfo) string { … }