const nodeWithoutTopology … type ActivePodsFunc … type ManagerImpl … type endpointInfo … type sourcesReadyStub … type PodReusableDevices … func (s *sourcesReadyStub) AddSource(source string) { … } func (s *sourcesReadyStub) AllReady() bool { … } // NewManagerImpl creates a new manager. func NewManagerImpl(topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) { … } func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) { … } func (m *ManagerImpl) Updates() <-chan resourceupdates.Update { … } // CleanupPluginDirectory is to remove all existing unix sockets // from /var/lib/kubelet/device-plugins on Device Plugin Manager start func (m *ManagerImpl) CleanupPluginDirectory(dir string) error { … } // PluginConnected is to connect a plugin to a new endpoint. // This is done as part of device plugin registration. func (m *ManagerImpl) PluginConnected(resourceName string, p plugin.DevicePlugin) error { … } // PluginDisconnected is to disconnect a plugin from an endpoint. // This is done as part of device plugin deregistration. func (m *ManagerImpl) PluginDisconnected(resourceName string) { … } // PluginListAndWatchReceiver receives ListAndWatchResponse from a device plugin // and ensures that an upto date state (e.g. number of devices and device health) // is captured. Also, registered device and device to container allocation // information is checkpointed to the disk. func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *pluginapi.ListAndWatchResponse) { … } func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) { … } // GetWatcherHandler returns the plugin handler func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler { … } // checkpointFile returns device plugin checkpoint file path. func (m *ManagerImpl) checkpointFile() string { … } // Start starts the Device Plugin Manager and start initialization of // podDevices and allocatedDevices information from checkpointed state and // starts device plugin registration service. func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.Set[string]) error { … } // Stop is the function that can stop the plugin server. // Can be called concurrently, more than once, and is safe to call // without a prior Start. func (m *ManagerImpl) Stop() error { … } // Allocate is the call that you can use to allocate a set of devices // from the registered device plugins. func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error { … } // UpdatePluginResources updates node resources based on devices already allocated to pods. func (m *ManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { … } func (m *ManagerImpl) markResourceUnhealthy(resourceName string) { … } // GetCapacity is expected to be called when Kubelet updates its node status. // The first returned variable contains the registered device plugin resource capacity. // The second returned variable contains the registered device plugin resource allocatable. // The third returned variable contains previously registered resources that are no longer active. // Kubelet uses this information to update resource capacity/allocatable in its node status. // After the call, device plugin can remove the inactive resources from its internal list as the // change is already reflected in Kubelet node status. // Note in the special case after Kubelet restarts, device plugin resource capacities can // temporarily drop to zero till corresponding device plugins re-register. This is OK because // cm.UpdatePluginResource() run during predicate Admit guarantees we adjust nodeinfo // capacity for already allocated pods so that they can continue to run. However, new pods // requiring device plugin resources will not be scheduled till device plugin re-registers. func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) { … } // Checkpoints device to container allocation information to disk. func (m *ManagerImpl) writeCheckpoint() error { … } // Reads device to container allocation information from disk, and populates // m.allocatedDevices accordingly. func (m *ManagerImpl) readCheckpoint() error { … } func (m *ManagerImpl) getCheckpoint() (checkpoint.DeviceManagerCheckpoint, error) { … } // UpdateAllocatedDevices frees any Devices that are bound to terminated pods. func (m *ManagerImpl) UpdateAllocatedDevices() { … } // Returns list of device Ids we need to allocate with Allocate rpc call. // Returns empty list in case we don't need to issue the Allocate rpc call. func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.Set[string]) (sets.Set[string], error) { … } func (m *ManagerImpl) filterByAffinity(podUID, contName, resource string, available sets.Set[string]) (sets.Set[string], sets.Set[string], sets.Set[string]) { … } // allocateContainerResources attempts to allocate all of required device // plugin resources for the input container, issues an Allocate rpc request // for each new device resource requirement, processes their AllocateResponses, // and updates the cached containerDevices on success. func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.Set[string]) error { … } // checkPodActive checks if the given pod is still in activePods list func (m *ManagerImpl) checkPodActive(pod *v1.Pod) bool { … } // GetDeviceRunContainerOptions checks whether we have cached containerDevices // for the passed-in <pod, container> and returns its DeviceRunContainerOptions // for the found one. An empty struct is returned in case no cached state is found. func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) { … } // callPreStartContainerIfNeeded issues PreStartContainer grpc call for device plugin resource // with PreStartRequired option set. func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource string) error { … } // callGetPreferredAllocationIfAvailable issues GetPreferredAllocation grpc // call for device plugin resource with GetPreferredAllocationAvailable option set. func (m *ManagerImpl) callGetPreferredAllocationIfAvailable(podUID, contName, resource string, available, mustInclude sets.Set[string], size int) (sets.Set[string], error) { … } // sanitizeNodeAllocatable scans through allocatedDevices in the device manager // and if necessary, updates allocatableResource in nodeInfo to at least equal to // the allocated capacity. This allows pods that have already been scheduled on // the node to pass GeneralPredicates admission checking even upon device plugin failure. func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulerframework.NodeInfo) { … } func (m *ManagerImpl) isDevicePluginResource(resource string) bool { … } // GetAllocatableDevices returns information about all the healthy devices known to the manager func (m *ManagerImpl) GetAllocatableDevices() ResourceDeviceInstances { … } // GetDevices returns the devices used by the specified container func (m *ManagerImpl) GetDevices(podUID, containerName string) ResourceDeviceInstances { … } func (m *ManagerImpl) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) { … } // ShouldResetExtendedResourceCapacity returns whether the extended resources should be zeroed or not, // depending on whether the node has been recreated. Absence of the checkpoint file strongly indicates the node // has been recreated. func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool { … } func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool { … }