Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account for empty meta in reboot_shutdown.py #2514 #2518

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/rockstor/scripts/scheduled_tasks/reboot_shutdown.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Copyright (c) 2012-2020 RockStor, Inc. <http://rockstor.com>
Copyright (c) 2012-2023 RockStor, Inc. <http://rockstor.com>
This file is part of RockStor.
RockStor is free software; you can redistribute it and/or modify
Expand Down Expand Up @@ -32,9 +32,9 @@
logger = logging.getLogger(__name__)


def validate_shutdown_meta(meta):
def validate_reboot_shutdown_meta(meta):
if type(meta) != dict:
raise Exception("meta must be a dictionary, not %s" % type(meta))
raise Exception("meta must be a dictionary, not {}".format(type(meta)))
return meta


Expand All @@ -47,7 +47,7 @@ def all_devices_offline(addresses):


def run_conditions_met(meta):
if meta["ping_scan"]:
if meta and meta["ping_scan"]:
address_parser = csv_reader([meta["ping_scan_addresses"]], delimiter=",")
addresses = list(address_parser)

Expand Down Expand Up @@ -99,12 +99,12 @@ def main():
aw = APIWrapper()
if tdo.task_type not in ["reboot", "shutdown", "suspend"]:
logger.error(
"task_type(%s) is not a system reboot, "
"shutdown or suspend." % tdo.task_type
"task_type({}) is not a system reboot, "
"shutdown or suspend.".format(tdo.task_type)
)
return
meta = json.loads(tdo.json_meta)
validate_shutdown_meta(meta)
validate_reboot_shutdown_meta(meta)

if not run_conditions_met(meta):
logger.debug(
Expand All @@ -119,7 +119,7 @@ def main():
try:
# set default command url before checking if it's a shutdown
# and if we have an rtc wake up
url = "commands/%s" % tdo.task_type
url = "commands/{}".format(tdo.task_type)

# if task_type is shutdown and rtc wake up true
# parse crontab hour & minute vs rtc hour & minute to state
Expand All @@ -144,15 +144,15 @@ def main():
epoch += timedelta(days=1)

epoch = epoch.strftime("%s")
url = "%s/%s" % (url, epoch)
url = "{}/{}".format(url, epoch)

aw.api_call(url, data=None, calltype="post", save_error=False)
logger.debug("System %s scheduled" % tdo.task_type)
logger.debug("System {} scheduled".format(tdo.task_type))
t.state = "finished"

except Exception as e:
t.state = "failed"
logger.error("Failed to schedule system %s" % tdo.task_type)
logger.error("Failed to schedule system {}".format(tdo.task_type))
logger.exception(e)

finally:
Expand Down
Empty file.
162 changes: 162 additions & 0 deletions src/rockstor/scripts/tests/test_reboot_shutdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""
Copyright (c) 2012-2023 RockStor, Inc. <http://rockstor.com>
This file is part of RockStor.
RockStor is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
by the Free Software Foundation; either version 2 of the License,
or (at your option) any later version.
RockStor is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import unittest
from mock import patch

from scripts.scheduled_tasks.reboot_shutdown import (
validate_reboot_shutdown_meta,
all_devices_offline,
run_conditions_met,
)


class RebootShutdownScriptTests(unittest.TestCase):
"""
To run the tests:
export DJANGO_SETTINGS_MODULE="settings"
cd src/rockstor && poetry run django-admin test -v 2 -p test_reboot_shutdown.py
"""

def setUp(self):
self.patch_is_network_device_responding = patch(
"scripts.scheduled_tasks.reboot_shutdown.is_network_device_responding"
)
self.mock_is_network_device_responding = (
self.patch_is_network_device_responding.start()
)

def tearDown(self):
pass

def test_validate_reboot_shutdown_meta(self):
"""
Test the following scenarios:
1. Reboot task valid
2. Suspend/Shutdown valid
3. Reboot/Suspend/Shutdown task invalid
"""
# 1. Valid meta is a Dict (empty if reboot task)
meta = {}
expected = meta
returned = validate_reboot_shutdown_meta(meta)
self.assertEqual(
returned,
expected,
msg="Un-expected validate_reboot_shutdown_meta result:\n "
"returned = ({}).\n "
"expected = ({}).".format(returned, expected),
)

# 2. Valid Shutdown meta
meta = {
"ping_scan_addresses": "1.1.1.1,8.8.8.8",
"ping_scan_interval": "5",
"rtc_minute": 0,
"ping_scan": True,
"wakeup": False,
"rtc_hour": 0,
"ping_scan_iterations": "3",
}
expected = meta
returned = validate_reboot_shutdown_meta(meta)
self.assertEqual(
returned,
expected,
msg="Un-expected validate_reboot_shutdown_meta result:\n "
"returned = ({}).\n "
"expected = ({}).".format(returned, expected),
)

# 3. Invalid meta (not a Dict)
meta = []
with self.assertRaises(Exception):
validate_reboot_shutdown_meta(meta)

def test_all_devices_offline(self):
"""
Test the following scenarios:
1. Target devices are OFFLINE
2. Target devices are ONLINE
"""
addresses = ["1.1.1.1", "8.8.8.8"]
# 1. Mock target devices as OFFLINE: should return True
self.mock_is_network_device_responding.return_value = False
returned = all_devices_offline(addresses)
self.assertTrue(
returned,
msg="Un-expected all_devices_offline result:\n "
"returned = ({}).\n "
"expected = True.".format(returned),
)

# 2. Mock target devices as ONLINE: should return False
self.mock_is_network_device_responding.return_value = True
returned = all_devices_offline(addresses)
self.assertFalse(
returned,
msg="Un-expected all_devices_offline result:\n "
"returned = ({}).\n "
"expected = False.".format(returned),
)

def test_run_conditions_met(self):
"""
Test the following scenarios:
1. Reboot task: empty meta
2. Shutdown/Suspend task, valid meta, ping targets OFFLINE
3. Shutdown/Suspend task, valid meta, ping targets ONLINE
Note that for Shutdown/Suspend tasks, an invalid meta will be caught
by validate_reboot_shutdown_meta() before run_conditions_met()
so no need to test for an invalid meta here.
"""
# 1. Reboot task, empty meta: should return True
meta = {}
returned = run_conditions_met(meta)
self.assertTrue(
returned,
msg="Un-expected run_conditions_met result:\n "
"returned = ({}).\n "
"expected = True.".format(returned),
)

# Shutdown/Suspend task
meta = {
"ping_scan_addresses": "1.1.1.1,8.8.8.8",
"ping_scan_interval": "5",
"rtc_minute": 0,
"ping_scan": True,
"wakeup": False,
"rtc_hour": 0,
"ping_scan_iterations": "1",
}
# 2. ping targets OFFLINE: should return True
self.mock_is_network_device_responding.return_value = False
returned = run_conditions_met(meta)
self.assertTrue(
returned,
msg="Un-expected run_conditions_met result:\n "
"returned = ({}).\n "
"expected = True.".format(returned),
)
# 3. ping targets ONLINE: should return False
self.mock_is_network_device_responding.return_value = True
returned = run_conditions_met(meta)
self.assertFalse(
returned,
msg="Un-expected run_conditions_met result:\n "
"returned = ({}).\n "
"expected = False.".format(returned),
)