diff --git a/common/test/__init__.py b/common/test/__init__.py index 33eb5c98e..e69de29bb 100644 --- a/common/test/__init__.py +++ b/common/test/__init__.py @@ -1,27 +0,0 @@ -# Back In Time -# Copyright (C) 2015-2022 Germar Reitze -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - -# This main file is required to run all unit tests in the "test" folder -# without specifying each test file name in the command line. -# See also: -# https://stackoverflow.com/a/43733357 -# https://docs.python.org/3/library/unittest.html#test-discovery - -from test import * - -if __name__ == '__main__': - pass diff --git a/common/test/generic.py b/common/test/generic.py index f0adc265a..edfe953a6 100644 --- a/common/test/generic.py +++ b/common/test/generic.py @@ -15,6 +15,13 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +"""This module offers some helpers and tools for unittesting. + +Most of the conent are `unittest.TestCase` derived classed doing basic setup +for the testing environment. They are dealing with snapshot path's, SSH, +config files and things like set. +""" + import os import sys import unittest @@ -33,12 +40,17 @@ tools.registerBackintimePath('qt', 'plugins') TMP_FLOCK = NamedTemporaryFile() -PRIV_KEY_FILE = os.path.expanduser(os.path.join("~",".ssh","id_rsa")) -AUTHORIZED_KEYS_FILE = os.path.expanduser(os.path.join("~",".ssh","authorized_keys")) +PRIV_KEY_FILE = os.path.expanduser( + os.path.join("~", ".ssh", "id_rsa")) +AUTHORIZED_KEYS_FILE = os.path.expanduser( + os.path.join("~", ".ssh", "authorized_keys")) DUMMY = 'dummy_test_process.sh' -if os.path.exists(PRIV_KEY_FILE + '.pub') and os.path.exists(AUTHORIZED_KEYS_FILE): +if os.path.exists(PRIV_KEY_FILE + '.pub') \ + and os.path.exists(AUTHORIZED_KEYS_FILE): + with open(PRIV_KEY_FILE + '.pub', 'rb') as pub: + with open(AUTHORIZED_KEYS_FILE, 'rb') as auth: KEY_IN_AUTH = pub.read() in auth.readlines() else: @@ -49,7 +61,7 @@ try: with socket.create_connection(('localhost', '22'), 2.0) as s: sshdPortAvailable = not bool(s.connect_ex(s.getpeername())) -except: +except ConnectionRefusedError: sshdPortAvailable = False LOCAL_SSH = all((tools.processExists('sshd'), @@ -60,27 +72,54 @@ ON_TRAVIS = os.environ.get('TRAVIS', 'None').lower() == 'true' ON_RTD = os.environ.get('READTHEDOCS', 'None').lower() == 'true' + class TestCase(unittest.TestCase): + """Base class for Back In Time unittesting. + + """ + def __init__(self, methodName): + """Initialize. + + Args: + methodName: Unkown. + """ + + # note by buhtz: This is not recommended. Unittest module handle + # that itself. The default locale is "C". Need further investigation. os.environ['LANGUAGE'] = 'en_US.UTF-8' - self.cfgFile = os.path.abspath(os.path.join(__file__, os.pardir, 'config')) + + # config file path + self.cfgFile = os.path.abspath( + os.path.join(__file__, os.pardir, 'config')) + logger.APP_NAME = 'BIT_unittest' logger.openlog() + super(TestCase, self).__init__(methodName) def setUp(self): + """ + """ logger.DEBUG = '-v' in sys.argv self.run = False self.sharePathObj = TemporaryDirectory() self.sharePath = self.sharePathObj.name def tearDown(self): + """ + """ self.sharePathObj.cleanup() def callback(self, func, *args): + """ + """ func(*args) self.run = True + # the next six assert methods are deprecated and can be replaced + # by using Pythons in-build "pathlib.Path" + def assertExists(self, *path): full_path = os.path.join(*path) if not os.path.exists(full_path): @@ -111,7 +150,11 @@ def assertIsLink(self, *path): if not os.path.islink(full_path): self.fail('Not a symlink: {}'.format(full_path)) + class TestCaseCfg(TestCase): + """Base class setting up config file. + """ + def setUp(self): super(TestCaseCfg, self).setUp() self.cfg = config.Config(self.cfgFile, self.sharePath) @@ -122,31 +165,60 @@ def setUp(self): self.cfg.PLUGIN_MANAGER.load() + class TestCaseSnapshotPath(TestCaseCfg): + """Base class for snapshot test cases. + + It setup a temporary directory as snapshot paths for each test method. + """ + def setUp(self): + """ + """ super(TestCaseSnapshotPath, self).setUp() - #use a new TemporaryDirectory for snapshotPath to avoid - #side effects on leftovers + + # use a new TemporaryDirectory for snapshotPath to avoid + # side effects on leftovers self.tmpDir = TemporaryDirectory() self.cfg.dict['profile1.snapshots.path'] = self.tmpDir.name self.snapshotPath = self.cfg.snapshotsFullPath() def tearDown(self): + """ + """ super(TestCaseSnapshotPath, self).tearDown() + self.tmpDir.cleanup() + class SnapshotsTestCase(TestCaseSnapshotPath): + """Base class for snapshot testing unittest classes. + + Unclear why this need to be separated from `TestCaseSnapshotPath`. + """ + def setUp(self): + """ + """ super(SnapshotsTestCase, self).setUp() + os.makedirs(self.snapshotPath) self.sn = snapshots.Snapshots(self.cfg) - #use a tmp-file for flock because test_flockExclusive would deadlock - #otherwise if a regular snapshot is running in background + + # use a tmp-file for flock because test_flockExclusive would deadlock + # otherwise if a regular snapshot is running in background self.sn.GLOBAL_FLOCK = TMP_FLOCK.name + class SnapshotsWithSidTestCase(SnapshotsTestCase): + """ + """ + def setUp(self): + """ + """ super(SnapshotsWithSidTestCase, self).setUp() + self.sid = snapshots.SID('20151219-010324-123', self.cfg) self.sid.makeDirs() @@ -156,69 +228,126 @@ def setUp(self): self.testFileFullPath = self.sid.pathBackup(self.testFile) self.sid.makeDirs(self.testDir) - with open(self.sid.pathBackup(self.testFile), 'wt') as f: + with open(self.sid.pathBackup(self.testFile), 'wt'): pass + class SSHTestCase(TestCaseCfg): - # running this test requires that user has public / private key pair created and ssh server running + """Base class for test cases using the SSH features. + + Running this test requires that user has public / private key pair + created and SSH server (at localhost) running. + """ def setUp(self): + """ + """ super(SSHTestCase, self).setUp() + logger.DEBUG = '-v' in sys.argv + self.cfg.setSnapshotsMode('ssh') self.cfg.setSshHost('localhost') self.cfg.setSshPrivateKeyFile(PRIV_KEY_FILE) + # use a TemporaryDirectory for remote snapshot path self.tmpDir = TemporaryDirectory() self.remotePath = os.path.join(self.tmpDir.name, 'foo') - self.remoteFullPath = os.path.join(self.remotePath, 'backintime', *self.cfg.hostUserProfile()) + self.remoteFullPath = os.path.join( + self.remotePath, 'backintime', *self.cfg.hostUserProfile()) self.cfg.setSshSnapshotsPath(self.remotePath) self.mount_kwargs = {} def tearDown(self): + """ + """ super(SSHTestCase, self).tearDown() self.tmpDir.cleanup() + class SSHSnapshotTestCase(SSHTestCase): + """ + """ + def setUp(self): + """ + """ super(SSHSnapshotTestCase, self).setUp() + self.snapshotPath = self.cfg.sshSnapshotsFullPath() os.makedirs(self.snapshotPath) self.sn = snapshots.Snapshots(self.cfg) - #use a tmp-file for flock because test_flockExclusive would deadlock - #otherwise if a regular snapshot is running in background + + # use a tmp-file for flock because test_flockExclusive would deadlock + # otherwise if a regular snapshot is running in background self.sn.GLOBAL_FLOCK = TMP_FLOCK.name + class SSHSnapshotsWithSidTestCase(SSHSnapshotTestCase): + """ + """ + def setUp(self): + """ + """ super(SSHSnapshotsWithSidTestCase, self).setUp() + self.sid = snapshots.SID('20151219-010324-123', self.cfg) - self.remoteSIDBackupPath = os.path.join(self.snapshotPath, self.sid.sid, 'backup') + self.remoteSIDBackupPath = os.path.join( + self.snapshotPath, self.sid.sid, 'backup') + os.makedirs(self.remoteSIDBackupPath) + self.testDir = 'foo/bar' - self.testDirFullPath = os.path.join(self.remoteSIDBackupPath, self.testDir) + self.testDirFullPath = os.path.join( + self.remoteSIDBackupPath, self.testDir) self.testFile = 'foo/bar/baz' - self.testFileFullPath = os.path.join(self.remoteSIDBackupPath, self.testFile) + self.testFileFullPath = os.path.join( + self.remoteSIDBackupPath, self.testFile) os.makedirs(self.testDirFullPath) - with open(self.testFileFullPath, 'wt') as f: + + with open(self.testFileFullPath, 'wt'): pass + def create_test_files(path): + """Create folders and files. + + Args: + path: Destination folder. + + That is the resulting folder structure :: + + foo + └── bar + ├── baz + ├── file with spaces + └── test + + """ os.makedirs(os.path.join(path, 'foo', 'bar')) + with open(os.path.join(path, 'foo', 'bar', 'baz'), 'wt') as f: f.write('foo') + with open(os.path.join(path, 'test'), 'wt') as f: f.write('bar') + with open(os.path.join(path, 'file with spaces'), 'wt') as f: f.write('asdf') + @contextmanager -def mockPermissions(path, mode = 0o000): +def mockPermissions(path, mode=0o000): + """ + """ + st = os.stat(path) os.chmod(path, mode) yield + # fix permissions so it can be removed os.chmod(path, st.st_mode) diff --git a/common/test/test_backintime.py b/common/test/test_backintime.py index 0656bbbe4..5eae78aea 100644 --- a/common/test/test_backintime.py +++ b/common/test/test_backintime.py @@ -79,7 +79,7 @@ def test_local_snapshot_is_successful(self): self.assertEqual(proc.returncode, 0, msg) - self.assertRegex(output.decode(), re.compile(''' + self.assertRegex(output.decode(), re.compile(r''' Back In Time Version: \d+.\d+.\d+.* @@ -113,7 +113,7 @@ def test_local_snapshot_is_successful(self): .format(proc.returncode, error.decode(), output.decode()) self.assertEqual(proc.returncode, 0, msg) - self.assertRegex(output.decode(), re.compile(''' + self.assertRegex(output.decode(), re.compile(r''' Back In Time Version: \d+.\d+.\d+.* @@ -152,7 +152,7 @@ def test_local_snapshot_is_successful(self): .format(proc.returncode, error.decode(), output.decode()) self.assertEqual(proc.returncode, 0, msg) - self.assertRegex(output.decode(), re.compile(''' + self.assertRegex(output.decode(), re.compile(r''' Back In Time Version: \d+.\d+.\d+.* diff --git a/common/test/test_tools.py b/common/test/test_tools.py index 8a83e7123..a77f3edb5 100644 --- a/common/test/test_tools.py +++ b/common/test/test_tools.py @@ -1,5 +1,6 @@ # Back In Time -# Copyright (C) 2008-2022 Oprea Dan, Bart de Koning, Richard Bailey, Germar Reitze, Taylor Raack +# Copyright (C) 2008-2022 Oprea Dan, Bart de Koning, Richard Bailey, +# Germar Reitze, Taylor Raack # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -35,13 +36,14 @@ import config import configfile -#chroot jails used for building may have no UUID devices (because of tmpfs) -#we need to skip tests that require UUIDs +# chroot jails used for building may have no UUID devices (because of tmpfs) +# we need to skip tests that require UUIDs DISK_BY_UUID_AVAILABLE = os.path.exists(tools.DISK_BY_UUID) -UDEVADM_HAS_UUID = subprocess.Popen(['udevadm', 'info', '-e'], - stdout = subprocess.PIPE, - stderr = subprocess.DEVNULL - ).communicate()[0].find(b'ID_FS_UUID=') > 0 + +UDEVADM_HAS_UUID = subprocess.Popen( + ['udevadm', 'info', '-e'], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL).communicate()[0].find(b'ID_FS_UUID=') > 0 RSYNC_INSTALLED = tools.checkCommand('rsync') @@ -57,6 +59,7 @@ are welcome to redistribute it under certain conditions. See the GNU General Public License for details. """ + RSYNC_310_VERSION = """rsync version 3.1.0 protocol version 31 Copyright (C) 1996-2013 by Andrew Tridgell, Wayne Davison, and others. Web site: http://rsync.samba.org/ @@ -70,10 +73,12 @@ General Public License for details. """ + class TestTools(generic.TestCase): """ All functions test here come from tools.py """ + def setUp(self): super(TestTools, self).setUp() self.subproc = None @@ -156,7 +161,7 @@ def test_readFile(self): f.flush() self.assertIsInstance(tools.readFile(tmp_gz), str) self.assertEqual(tools.readFile(tmp_gz), 'foo\nbar') - os.remove(tmp_gz+ '.gz') + os.remove(tmp_gz + '.gz') def test_readFileLines(self): """ @@ -184,7 +189,7 @@ def test_readFileLines(self): f.flush() self.assertIsInstance(tools.readFileLines(tmp_gz), list) self.assertEqual(tools.readFileLines(tmp_gz), ['foo', 'bar']) - os.remove(tmp_gz+ '.gz') + os.remove(tmp_gz + '.gz') def test_checkCommand(self): """ @@ -214,7 +219,8 @@ def test_makeDirs(self): def test_makeDirs_not_writable(self): with TemporaryDirectory() as d: os.chmod(d, stat.S_IRUSR) - path = os.path.join(d, 'foobar{}'.format(random.randrange(100, 999))) + path = os.path.join( + d, 'foobar{}'.format(random.randrange(100, 999))) self.assertFalse(tools.makeDirs(path)) def test_mkdir(self): @@ -223,10 +229,13 @@ def test_mkdir(self): path = os.path.join(d, 'foo') self.assertTrue(tools.mkdir(path)) for mode in (0o700, 0o644, 0o777): - msg = 'new path should have octal permissions {0:#o}'.format(mode) + msg = 'new path should have octal permissions {0:#o}' \ + .format(mode) path = os.path.join(d, '{0:#o}'.format(mode)) self.assertTrue(tools.mkdir(path, mode), msg) - self.assertEqual('{0:o}'.format(os.stat(path).st_mode & 0o777), '{0:o}'.format(mode), msg) + self.assertEqual( + '{0:o}'.format(os.stat(path).st_mode & 0o777), + '{0:o}'.format(mode), msg) def test_pids(self): pids = tools.pids() @@ -236,7 +245,8 @@ def test_pids(self): def test_processStat(self): pid = self.createProcess() stat = tools.processStat(pid) - self.assertRegex(stat, r'{} \({}\) \w .*'.format(pid, generic.DUMMY[:15])) + self.assertRegex( + stat, r'{} \({}\) \w .*'.format(pid, generic.DUMMY[:15])) @patch('builtins.open') def test_processStat_exception(self, mock_open): @@ -282,14 +292,14 @@ def test_pidsWithName(self): def test_processExists(self): self.assertFalse(tools.processExists('nonExistingProcess')) - pid = self.createProcess() + self.createProcess() self.assertTrue(tools.processExists(generic.DUMMY)) def test_processAlive(self): """ Test the function processAlive """ - #TODO: add test (in chroot) running proc as root and kill as non-root + # TODO: add test (in chroot) running proc as root and kill as non-root self.assertTrue(tools.processAlive(os.getpid())) pid = self.createProcess() self.assertTrue(tools.processAlive(pid)) @@ -304,7 +314,8 @@ def test_checkXServer(self): try: tools.checkXServer() except Exception as e: - self.fail('tools.ckeck_x_server() raised exception {}'.format(str(e))) + self.fail( + 'tools.ckeck_x_server() raised exception {}'.format(str(e))) def test_preparePath(self): path_with_slash_at_begin = "/test/path" @@ -337,7 +348,7 @@ def test_rsyncCaps(self): self.assertIsInstance(caps, list) self.assertGreaterEqual(len(caps), 1) - self.assertListEqual(tools.rsyncCaps(data = RSYNC_307_VERSION), + self.assertListEqual(tools.rsyncCaps(data=RSYNC_307_VERSION), ['64-bit files', '64-bit inums', '32-bit timestamps', @@ -354,7 +365,7 @@ def test_rsyncCaps(self): 'iconv', 'symtimes']) - self.assertListEqual(tools.rsyncCaps(data = RSYNC_310_VERSION), + self.assertListEqual(tools.rsyncCaps(data=RSYNC_310_VERSION), ['progress2', '64-bit files', '64-bit inums', @@ -404,7 +415,7 @@ def test_checkCronPattern(self): def test_checkHomeEncrypt(self): pass - #envLoad and envSave tests are in TestToolsEnviron below + # envLoad and envSave tests are in TestToolsEnviron below @unittest.skip('Not yet implemented') def test_keyringSupported(self): @@ -426,8 +437,9 @@ def test_mountpoint(self): def test_decodeOctalEscape(self): self.assertEqual(tools.decodeOctalEscape('/mnt/normalPath'), '/mnt/normalPath') - self.assertEqual(tools.decodeOctalEscape('/mnt/path\\040with\\040space'), - '/mnt/path with space') + self.assertEqual( + tools.decodeOctalEscape('/mnt/path\\040with\\040space'), + '/mnt/path with space') def test_mountArgs(self): rootArgs = tools.mountArgs('/') @@ -450,8 +462,9 @@ def test_device(self): def test_filesystem(self): self.assertEqual(tools.filesystem('/proc'), 'proc') self.assertRegex(tools.filesystem('/sys'), r'sys.*') - self.assertRegex(tools.filesystem('/nonExistingFolder/foo/bar').lower(), - r'(:?ext[2-4]|xfs|zfs|jfs|raiserfs|btrfs|tmpfs|shiftfs)') + self.assertRegex( + tools.filesystem('/nonExistingFolder/foo/bar').lower(), + r'(:?ext[2-4]|xfs|zfs|jfs|raiserfs|btrfs|tmpfs|shiftfs)') # tools.uuidFromDev() get called from tools.uuidFromPath. # So we skip an extra unittest as it's hard to find a dev on all systems @@ -548,8 +561,9 @@ def test_readWriteCrontab(self): oldCrontab = tools.readCrontab() self.assertIsInstance(oldCrontab, list) - testLine = '#BackInTime Unittest from {}. Test probably failed. You can remove this line.'.format(now) - self.assertTrue(tools.writeCrontab(oldCrontab + [testLine,])) + testLine = '#BackInTime Unittest from {}. Test probably failed. ' \ + 'You can remove this line.'.format(now) + self.assertTrue(tools.writeCrontab(oldCrontab + [testLine, ])) newCrontab = tools.readCrontab() self.assertIn(testLine, newCrontab) @@ -561,33 +575,36 @@ def test_readWriteCrontab(self): def test_splitCommands(self): ret = list(tools.splitCommands(['echo foo;'], - head = 'echo start;', - tail = 'echo end', - maxLength = 40)) + head='echo start;', + tail='echo end', + maxLength=40)) self.assertEqual(len(ret), 1) self.assertEqual(ret[0], 'echo start;echo foo;echo end') ret = list(tools.splitCommands(['echo foo;']*3, - head = 'echo start;', - tail = 'echo end', - maxLength = 40)) + head='echo start;', + tail='echo end', + maxLength=40)) self.assertEqual(len(ret), 2) self.assertEqual(ret[0], 'echo start;echo foo;echo foo;echo end') self.assertEqual(ret[1], 'echo start;echo foo;echo end') ret = list(tools.splitCommands(['echo foo;']*3, - head = 'echo start;', - tail = 'echo end', - maxLength = 0)) + head='echo start;', + tail='echo end', + maxLength=0)) self.assertEqual(len(ret), 1) - self.assertEqual(ret[0], 'echo start;echo foo;echo foo;echo foo;echo end') + self.assertEqual(ret[0], + 'echo start;echo foo;echo foo;echo foo;echo end') - ret = list(tools.splitCommands(['echo foo;']*3, - head = 'echo start;', - tail = 'echo end', - maxLength = -10)) + ret = list(tools.splitCommands(['echo foo;'] * 3, + head='echo start;', + tail='echo end', + maxLength=-10)) self.assertEqual(len(ret), 1) - self.assertEqual(ret[0], 'echo start;echo foo;echo foo;echo foo;echo end') + self.assertEqual( + ret[0], + 'echo start;echo foo;echo foo;echo foo;echo end') def test_isIPv6Address(self): self.assertTrue(tools.isIPv6Address('fd00:0::5')) @@ -598,8 +615,9 @@ def test_isIPv6Address(self): def test_excapeIPv6Address(self): self.assertEqual(tools.escapeIPv6Address('fd00:0::5'), '[fd00:0::5]') - self.assertEqual(tools.escapeIPv6Address('2001:db8:0:8d3:0:8a2e:70:7344'), - '[2001:db8:0:8d3:0:8a2e:70:7344]') + self.assertEqual( + tools.escapeIPv6Address('2001:db8:0:8d3:0:8a2e:70:7344'), + '[2001:db8:0:8d3:0:8a2e:70:7344]') self.assertEqual(tools.escapeIPv6Address('foo.bar'), 'foo.bar') self.assertEqual(tools.escapeIPv6Address('192.168.1.1'), '192.168.1.1') self.assertEqual(tools.escapeIPv6Address('fd00'), 'fd00') @@ -610,7 +628,11 @@ def test_camelCase(self): self.assertEqual(tools.camelCase('foo_bar'), 'FooBar') self.assertEqual(tools.camelCase('foo_Bar'), 'FooBar') + class TestToolsEnviron(generic.TestCase): + """??? + """ + def __init__(self, *args, **kwargs): super(TestToolsEnviron, self).__init__(*args, **kwargs) self.env = deepcopy(os.environ) @@ -632,7 +654,7 @@ def test_envLoad_without_previous_values(self): test_env.setStrValue('ASDF', 'qwertz') test_env.save(self.temp_file) - #make sure environ is clean + # make sure environ is clean self.assertNotIn('FOO', os.environ) self.assertNotIn('ASDF', os.environ) @@ -648,7 +670,7 @@ def test_envLoad_do_not_overwrite_previous_values(self): test_env.setStrValue('ASDF', 'qwertz') test_env.save(self.temp_file) - #add some environ vars that should not get overwritten + # add some environ vars that should not get overwritten os.environ['FOO'] = 'defaultFOO' os.environ['ASDF'] = 'defaultASDF' @@ -659,10 +681,16 @@ def test_envLoad_do_not_overwrite_previous_values(self): self.assertEqual(os.environ['ASDF'], 'defaultASDF') def test_envSave(self): - keys = ('GNOME_KEYRING_CONTROL', 'DBUS_SESSION_BUS_ADDRESS', \ - 'DBUS_SESSION_BUS_PID', 'DBUS_SESSION_BUS_WINDOWID', \ - 'DISPLAY', 'XAUTHORITY', 'GNOME_DESKTOP_SESSION_ID', \ - 'KDE_FULL_SESSION') + keys = ( + 'GNOME_KEYRING_CONTROL', + 'DBUS_SESSION_BUS_ADDRESS', + 'DBUS_SESSION_BUS_PID', + 'DBUS_SESSION_BUS_WINDOWID', + 'DISPLAY', + 'XAUTHORITY', + 'GNOME_DESKTOP_SESSION_ID', + 'KDE_FULL_SESSION') + for i, k in enumerate(keys): os.environ[k] = str(i) @@ -673,13 +701,14 @@ def test_envSave(self): test_env = configfile.ConfigFile() test_env.load(self.temp_file) for i, k in enumerate(keys): - with self.subTest(i = i, k = k): - #workaround for py.test3 2.5.1 doesn't support subTest - msg = 'i = %s, k = %s' %(i, k) + with self.subTest(i=i, k=k): + # workaround for py.test3 2.5.1 doesn't support subTest + msg = 'i = %s, k = %s' % (i, k) self.assertEqual(test_env.strValue(k), str(i), msg) + class TestToolsUniquenessSet(generic.TestCase): - #TODO: add test for follow_symlink + # TODO: add test for follow_symlink def test_checkUnique(self): with TemporaryDirectory() as d: for i in range(1, 5): @@ -696,37 +725,37 @@ def test_checkUnique(self): with open(i, 'wt') as f: f.write('42') - #fix timestamps because otherwise test will fail on slow machines - obj = os.stat(t1) - os.utime(t2, times = (obj.st_atime, obj.st_mtime)) - obj = os.stat(t3) - os.utime(t4, times = (obj.st_atime, obj.st_mtime)) + # fix timestamps because otherwise test will fail on slow machines + obj = os.stat(t1) + os.utime(t2, times=(obj.st_atime, obj.st_mtime)) + obj = os.stat(t3) + os.utime(t4, times=(obj.st_atime, obj.st_mtime)) - #same size and mtime - uniqueness = tools.UniquenessSet(dc = False, - follow_symlink = False, - list_equal_to = '') + # same size and mtime + uniqueness = tools.UniquenessSet(dc=False, + follow_symlink=False, + list_equal_to='') self.assertTrue(uniqueness.check(t1)) self.assertFalse(uniqueness.check(t2)) self.assertTrue(uniqueness.check(t3)) self.assertFalse(uniqueness.check(t4)) - os.utime(t1, times = (0, 0)) - os.utime(t3, times = (0, 0)) + os.utime(t1, times=(0, 0)) + os.utime(t3, times=(0, 0)) - #same size different mtime - uniqueness = tools.UniquenessSet(dc = False, - follow_symlink = False, - list_equal_to = '') + # same size different mtime + uniqueness = tools.UniquenessSet(dc=False, + follow_symlink=False, + list_equal_to='') self.assertTrue(uniqueness.check(t1)) self.assertTrue(uniqueness.check(t2)) self.assertTrue(uniqueness.check(t3)) self.assertTrue(uniqueness.check(t4)) - #same size different mtime use deep_check - uniqueness = tools.UniquenessSet(dc = True, - follow_symlink = False, - list_equal_to = '') + # same size different mtime use deep_check + uniqueness = tools.UniquenessSet(dc=True, + follow_symlink=False, + list_equal_to='') self.assertTrue(uniqueness.check(t1)) self.assertFalse(uniqueness.check(t2)) self.assertTrue(uniqueness.check(t3)) @@ -751,9 +780,9 @@ def test_checkUnique_hardlinks(self): os.link(t3, t4) self.assertEqual(os.stat(t3).st_ino, os.stat(t4).st_ino) - uniqueness = tools.UniquenessSet(dc = True, - follow_symlink = False, - list_equal_to = '') + uniqueness = tools.UniquenessSet(dc=True, + follow_symlink=False, + list_equal_to='') self.assertTrue(uniqueness.check(t1)) self.assertFalse(uniqueness.check(t2)) self.assertTrue(uniqueness.check(t3)) @@ -775,38 +804,39 @@ def test_checkEqual(self): with open(i, 'wt') as f: f.write('42') - #fix timestamps because otherwise test will fail on slow machines - obj = os.stat(t1) - os.utime(t2, times = (obj.st_atime, obj.st_mtime)) - obj = os.stat(t3) - os.utime(t4, times = (obj.st_atime, obj.st_mtime)) + # fix timestamps because otherwise test will fail on slow machines + obj = os.stat(t1) + os.utime(t2, times=(obj.st_atime, obj.st_mtime)) + obj = os.stat(t3) + os.utime(t4, times=(obj.st_atime, obj.st_mtime)) - #same size and mtime - uniqueness = tools.UniquenessSet(dc = False, - follow_symlink = False, - list_equal_to = t1) + # same size and mtime + uniqueness = tools.UniquenessSet(dc=False, + follow_symlink=False, + list_equal_to=t1) self.assertTrue(uniqueness.check(t1)) self.assertTrue(uniqueness.check(t2)) self.assertFalse(uniqueness.check(t3)) - os.utime(t1, times = (0, 0)) + os.utime(t1, times=(0, 0)) - #same size different mtime - uniqueness = tools.UniquenessSet(dc = False, - follow_symlink = False, - list_equal_to = t1) + # same size different mtime + uniqueness = tools.UniquenessSet(dc=False, + follow_symlink=False, + list_equal_to=t1) self.assertTrue(uniqueness.check(t1)) self.assertFalse(uniqueness.check(t2)) self.assertFalse(uniqueness.check(t3)) - #same size different mtime use deep_check - uniqueness = tools.UniquenessSet(dc = True, - follow_symlink = False, - list_equal_to = t1) + # same size different mtime use deep_check + uniqueness = tools.UniquenessSet(dc=True, + follow_symlink=False, + list_equal_to=t1) self.assertTrue(uniqueness.check(t1)) self.assertTrue(uniqueness.check(t2)) self.assertFalse(uniqueness.check(t3)) + class TestToolsExecuteSubprocess(generic.TestCase): # new method with subprocess def test_returncode(self): @@ -815,20 +845,20 @@ def test_returncode(self): def test_callback(self): c = lambda x, y: self.callback(self.assertEqual, x, 'foo') - tools.Execute(['echo', 'foo'], callback = c).run() + tools.Execute(['echo', 'foo'], callback=c).run() self.assertTrue(self.run) self.run = False # give extra user_data for callback c = lambda x, y: self.callback(self.assertEqual, x, y) - tools.Execute(['echo', 'foo'], callback = c, user_data = 'foo').run() + tools.Execute(['echo', 'foo'], callback=c, user_data='foo').run() self.assertTrue(self.run) self.run = False # no output c = lambda x, y: self.callback(self.fail, 'callback was called unexpectedly') - tools.Execute(['true'], callback = c).run() + tools.Execute(['true'], callback=c).run() self.assertFalse(self.run) self.run = False @@ -836,6 +866,7 @@ def test_pausable(self): proc = tools.Execute(['true']) self.assertTrue(proc.pausable) + class TestToolsExecuteOsSystem(generic.TestCase): # old method with os.system def test_returncode(self): @@ -844,26 +875,23 @@ def test_returncode(self): def test_callback(self): c = lambda x, y: self.callback(self.assertEqual, x, 'foo') - tools.Execute('echo foo', callback = c).run() + tools.Execute('echo foo', callback=c).run() self.assertTrue(self.run) self.run = False # give extra user_data for callback c = lambda x, y: self.callback(self.assertEqual, x, y) - tools.Execute('echo foo', callback = c, user_data = 'foo').run() + tools.Execute('echo foo', callback=c, user_data='foo').run() self.assertTrue(self.run) self.run = False # no output c = lambda x, y: self.callback(self.fail, 'callback was called unexpectedly') - tools.Execute('true', callback = c).run() + tools.Execute('true', callback=c).run() self.assertFalse(self.run) self.run = False def test_pausable(self): proc = tools.Execute('true') self.assertFalse(proc.pausable) - -if __name__ == '__main__': - unittest.main()