kubernetes/pkg/kubelet/config/file_linux_test.go

//go:build linux
// +build linux

/*
Copyright 2016 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package config

import (
	"fmt"
	"io"
	"os"
	"path/filepath"
	"sync"
	"testing"
	"time"

	v1 "k8s.io/api/core/v1"
	apiequality "k8s.io/apimachinery/pkg/api/equality"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/wait"
	clientscheme "k8s.io/client-go/kubernetes/scheme"
	api "k8s.io/kubernetes/pkg/apis/core"
	k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
	"k8s.io/kubernetes/pkg/apis/core/validation"
	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
	"k8s.io/kubernetes/pkg/securitycontext"
)

func TestExtractFromNonExistentFile(t *testing.T) {
	ch := make(chan interface{}, 1)
	lw := newSourceFile("/some/fake/file", "localhost", time.Millisecond, ch)
	err := lw.doWatch()
	if err == nil {
		t.Errorf("Expected error")
	}
}

func TestUpdateOnNonExistentFile(t *testing.T) {
	ch := make(chan interface{})
	NewSourceFile("random_non_existent_path", "localhost", time.Millisecond, ch)
	select {
	case got := <-ch:
		update := got.(kubetypes.PodUpdate)
		expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource)
		if !apiequality.Semantic.DeepDerivative(expected, update) {
			t.Fatalf("expected %#v, Got %#v", expected, update)
		}

	case <-time.After(wait.ForeverTestTimeout):
		t.Fatalf("expected update, timeout instead")
	}
}

func TestReadPodsFromFileExistAlready(t *testing.T) {
	hostname := types.NodeName("random-test-hostname")
	var testCases = getTestCases(hostname)

	for _, testCase := range testCases {
		func() {
			dirName, err := mkTempDir("file-test")
			if err != nil {
				t.Fatalf("unable to create temp dir: %v", err)
			}
			defer os.RemoveAll(dirName)
			file := testCase.writeToFile(dirName, "test_pod_manifest", t)

			ch := make(chan interface{})
			NewSourceFile(file, hostname, time.Millisecond, ch)
			select {
			case got := <-ch:
				update := got.(kubetypes.PodUpdate)
				for _, pod := range update.Pods {
					// TODO: remove the conversion when validation is performed on versioned objects.
					internalPod := &api.Pod{}
					if err := k8s_api_v1.Convert_v1_Pod_To_core_Pod(pod, internalPod, nil); err != nil {
						t.Fatalf("%s: Cannot convert pod %#v, %#v", testCase.desc, pod, err)
					}
					if errs := validation.ValidatePodCreate(internalPod, validation.PodValidationOptions{}); len(errs) > 0 {
						t.Fatalf("%s: Invalid pod %#v, %#v", testCase.desc, internalPod, errs)
					}
				}
				if !apiequality.Semantic.DeepEqual(testCase.expected, update) {
					t.Fatalf("%s: Expected %#v, Got %#v", testCase.desc, testCase.expected, update)
				}
			case <-time.After(wait.ForeverTestTimeout):
				t.Fatalf("%s: Expected update, timeout instead", testCase.desc)
			}
		}()
	}
}

var (
	testCases = []struct {
		watchDir bool
		symlink  bool
		period   time.Duration
	}{
		// set the period to be long enough for the file to be changed
		// and short enough to trigger the event
		{true, true, 3 * time.Second},

		// set the period to avoid periodic PodUpdate event
		{true, false, 60 * time.Second},
		{false, true, 60 * time.Second},
		{false, false, 60 * time.Second},
	}
)

func TestWatchFileAdded(t *testing.T) {
	for _, testCase := range testCases {
		watchFileAdded(testCase.watchDir, testCase.symlink, t)
	}
}

func TestWatchFileChanged(t *testing.T) {
	for _, testCase := range testCases {
		watchFileChanged(testCase.watchDir, testCase.symlink, testCase.period, t)
	}
}

type testCase struct {
	lock     *sync.Mutex
	desc     string
	pod      runtime.Object
	expected kubetypes.PodUpdate
}

func getTestCases(hostname types.NodeName) []*testCase {
	grace := int64(30)
	enableServiceLinks := v1.DefaultEnableServiceLinks
	return []*testCase{
		{
			lock: &sync.Mutex{},
			desc: "Simple pod",
			pod: &v1.Pod{
				TypeMeta: metav1.TypeMeta{
					Kind:       "Pod",
					APIVersion: "",
				},
				ObjectMeta: metav1.ObjectMeta{
					Name:      "test",
					UID:       "12345",
					Namespace: "mynamespace",
				},
				Spec: v1.PodSpec{
					Containers:      []v1.Container{{Name: "image", Image: "test/image", SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults()}},
					SecurityContext: &v1.PodSecurityContext{},
					SchedulerName:   v1.DefaultSchedulerName,
				},
				Status: v1.PodStatus{
					Phase: v1.PodPending,
				},
			},
			expected: CreatePodUpdate(kubetypes.SET, kubetypes.FileSource, &v1.Pod{
				ObjectMeta: metav1.ObjectMeta{
					Name:        "test-" + string(hostname),
					UID:         "12345",
					Namespace:   "mynamespace",
					Annotations: map[string]string{kubetypes.ConfigHashAnnotationKey: "12345"},
				},
				Spec: v1.PodSpec{
					NodeName:                      string(hostname),
					RestartPolicy:                 v1.RestartPolicyAlways,
					DNSPolicy:                     v1.DNSClusterFirst,
					TerminationGracePeriodSeconds: &grace,
					Tolerations: []v1.Toleration{{
						Operator: "Exists",
						Effect:   "NoExecute",
					}},
					Containers: []v1.Container{{
						Name:                     "image",
						Image:                    "test/image",
						TerminationMessagePath:   "/dev/termination-log",
						ImagePullPolicy:          "Always",
						SecurityContext:          securitycontext.ValidSecurityContextWithContainerDefaults(),
						TerminationMessagePolicy: v1.TerminationMessageReadFile,
					}},
					SecurityContext:    &v1.PodSecurityContext{},
					SchedulerName:      v1.DefaultSchedulerName,
					EnableServiceLinks: &enableServiceLinks,
				},
				Status: v1.PodStatus{
					Phase: v1.PodPending,
				},
			}),
		},
	}
}

func (tc *testCase) writeToFile(dir, name string, t *testing.T) string {
	fileContents, err := runtime.Encode(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), tc.pod)
	if err != nil {
		t.Fatalf("%s: error in encoding the pod: %v", tc.desc, err)
	}

	fileName := filepath.Join(dir, name)
	if err := writeFile(fileName, []byte(fileContents)); err != nil {
		t.Fatalf("unable to write test file %#v", err)
	}
	return fileName
}

func createSymbolicLink(link, target, name string, t *testing.T) string {
	linkName := filepath.Join(link, name)
	linkedFile := filepath.Join(target, name)

	err := os.Symlink(linkedFile, linkName)
	if err != nil {
		t.Fatalf("unexpected error when create symbolic link: %v", err)
	}
	return linkName
}

func watchFileAdded(watchDir bool, symlink bool, t *testing.T) {
	hostname := types.NodeName("random-test-hostname")
	var testCases = getTestCases(hostname)

	fileNamePre := "test_pod_manifest"
	for index, testCase := range testCases {
		func() {
			dirName, err := mkTempDir("dir-test")
			if err != nil {
				t.Fatalf("unable to create temp dir: %v", err)
			}
			defer removeAll(dirName, t)

			fileName := fmt.Sprintf("%s_%d", fileNamePre, index)
			var linkedDirName string
			if symlink {
				linkedDirName, err = mkTempDir("linked-dir-test")
				if err != nil {
					t.Fatalf("unable to create temp dir for linked files: %v", err)
				}
				defer removeAll(linkedDirName, t)
				createSymbolicLink(dirName, linkedDirName, fileName, t)
			}

			ch := make(chan interface{})
			if watchDir {
				NewSourceFile(dirName, hostname, 100*time.Millisecond, ch)
			} else {
				NewSourceFile(filepath.Join(dirName, fileName), hostname, 100*time.Millisecond, ch)
			}
			expectEmptyUpdate(t, ch)

			addFile := func() {
				// Add a file
				if symlink {
					testCase.writeToFile(linkedDirName, fileName, t)
					return
				}

				testCase.writeToFile(dirName, fileName, t)
			}

			go addFile()

			// For !watchDir: expect an update by SourceFile.reloadConfig().
			// For watchDir: expect at least one update from CREATE & MODIFY inotify event.
			// Shouldn't expect two updates from CREATE & MODIFY because CREATE doesn't guarantee file written.
			// In that case no update will be sent from CREATE event.
			expectUpdate(t, ch, testCase)
		}()
	}
}

func watchFileChanged(watchDir bool, symlink bool, period time.Duration, t *testing.T) {
	hostname := types.NodeName("random-test-hostname")
	var testCases = getTestCases(hostname)

	fileNamePre := "test_pod_manifest"
	for index, testCase := range testCases {
		func() {
			dirName, err := mkTempDir("dir-test")
			fileName := fmt.Sprintf("%s_%d", fileNamePre, index)
			if err != nil {
				t.Fatalf("unable to create temp dir: %v", err)
			}
			defer removeAll(dirName, t)

			var linkedDirName string
			if symlink {
				linkedDirName, err = mkTempDir("linked-dir-test")
				if err != nil {
					t.Fatalf("unable to create temp dir for linked files: %v", err)
				}
				defer removeAll(linkedDirName, t)
				createSymbolicLink(dirName, linkedDirName, fileName, t)
			}

			var file string
			ch := make(chan interface{})
			func() {
				testCase.lock.Lock()
				defer testCase.lock.Unlock()

				if symlink {
					file = testCase.writeToFile(linkedDirName, fileName, t)
					return
				}

				file = testCase.writeToFile(dirName, fileName, t)
			}()

			if watchDir {
				NewSourceFile(dirName, hostname, period, ch)
			} else {
				NewSourceFile(file, hostname, period, ch)
			}

			// await fsnotify to be ready
			time.Sleep(time.Second)

			// expect an update by SourceFile.resetStoreFromPath()
			expectUpdate(t, ch, testCase)

			pod := testCase.pod.(*v1.Pod)
			pod.Spec.Containers[0].Name = "image2"

			testCase.expected.Pods[0].Spec.Containers[0].Name = "image2"
			changeFile := func() {
				// Edit the file content
				if symlink {
					file = testCase.writeToFile(linkedDirName, fileName, t)
					return
				}

				file = testCase.writeToFile(dirName, fileName, t)
			}

			go changeFile()
			// expect an update by MODIFY inotify event
			expectUpdate(t, ch, testCase)

			if watchDir {
				go changeFileName(dirName, fileName, fileName+"_ch", t)
				// expect an update by MOVED_FROM inotify event cause changing file name
				expectEmptyUpdate(t, ch)
				// expect an update by MOVED_TO inotify event cause changing file name
				expectUpdate(t, ch, testCase)
			}
		}()
	}
}

func expectUpdate(t *testing.T, ch chan interface{}, testCase *testCase) {
	timer := time.After(5 * time.Second)
	for {
		select {
		case got := <-ch:
			update := got.(kubetypes.PodUpdate)
			if len(update.Pods) == 0 {
				// filter out the empty updates from reading a non-existing path
				continue
			}
			for _, pod := range update.Pods {
				// TODO: remove the conversion when validation is performed on versioned objects.
				internalPod := &api.Pod{}
				if err := k8s_api_v1.Convert_v1_Pod_To_core_Pod(pod, internalPod, nil); err != nil {
					t.Fatalf("%s: Cannot convert pod %#v, %#v", testCase.desc, pod, err)
				}
				if errs := validation.ValidatePodCreate(internalPod, validation.PodValidationOptions{}); len(errs) > 0 {
					t.Fatalf("%s: Invalid pod %#v, %#v", testCase.desc, internalPod, errs)
				}
			}

			if !apiequality.Semantic.DeepEqual(testCase.expected, update) {
				t.Fatalf("%s: Expected: %#v, Got: %#v", testCase.desc, testCase.expected, update)
			}
			return
		case <-timer:
			t.Fatalf("%s: Expected update, timeout instead", testCase.desc)
		}
	}
}

func expectEmptyUpdate(t *testing.T, ch chan interface{}) {
	timer := time.After(5 * time.Second)
	for {
		select {
		case got := <-ch:
			update := got.(kubetypes.PodUpdate)
			if len(update.Pods) != 0 {
				t.Fatalf("expected empty update, got %#v", update)
			}
			return
		case <-timer:
			t.Fatalf("expected empty update, timeout instead")
		}
	}
}

func writeFile(filename string, data []byte) error {
	f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE, 0666)
	if err != nil {
		return err
	}
	n, err := f.Write(data)
	if err == nil && n < len(data) {
		err = io.ErrShortWrite
	}
	if err1 := f.Close(); err == nil {
		err = err1
	}
	return err
}

func changeFileName(dir, from, to string, t *testing.T) {
	fromPath := filepath.Join(dir, from)
	toPath := filepath.Join(dir, to)
	if err := os.Rename(fromPath, toPath); err != nil {
		t.Errorf("Fail to change file name: %s", err)
	}
}