chromium/tools/binary_size/libsupersize/parallel_test.py

#!/usr/bin/env python3
# Copyright 2017 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import os
import threading
import unittest

import parallel


def _ForkTestHelper(arg1, arg2, pickle_me_not, test_instance, parent_pid):
  _ = pickle_me_not  # Suppress lint warning.
  test_instance.assertNotEqual(os.getpid(), parent_pid)
  return arg1 + arg2


class Unpicklable:
  """Ensures that pickle() is not called on parameters."""

  def __getstate__(self):
    raise AssertionError('Tried to pickle')


class ConcurrentTest(unittest.TestCase):
  def testEncodeDictOfLists_Empty(self):
    test_dict = {}
    encoded = parallel.EncodeDictOfLists(test_dict)
    decoded = parallel.DecodeDictOfLists(encoded)
    self.assertEqual(test_dict, decoded)

  def testEncodeDictOfLists_EmptyValue(self):
    test_dict = {'foo': []}
    encoded = parallel.EncodeDictOfLists(test_dict)
    decoded = parallel.DecodeDictOfLists(encoded)
    self.assertEqual(test_dict, decoded)

  def testEncodeDictOfLists_AllStrings(self):
    test_dict = {'foo': ['a', 'b', 'c'], 'foo2': ['a', 'b']}
    encoded = parallel.EncodeDictOfLists(test_dict)
    decoded = parallel.DecodeDictOfLists(encoded)
    self.assertEqual(test_dict, decoded)

  def testEncodeDictOfLists_KeyTransform(self):
    test_dict = {0: ['a', 'b', 'c'], 9: ['a', 'b']}
    encoded = parallel.EncodeDictOfLists(test_dict, key_transform=str)
    decoded = parallel.DecodeDictOfLists(encoded, key_transform=int)
    self.assertEqual(test_dict, decoded)

  def testEncodeDictOfLists_ValueTransform(self):
    test_dict = {'a': ['0', '1', '2'], 'b': ['3', '4']}
    expected = {'a': [0, 1, 2], 'b': [3, 4]}
    encoded = parallel.EncodeDictOfLists(test_dict)
    decoded = parallel.DecodeDictOfLists(encoded, value_transform=int)
    self.assertEqual(expected, decoded)

  def testEncodeDictOfLists_Join_Empty(self):
    test_dict1 = {}
    test_dict2 = {}
    expected = {}
    encoded1 = parallel.EncodeDictOfLists(test_dict1)
    encoded2 = parallel.EncodeDictOfLists(test_dict2)
    encoded = parallel.JoinEncodedDictOfLists([encoded1, encoded2])
    decoded = parallel.DecodeDictOfLists(encoded)
    self.assertEqual(expected, decoded)

  def testEncodeDictOfLists_Join_Singl(self):
    test_dict1 = {'key1': ['a']}
    encoded1 = parallel.EncodeDictOfLists(test_dict1)
    encoded = parallel.JoinEncodedDictOfLists([encoded1])
    decoded = parallel.DecodeDictOfLists(encoded)
    self.assertEqual(test_dict1, decoded)

  def testEncodeDictOfLists_JoinMultiple(self):
    test_dict1 = {'key1': ['a']}
    test_dict2 = {'key2': ['b']}
    expected = {'key1': ['a'], 'key2': ['b']}
    encoded1 = parallel.EncodeDictOfLists(test_dict1)
    encoded2 = parallel.EncodeDictOfLists({})
    encoded3 = parallel.EncodeDictOfLists(test_dict2)
    encoded = parallel.JoinEncodedDictOfLists([encoded1, encoded2, encoded3])
    decoded = parallel.DecodeDictOfLists(encoded)
    self.assertEqual(expected, decoded)

  def testCallOnThread(self):
    main_thread = threading.current_thread()

    def callback(arg1, arg2):
      self.assertEqual(1, arg1)
      self.assertEqual(2, arg2)
      my_thread = threading.current_thread()
      self.assertNotEqual(my_thread, main_thread)
      return 3

    result = parallel.CallOnThread(callback, 1, arg2=2)
    self.assertEqual(3, result.get())

  def testForkAndCall_normal(self):
    parent_pid = os.getpid()
    result = parallel.ForkAndCall(_ForkTestHelper,
                                  (1, 2, Unpicklable(), self, parent_pid))
    self.assertEqual(3, result.get())

  def testForkAndCall_exception(self):
    parent_pid = os.getpid()
    result = parallel.ForkAndCall(_ForkTestHelper,
                                  (1, 'a', None, self, parent_pid))
    self.assertRaises(TypeError, result.get)

  def testBulkForkAndCall_none(self):
    results = parallel.BulkForkAndCall(_ForkTestHelper, [])
    self.assertEqual([], list(results))

  def testBulkForkAndCall_few(self):
    parent_pid = os.getpid()
    results = parallel.BulkForkAndCall(_ForkTestHelper,
                                       [(1, 2, Unpicklable(), self, parent_pid),
                                        (3, 4, None, self, parent_pid)])
    self.assertEqual({3, 7}, set(results))

  def testBulkForkAndCall_few_kwargs(self):
    parent_pid = os.getpid()
    results = parallel.BulkForkAndCall(
        _ForkTestHelper, [(1, 2, Unpicklable()), (3, 4, None)],
        test_instance=self,
        parent_pid=parent_pid)
    self.assertEqual({3, 7}, set(results))

  def testBulkForkAndCall_many(self):
    parent_pid = os.getpid()
    args = [(1, 2, Unpicklable(), self, parent_pid) for _ in range(100)]
    results = parallel.BulkForkAndCall(_ForkTestHelper, args)
    self.assertEqual([3] * 100, list(results))

  def testBulkForkAndCall_many_kwargs(self):
    parent_pid = os.getpid()
    args = [(1, 2) for _ in range(100)]
    results = parallel.BulkForkAndCall(
        _ForkTestHelper,
        args,
        pickle_me_not=Unpicklable(),
        test_instance=self,
        parent_pid=parent_pid)
    self.assertEqual([3] * 100, list(results))

  def testBulkForkAndCall_exception(self):
    parent_pid = os.getpid()
    results = parallel.BulkForkAndCall(_ForkTestHelper,
                                       [(1, 'a', None, self, parent_pid)])
    self.assertRaises(TypeError, results.__next__)


if __name__ == '__main__':
  unittest.main()