Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix copy manager functionality so it works under Python >= 3.8 on OS X #818

Merged
merged 6 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -821,12 +821,12 @@ jobs:
python_version: "3.5"
tox_target: "py3.5-unit-tests"

unittest-37-osx:
unittest-38-osx:
working_directory: ~/scalyr-agent-2
macos:
xcode: "13.2.1"
environment:
PYTHON: 3.7.10
PYTHON: 3.8.12
# Updating homebrew is slow so we skip it
HOMEBREW_NO_AUTO_UPDATE: 1
steps:
Expand All @@ -838,7 +838,7 @@ jobs:
brew install pyenv
- restore_cache:
name: Restore pyenv cache
key: deps-<< pipeline.parameters.cache_version_py_dependencies >>-xcode-13.2.1-tox-{{ .Branch }}-py-3.7-osx-venv-{{ checksum "dev-requirements.txt" }}-pyenv
key: deps-<< pipeline.parameters.cache_version_py_dependencies >>-xcode-13.2.1-tox-{{ .Branch }}-py-3.8-osx-venv-{{ checksum "dev-requirements.txt" }}-pyenv
- run:
name: Install Python
command: |
Expand All @@ -849,12 +849,12 @@ jobs:
pyenv install "$PYTHON" -s
- save_cache:
name: Save pyenv cache
key: deps-<< pipeline.parameters.cache_version_py_dependencies >>-xcode-13.2.1-tox-{{ .Branch }}-py-3.7-osx-venv-{{ checksum "dev-requirements.txt" }}-pyenv
key: deps-<< pipeline.parameters.cache_version_py_dependencies >>-xcode-13.2.1-tox-{{ .Branch }}-py-3.8-osx-venv-{{ checksum "dev-requirements.txt" }}-pyenv
paths:
- ~/.pyenv
- restore_cache:
name: Restore pip cache
key: deps-<< pipeline.parameters.cache_version_py_dependencies >>-xcode-13.2.1-tox-{{ .Branch }}-py-3.7-osx-venv-{{ checksum "dev-requirements.txt" }}-pip
key: deps-<< pipeline.parameters.cache_version_py_dependencies >>-xcode-13.2.1-tox-{{ .Branch }}-py-3.8-osx-venv-{{ checksum "dev-requirements.txt" }}-pip
- run:
name: Install Dependencies
command: |
Expand All @@ -870,10 +870,10 @@ jobs:
command: |
pyenv local $PYTHON
source ./venv/bin/activate
tox -epy3.7-unit-tests
tox -epy3.8-unit-tests
- save_cache:
name: Save pip cache
key: deps-<< pipeline.parameters.cache_version_py_dependencies >>-xcode-13.2.1-tox-{{ .Branch }}-py-3.7-osx-venv-{{ checksum "dev-requirements.txt" }}-pip
key: deps-<< pipeline.parameters.cache_version_py_dependencies >>-xcode-13.2.1-tox-{{ .Branch }}-py-3.8-osx-venv-{{ checksum "dev-requirements.txt" }}-pip
paths:
- ~/.cache/pip
- slack/status:
Expand Down Expand Up @@ -2276,7 +2276,9 @@ workflows:
context: scalyr-agent
- unittest-35:
context: scalyr-agent
- unittest-37-osx:
- unittest-38:
context: scalyr-agent
- unittest-38-osx:
context: scalyr-agent
- unittest-38-windows:
context: scalyr-agent
Expand All @@ -2287,6 +2289,8 @@ workflows:
context: scalyr-agent
- smoke-standalone-35:
context: scalyr-agent
- smoke-standalone-38:
context: scalyr-agent
- smoke-standalone-35-rate-limit:
context: scalyr-agent
# NOTE: smoke test jobs below still use old smoke tests framework
Expand Down Expand Up @@ -2423,7 +2427,9 @@ workflows:
context: scalyr-agent
- unittest-35:
context: scalyr-agent
- unittest-37-osx:
- unittest-38:
context: scalyr-agent
- unittest-38-osx:
context: scalyr-agent
- unittest-38-windows:
context: scalyr-agent
Expand Down
141 changes: 74 additions & 67 deletions scalyr_agent/copying_manager/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1369,86 +1369,93 @@ class CopyingManagerWorkerSessionProxy(_CopyingManagerWorkerSessionProxy): # ty
pass


