kubernetes/pkg/controller/job/job_controller.go

var controllerKind

var syncJobBatchPeriod

var DefaultJobApiBackOff

var MaxJobApiBackOff

var DefaultJobPodFailureBackOff

var MaxJobPodFailureBackOff

var MaxUncountedPods

var MaxPodCreateDeletePerSync

type Controller

type syncJobCtx

type orphanPodKeyKind

const OrphanPodKeyKindName

const OrphanPodKeyKindSelector

type orphanPodKey

// NewController creates a new Job controller that keeps the relevant pods
// in sync with their corresponding Job objects.
func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) (*Controller, error) {}

func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface, clock clock.WithTicker) (*Controller, error) {}

// Run the main goroutine responsible for watching and syncing jobs.
func (jm *Controller) Run(ctx context.Context, workers int) {}

// getPodJobs returns a list of Jobs that potentially match a Pod.
func (jm *Controller) getPodJobs(pod *v1.Pod) []*batch.Job {}

// resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller
// of the correct Kind.
func (jm *Controller) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batch.Job {}

// When a pod is created, enqueue the controller that manages it and update its expectations.
func (jm *Controller) addPod(logger klog.Logger, obj interface{}

// When a pod is updated, figure out what job/s manage it and wake them up.
// If the labels of the pod have changed we need to awaken both the old
// and new job. old and cur must be *v1.Pod types.
func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}

// When a pod is deleted, enqueue the job that manages the pod and update its expectations.
// obj could be an *v1.Pod, or a DeleteFinalStateUnknown marker item.
func (jm *Controller) deletePod(logger klog.Logger, obj interface{}

func (jm *Controller) addJob(logger klog.Logger, obj interface{}

func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}

// deleteJob enqueues the job and all the pods associated with it that still
// have a finalizer.
func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}

func (jm *Controller) enqueueLabelSelector(jobObj *batch.Job) {}

// enqueueSyncJobImmediately tells the Job controller to invoke syncJob
// immediately.
// It is only used for Job events (creation, deletion, spec update).
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
func (jm *Controller) enqueueSyncJobImmediately(logger klog.Logger, obj interface{}

// enqueueSyncJobBatched tells the controller to invoke syncJob with a
// constant batching delay.
// It is used for:
// - Pod events (creation, deletion, update)
// - Job status update
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
func (jm *Controller) enqueueSyncJobBatched(logger klog.Logger, obj interface{}

// enqueueSyncJobWithDelay tells the controller to invoke syncJob with a
// custom delay, but not smaller than the batching delay.
// It is used when pod recreations are delayed due to pod failures.
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
func (jm *Controller) enqueueSyncJobWithDelay(logger klog.Logger, obj interface{}

func (jm *Controller) enqueueSyncJobInternal(logger klog.Logger, obj interface{}

func (jm *Controller) enqueueOrphanPod(obj *v1.Pod) {}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (jm *Controller) worker(ctx context.Context) {}

func (jm *Controller) processNextWorkItem(ctx context.Context) bool {}

func (jm *Controller) orphanWorker(ctx context.Context) {}

func (jm *Controller) processNextOrphanPod(ctx context.Context) bool {}

// syncOrphanPod removes the tracking finalizer from an orphan pod if found.
func (jm *Controller) syncOrphanPod(ctx context.Context, key orphanPodKey) error {}

// syncOrphanPodsBySelector fetches and processes all pods matching the given label selector.
func (jm *Controller) syncOrphanPodsBySelector(ctx context.Context, namespace string, labelSelector string) error {}

// handleSingleOrphanPod processes a single orphan pod.
func (jm *Controller) handleSingleOrphanPod(ctx context.Context, sharedPod *v1.Pod) error {}

// getPodsForJob returns the set of pods that this Job should manage.
// It also reconciles ControllerRef by adopting/orphaning, adding tracking
// finalizers.
// Note that the returned Pods are pointers into the cache.
func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Pod, error) {}

// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {}

func (jm *Controller) newFailureCondition(reason, message string) *batch.JobCondition {}

func (jm *Controller) newSuccessCondition() *batch.JobCondition {}

func delayTerminalCondition() bool {}

// deleteActivePods issues deletion for active Pods, preserving finalizers.
// This is done through DELETE calls that set deletion timestamps.
// The method trackJobStatusAndRemoveFinalizers removes the finalizers, after
// which the objects can actually be deleted.
// Returns number of successfully deleted ready pods and total number of successfully deleted pods.
func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods []*v1.Pod) (int32, int32, error) {}

func nonIgnoredFailedPodsCount(jobCtx *syncJobCtx, failedPods []*v1.Pod) int {}

// deleteJobPods deletes the pods, returns the number of successful removals of ready pods and total number of successful pod removals
// and any error.
func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, int32, error) {}

// trackJobStatusAndRemoveFinalizers does:
//  1. Add finished Pods to .status.uncountedTerminatedPods
//  2. Remove the finalizers from the Pods if they completed or were removed
//     or the job was removed.
//  3. Increment job counters for pods that no longer have a finalizer.
//  4. Add Complete condition if satisfied with current counters.
//
// It does this up to a limited number of Pods so that the size of .status
// doesn't grow too much and this sync doesn't starve other Jobs.
func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, needsFlush bool) error {}

