const maxWaitForContainerRuntime … const nodeStatusUpdateRetry … const nodeReadyGracePeriod … const DefaultContainerLogsDir … const MaxContainerBackOff … const housekeepingPeriod … const housekeepingWarningDuration … const runtimeCacheRefreshPeriod … const evictionMonitoringPeriod … const linuxEtcHostsPath … const windowsEtcHostsPath … const plegChannelCapacity … const genericPlegRelistPeriod … const genericPlegRelistThreshold … const eventedPlegRelistPeriod … const eventedPlegRelistThreshold … const eventedPlegMaxStreamRetries … const backOffPeriod … const ContainerGCPeriod … const ImageGCPeriod … const minDeadContainerInPod … const nodeLeaseRenewIntervalFraction … const instrumentationScope … var ContainerLogsDir … var etcHostsPath … func getContainerEtcHostsPath() string { … } type SyncHandler … type Option … type Bootstrap … type Dependencies … // makePodSourceConfig creates a config.PodConfig from the given // KubeletConfiguration or returns an error. func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) { … } // PreInitRuntimeService will init runtime service before RunKubelet. func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies) error { … } // NewMainKubelet instantiates a new Kubelet object along with all the required internal modules. // No initialization of Kubelet and its modules should happen here. func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, crOptions *config.ContainerRuntimeOptions, hostname string, hostnameOverridden bool, nodeName types.NodeName, nodeIPs []net.IP, providerID string, cloudProvider string, certDirectory string, rootDirectory string, podLogsDirectory string, imageCredentialProviderConfigFile string, imageCredentialProviderBinDir string, registerNode bool, registerWithTaints []v1.Taint, allowedUnsafeSysctls []string, experimentalMounterPath string, kernelMemcgNotification bool, experimentalNodeAllocatableIgnoreEvictionThreshold bool, minimumGCAge metav1.Duration, maxPerPodContainerCount int32, maxContainerCount int32, registerSchedulable bool, nodeLabels map[string]string, nodeStatusMaxImages int32, seccompDefault bool, ) (*Kubelet, error) { … } type serviceLister … type Kubelet … // ListPodStats is delegated to StatsProvider, which implements stats.Provider interface func (kl *Kubelet) ListPodStats(ctx context.Context) ([]statsapi.PodStats, error) { … } // ListPodCPUAndMemoryStats is delegated to StatsProvider, which implements stats.Provider interface func (kl *Kubelet) ListPodCPUAndMemoryStats(ctx context.Context) ([]statsapi.PodStats, error) { … } // ListPodStatsAndUpdateCPUNanoCoreUsage is delegated to StatsProvider, which implements stats.Provider interface func (kl *Kubelet) ListPodStatsAndUpdateCPUNanoCoreUsage(ctx context.Context) ([]statsapi.PodStats, error) { … } // ImageFsStats is delegated to StatsProvider, which implements stats.Provider interface func (kl *Kubelet) ImageFsStats(ctx context.Context) (*statsapi.FsStats, *statsapi.FsStats, error) { … } // GetCgroupStats is delegated to StatsProvider, which implements stats.Provider interface func (kl *Kubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) { … } // GetCgroupCPUAndMemoryStats is delegated to StatsProvider, which implements stats.Provider interface func (kl *Kubelet) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) { … } // RootFsStats is delegated to StatsProvider, which implements stats.Provider interface func (kl *Kubelet) RootFsStats() (*statsapi.FsStats, error) { … } // RlimitStats is delegated to StatsProvider, which implements stats.Provider interface func (kl *Kubelet) RlimitStats() (*statsapi.RlimitStats, error) { … } // setupDataDirs creates: // 1. the root directory // 2. the pods directory // 3. the plugins directory // 4. the pod-resources directory // 5. the checkpoint directory // 6. the pod logs root directory func (kl *Kubelet) setupDataDirs() error { … } // StartGarbageCollection starts garbage collection threads. func (kl *Kubelet) StartGarbageCollection() { … } // initializeModules will initialize internal modules that do not require the container runtime to be up. // Note that the modules here must not depend on modules that are not initialized here. func (kl *Kubelet) initializeModules() error { … } // initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up. func (kl *Kubelet) initializeRuntimeDependentModules() { … } // Run starts the kubelet reacting to config updates func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { … } // SyncPod is the transaction script for the sync of a single pod (setting up) // a pod. This method is reentrant and expected to converge a pod towards the // desired state of the spec. The reverse (teardown) is handled in // SyncTerminatingPod and SyncTerminatedPod. If SyncPod exits without error, // then the pod runtime state is in sync with the desired configuration state // (pod is running). If SyncPod exits with a transient error, the next // invocation of SyncPod is expected to make progress towards reaching the // desired state. SyncPod exits with isTerminal when the pod was detected to // have reached a terminal lifecycle phase due to container exits (for // RestartNever or RestartOnFailure) and the next method invoked will be // SyncTerminatingPod. If the pod terminates for any other reason, SyncPod // will receive a context cancellation and should exit as soon as possible. // // Arguments: // // updateType - whether this is a create (first time) or an update, should // only be used for metrics since this method must be reentrant // // pod - the pod that is being set up // // mirrorPod - the mirror pod known to the kubelet for this pod, if any // // podStatus - the most recent pod status observed for this pod which can // be used to determine the set of actions that should be taken during // this loop of SyncPod // // The workflow is: // - If the pod is being created, record pod worker start latency // - Call generateAPIPodStatus to prepare an v1.PodStatus for the pod // - If the pod is being seen as running for the first time, record pod // start latency // - Update the status of the pod in the status manager // - Stop the pod's containers if it should not be running due to soft // admission // - Ensure any background tracking for a runnable pod is started // - Create a mirror pod if the pod is a static pod, and does not // already have a mirror pod // - Create the data directories for the pod if they do not exist // - Wait for volumes to attach/mount // - Fetch the pull secrets for the pod // - Call the container runtime's SyncPod callback // - Update the traffic shaping for the pod's ingress and egress limits // // If any step of this workflow errors, the error is returned, and is repeated // on the next SyncPod call. // // This operation writes all events that are dispatched in order to provide // the most accurate information possible about an error situation to aid debugging. // Callers should not write an event if this operation returns an error. func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) { … } // SyncTerminatingPod is expected to terminate all running containers in a pod. Once this method // returns without error, the pod is considered to be terminated and it will be safe to clean up any // pod state that is tied to the lifetime of running containers. The next method invoked will be // SyncTerminatedPod. This method is expected to return with the grace period provided and the // provided context may be cancelled if the duration is exceeded. The method may also be interrupted // with a context cancellation if the grace period is shortened by the user or the kubelet (such as // during eviction). This method is not guaranteed to be called if a pod is force deleted from the // configuration and the kubelet is restarted - SyncTerminatingRuntimePod handles those orphaned // pods. func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error { … } // preserveDataFromBeforeStopping preserves data, like IPs, which are expected // to be sent to the API server after termination, but are no longer returned by // containerRuntime.GetPodStatus for a stopped pod. // Note that Kubelet restart, after the pod is stopped, may still cause losing // track of the data. func preserveDataFromBeforeStopping(stoppedPodStatus, podStatus *kubecontainer.PodStatus) { … } // SyncTerminatingRuntimePod is expected to terminate running containers in a pod that we have no // configuration for. Once this method returns without error, any remaining local state can be safely // cleaned up by background processes in each subsystem. Unlike syncTerminatingPod, we lack // knowledge of the full pod spec and so cannot perform lifecycle related operations, only ensure // that the remnant of the running pod is terminated and allow garbage collection to proceed. We do // not update the status of the pod because with the source of configuration removed, we have no // place to send that status. func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kubecontainer.Pod) error { … } // SyncTerminatedPod cleans up a pod that has terminated (has no running containers). // The invocations in this call are expected to tear down all pod resources. // When this method exits the pod is expected to be ready for cleanup. This method // reduces the latency of pod cleanup but is not guaranteed to get called in all scenarios. // // Because the kubelet has no local store of information, all actions in this method that modify // on-disk state must be reentrant and be garbage collected by HandlePodCleanups or a separate loop. // This typically occurs when a pod is force deleted from configuration (local disk or API) and the // kubelet restarts in the middle of the action. func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error { … } // Get pods which should be resynchronized. Currently, the following pod should be resynchronized: // - pod whose work is ready. // - internal modules that request sync of a pod. // // This method does not return orphaned pods (those known only to the pod worker that may have // been deleted from configuration). Those pods are synced by HandlePodCleanups as a consequence // of driving the state machine to completion. // // TODO: Consider synchronizing all pods which have not recently been acted on to be resilient // to bugs that might prevent updates from being delivered (such as the previous bug with // orphaned pods). Instead of asking the work queue for pending work, consider asking the // PodWorker which pods should be synced. func (kl *Kubelet) getPodsToSync() []*v1.Pod { … } // deletePod deletes the pod from the internal state of the kubelet by: // 1. stopping the associated pod worker asynchronously // 2. signaling to kill the pod by sending on the podKillingCh channel // // deletePod returns an error if not all sources are ready or the pod is not // found in the runtime cache. func (kl *Kubelet) deletePod(pod *v1.Pod) error { … } // rejectPod records an event about the pod with the given reason and message, // and updates the pod to the failed phase in the status manager. func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) { … } // canAdmitPod determines if a pod can be admitted, and gives a reason if it // cannot. "pod" is new pod, while "pods" are all admitted pods // The function returns a boolean value indicating whether the pod // can be admitted, a brief single-word reason and a message explaining why // the pod cannot be admitted. // allocatedPods should represent the pods that have already been admitted, along with their // admitted (allocated) resources. func (kl *Kubelet) canAdmitPod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, string, string) { … } // syncLoop is the main loop for processing changes. It watches for changes from // three channels (file, apiserver, and http) and creates a union of them. For // any new change seen, will run a sync against desired state and running state. If // no changes are seen to the configuration, will synchronize the last known desired // state every sync-frequency seconds. Never returns. func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) { … } // syncLoopIteration reads from various channels and dispatches pods to the // given handler. // // Arguments: // 1. configCh: a channel to read config events from // 2. handler: the SyncHandler to dispatch pods to // 3. syncCh: a channel to read periodic sync events from // 4. housekeepingCh: a channel to read housekeeping events from // 5. plegCh: a channel to read PLEG updates from // // Events are also read from the kubelet liveness manager's update channel. // // The workflow is to read from one of the channels, handle that event, and // update the timestamp in the sync loop monitor. // // Here is an appropriate place to note that despite the syntactical // similarity to the switch statement, the case statements in a select are // evaluated in a pseudorandom order if there are multiple channels ready to // read from when the select is evaluated. In other words, case statements // are evaluated in random order, and you can not assume that the case // statements evaluate in order if multiple channels have events. // // With that in mind, in truly no particular order, the different channels // are handled as follows: // // - configCh: dispatch the pods for the config change to the appropriate // handler callback for the event type // - plegCh: update the runtime cache; sync pod // - syncCh: sync all pods waiting for sync // - housekeepingCh: trigger cleanup of pods // - health manager: sync pods that have failed or in which one or more // containers have failed health checks func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { … } func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) { … } // HandlePodAdditions is the callback in SyncHandler for pods being added from // a config source. func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { … } // HandlePodUpdates is the callback in the SyncHandler interface for pods // being updated from a config source. func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { … } // HandlePodRemoves is the callback in the SyncHandler interface for pods // being removed from a config source. func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { … } // HandlePodReconcile is the callback in the SyncHandler interface for pods // that should be reconciled. Pods are reconciled when only the status of the // pod is updated in the API. func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { … } // HandlePodSyncs is the callback in the syncHandler interface for pods // that should be dispatched to pod workers for sync. func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) { … } func isPodResizeInProgress(pod *v1.Pod, podStatus *kubecontainer.PodStatus) bool { … } // canResizePod determines if the requested resize is currently feasible. // pod should hold the desired (pre-allocated) spec. // Returns true if the resize can proceed. func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, v1.PodResizeStatus) { … } func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod) (*v1.Pod, error) { … } // LatestLoopEntryTime returns the last time in the sync loop monitor. func (kl *Kubelet) LatestLoopEntryTime() time.Time { … } // SyncLoopHealthCheck checks if kubelet's sync loop that updates containers is working. func (kl *Kubelet) SyncLoopHealthCheck(req *http.Request) error { … } // updateRuntimeUp calls the container runtime status callback, initializing // the runtime dependent modules when the container runtime first comes up, // and returns an error if the status check fails. If the status check is OK, // update the container runtime uptime in the kubelet runtimeState. func (kl *Kubelet) updateRuntimeUp() { … } // GetConfiguration returns the KubeletConfiguration used to configure the kubelet. func (kl *Kubelet) GetConfiguration() kubeletconfiginternal.KubeletConfiguration { … } // BirthCry sends an event that the kubelet has started up. func (kl *Kubelet) BirthCry() { … } // ListenAndServe runs the kubelet HTTP server. func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface, tp trace.TracerProvider) { … } // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode. func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint, tp trace.TracerProvider) { … } // ListenAndServePodResources runs the kubelet podresources grpc service func (kl *Kubelet) ListenAndServePodResources() { … } // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around. func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID string) { … } // fastStatusUpdateOnce starts a loop that checks if the current state of kubelet + container runtime // would be able to turn the node ready, and sync the ready state to the apiserver as soon as possible. // Function returns after the node status update after such event, or when the node is already ready. // Function is executed only during Kubelet start which improves latency to ready node by updating // kubelet state, runtime status and node statuses ASAP. func (kl *Kubelet) fastStatusUpdateOnce() { … } // CheckpointContainer tries to checkpoint a container. The parameters are used to // look up the specified container. If the container specified by the given parameters // cannot be found an error is returned. If the container is found the container // engine will be asked to checkpoint the given container into the kubelet's default // checkpoint directory. func (kl *Kubelet) CheckpointContainer( ctx context.Context, podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest, ) error { … } // ListMetricDescriptors gets the descriptors for the metrics that will be returned in ListPodSandboxMetrics. func (kl *Kubelet) ListMetricDescriptors(ctx context.Context) ([]*runtimeapi.MetricDescriptor, error) { … } // ListPodSandboxMetrics retrieves the metrics for all pod sandboxes. func (kl *Kubelet) ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error) { … } func (kl *Kubelet) supportLocalStorageCapacityIsolation() bool { … } // isSyncPodWorthy filters out events that are not worthy of pod syncing func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool { … } // PrepareDynamicResources calls the container Manager PrepareDynamicResources API // This method implements the RuntimeHelper interface func (kl *Kubelet) PrepareDynamicResources(ctx context.Context, pod *v1.Pod) error { … } // UnprepareDynamicResources calls the container Manager UnprepareDynamicResources API // This method implements the RuntimeHelper interface func (kl *Kubelet) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error { … }