def create_shared_object_manager(worker_session_class, worker_session_proxy_class):
# NOTE: This class needs to be top-level in the module scope since Python >= 3.8 doesn't support
# pickling local objects anymore and this object is indeed used for communication and is pickled.
# - https://gist.github.com/Kami/8386c79d2db7c95329e6c182ec639f49
# - https://github.com/spack/spack/issues/14102
class _SharedObjectManager(scalyr_util.ParentProcessAwareSyncManager):
"""
Creates and returns an instance of the subclass of the 'scalyr_utils.ParentAwareSyncManager' and also registers
all proxy types that will be needed for the multiprocess worker session.
This is done in function, only to be reusable by the tests.
:param worker_session_class: The worker session class to "proxify"
:param worker_session_proxy_class: The predefined worker session proxy class.
:return: a new instance of the 'scalyr_utils.ParentAwareSyncManager' with registered proxies.
The subclass of the 'scalyr_util.ParentAwareSyncManager' which also has access to the worker session
instance in order to stop it if the parent process is killed.

According to the fact that the worker session runs in manager's process in a separate thread, we have to
handle the situation where the agent was killed and worker session remains alive in the manager's process
and keeps sending logs.
"""

class _SharedObjectManager(scalyr_util.ParentProcessAwareSyncManager):
"""
The subclass of the 'scalyr_util.ParentAwareSyncManager' which also has access to the worker session
instance in order to stop it if the parent process is killed.
def __init__(self, worker_session_class, *args, **kwargs):
super(_SharedObjectManager, self).__init__(*args, **kwargs)

According to the fact that the worker session runs in manager's process in a separate thread, we have to
handle the situation where the agent was killed and worker session remains alive in the manager's process
and keeps sending logs.
"""
self._worker_session_class = worker_session_class

def __init__(self, *args, **kwargs):
super(_SharedObjectManager, self).__init__(*args, **kwargs)
self._worker_session = (
None
) # type: Optional[CopyingManagerWorkerSessionInterface]

self._worker_session = (
None
) # type: Optional[CopyingManagerWorkerSessionInterface]
def _create_worker_session(
self, configuration, worker_config_entry, worker_session_id
):
# type: (Configuration, Dict, six.text_type) -> CopyingManagerWorkerSessionInterface
"""
Create a new worker session and save it as an attribute to be able to access the
worker session's instance within the local process.

def _create_worker_session(
self, configuration, worker_config_entry, worker_session_id
):
# type: (Configuration, Dict, six.text_type) -> CopyingManagerWorkerSessionInterface
"""
Create a new worker session and save it as an attribute to be able to access the
worker session's instance within the local process.
The arguments are the same as in the worker session's constructor.
:return: the proxy object for the worker session instance.
"""

The arguments are the same as in the worker session's constructor.
:return: the proxy object for the worker session instance.
"""
# we set 'is_daemon' as True in order to be able to stop the
# worker session's thread if the manager's main thread is exited.
# but it is just a 'last stand' option when the graceful worker session stop is failed.
self._worker_session = self._worker_session_class(
configuration, worker_config_entry, worker_session_id, is_daemon=True
)

# we set 'is_daemon' as True in order to be able to stop the
# worker session's thread if the manager's main thread is exited.
# but it is just a 'last stand' option when the graceful worker session stop is failed.
self._worker_session = worker_session_class(
configuration, worker_config_entry, worker_session_id, is_daemon=True
)
return self._worker_session # type: ignore

def _on_parent_process_kill(self):
"""
Override the callback which is invoked when the parent process is killed,
so we have to stop the worker session before this process will be terminated.
"""
log.error(
"The main agent process does not exist. Probably it was forcibly killed. "
"Checking if the worker session is still alive."
)
if self._worker_session and self._worker_session.is_alive():
log.error("The worker session is alive. Stopping it.")
try:
self._worker_session.stop_worker_session()
except:
log.exception(
"Can not stop the worker session. Waiting before killing the process..."
)
# can not stop worker session gracefully, just wait for the main thread of the process exits and
# the worker session's thread(since it is a daemon) will be terminated too.

return self._worker_session # type: ignore
@classmethod
def _on_exit(cls, error=None):
"""
Just add more log messages before the process is terminated.
:return:
"""
if error:
log.error("The shared object manager thread has ended with an error.")
else:
log.info("The shared object manager of the worker session has stopped.")