// canRemoveFinalizer determines if the pod's finalizer can be safely removed.
// The finalizer can be removed when:
//   - the entire Job is terminating; or
//   - the pod's index is succeeded; or
//   - the Pod is considered failed, unless it's removal is delayed for the
//     purpose of transferring the JobIndexFailureCount annotations to the
//     replacement pod. the entire Job is terminating the finalizer can be
//     removed unconditionally; or
//   - the Job met successPolicy.
func canRemoveFinalizer(logger klog.Logger, jobCtx *syncJobCtx, pod *v1.Pod, considerPodFailed bool) bool {}

// flushUncountedAndRemoveFinalizers does:
//  1. flush the Job status that might include new uncounted Pod UIDs.
//     Also flush the interim FailureTarget and SuccessCriteriaMet conditions if present.
//  2. perform the removal of finalizers from Pods which are in the uncounted
//     lists.
//  3. update the counters based on the Pods for which it successfully removed
//     the finalizers.
//  4. (if not all removals succeeded) flush Job status again.
//
// Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) {}

// cleanUncountedPodsWithoutFinalizers removes the Pod UIDs from
// .status.uncountedTerminatedPods for which the finalizer was successfully
// removed and increments the corresponding status counters.
// Returns whether there was any status change.
func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.Set[string]) bool {}

// removeTrackingFinalizerFromPods removes tracking finalizers from Pods and
// returns an array of booleans where the i-th value is true if the finalizer
// of the i-th Pod was successfully removed (if the pod was deleted when this
// function was called, it's considered as the finalizer was removed successfully).
func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKey string, pods []*v1.Pod) ([]bool, error) {}

// enactJobFinished adds the Complete or Failed condition and records events.
// Returns whether the Job was considered finished.
func (jm *Controller) enactJobFinished(logger klog.Logger, jobCtx *syncJobCtx) bool {}

// recordJobFinished records events and the job_finished_total metric for a finished job.
func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobCondition) bool {}

func filterInUncountedUIDs(uncounted []types.UID, include sets.Set[string]) []types.UID {}

// newFailedConditionForFailureTarget creates a job Failed condition based on
// the interim FailureTarget condition.
func newFailedConditionForFailureTarget(condition *batch.JobCondition, now time.Time) *batch.JobCondition {}

// pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit
// this method applies only to pods with restartPolicy == OnFailure
func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool {}

// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if
// it is exceeded. If the job is currently suspended, the function will always
// return false.
func (jm *Controller) pastActiveDeadline(job *batch.Job) bool {}

func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatus, reason, message string, now time.Time) *batch.JobCondition {}

// getFailJobMessage returns a job failure message if the job should fail with the current counters
func getFailJobMessage(job *batch.Job, pods []*v1.Pod) *string {}

// getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted
// in the job status. The list of failed pods can be affected by the podFailurePolicy.
func getNewFinishedPods(jobCtx *syncJobCtx) (succeededPods, failedPods []*v1.Pod) {}

