From 2dff8b35742a03424eb1954eea7d7420a1b2a15e Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Tue, 14 Nov 2023 16:55:22 -0600 Subject: [PATCH 01/28] Assign marker for tests in watertap.tools --- watertap/conftest.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/watertap/conftest.py b/watertap/conftest.py index 92f472ae8d..ba203944c0 100644 --- a/watertap/conftest.py +++ b/watertap/conftest.py @@ -12,7 +12,7 @@ import contextlib import enum from pathlib import Path -from typing import Container, Optional, Callable +from typing import Container, Optional, Callable, List import pytest from _pytest.nodes import Item @@ -29,6 +29,7 @@ class MarkerSpec(enum.Enum): requires_idaes_solver = ( "Tests that require a solver from the IDEAS extensions to pass" ) + tools = "Tests pertaining to WaterTAP tools" @property def description(self) -> str: @@ -83,3 +84,9 @@ def pytest_addoption(parser: Parser): default=False, dest="edb_no_mock", ) + + +def pytest_collection_modifyitems(items: List[Item]): + for item in items: + if "watertap/tools" in str(item.path): + item.add_marker("tools") From b1dd0b3dad4632ecafb6643c27437ea88357e6a4 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Tue, 14 Nov 2023 17:00:54 -0600 Subject: [PATCH 02/28] Split test suite based on tools marker and disable capture for tools --- .github/workflows/checks.yml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 869cee99fd..3a846bbece 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -125,10 +125,13 @@ jobs: if: matrix.cov_report run: | - echo PYTEST_ADDOPTS="$PYTEST_ADDOPTS --cov-report=xml" >> $GITHUB_ENV - - name: Test with pytest + echo PYTEST_ADDOPTS="$PYTEST_ADDOPTS --cov-report=xml --cov-append" >> $GITHUB_ENV + - name: Test with pytest (not tools) run: | - pytest --pyargs watertap + pytest --pyargs watertap -m 'not tools' + - name: Test with pytest (tools) + run: | + pytest --pyargs watertap -m 'tools' --verbose --capture=no - name: Upload coverage report to Codecov if: matrix.cov_report uses: codecov/codecov-action@v2 From fc8747e054b78e1c49858b187a117beb0851e323 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Tue, 14 Nov 2023 17:02:54 -0600 Subject: [PATCH 03/28] Reduce CI jobs while WIP --- .github/workflows/checks.yml | 146 ----------------------------------- 1 file changed, 146 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 3a846bbece..59634859b6 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -83,15 +83,9 @@ jobs: - '3.11' os: - linux - - win64 - # - macos include: - os: linux os-version: ubuntu-20.04 - - os: win64 - os-version: windows-2019 - # - os: macos - # os-version: macos-10.15 - python-version: '3.8' # limit uploading coverage report for a single Python version in the matrix cov_report: true @@ -144,143 +138,3 @@ jobs: if: 'false' run: | make -C docs linkcheck -d - - user-mode-pytest: - name: pytest (user mode) (py${{ matrix.python-version }}/${{ matrix.os }}) - runs-on: ${{ matrix.os-version }} - needs: [code-formatting] - strategy: - fail-fast: false - matrix: - python-version: - - '3.8' - - '3.11' - os: - - linux - - win64 - include: - - os: linux - os-version: ubuntu-20.04 - - os: win64 - os-version: windows-2019 - steps: - - name: Set up Python ${{ matrix.python-version }} - uses: conda-incubator/setup-miniconda@v2 - with: - activate-environment: watertap - python-version: ${{ matrix.python-version }} - - name: Define install URL (default) - env: - _repo_full_name: watertap-org/watertap - _ref_to_install: main - run: | - echo "_install_url=https://github.com/${_repo_full_name}/archive/${_ref_to_install}.zip" >> $GITHUB_ENV - - name: Define install URL (for PRs) - if: github.event.pull_request - env: - _repo_full_name: ${{ github.event.pull_request.head.repo.full_name }} - _ref_to_install: ${{ github.event.pull_request.head.sha }} - run: - echo "_install_url=https://github.com/${_repo_full_name}/archive/${_ref_to_install}.zip" >> $GITHUB_ENV - - name: Install watertap and testing dependencies - run: | - echo '::group::Output of "pip install" commands' - pip install "watertap @ ${_install_url}" pytest - echo '::endgroup::' - echo '::group::Display installed packages' - conda list - pip list - pip show idaes-pse - echo '::endgroup::' - echo '::group::Output of "idaes get-extensions" command' - idaes get-extensions --verbose - echo '::endgroup::' - - name: Run pytest - run: | - pytest --pyargs watertap - - notebooks: - name: Test notebooks (py${{ matrix.python-version }}/${{ matrix.os }}) - runs-on: ${{ matrix.os-version }} - needs: [code-formatting] - strategy: - fail-fast: false - matrix: - python-version: - - '3.8' - - '3.11' - os: - - linux - - win64 - include: - - os: linux - os-version: ubuntu-20.04 - - os: win64 - os-version: windows-2019 - steps: - - uses: actions/checkout@v3 - - name: Set up Python ${{ matrix.python-version }} - uses: conda-incubator/setup-miniconda@v2 - with: - activate-environment: watertap-dev - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - echo '::group::Output of "conda install" commands' - conda install --quiet --yes pip setuptools wheel pandoc - echo '::endgroup::' - echo '::group::Output of "pip install" commands' - pip install -r requirements-dev.txt - echo '::endgroup::' - echo '::group::Display installed packages' - conda list - pip list - pip show idaes-pse - echo '::endgroup::' - echo '::group::Output of "idaes get-extensions" command' - idaes get-extensions --verbose - echo '::endgroup::' - - name: Run pytest with nbmake - run: - pytest --nbmake **/*.ipynb - - macos: - name: macOS setup (EXPERIMENTAL) - runs-on: macos-11 - needs: [code-formatting] - strategy: - fail-fast: false - matrix: - python-version: - - '3.8' - steps: - - uses: actions/checkout@v3 - - name: Set up Python ${{ matrix.python-version }} - uses: conda-incubator/setup-miniconda@v2 - with: - activate-environment: watertap - python-version: ${{ matrix.python-version }} - - name: Install WaterTAP (dev) without idaes get-extensions - run: | - echo '::group::Output of "conda install" commands' - conda install --quiet --yes pip=21.1 wheel setuptools pandoc - echo '::endgroup::' - echo '::group::Output of "pip install" commands' - pip install -r requirements-dev.txt - echo '::endgroup::' - echo '::group::Display installed packages' - conda list - pip list - pip show pyomo idaes-pse - echo '::endgroup::' - - name: Install Ipopt from conda-forge - run: - conda install --quiet --yes -c conda-forge ipopt=3.14.11 - - name: Build Pyomo extensions - run: | - conda install --quiet --yes cmake - # some failures are expected, but this should succeed as long as pynumero is built correctly - pyomo build-extensions || python -c "from pyomo.contrib.pynumero.asl import AmplInterface; exit(0) if AmplInterface.available() else exit(1)" - - name: Run pytest - run: | - pytest --pyargs watertap -k 'not (nf_dspmde.nf_ui or nf_dspmde.nf_with_bypass_ui)' From f3f74f7bc8787677b0caab8e93af465b468d7faa Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Tue, 14 Nov 2023 17:14:46 -0600 Subject: [PATCH 04/28] Invert job step order so that tools tests run first --- .github/workflows/checks.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 59634859b6..982bf0615b 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -120,12 +120,12 @@ jobs: run: | echo PYTEST_ADDOPTS="$PYTEST_ADDOPTS --cov-report=xml --cov-append" >> $GITHUB_ENV - - name: Test with pytest (not tools) - run: | - pytest --pyargs watertap -m 'not tools' - name: Test with pytest (tools) run: | pytest --pyargs watertap -m 'tools' --verbose --capture=no + - name: Test with pytest (not tools) + run: | + pytest --pyargs watertap -m 'not tools' - name: Upload coverage report to Codecov if: matrix.cov_report uses: codecov/codecov-action@v2 From d0b8631030376be7d2576248926913c4d0f24f0b Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Tue, 14 Nov 2023 20:04:51 -0600 Subject: [PATCH 05/28] Remove unaffected steps while investigating --- .github/workflows/checks.yml | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 982bf0615b..762396836c 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -123,18 +123,3 @@ jobs: - name: Test with pytest (tools) run: | pytest --pyargs watertap -m 'tools' --verbose --capture=no - - name: Test with pytest (not tools) - run: | - pytest --pyargs watertap -m 'not tools' - - name: Upload coverage report to Codecov - if: matrix.cov_report - uses: codecov/codecov-action@v2 - - name: Test documentation code - run: | - make -C docs doctest -d - # TODO: this should be moved to a dedicated job/workflow - # until then, we can leave this here as a reminder - - name: Test documentation links - if: 'false' - run: | - make -C docs linkcheck -d From 82ff17c93f67b4255c93283b6d31f4417ac69b6b Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Tue, 14 Nov 2023 20:09:04 -0600 Subject: [PATCH 06/28] Trigger CI From f56b54e02a1e74b3f745f54fb0d51b4ca9db7862 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Tue, 14 Nov 2023 20:13:56 -0600 Subject: [PATCH 07/28] Try specifying a short step timeout --- .github/workflows/checks.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 762396836c..6160199a0a 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -123,3 +123,4 @@ jobs: - name: Test with pytest (tools) run: | pytest --pyargs watertap -m 'tools' --verbose --capture=no + timeout-minutes: 5 From 01da94144bd59f17241237d2e0c86352f9699f54 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Tue, 14 Nov 2023 20:15:00 -0600 Subject: [PATCH 08/28] Remove more unaffected CI jobs while investigating --- .github/workflows/checks.yml | 17 ------- .github/workflows/mpi4py-test.yml | 73 ------------------------------- 2 files changed, 90 deletions(-) delete mode 100644 .github/workflows/mpi4py-test.yml diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 6160199a0a..da1f9f8d6c 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -52,23 +52,6 @@ jobs: run: | black --check . - pylint: - name: Code linting (pylint) - runs-on: ubuntu-latest - needs: [code-formatting] - steps: - - uses: actions/checkout@v3 - - uses: actions/setup-python@v4 - with: - python-version: '3.8' - - name: Install dev dependencies - run: | - pip install -r requirements-dev.txt - pip list - - name: Run pylint - run: | - pylint watertap - tests: name: Tests (py${{ matrix.python-version }}/${{ matrix.os }}) runs-on: ${{ matrix.os-version }} diff --git a/.github/workflows/mpi4py-test.yml b/.github/workflows/mpi4py-test.yml deleted file mode 100644 index 8ad8fdea16..0000000000 --- a/.github/workflows/mpi4py-test.yml +++ /dev/null @@ -1,73 +0,0 @@ -# This workflow will install Python dependencies, run tests and lint with a variety of Python versions -# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions - -name: MPI tests - -on: - push: - branches: [main] - pull_request: - branches: [main] - -concurrency: - # NOTE: the value of `group` should be chosen carefully, - # otherwise we might end up over- or under-canceling workflow runs - # github.head_ref is only defined for pull request events - # so, if it's not present (i.e. event was triggered by push) - # we use github.ref instead - group: ${{ github.workflow }}-${{ github.head_ref || github.ref }} - cancel-in-progress: true - -env: - # --color=yes needed for colorized output to be shown in GHA logs - # --pyargs watertap is needed to be able to define CLI options in watertap/conftest.py - PYTEST_ADDOPTS: "--color=yes" - PIP_PROGRESS_BAR: "off" - -jobs: - build: - - runs-on: ${{ matrix.os-version }} - defaults: - run: - shell: bash -l {0} - strategy: - fail-fast: false - matrix: - python-version: [3.8] - os: - - linux - - win64 - # - macos - include: - - os: linux - os-version: ubuntu-20.04 - - os: win64 - os-version: windows-2019 - # - os: macos - # os-version: macos-10.15 - - steps: - - uses: actions/checkout@v3 - - name: Set up Python ${{ matrix.python-version }} - uses: conda-incubator/setup-miniconda@v2 - with: - auto-update-conda: true - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - conda install --quiet --yes -c conda-forge mpi4py - python -m pip install --upgrade pip - pip install -r requirements-dev.txt - idaes get-extensions --verbose - - name: Conda info - run: conda info - - name: Test parallel pytest w/ MPI - run: | - mpiexec -n 2 coverage run --parallel-mode -m mpi4py -m pytest watertap/tools/parameter_sweep/tests/test*parameter_sweep.py watertap/tools/analysis_tools/loop_tool/tests/test*loop_tool.py --no-cov - # single report - coverage combine - # convert to XML - coverage xml - - name: Upload coverage report to Codecov - uses: codecov/codecov-action@v2 From bbd04a04ef054397d6c85ccd0d02a45fcde6f5f9 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Tue, 14 Nov 2023 20:26:09 -0600 Subject: [PATCH 09/28] When cross-section is low, increase luminosity --- .github/workflows/checks.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index da1f9f8d6c..bc7ef14cc1 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -53,12 +53,13 @@ jobs: black --check . tests: - name: Tests (py${{ matrix.python-version }}/${{ matrix.os }}) + name: Tests (py${{ matrix.python-version }}/${{ matrix.os }}/rep=${{ matrix.rep }}) runs-on: ${{ matrix.os-version }} needs: [code-formatting] strategy: fail-fast: false matrix: + rep: ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"] python-version: - '3.8' - '3.9' From 968f10a9390bb444d9d222880c40a69680ad1b98 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Tue, 14 Nov 2023 20:42:32 -0600 Subject: [PATCH 10/28] Display child processes before and after each test item --- watertap/conftest.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/watertap/conftest.py b/watertap/conftest.py index ba203944c0..dba982d303 100644 --- a/watertap/conftest.py +++ b/watertap/conftest.py @@ -90,3 +90,18 @@ def pytest_collection_modifyitems(items: List[Item]): for item in items: if "watertap/tools" in str(item.path): item.add_marker("tools") + + +@pytest.hookimpl(hookwrapper=True) +def pytest_runtest_protocol(item): + import psutil + + this_process = psutil.Process() + + print("processes before:") + for proc in this_process.children(recursive=True): + print(f"\t{proc}") + yield + print(f"\nprocesses after:") + for proc in this_process.children(recursive=True): + print(f"\t{proc}") From 85636dd6994fc95f9a00d6cc61f0e650363c0852 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Tue, 14 Nov 2023 20:58:40 -0600 Subject: [PATCH 11/28] Try adding explicit timeout where possible/easy --- watertap/tools/parallel/concurrent_futures_parallel_manager.py | 2 +- watertap/tools/parallel/multiprocessing_parallel_manager.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/watertap/tools/parallel/concurrent_futures_parallel_manager.py b/watertap/tools/parallel/concurrent_futures_parallel_manager.py index 5936950c1c..b157846b6b 100644 --- a/watertap/tools/parallel/concurrent_futures_parallel_manager.py +++ b/watertap/tools/parallel/concurrent_futures_parallel_manager.py @@ -102,7 +102,7 @@ def scatter( def gather(self): results = [] try: - execution_results = futures.wait(self.running_futures.keys()) + execution_results = futures.wait(self.running_futures.keys(), timeout=30) for future in execution_results.done: process_number, values = self.running_futures[future] results.append(LocalResults(process_number, values, future.result())) diff --git a/watertap/tools/parallel/multiprocessing_parallel_manager.py b/watertap/tools/parallel/multiprocessing_parallel_manager.py index c01a81554a..2b81dc8c95 100644 --- a/watertap/tools/parallel/multiprocessing_parallel_manager.py +++ b/watertap/tools/parallel/multiprocessing_parallel_manager.py @@ -113,7 +113,7 @@ def gather(self): # collect result from the actors while len(results) < self.expected_samples: if self.return_queue.empty() == False: - i, values, result = self.return_queue.get() + i, values, result = self.return_queue.get(timeout=30) results.append(LocalResults(i, values, result)) # sort the results by the process number to keep a deterministic ordering From 37c6095933150f56e24a31c4d2e704c29a00b48a Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Tue, 14 Nov 2023 21:33:16 -0600 Subject: [PATCH 12/28] Try disabling ray --- .github/workflows/checks.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index bc7ef14cc1..b9a4a73175 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -88,9 +88,6 @@ jobs: echo '::group::Output of "pip install" commands' pip install -r requirements-dev.txt echo '::endgroup::' - echo '::group::Output of "pip install -U ray" command' - pip install -U ray - echo '::endgroup::' echo '::group::Display installed packages' conda list pip list From 0bef530a136d33241f8bbcaee8a42b056809ed6c Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Wed, 15 Nov 2023 08:56:19 -0600 Subject: [PATCH 13/28] Try one more timeout --- watertap/tools/parallel/multiprocessing_parallel_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/watertap/tools/parallel/multiprocessing_parallel_manager.py b/watertap/tools/parallel/multiprocessing_parallel_manager.py index 2b81dc8c95..c5f44f4d95 100644 --- a/watertap/tools/parallel/multiprocessing_parallel_manager.py +++ b/watertap/tools/parallel/multiprocessing_parallel_manager.py @@ -138,6 +138,6 @@ def multiProcessingActor( if queue.empty(): break else: - i, local_parameters = queue.get() + i, local_parameters = queue.get(timeout=30) result = actor.execute(local_parameters) return_queue.put([i, local_parameters, result]) From f86c3ff847f6e120b53e640c3fc0a5481bad1cb8 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Wed, 15 Nov 2023 09:14:32 -0600 Subject: [PATCH 14/28] Try excluding tests in test_parallel_manager.py --- .github/workflows/checks.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index b9a4a73175..334b0985ca 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -103,5 +103,5 @@ jobs: echo PYTEST_ADDOPTS="$PYTEST_ADDOPTS --cov-report=xml --cov-append" >> $GITHUB_ENV - name: Test with pytest (tools) run: | - pytest --pyargs watertap -m 'tools' --verbose --capture=no + pytest --pyargs watertap -m 'tools' -k 'not test_parallel_manager' --verbose --capture=no timeout-minutes: 5 From 1d1861a55f33d195e6c33a8b49db5cad89e27563 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Wed, 15 Nov 2023 09:29:28 -0600 Subject: [PATCH 15/28] Exclude one more test using MultiProcessing --- .github/workflows/checks.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 334b0985ca..3a0d441887 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -103,5 +103,5 @@ jobs: echo PYTEST_ADDOPTS="$PYTEST_ADDOPTS --cov-report=xml --cov-append" >> $GITHUB_ENV - name: Test with pytest (tools) run: | - pytest --pyargs watertap -m 'tools' -k 'not test_parallel_manager' --verbose --capture=no + pytest --pyargs watertap -m 'tools' -k 'not (test_parallel_manager or test_parameter_sweep_optimize_with_added_var)' --verbose --capture=no timeout-minutes: 5 From 882eac066a2e4bfc63a80f1d49ce054a5411dfd5 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Wed, 15 Nov 2023 09:38:49 -0600 Subject: [PATCH 16/28] Try more specific exclusion for multiprocessing tests --- .github/workflows/checks.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 3a0d441887..d5463f511c 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -103,5 +103,5 @@ jobs: echo PYTEST_ADDOPTS="$PYTEST_ADDOPTS --cov-report=xml --cov-append" >> $GITHUB_ENV - name: Test with pytest (tools) run: | - pytest --pyargs watertap -m 'tools' -k 'not (test_parallel_manager or test_parameter_sweep_optimize_with_added_var)' --verbose --capture=no + pytest --pyargs watertap -m 'tools' -k 'not (test_multiple_subprocesses_with_multiprocessing or test_parameter_sweep_optimize_with_added_var)' --verbose --capture=no timeout-minutes: 5 From 68b36b964e74400286b3aabfb858800329b75ea4 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Wed, 15 Nov 2023 09:47:01 -0600 Subject: [PATCH 17/28] Try loosening keyword exclusion --- .github/workflows/checks.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index d5463f511c..4726d727e0 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -103,5 +103,5 @@ jobs: echo PYTEST_ADDOPTS="$PYTEST_ADDOPTS --cov-report=xml --cov-append" >> $GITHUB_ENV - name: Test with pytest (tools) run: | - pytest --pyargs watertap -m 'tools' -k 'not (test_multiple_subprocesses_with_multiprocessing or test_parameter_sweep_optimize_with_added_var)' --verbose --capture=no + pytest --pyargs watertap -m 'tools' -k 'not (test_multiple_subprocesses_with_multiprocessing or keyword_without_match)' --verbose --capture=no timeout-minutes: 5 From c81300fce5de528aa4b317455d4fad2479c60a4e Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Wed, 15 Nov 2023 10:00:43 -0600 Subject: [PATCH 18/28] Try different combination for test exclusion --- .github/workflows/checks.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 4726d727e0..75dcf9157c 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -103,5 +103,5 @@ jobs: echo PYTEST_ADDOPTS="$PYTEST_ADDOPTS --cov-report=xml --cov-append" >> $GITHUB_ENV - name: Test with pytest (tools) run: | - pytest --pyargs watertap -m 'tools' -k 'not (test_multiple_subprocesses_with_multiprocessing or keyword_without_match)' --verbose --capture=no + pytest --pyargs watertap -m 'tools' -k 'not (keyword_without_match or test_parameter_sweep_optimize_with_added_var)' --verbose --capture=no timeout-minutes: 5 From 4d003b26cc2de35cd72786db473a816108c0f6f9 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Wed, 15 Nov 2023 10:26:13 -0600 Subject: [PATCH 19/28] Add quick n dirty way to shut down multiprocessing worker processes --- .../tools/parallel/multiprocessing_parallel_manager.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/watertap/tools/parallel/multiprocessing_parallel_manager.py b/watertap/tools/parallel/multiprocessing_parallel_manager.py index c5f44f4d95..f7a9dd0e16 100644 --- a/watertap/tools/parallel/multiprocessing_parallel_manager.py +++ b/watertap/tools/parallel/multiprocessing_parallel_manager.py @@ -116,10 +116,17 @@ def gather(self): i, values, result = self.return_queue.get(timeout=30) results.append(LocalResults(i, values, result)) + self._shut_down() # sort the results by the process number to keep a deterministic ordering results.sort(key=lambda result: result.process_number) return results + def _shut_down(self, timeout=5): + for worker in self.actors: + print(f"Attempting to shut down {worker}") + worker.join(timeout=5) + print(f"Shut down {len(self.actors)} workers") + def results_from_local_tree(self, results): return results From d103b1dae6ff494b01bfd85fadd5f6971a13f078 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Wed, 15 Nov 2023 10:26:29 -0600 Subject: [PATCH 20/28] Try removing keyword exclusion --- .github/workflows/checks.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 75dcf9157c..b9a4a73175 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -103,5 +103,5 @@ jobs: echo PYTEST_ADDOPTS="$PYTEST_ADDOPTS --cov-report=xml --cov-append" >> $GITHUB_ENV - name: Test with pytest (tools) run: | - pytest --pyargs watertap -m 'tools' -k 'not (keyword_without_match or test_parameter_sweep_optimize_with_added_var)' --verbose --capture=no + pytest --pyargs watertap -m 'tools' --verbose --capture=no timeout-minutes: 5 From 6a05e55449f24dc11150e440d9eb89f8e4ae40f1 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Wed, 15 Nov 2023 10:44:27 -0600 Subject: [PATCH 21/28] Add timestamps to debug output --- watertap/conftest.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/watertap/conftest.py b/watertap/conftest.py index dba982d303..29763a5ef6 100644 --- a/watertap/conftest.py +++ b/watertap/conftest.py @@ -10,6 +10,7 @@ # "https://github.com/watertap-org/watertap/" ################################################################################# import contextlib +import datetime import enum from pathlib import Path from typing import Container, Optional, Callable, List @@ -20,6 +21,15 @@ from _pytest.config.argparsing import Parser +START = datetime.datetime.now() + + +def _get_elapsed_seconds(end=None) -> float: + end = end or datetime.datetime.now() + delta = end - START + return delta.total_seconds() + + class MarkerSpec(enum.Enum): unit = "Quick tests that do not require a solver, must run in < 2 s" component = "Quick tests that may require a solver" @@ -98,6 +108,8 @@ def pytest_runtest_protocol(item): this_process = psutil.Process() + print(f"START {item}") + print(f"{_get_elapsed_seconds()=}") print("processes before:") for proc in this_process.children(recursive=True): print(f"\t{proc}") @@ -105,3 +117,5 @@ def pytest_runtest_protocol(item): print(f"\nprocesses after:") for proc in this_process.children(recursive=True): print(f"\t{proc}") + print(f"{_get_elapsed_seconds()=}") + print(f"END {item}") From 55c10fc64b547db827a8e8855df386f66e7d9518 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Wed, 15 Nov 2023 11:00:54 -0600 Subject: [PATCH 22/28] Try running pytest command thru timeout to get stack trace --- .github/workflows/checks.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index b9a4a73175..40cd801537 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -103,5 +103,5 @@ jobs: echo PYTEST_ADDOPTS="$PYTEST_ADDOPTS --cov-report=xml --cov-append" >> $GITHUB_ENV - name: Test with pytest (tools) run: | - pytest --pyargs watertap -m 'tools' --verbose --capture=no + timeout -s SIGINT 3m pytest --pyargs watertap -m 'tools' --verbose --capture=no --full-trace timeout-minutes: 5 From 7cc9fbf26d0c4125cf2b98252892d6fa57a60610 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Wed, 15 Nov 2023 11:38:44 -0600 Subject: [PATCH 23/28] Try using EAFP instead of checking Queue.empty() --- .../multiprocessing_parallel_manager.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/watertap/tools/parallel/multiprocessing_parallel_manager.py b/watertap/tools/parallel/multiprocessing_parallel_manager.py index f7a9dd0e16..10bf46aafa 100644 --- a/watertap/tools/parallel/multiprocessing_parallel_manager.py +++ b/watertap/tools/parallel/multiprocessing_parallel_manager.py @@ -19,6 +19,7 @@ ParallelManager, ) import multiprocessing +from queue import Empty as EmptyQueue class MultiprocessingParallelManager(ParallelManager): @@ -112,10 +113,11 @@ def gather(self): results = [] # collect result from the actors while len(results) < self.expected_samples: - if self.return_queue.empty() == False: - i, values, result = self.return_queue.get(timeout=30) - + try: + i, values, result = self.return_queue.get(timeout=5) results.append(LocalResults(i, values, result)) + except EmptyQueue: + break self._shut_down() # sort the results by the process number to keep a deterministic ordering results.sort(key=lambda result: result.process_number) @@ -142,9 +144,10 @@ def multiProcessingActor( ): actor = parallelActor(do_build, do_build_kwargs, do_execute, local_parameters) while True: - if queue.empty(): - break - else: - i, local_parameters = queue.get(timeout=30) - result = actor.execute(local_parameters) - return_queue.put([i, local_parameters, result]) + try: + msg = queue.get(timeout=5) + except EmptyQueue: + return + i, local_parameters = msg + result = actor.execute(local_parameters) + return_queue.put([i, local_parameters, result]) From 966e08e3dba8acf84692416b28cd18b287cb9098 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Wed, 15 Nov 2023 11:50:18 -0600 Subject: [PATCH 24/28] Make timeout configurable with default value --- .../multiprocessing_parallel_manager.py | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/watertap/tools/parallel/multiprocessing_parallel_manager.py b/watertap/tools/parallel/multiprocessing_parallel_manager.py index 10bf46aafa..06e99521b7 100644 --- a/watertap/tools/parallel/multiprocessing_parallel_manager.py +++ b/watertap/tools/parallel/multiprocessing_parallel_manager.py @@ -20,10 +20,22 @@ ) import multiprocessing from queue import Empty as EmptyQueue +from numbers import Number +from typing import Optional + + +TimeoutSpec = Optional[Number] + +_DEFAULT_TIMEOUT_SECONDS = 1 class MultiprocessingParallelManager(ParallelManager): - def __init__(self, number_of_subprocesses=1, **kwargs): + def __init__( + self, + number_of_subprocesses=1, + timeout: TimeoutSpec = _DEFAULT_TIMEOUT_SECONDS, + **kwargs, + ): self.max_number_of_subprocesses = number_of_subprocesses # this will be updated when child processes are kicked off @@ -32,6 +44,7 @@ def __init__(self, number_of_subprocesses=1, **kwargs): # Future -> (process number, parameters). Used to keep track of the process number and parameters for # all in-progress futures self.running_futures = dict() + self.timeout = timeout def is_root_process(self): return True @@ -105,6 +118,9 @@ def scatter( do_execute, divided_parameters[0], ), + kwargs={ + "timeout": self.timeout, + }, ) ) self.actors[-1].start() @@ -114,7 +130,7 @@ def gather(self): # collect result from the actors while len(results) < self.expected_samples: try: - i, values, result = self.return_queue.get(timeout=5) + i, values, result = self.return_queue.get(timeout=self.timeout) results.append(LocalResults(i, values, result)) except EmptyQueue: break @@ -123,10 +139,10 @@ def gather(self): results.sort(key=lambda result: result.process_number) return results - def _shut_down(self, timeout=5): + def _shut_down(self): for worker in self.actors: print(f"Attempting to shut down {worker}") - worker.join(timeout=5) + worker.join(timeout=self.timeout) print(f"Shut down {len(self.actors)} workers") def results_from_local_tree(self, results): @@ -141,11 +157,12 @@ def multiProcessingActor( do_build_kwargs, do_execute, local_parameters, + timeout: TimeoutSpec = _DEFAULT_TIMEOUT_SECONDS, ): actor = parallelActor(do_build, do_build_kwargs, do_execute, local_parameters) while True: try: - msg = queue.get(timeout=5) + msg = queue.get(timeout=timeout) except EmptyQueue: return i, local_parameters = msg From cdf1c66f54d3eed2179822044f4ecdb5b346eed5 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Wed, 15 Nov 2023 11:55:49 -0600 Subject: [PATCH 25/28] Refactor to use logging --- .../parallel/multiprocessing_parallel_manager.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/watertap/tools/parallel/multiprocessing_parallel_manager.py b/watertap/tools/parallel/multiprocessing_parallel_manager.py index 06e99521b7..bba76b10c0 100644 --- a/watertap/tools/parallel/multiprocessing_parallel_manager.py +++ b/watertap/tools/parallel/multiprocessing_parallel_manager.py @@ -22,8 +22,11 @@ from queue import Empty as EmptyQueue from numbers import Number from typing import Optional +import logging +_logger = logging.getLogger(__name__) + TimeoutSpec = Optional[Number] _DEFAULT_TIMEOUT_SECONDS = 1 @@ -140,10 +143,12 @@ def gather(self): return results def _shut_down(self): - for worker in self.actors: - print(f"Attempting to shut down {worker}") - worker.join(timeout=self.timeout) - print(f"Shut down {len(self.actors)} workers") + n_shut_down = 0 + for process in self.actors: + _logger.debug("Attempting to shut down %s", process) + process.join(timeout=self.timeout) + n_shut_down += 1 + _logger.debug("Shut down %d processes", n_shut_down) def results_from_local_tree(self, results): return results From 61d208643241562337cc13a048e751a07de288f7 Mon Sep 17 00:00:00 2001 From: Ludovico Bianchi Date: Wed, 15 Nov 2023 11:56:37 -0600 Subject: [PATCH 26/28] Reorder imports --- .../parallel/multiprocessing_parallel_manager.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/watertap/tools/parallel/multiprocessing_parallel_manager.py b/watertap/tools/parallel/multiprocessing_parallel_manager.py index bba76b10c0..62b788162c 100644 --- a/watertap/tools/parallel/multiprocessing_parallel_manager.py +++ b/watertap/tools/parallel/multiprocessing_parallel_manager.py @@ -10,19 +10,19 @@ # "https://github.com/watertap-org/watertap/" ################################################################################# -import numpy +import logging +import multiprocessing +from numbers import Number +from queue import Empty as EmptyQueue +from typing import Optional +import numpy from watertap.tools.parallel.results import LocalResults from watertap.tools.parallel.parallel_manager import ( parallelActor, ParallelManager, ) -import multiprocessing -from queue import Empty as EmptyQueue -from numbers import Number -from typing import Optional -import logging _logger = logging.getLogger(__name__) From ac76ce27ed18be1ed034d8844c1c88595c7454b4 Mon Sep 17 00:00:00 2001 From: Bernard Knueven Date: Mon, 20 Nov 2023 19:32:49 -0700 Subject: [PATCH 27/28] remove timeout from concurrent.futures.wait --- watertap/tools/parallel/concurrent_futures_parallel_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/watertap/tools/parallel/concurrent_futures_parallel_manager.py b/watertap/tools/parallel/concurrent_futures_parallel_manager.py index b157846b6b..5936950c1c 100644 --- a/watertap/tools/parallel/concurrent_futures_parallel_manager.py +++ b/watertap/tools/parallel/concurrent_futures_parallel_manager.py @@ -102,7 +102,7 @@ def scatter( def gather(self): results = [] try: - execution_results = futures.wait(self.running_futures.keys(), timeout=30) + execution_results = futures.wait(self.running_futures.keys()) for future in execution_results.done: process_number, values = self.running_futures[future] results.append(LocalResults(process_number, values, future.result())) From 6d085582bbfd264fc8d2f31f7ba8064885488ad9 Mon Sep 17 00:00:00 2001 From: Bernard Knueven Date: Mon, 20 Nov 2023 20:36:06 -0700 Subject: [PATCH 28/28] trying Pool implementation --- .../multiprocessing_parallel_manager.py | 95 ++++--------------- 1 file changed, 19 insertions(+), 76 deletions(-) diff --git a/watertap/tools/parallel/multiprocessing_parallel_manager.py b/watertap/tools/parallel/multiprocessing_parallel_manager.py index 62b788162c..42377a8144 100644 --- a/watertap/tools/parallel/multiprocessing_parallel_manager.py +++ b/watertap/tools/parallel/multiprocessing_parallel_manager.py @@ -10,33 +10,26 @@ # "https://github.com/watertap-org/watertap/" ################################################################################# +import functools import logging import multiprocessing -from numbers import Number -from queue import Empty as EmptyQueue -from typing import Optional import numpy from watertap.tools.parallel.results import LocalResults from watertap.tools.parallel.parallel_manager import ( - parallelActor, + build_and_execute, ParallelManager, ) _logger = logging.getLogger(__name__) -TimeoutSpec = Optional[Number] - -_DEFAULT_TIMEOUT_SECONDS = 1 - class MultiprocessingParallelManager(ParallelManager): def __init__( self, number_of_subprocesses=1, - timeout: TimeoutSpec = _DEFAULT_TIMEOUT_SECONDS, **kwargs, ): self.max_number_of_subprocesses = number_of_subprocesses @@ -44,11 +37,6 @@ def __init__( # this will be updated when child processes are kicked off self.actual_number_of_subprocesses = None - # Future -> (process number, parameters). Used to keep track of the process number and parameters for - # all in-progress futures - self.running_futures = dict() - self.timeout = timeout - def is_root_process(self): return True @@ -99,77 +87,32 @@ def scatter( # split the parameters prameters for async run self.expected_samples = len(all_parameters) divided_parameters = numpy.array_split(all_parameters, self.expected_samples) - # create queues, run queue will be used to store paramters we want to run - # and return_queue is used to store results - self.run_queue = multiprocessing.Queue() - self.return_queue = multiprocessing.Queue() - for i, param in enumerate(divided_parameters): - # print(param) - self.run_queue.put([i, param]) - # setup multiprocessing actors - self.actors = [] - - for cpu in range(self.actual_number_of_subprocesses): - self.actors.append( - multiprocessing.Process( - target=multiProcessingActor, - args=( - self.run_queue, - self.return_queue, - do_build, - do_build_kwargs, - do_execute, - divided_parameters[0], - ), - kwargs={ - "timeout": self.timeout, - }, - ) - ) - self.actors[-1].start() + + build_and_execute_this_run = functools.partial( + build_and_execute, do_build, do_build_kwargs, do_execute + ) + + actor = functools.partial(multiProcessingActor, build_and_execute_this_run) + + self._pool = multiprocessing.Pool(self.actual_number_of_subprocesses) + self._results = self._pool.map_async(actor, enumerate(divided_parameters)) + self._pool.close() def gather(self): - results = [] - # collect result from the actors - while len(results) < self.expected_samples: - try: - i, values, result = self.return_queue.get(timeout=self.timeout) - results.append(LocalResults(i, values, result)) - except EmptyQueue: - break - self._shut_down() + results = self._results.get() # sort the results by the process number to keep a deterministic ordering results.sort(key=lambda result: result.process_number) return results - def _shut_down(self): - n_shut_down = 0 - for process in self.actors: - _logger.debug("Attempting to shut down %s", process) - process.join(timeout=self.timeout) - n_shut_down += 1 - _logger.debug("Shut down %d processes", n_shut_down) - def results_from_local_tree(self, results): return results -# This function is used for running the actors in multprocessing def multiProcessingActor( - queue, - return_queue, - do_build, - do_build_kwargs, - do_execute, - local_parameters, - timeout: TimeoutSpec = _DEFAULT_TIMEOUT_SECONDS, + build_and_execute_this_run, + i_local_parameters, ): - actor = parallelActor(do_build, do_build_kwargs, do_execute, local_parameters) - while True: - try: - msg = queue.get(timeout=timeout) - except EmptyQueue: - return - i, local_parameters = msg - result = actor.execute(local_parameters) - return_queue.put([i, local_parameters, result]) + i, local_parameters = i_local_parameters + return LocalResults( + i, local_parameters, build_and_execute_this_run(local_parameters) + )