var controllerKind … var syncJobBatchPeriod … var DefaultJobApiBackOff … var MaxJobApiBackOff … var DefaultJobPodFailureBackOff … var MaxJobPodFailureBackOff … var MaxUncountedPods … var MaxPodCreateDeletePerSync … type Controller … type syncJobCtx … type orphanPodKeyKind … const OrphanPodKeyKindName … const OrphanPodKeyKindSelector … type orphanPodKey … // NewController creates a new Job controller that keeps the relevant pods // in sync with their corresponding Job objects. func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) (*Controller, error) { … } func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface, clock clock.WithTicker) (*Controller, error) { … } // Run the main goroutine responsible for watching and syncing jobs. func (jm *Controller) Run(ctx context.Context, workers int) { … } // getPodJobs returns a list of Jobs that potentially match a Pod. func (jm *Controller) getPodJobs(pod *v1.Pod) []*batch.Job { … } // resolveControllerRef returns the controller referenced by a ControllerRef, // or nil if the ControllerRef could not be resolved to a matching controller // of the correct Kind. func (jm *Controller) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batch.Job { … } // When a pod is created, enqueue the controller that manages it and update its expectations. func (jm *Controller) addPod(logger klog.Logger, obj interface{ … } // When a pod is updated, figure out what job/s manage it and wake them up. // If the labels of the pod have changed we need to awaken both the old // and new job. old and cur must be *v1.Pod types. func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{ … } // When a pod is deleted, enqueue the job that manages the pod and update its expectations. // obj could be an *v1.Pod, or a DeleteFinalStateUnknown marker item. func (jm *Controller) deletePod(logger klog.Logger, obj interface{ … } func (jm *Controller) addJob(logger klog.Logger, obj interface{ … } func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{ … } // deleteJob enqueues the job and all the pods associated with it that still // have a finalizer. func (jm *Controller) deleteJob(logger klog.Logger, obj interface{ … } func (jm *Controller) enqueueLabelSelector(jobObj *batch.Job) { … } // enqueueSyncJobImmediately tells the Job controller to invoke syncJob // immediately. // It is only used for Job events (creation, deletion, spec update). // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. func (jm *Controller) enqueueSyncJobImmediately(logger klog.Logger, obj interface{ … } // enqueueSyncJobBatched tells the controller to invoke syncJob with a // constant batching delay. // It is used for: // - Pod events (creation, deletion, update) // - Job status update // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. func (jm *Controller) enqueueSyncJobBatched(logger klog.Logger, obj interface{ … } // enqueueSyncJobWithDelay tells the controller to invoke syncJob with a // custom delay, but not smaller than the batching delay. // It is used when pod recreations are delayed due to pod failures. // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. func (jm *Controller) enqueueSyncJobWithDelay(logger klog.Logger, obj interface{ … } func (jm *Controller) enqueueSyncJobInternal(logger klog.Logger, obj interface{ … } func (jm *Controller) enqueueOrphanPod(obj *v1.Pod) { … } // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (jm *Controller) worker(ctx context.Context) { … } func (jm *Controller) processNextWorkItem(ctx context.Context) bool { … } func (jm *Controller) orphanWorker(ctx context.Context) { … } func (jm *Controller) processNextOrphanPod(ctx context.Context) bool { … } // syncOrphanPod removes the tracking finalizer from an orphan pod if found. func (jm *Controller) syncOrphanPod(ctx context.Context, key orphanPodKey) error { … } // syncOrphanPodsBySelector fetches and processes all pods matching the given label selector. func (jm *Controller) syncOrphanPodsBySelector(ctx context.Context, namespace string, labelSelector string) error { … } // handleSingleOrphanPod processes a single orphan pod. func (jm *Controller) handleSingleOrphanPod(ctx context.Context, sharedPod *v1.Pod) error { … } // getPodsForJob returns the set of pods that this Job should manage. // It also reconciles ControllerRef by adopting/orphaning, adding tracking // finalizers. // Note that the returned Pods are pointers into the cache. func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Pod, error) { … } // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // concurrently with the same key. func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { … } func (jm *Controller) newFailureCondition(reason, message string) *batch.JobCondition { … } func (jm *Controller) newSuccessCondition() *batch.JobCondition { … } func delayTerminalCondition() bool { … } // deleteActivePods issues deletion for active Pods, preserving finalizers. // This is done through DELETE calls that set deletion timestamps. // The method trackJobStatusAndRemoveFinalizers removes the finalizers, after // which the objects can actually be deleted. // Returns number of successfully deleted ready pods and total number of successfully deleted pods. func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods []*v1.Pod) (int32, int32, error) { … } func nonIgnoredFailedPodsCount(jobCtx *syncJobCtx, failedPods []*v1.Pod) int { … } // deleteJobPods deletes the pods, returns the number of successful removals of ready pods and total number of successful pod removals // and any error. func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, int32, error) { … } // trackJobStatusAndRemoveFinalizers does: // 1. Add finished Pods to .status.uncountedTerminatedPods // 2. Remove the finalizers from the Pods if they completed or were removed // or the job was removed. // 3. Increment job counters for pods that no longer have a finalizer. // 4. Add Complete condition if satisfied with current counters. // // It does this up to a limited number of Pods so that the size of .status // doesn't grow too much and this sync doesn't starve other Jobs. func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, needsFlush bool) error { … } // canRemoveFinalizer determines if the pod's finalizer can be safely removed. // The finalizer can be removed when: // - the entire Job is terminating; or // - the pod's index is succeeded; or // - the Pod is considered failed, unless it's removal is delayed for the // purpose of transferring the JobIndexFailureCount annotations to the // replacement pod. the entire Job is terminating the finalizer can be // removed unconditionally; or // - the Job met successPolicy. func canRemoveFinalizer(logger klog.Logger, jobCtx *syncJobCtx, pod *v1.Pod, considerPodFailed bool) bool { … } // flushUncountedAndRemoveFinalizers does: // 1. flush the Job status that might include new uncounted Pod UIDs. // Also flush the interim FailureTarget and SuccessCriteriaMet conditions if present. // 2. perform the removal of finalizers from Pods which are in the uncounted // lists. // 3. update the counters based on the Pods for which it successfully removed // the finalizers. // 4. (if not all removals succeeded) flush Job status again. // // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[types.UID], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) { … } // cleanUncountedPodsWithoutFinalizers removes the Pod UIDs from // .status.uncountedTerminatedPods for which the finalizer was successfully // removed and increments the corresponding status counters. // Returns whether there was any status change. func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.Set[types.UID]) bool { … } // removeTrackingFinalizerFromPods removes tracking finalizers from Pods and // returns an array of booleans where the i-th value is true if the finalizer // of the i-th Pod was successfully removed (if the pod was deleted when this // function was called, it's considered as the finalizer was removed successfully). func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKey string, pods []*v1.Pod) ([]bool, error) { … } // enactJobFinished adds the Complete or Failed condition and records events. // Returns whether the Job was considered finished. func (jm *Controller) enactJobFinished(logger klog.Logger, jobCtx *syncJobCtx) bool { … } // recordJobFinished records events and the job_finished_total metric for a finished job. func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobCondition) bool { … } func filterInUncountedUIDs(uncounted []types.UID, include sets.Set[types.UID]) []types.UID { … } // newFailedConditionForFailureTarget creates a job Failed condition based on // the interim FailureTarget condition. func newFailedConditionForFailureTarget(condition *batch.JobCondition, now time.Time) *batch.JobCondition { … } // pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit // this method applies only to pods with restartPolicy == OnFailure func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool { … } // pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if // it is exceeded. If the job is currently suspended, the function will always // return false. func (jm *Controller) pastActiveDeadline(job *batch.Job) bool { … } func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatus, reason, message string, now time.Time) *batch.JobCondition { … } // getFailJobMessage returns a job failure message if the job should fail with the current counters func getFailJobMessage(job *batch.Job, pods []*v1.Pod) *string { … } // getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted // in the job status. The list of failed pods can be affected by the podFailurePolicy. func getNewFinishedPods(jobCtx *syncJobCtx) (succeededPods, failedPods []*v1.Pod) { … } // jobSuspended returns whether a Job is suspended while taking the feature // gate into account. func jobSuspended(job *batch.Job) bool { … } // manageJob is the core method responsible for managing the number of running // pods according to what is specified in the job.Spec. // Respects back-off; does not create new pods if the back-off time has not passed // Does NOT modify <activePods>. func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syncJobCtx) (int32, string, error) { … } // getPodCreationInfoForIndependentIndexes returns a sub-list of all indexes // to create that contains those which can be already created. In case no indexes // are ready to create pods, it returns the lowest remaining time to create pods // out of all indexes. func (jm *Controller) getPodCreationInfoForIndependentIndexes(logger klog.Logger, indexesToAdd []int, podsWithDelayedDeletionPerIndex map[int]*v1.Pod) ([]int, time.Duration) { … } // activePodsForRemoval returns Pods that should be removed because there // are too many pods running or, if this is an indexed job, there are repeated // indexes or invalid indexes or some pods don't have indexes. // Sorts candidate pods in the order such that not-ready < ready, unscheduled // < scheduled, and pending < running. This ensures that we delete pods // in the earlier stages whenever possible. func activePodsForRemoval(job *batch.Job, pods []*v1.Pod, rmAtLeast int) []*v1.Pod { … } // updateJobStatus calls the API to update the job status. func (jm *Controller) updateJobStatus(ctx context.Context, job *batch.Job) (*batch.Job, error) { … } func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte) error { … } // getValidPodsWithFilter returns the valid pods that pass the filter. // Pods are valid if they have a finalizer or in uncounted set // and, for Indexed Jobs, a valid completion index. func getValidPodsWithFilter(jobCtx *syncJobCtx, uncounted sets.Set[types.UID], filter func(*v1.Pod) bool) []*v1.Pod { … } // getCompletionMode returns string representation of the completion mode. Used as a label value for metrics. func getCompletionMode(job *batch.Job) string { … } func appendJobCompletionFinalizerIfNotFound(finalizers []string) []string { … } func removeTrackingFinalizerPatch(pod *v1.Pod) []byte { … } type uncountedTerminatedPods … func newUncountedTerminatedPods(in batch.UncountedTerminatedPods) *uncountedTerminatedPods { … } func (u *uncountedTerminatedPods) Succeeded() sets.Set[types.UID] { … } func (u *uncountedTerminatedPods) Failed() sets.Set[types.UID] { … } func errorFromChannel(errCh <-chan error) error { … } // ensureJobConditionStatus appends or updates an existing job condition of the // given type with the given status value. Note that this function will not // append to the conditions list if the new condition's status is false // (because going from nothing to false is meaningless); it can, however, // update the status condition to false. The function returns a bool to let the // caller know if the list was changed (either appended or updated). func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditionType, status v1.ConditionStatus, reason, message string, now time.Time) ([]batch.JobCondition, bool) { … } func isPodFailed(p *v1.Pod, job *batch.Job) bool { … } func findConditionByType(list []batch.JobCondition, cType batch.JobConditionType) *batch.JobCondition { … } func recordJobPodFinished(logger klog.Logger, job *batch.Job, oldCounters batch.JobStatus) { … } func indexesCount(logger klog.Logger, indexesStr *string, completions int) int { … } func backoffLimitMetricsLabel(job *batch.Job) string { … } func recordJobPodFailurePolicyActions(podFailureCountByPolicyAction map[string]int) { … } func countReadyPods(pods []*v1.Pod) int32 { … } // trackTerminatingPods checks if the count of terminating pods is tracked. // They are tracked when any the following is true: // - JobPodReplacementPolicy is enabled to be returned in the status field; // and to delay setting the Job terminal condition, // - JobManagedBy is enabled to delay setting Job terminal condition, // - only failed pods are replaced, because pod failure policy is used func trackTerminatingPods(job *batch.Job) bool { … } // This checks if we should apply PodReplacementPolicy. // PodReplacementPolicy controls when we recreate pods if they are marked as terminating // Failed means that we recreate only once the pod has terminated. func onlyReplaceFailedPods(job *batch.Job) bool { … } func recordJobPodsCreationTotal(job *batch.Job, jobCtx *syncJobCtx, succeeded, failed int32) { … } func managedByExternalController(jobObj *batch.Job) *string { … }