def _on_parent_process_kill(self):
"""
Override the callback which is invoked when the parent process is killed,
so we have to stop the worker session before this process will be terminated.
"""
log.error(
"The main agent process does not exist. Probably it was forcibly killed. "
"Checking if the worker session is still alive."
)
if self._worker_session and self._worker_session.is_alive():
log.error("The worker session is alive. Stopping it.")
try:
self._worker_session.stop_worker_session()
except:
log.exception(
"Can not stop the worker session. Waiting before killing the process..."
)
# can not stop worker session gracefully, just wait for the main thread of the process exits and
# the worker session's thread(since it is a daemon) will be terminated too.

@classmethod
def _on_exit(cls, error=None):
"""
Just add more log messages before the process is terminated.
:return:
"""
if error:
log.error("The shared object manager thread has ended with an error.")
else:
log.info("The shared object manager of the worker session has stopped.")
def create_shared_object_manager(worker_session_class, worker_session_proxy_class):
"""
Creates and returns an instance of the subclass of the 'scalyr_utils.ParentAwareSyncManager' and also registers
all proxy types that will be needed for the multiprocess worker session.
This is done in function, only to be reusable by the tests.
:param worker_session_class: The worker session class to "proxify"
:param worker_session_proxy_class: The predefined worker session proxy class.
:return: a new instance of the 'scalyr_utils.ParentAwareSyncManager' with registered proxies.
"""

manager = _SharedObjectManager()
manager = _SharedObjectManager(worker_session_class=worker_session_class)

# pylint: disable=E1101
manager.register(
Expand Down
7 changes: 5 additions & 2 deletions tests/unit/copying_manager_tests/copying_manager_new_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ def pytest_generate_tests(metafunc):
"""
if "worker_type" in metafunc.fixturenames:
test_params = [["thread", 1, 1], ["thread", 2, 2]]
# if the OS is not Windows and python version > 2.7 then also do the multiprocess workers testing.
if platform.system() != "Windows" and sys.version_info >= (2, 7):
# if the OS is not Windows / OS X and python version > 2.7 then also do the multiprocess workers testing.
if platform.system() not in ["Windows", "Darwin"] and sys.version_info >= (
2,
7,
):
test_params.extend([["process", 1, 1], ["process", 2, 2]])

metafunc.parametrize(
Expand Down
7 changes: 5 additions & 2 deletions tests/unit/copying_manager_tests/copying_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ def pytest_generate_tests(metafunc):
"""
if "worker_type" in metafunc.fixturenames:
test_params = [["thread", 1, 1], ["thread", 2, 2]]
# if the OS is not Windows and python version > 2.7 then also do the multiprocess workers testing.
if platform.system() != "Windows" and sys.version_info >= (2, 7):
# if the OS is not Windows / OSX and python version > 2.7 then also do the multiprocess workers testing.
if platform.system() not in ["Windows", "Darwin"] and sys.version_info >= (
2,
7,
):
test_params.extend([["process", 1, 1], ["process", 2, 2]])

metafunc.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ def pytest_addoption(parser):
def pytest_generate_tests(metafunc):
if "worker_type" in metafunc.fixturenames:
test_params = ["thread"]
# if the OS is not Windows and python version > 2.7 then also do the multiprocess workers testing.
if platform.system() != "Windows" and sys.version_info >= (2, 7):
# if the OS is not Windows / OSX and python version > 2.7 then also do the multiprocess workers testing.
if platform.system() not in ["Windows", "Darwin"] and sys.version_info >= (
2,
7,
):
test_params.append("process")

metafunc.parametrize("worker_type", test_params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@
def pytest_generate_tests(metafunc):
if "worker_type" in metafunc.fixturenames:
test_params = ["thread"]
# if the OS is not Windows and python version > 2.7 then also do the multiprocess workers testing.
if platform.system() != "Windows" and sys.version_info >= (2, 7):
# if the OS is not Windows / OSX and python version > 2.7 then also do the multiprocess workers testing.
if platform.system() not in ["Windows", "Darwin"] and sys.version_info >= (
2,
7,
):
test_params.append("process")

metafunc.parametrize("worker_type", test_params)
Expand Down