type ControllerParameters … // NewController creates a new PersistentVolume controller func NewController(ctx context.Context, p ControllerParameters) (*PersistentVolumeController, error) { … } // initializeCaches fills all controller caches with initial data from etcd in // order to have the caches already filled when first addClaim/addVolume to // perform initial synchronization of the controller. func (ctrl *PersistentVolumeController) initializeCaches(logger klog.Logger, volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) { … } // enqueueWork adds volume or claim to given work queue. func (ctrl *PersistentVolumeController) enqueueWork(ctx context.Context, queue workqueue.TypedInterface[string], obj interface{ … } func (ctrl *PersistentVolumeController) storeVolumeUpdate(logger klog.Logger, volume interface{ … } func (ctrl *PersistentVolumeController) storeClaimUpdate(logger klog.Logger, claim interface{ … } // updateVolume runs in worker thread and handles "volume added", // "volume updated" and "periodic sync" events. func (ctrl *PersistentVolumeController) updateVolume(ctx context.Context, volume *v1.PersistentVolume) { … } // deleteVolume runs in worker thread and handles "volume deleted" event. func (ctrl *PersistentVolumeController) deleteVolume(ctx context.Context, volume *v1.PersistentVolume) { … } // updateClaim runs in worker thread and handles "claim added", // "claim updated" and "periodic sync" events. func (ctrl *PersistentVolumeController) updateClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) { … } // Unit test [5-5] [5-6] [5-7] // deleteClaim runs in worker thread and handles "claim deleted" event. func (ctrl *PersistentVolumeController) deleteClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) { … } // Run starts all of this controller's control loops func (ctrl *PersistentVolumeController) Run(ctx context.Context) { … } func (ctrl *PersistentVolumeController) updateClaimMigrationAnnotations(ctx context.Context, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) { … } func (ctrl *PersistentVolumeController) updateVolumeMigrationAnnotationsAndFinalizers(ctx context.Context, volume *v1.PersistentVolume) (*v1.PersistentVolume, error) { … } // modifyDeletionFinalizers updates the finalizers based on the reclaim policy and if it is a in-tree volume or not. // The in-tree PV deletion protection finalizer is only added if the reclaimPolicy associated with the PV is `Delete`. // The in-tree PV deletion protection finalizer is removed if the reclaimPolicy associated with the PV is `Retain` or // `Recycle`, removing the finalizer is necessary to reflect the recalimPolicy updates on the PV. // The method also removes any external PV Deletion Protection finalizers added on the PV, this represents CSI migration // rollback/disable scenarios. func modifyDeletionFinalizers(logger klog.Logger, cmpm CSIMigratedPluginManager, volume *v1.PersistentVolume) ([]string, bool) { … } // updateMigrationAnnotations takes an Annotations map and checks for a // provisioner name using the provisionerKey. It will then add a // "pv.kubernetes.io/migrated-to" annotation if migration with the CSI // driver name for that provisioner is "on" based on feature flags, it will also // remove the annotation is migration is "off" for that provisioner in rollback // scenarios. Returns true if the annotations map was modified and false otherwise. func updateMigrationAnnotations(logger klog.Logger, cmpm CSIMigratedPluginManager, translator CSINameTranslator, ann map[string]string, claim bool) bool { … } // volumeWorker processes items from volumeQueue. It must run only once, // syncVolume is not assured to be reentrant. func (ctrl *PersistentVolumeController) volumeWorker(ctx context.Context) { … } // claimWorker processes items from claimQueue. It must run only once, // syncClaim is not reentrant. func (ctrl *PersistentVolumeController) claimWorker(ctx context.Context) { … } // resync supplements short resync period of shared informers - we don't want // all consumers of PV/PVC shared informer to have a short resync period, // therefore we do our own. func (ctrl *PersistentVolumeController) resync(ctx context.Context) { … } // setClaimProvisioner saves // claim.Annotations["volume.kubernetes.io/storage-provisioner"] = class.Provisioner func (ctrl *PersistentVolumeController) setClaimProvisioner(ctx context.Context, claim *v1.PersistentVolumeClaim, provisionerName string) (*v1.PersistentVolumeClaim, error) { … } func getClaimStatusForLogging(claim *v1.PersistentVolumeClaim) string { … } func getVolumeStatusForLogging(volume *v1.PersistentVolume) string { … } // storeObjectUpdate updates given cache with a new object version from Informer // callback (i.e. with events from etcd) or with an object modified by the // controller itself. Returns "true", if the cache was updated, false if the // object is an old version and should be ignored. func storeObjectUpdate(logger klog.Logger, store cache.Store, obj interface{ … }