// jobSuspended returns whether a Job is suspended while taking the feature
// gate into account.
func jobSuspended(job *batch.Job) bool {}

// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
// Respects back-off; does not create new pods if the back-off time has not passed
// Does NOT modify <activePods>.
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syncJobCtx) (int32, string, error) {}

// getPodCreationInfoForIndependentIndexes returns a sub-list of all indexes
// to create that contains those which can be already created. In case no indexes
// are ready to create pods, it returns the lowest remaining time to create pods
// out of all indexes.
func (jm *Controller) getPodCreationInfoForIndependentIndexes(logger klog.Logger, indexesToAdd []int, podsWithDelayedDeletionPerIndex map[int]*v1.Pod) ([]int, time.Duration) {}

// activePodsForRemoval returns Pods that should be removed because there
// are too many pods running or, if this is an indexed job, there are repeated
// indexes or invalid indexes or some pods don't have indexes.
// Sorts candidate pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
func activePodsForRemoval(job *batch.Job, pods []*v1.Pod, rmAtLeast int) []*v1.Pod {}

// updateJobStatus calls the API to update the job status.
func (jm *Controller) updateJobStatus(ctx context.Context, job *batch.Job) (*batch.Job, error) {}

func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte) error {}

// getValidPodsWithFilter returns the valid pods that pass the filter.
// Pods are valid if they have a finalizer or in uncounted set
// and, for Indexed Jobs, a valid completion index.
func getValidPodsWithFilter(jobCtx *syncJobCtx, uncounted sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod {}

// getCompletionMode returns string representation of the completion mode. Used as a label value for metrics.
func getCompletionMode(job *batch.Job) string {}

func appendJobCompletionFinalizerIfNotFound(finalizers []string) []string {}

func removeTrackingFinalizerPatch(pod *v1.Pod) []byte {}

type uncountedTerminatedPods

func newUncountedTerminatedPods(in batch.UncountedTerminatedPods) *uncountedTerminatedPods {}

func (u *uncountedTerminatedPods) Succeeded() sets.Set[string] {}

func (u *uncountedTerminatedPods) Failed() sets.Set[string] {}

func errorFromChannel(errCh <-chan error) error {}

// ensureJobConditionStatus appends or updates an existing job condition of the
// given type with the given status value. Note that this function will not
// append to the conditions list if the new condition's status is false
// (because going from nothing to false is meaningless); it can, however,
// update the status condition to false. The function returns a bool to let the
// caller know if the list was changed (either appended or updated).
func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditionType, status v1.ConditionStatus, reason, message string, now time.Time) ([]batch.JobCondition, bool) {}

func isPodFailed(p *v1.Pod, job *batch.Job) bool {}

func findConditionByType(list []batch.JobCondition, cType batch.JobConditionType) *batch.JobCondition {}

func recordJobPodFinished(logger klog.Logger, job *batch.Job, oldCounters batch.JobStatus) {}

func indexesCount(logger klog.Logger, indexesStr *string, completions int) int {}

func backoffLimitMetricsLabel(job *batch.Job) string {}

func recordJobPodFailurePolicyActions(podFailureCountByPolicyAction map[string]int) {}

func countReadyPods(pods []*v1.Pod) int32 {}

// trackTerminatingPods checks if the count of terminating pods is tracked.
// They are tracked when any the following is true:
//   - JobPodReplacementPolicy is enabled to be returned in the status field;
//     and to delay setting the Job terminal condition,
//   - JobManagedBy is enabled to delay setting Job terminal condition,
//   - only failed pods are replaced, because pod failure policy is used
func trackTerminatingPods(job *batch.Job) bool {}

// This checks if we should apply PodReplacementPolicy.
// PodReplacementPolicy controls when we recreate pods if they are marked as terminating
// Failed means that we recreate only once the pod has terminated.
func onlyReplaceFailedPods(job *batch.Job) bool {}

func recordJobPodsCreationTotal(job *batch.Job, jobCtx *syncJobCtx, succeeded, failed int32) {}

func managedByExternalController(jobObj *batch.Job) *string {}