diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 9d7f042..3baae83 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -26,7 +26,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: [ 3.7, 3.8, 3.9, '3.10' ] + python-version: [ "3.8", "3.9", "3.10", "3.11" ] steps: - uses: actions/checkout@v2 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 31ff62c..b591d56 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,15 +1,10 @@ exclude: ^(docsrc/|docs/|examples/) repos: - repo: https://github.com/psf/black - rev: 23.1.0 + rev: 23.3.0 hooks: - id: black - name: black - language: system - entry: black - minimum_pre_commit_version: 2.9.2 - require_serial: true - types: [python] + language_version: python3.11 - repo: https://github.com/PyCQA/flake8 rev: 6.0.0 diff --git a/dev_requirements.txt b/dev_requirements.txt index e140242..def83e9 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,9 +1,10 @@ -furo==2022.12.7 -pre-commit==3.0.4 -pytest==7.2.1 -python-box==7.0.0 +furo==2023.5.20 +pre-commit==3.3.3 +pytest==7.3.2 +python-box==7.0.1 restfly==1.4.7 -requests==2.28.2 -responses==0.22.0 -sphinx==6.1.3 -toml==0.10.2 \ No newline at end of file +requests==2.31.0 +responses==0.23.1 +sphinx==7.0.1 +toml==0.10.2 +urllib3<2 \ No newline at end of file diff --git a/docsrc/conf.py b/docsrc/conf.py index 990de03..2f62cf0 100644 --- a/docsrc/conf.py +++ b/docsrc/conf.py @@ -25,9 +25,9 @@ html_title = "" # The short X.Y version -version = '1.4' +version = '1.5' # The full version, including alpha/beta/rc tags -release = '1.4.1' +release = '1.5.0' # -- General configuration --------------------------------------------------- @@ -95,7 +95,7 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ["_static"] +html_static_path = [] # Custom sidebar templates, must be a dictionary that maps document names # to template names. @@ -182,7 +182,10 @@ # -- Options for intersphinx extension --------------------------------------- # Example configuration for intersphinx: refer to the Python standard library. -intersphinx_mapping = {"https://docs.python.org/": None} +intersphinx_mapping = {'python': ('https://docs.python.org/3', None), + 'restfly': ('https://restfly.readthedocs.io/en/latest/', None), + 'box': ('https://box.readthedocs.io/en/latest', None), + } # -- Options for todo extension ---------------------------------------------- diff --git a/docsrc/index.rst b/docsrc/index.rst index 9eb8bf7..23b0775 100644 --- a/docsrc/index.rst +++ b/docsrc/index.rst @@ -9,6 +9,7 @@ zs/zia/index zs/zpa/index zs/zcc/index + zs/zdx/index pyZscaler SDK - Library Reference ===================================================================== @@ -41,7 +42,7 @@ Products - :doc:`Zscaler Private Access (ZPA) ` - :doc:`Zscaler Internet Access (ZIA) ` - :doc:`Zscaler Mobile Admin Portal ` -- Cloud Security Posture Management (CSPM) - (work in progress) +- :doc:`Zscaler Digital Experience (ZDX) ` Installation ============== @@ -98,6 +99,18 @@ Quick ZCC Example for device in zcc.devices.list_devices(): pprint(device) +Quick ZDX Example +^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + from pyzscaler import ZDX + from pprint import pprint + + zcc = ZDX(client_id='CLIENT_ID', client_secret='CLIENT_SECRET') + for device in zdx.devices.list_devices(): + pprint(device) + .. automodule:: pyzscaler :members: diff --git a/docsrc/zs/zcc/index.rst b/docsrc/zs/zcc/index.rst index 487bf03..aad87db 100644 --- a/docsrc/zs/zcc/index.rst +++ b/docsrc/zs/zcc/index.rst @@ -2,19 +2,6 @@ ZCC ========== This package covers the ZCC interface. -Retrieving the ZCC Company ID. -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -The ZCC Company ID can be obtained by following these instructions: - 1. Navigate to the Zscaler Mobile Admin Portal in a web browser. - 2. Open the Browser console (typically ``F12``) and click on **Network**. - 3. From the top navigation, click on **Enrolled Devices**. - 4. Look for the API call ``mobileadmin.zscaler.net/webservice/api/web/usersByCompany`` in the 'Networks' tab - of the Browser Console. Click on this entry. - 5. Click on either **Preview** or **Response** to see the data that was returned by the Mobile Admin Portal. - 6. The Company ID is represented as an ``int`` and can be found under the ``companyId`` key in the object returned - for each user. - .. toctree:: :maxdepth: 1 :glob: diff --git a/docsrc/zs/zdx/admin.rst b/docsrc/zs/zdx/admin.rst new file mode 100644 index 0000000..0ef1182 --- /dev/null +++ b/docsrc/zs/zdx/admin.rst @@ -0,0 +1,12 @@ +admin +------ + +The following methods allow for interaction with the ZDX +Admin API endpoints. + +Methods are accessible via ``zdx.admin`` + +.. _zdx-admin: + +.. automodule:: pyzscaler.zdx.admin + :members: \ No newline at end of file diff --git a/docsrc/zs/zdx/apps.rst b/docsrc/zs/zdx/apps.rst new file mode 100644 index 0000000..380f58d --- /dev/null +++ b/docsrc/zs/zdx/apps.rst @@ -0,0 +1,12 @@ +apps +------ + +The following methods allow for interaction with the ZDX +Application API endpoints. + +Methods are accessible via ``zdx.apps`` + +.. _zdx-apps: + +.. automodule:: pyzscaler.zdx.apps + :members: \ No newline at end of file diff --git a/docsrc/zs/zdx/devices.rst b/docsrc/zs/zdx/devices.rst new file mode 100644 index 0000000..6fd3c2d --- /dev/null +++ b/docsrc/zs/zdx/devices.rst @@ -0,0 +1,12 @@ +devices +------- + +The following methods allow for interaction with the ZDX +Devices API endpoints. + +Methods are accessible via ``zdx.devices`` + +.. _zdx-devices: + +.. automodule:: pyzscaler.zdx.devices + :members: \ No newline at end of file diff --git a/docsrc/zs/zdx/index.rst b/docsrc/zs/zdx/index.rst new file mode 100644 index 0000000..b53823c --- /dev/null +++ b/docsrc/zs/zdx/index.rst @@ -0,0 +1,13 @@ +ZDX +========== +This package covers the ZDX interface. + +.. toctree:: + :maxdepth: 1 + :glob: + :hidden: + + * + +.. automodule:: pyzscaler.zdx + :members: diff --git a/docsrc/zs/zdx/session.rst b/docsrc/zs/zdx/session.rst new file mode 100644 index 0000000..51b4101 --- /dev/null +++ b/docsrc/zs/zdx/session.rst @@ -0,0 +1,12 @@ +session +------- + +The following methods allow for interaction with the ZDX +Session API endpoints. + +Methods are accessible via ``zdx.session`` + +.. _zdx-session: + +.. automodule:: pyzscaler.zdx.session + :members: \ No newline at end of file diff --git a/docsrc/zs/zdx/users.rst b/docsrc/zs/zdx/users.rst new file mode 100644 index 0000000..8448712 --- /dev/null +++ b/docsrc/zs/zdx/users.rst @@ -0,0 +1,12 @@ +users +------- + +The following methods allow for interaction with the ZDX +Users API endpoints. + +Methods are accessible via ``zdx.users`` + +.. _zdx-users: + +.. automodule:: pyzscaler.zdx.users + :members: \ No newline at end of file diff --git a/docsrc/zs/zia/index.rst b/docsrc/zs/zia/index.rst index 0580de8..06f7003 100644 --- a/docsrc/zs/zia/index.rst +++ b/docsrc/zs/zia/index.rst @@ -3,25 +3,10 @@ ZIA This package covers the ZIA interface. .. toctree:: - :maxdepth: 2 + :glob: + :hidden: - admin_and_role_management - audit_logs - config - dlp - firewall - locations - rule_labels - sandbox - security - session - ssl_inspection - traffic - url_categories - url_filters - users - vips - web_dlp + * .. automodule:: pyzscaler.zia :members: \ No newline at end of file diff --git a/docsrc/zs/zpa/connector_groups.rst b/docsrc/zs/zpa/connector_groups.rst deleted file mode 100644 index 6247278..0000000 --- a/docsrc/zs/zpa/connector_groups.rst +++ /dev/null @@ -1,12 +0,0 @@ -connector_groups ------------------ - -The following methods allow for interaction with the ZPA -Connector Groups API endpoints. - -Methods are accessible via ``zpa.connector_groups`` - -.. _zpa-connector_groups: - -.. automodule:: pyzscaler.zpa.connector_groups - :members: \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index d6c7174..9632398 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pyzscaler" -version = "1.4.1" +version = "1.5.0" description = "A python SDK for the Zscaler API." authors = ["Mitch Kelly "] license = "MIT" @@ -16,10 +16,10 @@ classifiers = [ "License :: OSI Approved :: MIT License", "Natural Language :: English", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", "Topic :: Security", "Topic :: Software Development :: Libraries :: Python Modules", ] include = [ @@ -30,21 +30,22 @@ include = [ "Bug Tracker" = "https://github.com/mitchos/pyZscaler/issues" [tool.poetry.dependencies] -python = "^3.7" +python = "^3.8" restfly = "1.4.7" -python-box = "7.0.0" +python-box = "7.0.1" [tool.poetry.dev-dependencies] -python = "^3.7" +python = "^3.8" restfly = "1.4.7" -python-box = "7.0.0" -sphinx = "6.1.3" -furo = "2022.12.7" -pytest = "7.2.1" -requests = "2.28.2" -pre-commit = "3.0.4" -responses = "0.22.0" +python-box = "7.0.1" +sphinx = "7.0.1" +furo = "2023.5.20" +pytest = "7.3.2" +requests = "2.29.0" +pre-commit = "3.3.3" +responses = "0.23.1" toml = "0.10.2" +urllib3 = "1.26.16" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/pyzscaler/__init__.py b/pyzscaler/__init__.py index 33e619b..12f25c7 100644 --- a/pyzscaler/__init__.py +++ b/pyzscaler/__init__.py @@ -4,8 +4,9 @@ "Dax Mickelson", "Jacob GĂ„rder", ] -__version__ = "1.4.1" +__version__ = "1.5.0" from pyzscaler.zcc import ZCC # noqa +from pyzscaler.zdx import ZDX # noqa from pyzscaler.zia import ZIA # noqa from pyzscaler.zpa import ZPA # noqa diff --git a/pyzscaler/utils.py b/pyzscaler/utils.py index 6d7631a..46d924f 100644 --- a/pyzscaler/utils.py +++ b/pyzscaler/utils.py @@ -1,3 +1,4 @@ +import functools import time from box import Box, BoxList @@ -79,13 +80,7 @@ def obfuscate_api_key(seed: list): def pick_version_profile(kwargs: list, payload: list): - # Used in ZPA endpoints. - # This function is used to convert the name of the version profile to - # the version profile id. This means our users don't need to look up the - # version profile id mapping themselves. - - version_profile = kwargs.pop("version_profile", None) - if version_profile: + if version_profile := kwargs.pop("version_profile", None): payload["overrideVersionProfile"] = True if version_profile == "default": payload["versionProfileId"] = 0 @@ -133,6 +128,51 @@ def _get_page(self) -> None: time.sleep(1) +class ZDXIterator(APIIterator): + """ + Iterator class for ZDX endpoints. + + """ + + def __init__(self, api, endpoint, limit=None, **kwargs): + super().__init__(api, **kwargs) + self.endpoint = endpoint + self.limit = limit + self.next_offset = None + self.total = 0 + + # Load the first page + self._get_page() + + def __next__(self): + try: + item = super().__next__() + except StopIteration: + if self.next_offset is None: + # There is no next page, so we're done iterating + raise + # There is another page, so get it and continue iterating + self._get_page() + item = super().__next__() + return item + + def _get_page(self): + params = {"limit": self.limit, "offset": self.next_offset} if self.next_offset else {} + + # Request the next page + response = self._api.get(self.endpoint, params=params) + + # Extract the next offset and the data items from the response + self.next_offset = response.get("next_offset") + self.page = response["users"] + + # Update the total number of records + self.total += len(self.page) + + # Reset page_count for the new page + self.page_count = 0 + + # Maps ZCC numeric os_type and registration_type arguments to a human-readable string zcc_param_map = { "os": { @@ -151,3 +191,44 @@ def _get_page(self) -> None: "quarantined": 6, }, } + + +def calculate_epoch(hours: int): + current_time = int(time.time()) + past_time = int(current_time - (hours * 3600)) + return current_time, past_time + + +def zdx_params(func): + """ + Decorator to add custom parameter functionality for ZDX API calls. + + Args: + func: The function to decorate. + + Returns: + The decorated function. + + """ + + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + since = kwargs.pop("since", None) + search = kwargs.pop("search", None) + location_id = kwargs.pop("location_id", None) + department_id = kwargs.pop("department_id", None) + geo_id = kwargs.pop("geo_id", None) + + if since: + current_time, past_time = calculate_epoch(since) + kwargs["to"] = current_time + kwargs["from"] = past_time + + kwargs["q"] = search or kwargs.get("q") + kwargs["loc"] = location_id or kwargs.get("loc") + kwargs["dept"] = department_id or kwargs.get("dept") + kwargs["geo"] = geo_id or kwargs.get("geo") + + return func(self, *args, **kwargs) + + return wrapper diff --git a/pyzscaler/zcc/__init__.py b/pyzscaler/zcc/__init__.py index d8d3dcf..dee639b 100644 --- a/pyzscaler/zcc/__init__.py +++ b/pyzscaler/zcc/__init__.py @@ -27,9 +27,6 @@ class ZCC(APISession): * ``zscalerthree`` * ``zscloud`` * ``zscalerbeta`` - company_id (str): - The ZCC Company ID. There seems to be no easy way to obtain this at present. See the note - at the top of this page for information on how to retrieve the Company ID. override_url (str): If supplied, this attribute can be used to override the production URL that is derived from supplying the `cloud` attribute. Use this attribute if you have a non-standard tenant URL @@ -57,7 +54,6 @@ def __init__(self, **kw): kw.get("override_url", os.getenv(f"{self._env_base}_OVERRIDE_URL")) or f"https://api-mobile.{self._env_cloud}.net/papi" ) - self.company_id = kw.get("company_id", os.getenv(f"{self._env_base}_COMPANY_ID")) self.conv_box = True super(ZCC, self).__init__(**kw) diff --git a/pyzscaler/zdx/__init__.py b/pyzscaler/zdx/__init__.py new file mode 100644 index 0000000..bd0edf7 --- /dev/null +++ b/pyzscaler/zdx/__init__.py @@ -0,0 +1,84 @@ +import os + +from box import Box +from restfly.session import APISession + +from pyzscaler import __version__ +from pyzscaler.zdx.admin import AdminAPI +from pyzscaler.zdx.apps import AppsAPI +from pyzscaler.zdx.devices import DevicesAPI +from pyzscaler.zdx.session import SessionAPI +from pyzscaler.zdx.users import UsersAPI + + +class ZDX(APISession): + """ + A Controller to access Endpoints in the Zscaler Digital Experience (ZDX) API. + + The ZDX object stores the session token and simplifies access to CRUD options within the ZDX Portal. + + Attributes: + client_id (str): The ZDX Client ID generated from the ZDX Portal. + client_secret (str): The ZDX Client Secret generated from the ZDX Portal. + cloud (str): The Zscaler cloud for your tenancy, accepted values are below. Defaults to ``zdxcloud``. + + * ``zdxcloud`` + * ``zdxbeta`` + + override_url (str): + If supplied, this attribute can be used to override the production URL that is derived + from supplying the `cloud` attribute. Use this attribute if you have a non-standard tenant URL + (e.g. internal test instance etc). When using this attribute, there is no need to supply the `cloud` + attribute. The override URL will be prepended to the API endpoint suffixes. The protocol must be included + i.e. http:// or https://. + + """ + + _vendor = "Zscaler" + _product = "pyZscaler" + _backoff = 3 + _build = __version__ + _box = True + _box_attrs = {"camel_killer_box": True} + _env_base = "ZDX" + _env_cloud = "zdxcloud" + _url = "https://api.zdxcloud.net/v1" + + def __init__(self, **kw): + self._client_id = kw.get("client_id", os.getenv(f"{self._env_base}_CLIENT_ID")) + self._client_secret = kw.get("client_secret", os.getenv(f"{self._env_base}_CLIENT_SECRET")) + self._cloud = kw.get("cloud", os.getenv(f"{self._env_base}_CLOUD", self._env_cloud)) + self._url = kw.get("override_url", os.getenv(f"{self._env_base}_OVERRIDE_URL")) or f"https://api.{self._cloud}.net/v1" + self.conv_box = True + super(ZDX, self).__init__(**kw) + + def _build_session(self, **kwargs) -> Box: + """Creates a ZCC API session.""" + super(ZDX, self)._build_session(**kwargs) + self._auth_token = self.session.create_token(client_id=self._client_id, client_secret=self._client_secret).token + return self._session.headers.update({"Authorization": f"Bearer {self._auth_token}"}) + + @property + def session(self): + """The interface object for the :ref:`ZDX Session interface `.""" + return SessionAPI(self) + + @property + def admin(self): + """The interface object for the :ref:`ZDX Admin interface `.""" + return AdminAPI(self) + + @property + def apps(self): + """The interface object for the :ref:`ZDX Apps interface `.""" + return AppsAPI(self) + + @property + def devices(self): + """The interface object for the :ref:`ZDX Devices interface `.""" + return DevicesAPI(self) + + @property + def users(self): + """The interface object for the :ref:`ZDX Users interface `.""" + return UsersAPI(self) diff --git a/pyzscaler/zdx/admin.py b/pyzscaler/zdx/admin.py new file mode 100644 index 0000000..a58aab0 --- /dev/null +++ b/pyzscaler/zdx/admin.py @@ -0,0 +1,72 @@ +from box import BoxList +from restfly.endpoint import APIEndpoint + +from pyzscaler.utils import zdx_params + + +class AdminAPI(APIEndpoint): + @zdx_params + def list_departments(self, **kwargs) -> BoxList: + """ + Returns a list of departments that are configured within ZDX. + + Keyword Args: + since (int): The number of hours to look back for devices. + search (str): The search string to filter by name or department ID. + + Returns: + :obj:`BoxList`: The list of departments in ZDX. + + Examples: + List all departments in ZDX for the past 2 hours: + + >>> for department in zdx.admin.list_departments(): + ... print(department) + + """ + + return self._get("administration/departments", params=kwargs) + + @zdx_params + def list_locations(self, **kwargs) -> BoxList: + """ + Returns a list of locations that are configured within ZDX. + + Keyword Args: + since (int): The number of hours to look back for devices. + search (str): The search string to filter by name or location ID. + + Returns: + :obj:`BoxList`: The list of locations in ZDX. + + Examples: + List all locations in ZDX for the past 2 hours: + + >>> for location in zdx.admin.list_locations(): + ... print(location) + + """ + return self._get("administration/locations", params=kwargs) + + @zdx_params + def list_geolocations(self, **kwargs) -> BoxList: + """ + Returns a list of all active geolocations configured within the ZDX tenant. + + Keyword Args: + since (int): The number of hours to look back for devices. + location_id (str): The unique ID for the location. + parent_geo_id (str): The unique ID for the parent geolocation. + search (str): The search string to filter by name. + + Returns: + :obj:`BoxList`: The list of geolocations in ZDX. + + Examples: + List all geolocations in ZDX for the past 2 hours: + + >>> for geolocation in zdx.admin.list_geolocations(): + ... print(geolocation) + + """ + return self._get("active_geo", params=kwargs) diff --git a/pyzscaler/zdx/apps.py b/pyzscaler/zdx/apps.py new file mode 100644 index 0000000..2c967f2 --- /dev/null +++ b/pyzscaler/zdx/apps.py @@ -0,0 +1,177 @@ +from box import BoxList +from restfly.endpoint import APIEndpoint + +from pyzscaler.utils import ZDXIterator, zdx_params + + +class AppsAPI(APIEndpoint): + @zdx_params + def list_apps(self, **kwargs) -> BoxList: + """ + Returns a list of all active applications configured within the ZDX tenant. + + Keyword Args: + since (int): The number of hours to look back for devices. + location_id (str): The unique ID for the location. + department_id (str): The unique ID for the department. + geo_id (str): The unique ID for the geolocation. + + Returns: + :obj:`BoxList`: The list of applications in ZDX. + + Examples: + List all applications in ZDX for the past 2 hours: + + >>> for app in zdx.apps.list_apps(): + ... print(app) + + """ + return self._get("apps", params=kwargs) + + @zdx_params + def get_app(self, app_id: str, **kwargs): + """ + Returns information on the specified application configured within the ZDX tenant. + + Args: + app_id (str): The unique ID for the ZDX application. + + Keyword Args: + since (int): The number of hours to look back for devices. + location_id (str): The unique ID for the location. + department_id (str): The unique ID for the department. + geo_id (str): The unique ID for the geolocation. + + Returns: + :obj:`Box`: The application information. + + Examples: + Return information on the application with the ID of 999999999: + + >>> zia.apps.get_app(app_id='999999999') + + """ + return self._get(f"apps/{app_id}", params=kwargs) + + @zdx_params + def get_app_score(self, app_id: str, **kwargs): + """ + Returns the ZDX score trend for the specified application configured within the ZDX tenant. + + Args: + app_id (str): The unique ID for the ZDX application. + + Keyword Args: + since (int): The number of hours to look back for devices. + location_id (str): The unique ID for the location. + department_id (str): The unique ID for the department. + geo_id (str): The unique ID for the geolocation. + + Returns: + :obj:`Box`: The application's ZDX score trend. + + Examples: + Return the ZDX score trend for the application with the ID of 999999999: + + >>> zia.apps.get_app_score(app_id='999999999') + + """ + return self._get(f"apps/{app_id}/score", params=kwargs) + + @zdx_params + def get_app_metrics(self, app_id: str, **kwargs): + """ + Returns the ZDX metrics for the specified application configured within the ZDX tenant. + + Args: + app_id (str): The unique ID for the ZDX application. + + Keyword Args: + since (int): The number of hours to look back for devices. + location_id (str): The unique ID for the location. + department_id (str): The unique ID for the department. + geo_id (str): The unique ID for the geolocation. + metric_name (str): The name of the metric to return. Available values are: + * `pft` - Page Fetch Time + * `dns` - DNS Time + * `availability` + + Returns: + :obj:`Box`: The application's ZDX metrics. + + Examples: + Return the ZDX metrics for the application with the ID of 999999999: + + >>> zia.apps.get_app_metrics(app_id='999999999') + + Return the ZDX metrics for the app with an ID of 999999999 for the last 24 hours, including dns matrics, + geolocation, department and location IDs: + + >>> zia.apps.get_app_metrics(app_id='999999999', since=24, metric_name='dns', location_id='888888888', + ... geo_id='777777777', department_id='666666666') + + """ + return self._get(f"apps/{app_id}/metrics", params=kwargs) + + @zdx_params + def list_app_users(self, app_id: str, **kwargs): + """ + Returns a list of users and devices that were used to access the specified application configured within + the ZDX tenant. + + Args: + app_id (str): The unique ID for the ZDX application. + + Keyword Args: + since (int): The number of hours to look back for devices. + location_id (str): The unique ID for the location. + department_id (str): The unique ID for the department. + geo_id (str): The unique ID for the geolocation. + score_bucket (str): The ZDX score bucket to filter by. Available values are: + * `poor` - 0-33 + * `okay` - 34-65 + * `good` - 66-100 + + Returns: + :obj:`BoxList`: The list of users and devices used to access the application. + + Examples: + Return a list of users and devices who have accessed the application with the ID of 999999999: + + >>> for user in zia.apps.get_app_users(app_id='999999999'): + ... print(user) + + """ + return BoxList( + ZDXIterator( + self._api, + f"apps/{app_id}/users", + pagination="offset_limit", + **kwargs, + ) + ) + + @zdx_params + def get_app_user(self, app_id: str, user_id: str, **kwargs): + """ + Returns information on the specified user and device that was used to access the specified application + configured within the ZDX tenant. + + Args: + app_id (str): The unique ID for the ZDX application. + user_id (str): The unique ID for the ZDX user. + + Keyword Args: + since (int): The number of hours to look back for devices. + + Returns: + :obj:`Box`: The user and device information. + + Examples: + Return information on the user with the ID of 999999999 who has accessed the application with the ID of + 888888888: + + >>> zia.apps.get_app_user(app_id='888888888', user_id='999999999') + + """ + return self._get(f"apps/{app_id}/users/{user_id}", params=kwargs) diff --git a/pyzscaler/zdx/devices.py b/pyzscaler/zdx/devices.py new file mode 100644 index 0000000..6f05d7d --- /dev/null +++ b/pyzscaler/zdx/devices.py @@ -0,0 +1,500 @@ +from restfly.endpoint import APIEndpoint + +from pyzscaler.utils import zdx_params + + +class DevicesAPI(APIEndpoint): + @zdx_params + def list_devices(self, **kwargs): + """ + Returns a list of all devices in ZDX. + + Keyword Args: + since (int): The number of hours to look back for devices. + location_id (str): The unique ID for the location. + department_id (str): The unique ID for the department. + geo_id (str): The unique ID for the geolocation. + + Returns: + :obj:`BoxList`: The list of devices in ZDX. + + Examples: + List all devices in ZDX for the past 2 hours: + + >>> for device in zdx.devices.list_devices(): + + List all devices in ZDX for the past 24 hours: + + >>> for device in zdx.devices.list_devices(since=24): + + """ + return self._get("devices", params=kwargs) + + @zdx_params + def get_device(self, device_id: str, **kwargs): + """ + Returns a single device in ZDX. + + Args: + device_id (str): The unique ID for the device. + + Keyword Args: + since (int): The number of hours to look back for devices. + + Returns: + :obj:`Box`: The ZDX device resource record. + + Examples: + Get information for the device with an ID of 123456789. + >>> device = zdx.devices.get_device('123456789') + + Get information for the device with an ID of 123456789 for the last 24 hours. + >>> device = zdx.devices.get_device('123456789', since=24) + + """ + return self._get(f"devices/{device_id}", params=kwargs) + + @zdx_params + def get_device_apps(self, device_id: str, **kwargs): + """ + Returns a list of all active applications for a device. + + Args: + device_id (str): The unique ID for the device. + + Keyword Args: + since (int): The number of hours to look back for devices. + + Returns: + :obj:`BoxList`: The list of active applications for the device. + + Examples: + Print a list of active applications for a device. + + >>> for app in zdx.devices.get_device_apps('123456789'): + ... print(app) + + Print a list of active applications for a device for the last 24 hours. + + >>> for app in zdx.devices.get_device_apps('123456789', since=24): + ... print(app) + + """ + return self._get(f"devices/{device_id}/apps", params=kwargs) + + def get_device_app(self, device_id: str, app_id: str): + """ + Returns a single application for a device. + + Args: + device_id (str): The unique ID for the device. + app_id (str): The unique ID for the application. + + Returns: + :obj:`Box`: The application resource record. + + Examples: + Print a single application for a device. + + >>> app = zdx.devices.get_device_app('123456789', '987654321') + ... print(app) + + """ + return self._get(f"devices/{device_id}/apps/{app_id}") + + def get_web_probes(self, device_id: str, app_id: str): + """ + Returns a list of all active web probes for a specific application being used by a device. + + Args: + device_id (str): The unique ID for the device. + app_id (str): The unique ID for the application. + + Returns: + :obj:`BoxList`: The list of web probes for the application. + + Examples: + Print a list of web probes for an application. + + >>> for probe in zdx.devices.get_device_app_webprobes('123456789', '987654321'): + ... print(probe) + + """ + return self._get(f"devices/{device_id}/apps/{app_id}/web-probes") + + @zdx_params + def get_web_probe(self, device_id: str, app_id: str, probe_id: str, **kwargs): + """ + Returns a single web probe for a specific application being used by a device. + + Args: + device_id (str): The unique ID for the device. + app_id (str): The unique ID for the application. + probe_id (str): The unique ID for the web probe. + + Keyword Args: + since (int): The number of hours to look back for devices. + + Returns: + :obj:`Box`: The web probe resource record. + + Examples: + Print a single web probe for an application. + + >>> probe = zdx.devices.get_web_probe('123456789', '987654321', '123987456') + ... print(probe) + + """ + return self._get(f"devices/{device_id}/apps/{app_id}/web-probes/{probe_id}", params=kwargs) + + @zdx_params + def list_cloudpath_probes(self, device_id: str, app_id: str, **kwargs): + """ + Returns a list of all active cloudpath probes for a specific application being used by a device. + + Args: + device_id (str): The unique ID for the device. + app_id (str): The unique ID for the application. + + Keyword Args: + since (int): The number of hours to look back for devices. + + Returns: + :obj:`BoxList`: The list of cloudpath probes for the application. + + Examples: + Print a list of cloudpath probes for an application. + + >>> for probe in zdx.devices.list_cloudpath_probes('123456789', '987654321'): + ... print(probe) + + """ + return self._get(f"devices/{device_id}/apps/{app_id}/cloudpath-probes", params=kwargs) + + @zdx_params + def get_cloudpath_probe(self, device_id: str, app_id: str, probe_id: str, **kwargs): + """ + Returns a single cloudpath probe for a specific application being used by a device. + + Args: + device_id (str): The unique ID for the device. + app_id (str): The unique ID for the application. + probe_id (str): The unique ID for the cloudpath probe. + + Keyword Args: + since (int): The number of hours to look back for devices. + + Returns: + :obj:`Box`: The cloudpath probe resource record. + + Examples: + Print a single cloudpath probe for an application. + + >>> probe = zdx.devices.get_cloudpath_probe('123456789', '987654321', '123987456') + ... print(probe) + + """ + return self._get(f"devices/{device_id}/apps/{app_id}/cloudpath-probes/{probe_id}", params=kwargs) + + @zdx_params + def get_cloudpath(self, device_id: str, app_id: str, probe_id: str, **kwargs): + """ + Returns a single cloudpath for a specific application being used by a device. + + Args: + device_id (str): The unique ID for the device. + app_id (str): The unique ID for the application. + probe_id (str): The unique ID for the cloudpath probe. + + Keyword Args: + since (int): The number of hours to look back for devices. + + Returns: + :obj:`Box`: The cloudpath resource record. + + Examples: + Print a single cloudpath for an application. + + >>> cloudpath = zdx.devices.get_cloudpath('123456789', '987654321', '123987456') + ... print(cloudpath) + + """ + return self._get(f"devices/{device_id}/apps/{app_id}/cloudpath-probes/{probe_id}/cloudpath", params=kwargs) + + @zdx_params + def get_call_quality_metrics(self, device_id: str, app_id: str, **kwargs): + """ + Returns a single call quality metrics for a specific application being used by a device. + + Args: + device_id (str): The unique ID for the device. + app_id (str): The unique ID for the application. + + Keyword Args: + since (int): The number of hours to look back for devices. + + Returns: + :obj:`Box`: The call quality metrics resource record. + + Examples: + Print call quality metrics for an application. + + >>> metrics = zdx.devices.get_call_quality_metrics('123456789', '987654321') + ... print(metrics) + + """ + return self._get(f"devices/{device_id}/apps/{app_id}/call-quality-metrics", params=kwargs) + + @zdx_params + def get_health_metrics(self, device_id: str, **kwargs): + """ + Returns health metrics trend for a specific device. + + Args: + device_id (str): The unique ID for the device. + + Keyword Args: + since (int): The number of hours to look back for devices. + + Returns: + :obj:`Box`: The health metrics resource record. + + Examples: + Print health metrics for an application. + + >>> metrics = zdx.devices.get_health_metrics('123456789') + ... print(metrics) + + """ + return self._get(f"devices/{device_id}/health-metrics", params=kwargs) + + def get_events(self, device_id: str): + """ + Returns a list of all events for a specific device. + + Args: + device_id (str): The unique ID for the device. + + Returns: + :obj:`BoxList`: The list of events for the device. + + Examples: + Print a list of events for a device. + + >>> for event in zdx.devices.get_events('123456789'): + ... print(event) + + """ + return self._get(f"devices/{device_id}/events") + + def list_deeptraces(self, device_id: str): + """ + Returns a list of all deep traces for a specific device. + + Args: + device_id (str): The unique ID for the device. + + Returns: + :obj:`BoxList`: The list of deep traces for the device. + + Examples: + Print a list of deep traces for a device. + + >>> for trace in zdx.devices.list_deep_traces('123456789'): + ... print(trace) + + """ + return self._get(f"devices/{device_id}/deeptraces") + + def start_deeptrace(self, device_id: str, app_id: str, session_name: str, **kwargs): + """ + Starts a deep trace for a specific device and application. + + Args: + device_id (str): The unique ID for the device. + app_id (str): The unique ID for the application. + session_name (str): The name of the deeptrace session. + + Keyword Args: + web_probe_id (str): The unique ID for the Web probe. + cloudpath_probe_id (str): The unique ID for the Cloudpath probe. + session_length_minutes (int): The duration of the deeptrace session in minutes. Defaults to 5. + probe_device (bool): Whether to probe the device. + + Returns: + :obj:`Box`: The deeptrace resource record. + + Examples: + Start a deeptrace for a device. + + >>> trace = zdx.devices.start_deeptrace(device_id='123456789', app_id='1', session_name='My Deeptrace') + ... print(trace) + + """ + payload = { + "session_name": session_name, + "app_id": app_id, + } + payload.update(kwargs) + + return self._post(f"devices/{device_id}/deeptraces", json=payload) + + def get_deeptrace(self, device_id: str, trace_id: str): + """ + Returns information on a single deeptrace for a specific device. + + Args: + device_id (str): The unique ID for the device. + trace_id (str): The unique ID for the deeptrace. + + Returns: + :obj:`Box`: The deeptrace resource record. + + Examples: + Print a single deeptrace for a device. + + >>> trace = zdx.devices.get_deeptrace('123456789', '987654321') + ... print(trace) + + """ + return self._get(f"devices/{device_id}/deeptraces/{trace_id}") + + def delete_deeptrace(self, device_id: str, trace_id: str): + """ + Deletes a single deeptrace session and associated data for a specific device. + + Args: + device_id (str): The unique ID for the device. + trace_id (str): The unique ID for the deeptrace. + + Returns: + :obj:`str`: The trace ID that was deleted. + + Examples: + Delete a single deeptrace for a device. + + >>> trace = zdx.devices.delete_deeptrace('123456789', '987654321') + ... print(trace) + + """ + return self._delete(f"devices/{device_id}/deeptraces/{trace_id}") + + def get_deeptrace_webprobe_metrics(self, device_id: str, trace_id: str): + """ + Returns web probe metrics for a specific deeptrace. + + Args: + device_id (str): The unique ID for the device. + trace_id (str): The unique ID for the deeptrace. + + Returns: + :obj:`Box`: The deeptrace web probe metrics. + + Examples: + Print web probe metrics for a deeptrace. + + >>> metrics = zdx.devices.get_deeptrace_webprobe_metrics('123456789', '987654321') + ... print(metrics) + + """ + return self._get(f"devices/{device_id}/deeptraces/{trace_id}/webprobe-metrics") + + def get_deeptrace_cloudpath_metrics(self, device_id: str, trace_id: str): + """ + Returns cloudpath metrics for a specific deeptrace. + + Args: + device_id (str): The unique ID for the device. + trace_id (str): The unique ID for the deeptrace. + + Returns: + :obj:`Box`: The deeptrace cloudpath metrics. + + Examples: + Print cloudpath metrics for a deeptrace. + + >>> metrics = zdx.devices.get_deeptrace_cloudpath_metrics('123456789', '987654321') + ... print(metrics) + + """ + return self._get(f"devices/{device_id}/deeptraces/{trace_id}/cloudpath-metrics") + + def get_deeptrace_cloudpath(self, device_id: str, trace_id: str): + """ + Returns cloudpath for a specific deeptrace. + + Args: + device_id (str): The unique ID for the device. + trace_id (str): The unique ID for the deeptrace. + + Returns: + :obj:`Box`: The deeptrace cloudpath. + + Examples: + Print cloudpath for a deeptrace. + + >>> metrics = zdx.devices.get_deeptrace_cloudpath('123456789', '987654321') + ... print(metrics) + + """ + return self._get(f"devices/{device_id}/deeptraces/{trace_id}/cloudpath") + + def get_deeptrace_health_metrics(self, device_id: str, trace_id: str): + """ + Returns health metrics for a specific deeptrace. + + Args: + device_id (str): The unique ID for the device. + trace_id (str): The unique ID for the deeptrace. + + Returns: + :obj:`Box`: The deeptrace health metrics. + + Examples: + Print health metrics for a deeptrace. + + >>> metrics = zdx.devices.get_deeptrace_health_metrics('123456789', '987654321') + ... print(metrics) + + """ + return self._get(f"devices/{device_id}/deeptraces/{trace_id}/health-metrics") + + def get_deeptrace_events(self, device_id: str, trace_id: str): + """ + Returns events for a specific deeptrace. + + Args: + device_id (str): The unique ID for the device. + trace_id (str): The unique ID for the deeptrace. + + Returns: + :obj:`Box`: The deeptrace events. + + Examples: + Print events for a deeptrace. + + >>> events = zdx.devices.get_deeptrace_events('123456789', '987654321') + ... print(events) + + """ + return self._get(f"devices/{device_id}/deeptraces/{trace_id}/events") + + def get_deeptrace_top_processes(self, device_id: str, trace_id: str): + """ + Returns top processes for a specific deeptrace. + + Args: + device_id (str): The unique ID for the device. + trace_id (str): The unique ID for the deeptrace. + + Returns: + :obj:`Box`: The deeptrace top processes. + + Examples: + Print top processes for a deeptrace. + + >>> top_processes = zdx.devices.get_deeptrace_top_processes('123456789', '987654321') + ... print(top_processes) + + """ + return self._get(f"devices/{device_id}/deeptraces/{trace_id}/top-processes") diff --git a/pyzscaler/zdx/session.py b/pyzscaler/zdx/session.py new file mode 100644 index 0000000..dc2ae0f --- /dev/null +++ b/pyzscaler/zdx/session.py @@ -0,0 +1,60 @@ +import time +from hashlib import sha256 + +from box import Box +from restfly.endpoint import APIEndpoint + + +class SessionAPI(APIEndpoint): + def create_token(self, client_id: str, client_secret: str) -> Box: + """ + Creates a ZDX authentication token. + + Args: + client_id (str): The ZDX API Key ID. + client_secret (str): The ZDX API Key Secret. + + Returns: + :obj:`Box`: The authenticated session information. + + Examples: + >>> zia.session.create(client_id='999999999', + ... client_secret='admin@example.com') + + """ + epoch_time = int(time.time()) + + # Zscaler requires the API Secret Key to be appended with the epoch timestamp, separated by a colon. We then + # need to take the SHA256 hash of this string and pass that as the API Secret Key. + api_secret_format = f"{client_secret}:{epoch_time}" + api_secret_hash = sha256(api_secret_format.encode("utf-8")).hexdigest() + + payload = {"key_id": client_id, "key_secret": api_secret_hash, "timestamp": epoch_time} + + return self._post("oauth/token", json=payload) + + def validate_token(self): + """ + Validates the current ZDX JWT token. + + Returns: + :obj:`Box`: The validated session information. + + Examples: + >>> validation = zdx.session.validate() + + """ + return self._get("oauth/validate") + + def get_jwks(self): + """ + Returns a JSON Web Key Set (JWKS) that contains the public keys that can be used to verify the JWT tokens. + + Returns: + :obj:`Box`: The JSON Web Key Set (JWKS). + + Examples: + >>> jwks = zdx.session.get_jwks() + + """ + return self._get("oauth/jwks") diff --git a/pyzscaler/zdx/users.py b/pyzscaler/zdx/users.py new file mode 100644 index 0000000..149a6dc --- /dev/null +++ b/pyzscaler/zdx/users.py @@ -0,0 +1,54 @@ +from box import BoxList +from restfly.endpoint import APIEndpoint + +from pyzscaler.utils import ZDXIterator, zdx_params + + +class UsersAPI(APIEndpoint): + @zdx_params + def list_users(self, **kwargs) -> BoxList: + """ + Returns a list of all active users configured within the ZDX tenant. + + Keyword Args: + since (int): The number of hours to look back for devices. + location_id (str): The unique ID for the location. + department_id (str): The unique ID for the department. + geo_id (str): The unique ID for the geolocation. + + Returns: + :obj:`BoxList`: The list of users in ZDX. + + Examples: + List all users in ZDX for the past 2 hours: + + >>> for user in zdx.users.list_users(): + ... print(user) + + """ + return BoxList(ZDXIterator(self._api, "users", **kwargs)) + + @zdx_params + def get_user(self, user_id: str, **kwargs): + """ + Returns information on the specified user configured within the ZDX tenant. + + Args: + user_id (str): The unique ID for the ZDX user. + + Keyword Args: + since (int): The number of hours to look back for devices. + location_id (str): The unique ID for the location. + department_id (str): The unique ID for the department. + geo_id (str): The unique ID for the geolocation. + + Returns: + :obj:`Box`: The user information. + + Examples: + Return information on the user with the ID of 999999999: + + >>> zia.users.get_user(user_id='999999999') + + """ + return self._get(f"users/{user_id}") diff --git a/pyzscaler/zia/__init__.py b/pyzscaler/zia/__init__.py index 9be087a..994245f 100644 --- a/pyzscaler/zia/__init__.py +++ b/pyzscaler/zia/__init__.py @@ -21,7 +21,7 @@ from .url_filters import URLFilteringAPI from .users import UserManagementAPI from .vips import DataCenterVIPSAPI -from .web_dlp import WebDLP +from .web_dlp import WebDLPAPI class ZIA(APISession): @@ -215,7 +215,7 @@ def vips(self): @property def web_dlp(self): """ - The interface object for the :ref: `ZIA Data-Loss-Prevention Web DLP Rules`. + The interface object for the :ref:`ZIA Web DLP interface `. """ - return WebDLP(self) + return WebDLPAPI(self) diff --git a/pyzscaler/zia/locations.py b/pyzscaler/zia/locations.py index ddc5a58..c14eef3 100644 --- a/pyzscaler/zia/locations.py +++ b/pyzscaler/zia/locations.py @@ -56,16 +56,91 @@ def add_location(self, name: str, **kwargs) -> Box: Location name. Keyword Args: - ip_addresses (list): + parent_id (int, optional): + Parent Location ID. If this ID does not exist or is 0, it is implied that it is a parent location. + up_bandwidth (int, optional): + Upload bandwidth in kbps. The value 0 implies no Bandwidth Control enforcement. Default: 0. + dn_bandwidth (int, optional): + Download bandwidth in kbps. The value 0 implies no Bandwidth Control enforcement. Default: 0. + country (str, optional): + Country. + tz (str, optional): + Timezone of the location. If not specified, it defaults to GMT. + ip_addresses (list[str], optional): For locations: IP addresses of the egress points that are provisioned in the Zscaler Cloud. Each entry is a single IP address (e.g., 238.10.33.9). For sub-locations: Egress, internal, or GRE tunnel IP addresses. Each entry is either a single IP address, CIDR (e.g., 10.10.33.0/24), or range (e.g., 10.10.33.1-10.10.33.10)). - ports (:obj:`list` of :obj:`str`): - List of whitelisted Proxy ports for the location. - vpn_credentials (dict): - VPN credentials for the location. + ports (list[int], optional): + IP ports that are associated with the location. + vpn_credentials (list, optional): + VPN User Credentials that are associated with the location. + auth_required (bool, optional): + Enforce Authentication. Required when ports are enabled, IP Surrogate is enabled, or Kerberos + Authentication is enabled. Default: False. + ssl_scan_enabled (bool, optional): + Enable SSL Inspection. Set to true in order to apply your SSL Inspection policy to HTTPS traffic in + the location and inspect HTTPS transactions for data leakage, malicious content, and viruses. + Default: False. + zapp_ssl_scan_enabled (bool, optional): + Enable Zscaler App SSL Setting. When set to true, the Zscaler App SSL Scan Setting takes effect, + irrespective of the SSL policy that is configured for the location. Default: False. + xff_forward_enabled (bool, optional): + Enable XFF Forwarding for a location. When set to true, traffic is passed to Zscaler Cloud via the + X-Forwarded-For (XFF) header. Default: False. + other_sub_location (bool, optional): + If set to true, indicates that this is a default sub-location created by the Zscaler service to + accommodate IPv4 addresses that are not part of any user-defined sub-locations. Default: False. + other6_sub_location (bool, optional): + If set to true, indicates that this is a default sub-location created by the Zscaler service to + accommodate IPv6 addresses that are not part of any user-defined sub-locations. Default: False. + surrogate_ip (bool, optional): + Enable Surrogate IP. When set to true, users are mapped to internal device IP addresses. Default: False. + idle_time_in_minutes (int, optional): + Idle Time to Disassociation. The user mapping idle time (in minutes) is required if Surrogate IP is + enabled. + display_time_unit (str, optional): + Display Time Unit. The time unit to display for IP Surrogate idle time to disassociation. + surrogate_ip_enforced_for_known_browsers (bool, optional): + Enforce Surrogate IP for Known Browsers. When set to true, IP Surrogate is enforced for all known + browsers. Default: False. + surrogate_refresh_time_in_minutes (int, optional): + Refresh Time for re-validation of Surrogacy. The surrogate refresh time (in minutes) to re-validate + the IP surrogates. + surrogate_refresh_time_unit (str, optional): + Display Refresh Time Unit. The time unit to display for refresh time for re-validation of surrogacy. + ofw_enabled (bool, optional): + Enable Firewall. When set to true, Firewall is enabled for the location. Default: False. + ips_control (bool, optional): + Enable IPS Control. When set to true, IPS Control is enabled for the location if Firewall is enabled. + Default: False. + aup_enabled (bool, optional): + Enable AUP. When set to true, AUP is enabled for the location. Default: False. + caution_enabled (bool, optional): + Enable Caution. When set to true, a caution notification is enabled for the location. Default: False. + aup_block_internet_until_accepted (bool, optional): + For First Time AUP Behavior, Block Internet Access. When set, all internet access (including non-HTTP + traffic) is disabled until the user accepts the AUP. Default: False. + aup_force_ssl_inspection (bool, optional): + For First Time AUP Behavior, Force SSL Inspection. When set, Zscaler forces SSL Inspection in order + to enforce AUP for HTTPS traffic. Default: False. + ipv6_enabled (bool, optional): + If set to true, IPv6 is enabled for the location and IPv6 traffic from the location can be forwarded + to the Zscaler service to enforce security policies. + ipv6_dns64_prefix (str, optional): + Name-ID pair of the NAT64 prefix configured as the DNS64 prefix for the location. + aup_timeout_in_days (int, optional): + Custom AUP Frequency. Refresh time (in days) to re-validate the AUP. + managed_by (str, optional): + SD-WAN Partner that manages the location. If a partner does not manage the location, this is set to + Self. + profile (str, optional): + Profile tag that specifies the location traffic type. If not specified, this tag defaults to + "Unassigned". + description (str, optional): + Additional notes or information regarding the location or sub-location. The description cannot + exceed 1024 characters. Returns: :obj:`Box`: The newly created location resource record @@ -76,6 +151,11 @@ def add_location(self, name: str, **kwargs) -> Box: >>> zia.locations.add_location(name='new_location', ... ip_addresses=['203.0.113.10']) + Add a location with VPN credentials. + + >>> zia.locations.add_location(name='new_location', + ... vpn_credentials=[{'id': '99999', 'type': 'UFQDN'}]) + """ payload = { "name": name, @@ -203,15 +283,94 @@ def update_location(self, location_id: str, **kwargs) -> Box: location_id (str): The unique identifier for the location you are updating. **kwargs: - Optional keyword args. + Optional keyword arguments. Keyword Args: - ip_addresses (:obj:`list` of :obj:`str`): - List of updated ip addresses. - ports (:obj:`list` of :obj:`str`): - List of whitelisted Proxy ports for the location. - vpn_credentials (dict): - VPN credentials for the location. + name (str, optional): + Location name. + parent_id (int, optional): + Parent Location ID. If this ID does not exist or is 0, it is implied that it is a parent location. + up_bandwidth (int, optional): + Upload bandwidth in kbps. The value 0 implies no Bandwidth Control enforcement. + dn_bandwidth (int, optional): + Download bandwidth in kbps. The value 0 implies no Bandwidth Control enforcement. + country (str, optional): + Country. + tz (str, optional): + Timezone of the location. If not specified, it defaults to GMT. + ip_addresses (list[str], optional): + For locations: IP addresses of the egress points that are provisioned in the Zscaler Cloud. + Each entry is a single IP address (e.g., 238.10.33.9). + + For sub-locations: Egress, internal, or GRE tunnel IP addresses. Each entry is either a single + IP address, CIDR (e.g., 10.10.33.0/24), or range (e.g., 10.10.33.1-10.10.33.10)). + ports (list[int], optional): + IP ports that are associated with the location. + vpn_credentials (list, optional): + VPN User Credentials that are associated with the location. + auth_required (bool, optional): + Enforce Authentication. Required when ports are enabled, IP Surrogate is enabled, or Kerberos + Authentication is enabled. + ssl_scan_enabled (bool, optional): + Enable SSL Inspection. Set to true in order to apply your SSL Inspection policy to HTTPS traffic in the + location and inspect HTTPS transactions for data leakage, malicious content, and viruses. + zapp_ssl_scan_enabled (bool, optional): + Enable Zscaler App SSL Setting. When set to true, the Zscaler App SSL Scan Setting takes effect, + irrespective of the SSL policy that is configured for the location. + xff_forward_enabled (bool, optional): + Enable XFF Forwarding for a location. When set to true, traffic is passed to Zscaler Cloud via the + X-Forwarded-For (XFF) header. + other_sub_location (bool, optional): + If set to true, indicates that this is a default sub-location created by the Zscaler service to + accommodate IPv4 addresses that are not part of any user-defined sub-locations. + other6_sub_location (bool, optional): + If set to true, indicates that this is a default sub-location created by the Zscaler service to + accommodate IPv6 addresses that are not part of any user-defined sub-locations. + surrogate_ip (bool, optional): + Enable Surrogate IP. When set to true, users are mapped to internal device IP addresses. + idle_time_in_minutes (int, optional): + Idle Time to Disassociation. The user mapping idle time (in minutes) is required if a Surrogate IP is + enabled. + display_time_unit (str, optional): + Display Time Unit. The time unit to display for IP Surrogate idle time to disassociation. + surrogate_ip_enforced_for_known_browsers (bool, optional): + Enforce Surrogate IP for Known Browsers. When set to true, IP Surrogate is enforced for all known + browsers. + surrogate_refresh_time_in_minutes (int, optional): + Refresh Time for re-validation of Surrogacy. The surrogate refresh time (in minutes) to re-validate + the IP surrogates. + surrogate_refresh_time_unit (str, optional): + Display Refresh Time Unit. The time unit to display for refresh time for re-validation of surrogacy. + ofw_enabled (bool, optional): + Enable Firewall. When set to true, Firewall is enabled for the location. + ips_control (bool, optional): + Enable IPS Control. When set to true, IPS Control is enabled for the location if Firewall is enabled. + aup_enabled (bool, optional): + Enable AUP. When set to true, AUP is enabled for the location. + caution_enabled (bool, optional): + Enable Caution. When set to true, a caution notification is enabled for the location. + aup_block_internet_until_accepted (bool, optional): + For First Time AUP Behavior, Block Internet Access. When set, all internet access (including non-HTTP + traffic) is disabled until the user accepts the AUP. + aup_force_ssl_inspection (bool, optional): + For First Time AUP Behavior, Force SSL Inspection. When set, Zscaler forces SSL Inspection in order to + enforce AUP for HTTPS traffic. + ipv6_enabled (bool, optional): + If set to true, IPv6 is enabled for the location and IPv6 traffic from the location can be forwarded + to the Zscaler service to enforce security policies. + ipv6_dns64_prefix (str, optional): + Name-ID pair of the NAT64 prefix configured as the DNS64 prefix for the location. + aup_timeout_in_days (int, optional): + Custom AUP Frequency. Refresh time (in days) to re-validate the AUP. + managed_by (str, optional): + SD-WAN Partner that manages the location. If a partner does not manage the location, this is set to + Self. + profile (str, optional): + Profile tag that specifies the location traffic type. If not specified, this tag defaults to + "Unassigned". + description (str, optional): + Additional notes or information regarding the location or sub-location. The description cannot exceed + 1024 characters. Returns: :obj:`Box`: The updated resource record. @@ -219,14 +378,19 @@ def update_location(self, location_id: str, **kwargs) -> Box: Examples: Update the name of a location: - >>> zia.locations.update('97456691', + >>> zia.locations.update_location('99999', ... name='updated_location_name') - Upodate the IP address of a location: + Update the IP address of a location: - >>> zia.locations.update('97456691', + >>> zia.locations.update_location('99999', ... ip_addresses=['203.0.113.20']) + Update the VPN credentials of a location: + + >>> zia.locations.update_location('99999', + ... vpn_credentials=[{'id': '88888', 'type': 'UFQDN'}]) + """ # Set payload to value of existing record payload = {snake_to_camel(k): v for k, v in self.get_location(location_id).items()} @@ -235,6 +399,10 @@ def update_location(self, location_id: str, **kwargs) -> Box: for key, value in kwargs.items(): payload[snake_to_camel(key)] = value + # Fixes edge case where the sublocation object is missing displayTimeUnit, which will result in a 500 error. + if not payload.get("displayTimeUnit"): + payload["displayTimeUnit"] = "MINUTE" + return self._put(f"locations/{location_id}", json=payload) def delete_location(self, location_id: str) -> int: diff --git a/pyzscaler/zia/web_dlp.py b/pyzscaler/zia/web_dlp.py index c269bac..32c8395 100644 --- a/pyzscaler/zia/web_dlp.py +++ b/pyzscaler/zia/web_dlp.py @@ -4,7 +4,7 @@ from restfly.endpoint import APIEndpoint -class WebDLP(APIEndpoint): +class WebDLPAPI(APIEndpoint): def list_rules(self, **kwargs) -> BoxList: """ Returns a list of DLP policy rules, excluding SaaS Security API DLP policy rules. diff --git a/pyzscaler/zpa/__init__.py b/pyzscaler/zpa/__init__.py index 8466e83..0c58c7b 100644 --- a/pyzscaler/zpa/__init__.py +++ b/pyzscaler/zpa/__init__.py @@ -6,7 +6,6 @@ from pyzscaler.zpa.app_segments import AppSegmentsAPI from pyzscaler.zpa.certificates import CertificatesAPI from pyzscaler.zpa.cloud_connector_groups import CloudConnectorGroupsAPI -from pyzscaler.zpa.connector_groups import ConnectorGroupsAPI from pyzscaler.zpa.connectors import ConnectorsAPI from pyzscaler.zpa.idp import IDPControllerAPI from pyzscaler.zpa.inspection import InspectionControllerAPI @@ -114,14 +113,6 @@ def cloud_connector_groups(self): """ return CloudConnectorGroupsAPI(self) - @property - def connector_groups(self): - """ - The interface object for the :ref:`ZPA Connector Groups interface `. - - """ - return ConnectorGroupsAPI(self) - @property def connectors(self): """ diff --git a/pyzscaler/zpa/connector_groups.py b/pyzscaler/zpa/connector_groups.py deleted file mode 100644 index 345240c..0000000 --- a/pyzscaler/zpa/connector_groups.py +++ /dev/null @@ -1,56 +0,0 @@ -from warnings import warn - -from box import Box, BoxList -from restfly.endpoint import APIEndpoint - -from pyzscaler.utils import Iterator - - -class ConnectorGroupsAPI(APIEndpoint): - def list_groups(self, **kwargs) -> BoxList: - """ - Returns a list of all connector groups. - - Warnings: - .. deprecated:: 0.13.0 - Use :func:`zpa.connectors.list_connector_groups` instead. - - Returns: - :obj:`BoxList`: List of all configured connector groups. - - Examples: - >>> connector_groups = zpa.connector_groups.list_groups() - - """ - warn( - "This endpoint is deprecated and will eventually be removed. " - "Use zpa.connectors.list_connector_groups() instead." - ) - - return BoxList(Iterator(self._api, "appConnectorGroup", **kwargs)) - - def get_group(self, group_id: str) -> Box: - """ - Get information for a specified connector group. - - Warnings: - .. deprecated:: 0.13.0 - Use :func:`zpa.connectors.get_connector_group` instead. - - Args: - group_id (str): - The unique identifier for the connector group. - - Returns: - :obj:`Box`: - The connector group resource record. - - Examples: - >>> connector_group = zpa.connector_groups.get_group('2342342354545455') - - """ - warn( - "This endpoint is deprecated and will eventually be removed. " "Use zpa.connectors.get_connector_group() instead." - ) - - return self._get(f"appConnectorGroup/{group_id}") diff --git a/pyzscaler/zpa/connectors.py b/pyzscaler/zpa/connectors.py index 919f36e..034fbda 100644 --- a/pyzscaler/zpa/connectors.py +++ b/pyzscaler/zpa/connectors.py @@ -150,7 +150,7 @@ def list_connector_groups(self, **kwargs) -> BoxList: :obj:`BoxList`: List of all configured connector groups. Examples: - >>> connector_groups = zpa.connector_groups.list_groups() + >>> connector_groups = zpa.connectors.list_connector_groups() """ return BoxList(Iterator(self._api, "appConnectorGroup", **kwargs)) @@ -168,7 +168,7 @@ def get_connector_group(self, group_id: str) -> Box: The connector group resource record. Examples: - >>> connector_group = zpa.connector_groups.get_group('99999') + >>> connector_group = zpa.connectors.get_connector_group('99999') """ return self._get(f"appConnectorGroup/{group_id}") diff --git a/pyzscaler/zpa/policies.py b/pyzscaler/zpa/policies.py index 9faac35..d8f5136 100644 --- a/pyzscaler/zpa/policies.py +++ b/pyzscaler/zpa/policies.py @@ -12,27 +12,69 @@ class PolicySetsAPI(APIEndpoint): "siem": "SIEM_POLICY", } - @staticmethod - def _create_conditions(conditions: list): + def _create_conditions(self, conditions: list) -> list: + """ + Creates a list template for feeding conditions into the ZPA Policies API when adding or updating a policy. + + Args: + conditions (list): List of condition tuples or lists (containing more complex logic statements). + + Returns: + list: The conditions template. + + """ + final_template = [] + for condition in conditions: + template, operator = self._parse_condition(condition) + expression = {"operands": template} + if operator: + expression["operator"] = operator.upper() + final_template.append(expression) + return final_template + + def _parse_condition(self, condition): """ - Creates a dict template for feeding conditions into the ZPA Policies API when adding or updating a policy. + Transforms a single statement with operand into a format for a condition template. Args: - conditions (list): List of condition tuples. + condition: A single list of condition statements or a tuple. Returns: - :obj:`dict`: The conditions template. + tuple: A tuple containing the formatted condition template (list) and the operator (str, "AND" or "OR") + or None. """ + # If this is a tuple condition on its own, process it now + if isinstance(condition, tuple) and len(condition) == 3: + return [self._format_condition_tuple(condition)], None + # Otherwise we'd expect a list of tuples and an optional operator at the end template = [] + operator = None + for parameter in condition: + if isinstance(parameter, str) and parameter.upper() in ["OR", "AND"]: + operator = parameter + elif isinstance(parameter, tuple) and len(parameter) == 3: + template.append(self._format_condition_tuple(parameter)) + return template, operator - for condition in conditions: - if isinstance(condition, tuple) and len(condition) == 3: - operand = {"operands": [{"objectType": condition[0].upper(), "lhs": condition[1], "rhs": condition[2]}]} - template.append(operand) + @staticmethod + def _format_condition_tuple(condition: tuple): + """ + Formats a simple tuple condition. - return template + Args: + condition (tuple): A condition tuple (objectType, lhs, rhs). + + Returns: + dict: Formatted dict structure for ZIA Policies API. + + """ + return { + "objectType": condition[0].upper(), + "lhs": condition[1], + "rhs": condition[2], + } def get_policy(self, policy_type: str) -> Box: """ @@ -198,7 +240,11 @@ def add_access_rule(self, name: str, action: str, **kwargs) -> Box: """ # Initialise the payload - payload = {"name": name, "action": action.upper(), "conditions": self._create_conditions(kwargs.pop("conditions", []))} + payload = { + "name": name, + "action": action.upper(), + "conditions": self._create_conditions(kwargs.pop("conditions", [])), + } # Get the policy id of the provided policy type for the URL. policy_id = self.get_policy("access").id diff --git a/pyzscaler/zpa/session.py b/pyzscaler/zpa/session.py index cea6921..0687768 100644 --- a/pyzscaler/zpa/session.py +++ b/pyzscaler/zpa/session.py @@ -20,7 +20,7 @@ def create_token(self, client_id: str, client_secret: str): :obj:`dict`: The authenticated session information. Examples: - >>> zpa.session.create(client_id='xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx==', + >>> zpa.session.create_token(client_id='xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx==', ... client_secret='yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy') """ diff --git a/requirements.txt b/requirements.txt index 5415bb4..4cb07a9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ restfly==1.4.7 -python-box==7.0.0 \ No newline at end of file +python-box==7.0.1 \ No newline at end of file diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..120542c --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,18 @@ +from pyzscaler.utils import zdx_params + + +def test_zdx_params(): + @zdx_params + def dummy_function(self, **kwargs): + return kwargs + + result = dummy_function( + None, since=10, search="test_search", location_id="test_loc", department_id="test_dept", geo_id="test_geo" + ) + + assert result["to"] is not None + assert result["from"] is not None + assert result["q"] == "test_search" + assert result["loc"] == "test_loc" + assert result["dept"] == "test_dept" + assert result["geo"] == "test_geo" diff --git a/tests/zdx/conftest.py b/tests/zdx/conftest.py new file mode 100644 index 0000000..da31311 --- /dev/null +++ b/tests/zdx/conftest.py @@ -0,0 +1,28 @@ +import pytest +import responses + +from pyzscaler.zdx import ZDX + + +@pytest.fixture(name="session") +def fixture_session(): + return { + "token": "ADMIN_LOGIN", + } + + +@pytest.fixture(name="zdx") +@responses.activate +def zdx(session): + responses.add( + responses.POST, + url="https://api.zdxcloud.net/v1/oauth/token", + content_type="application/json", + json=session, + status=200, + ) + + return ZDX( + client_id="abc123", + client_secret="999999", + ) diff --git a/tests/zdx/test_zdx_admin.py b/tests/zdx/test_zdx_admin.py new file mode 100644 index 0000000..f3e5bdc --- /dev/null +++ b/tests/zdx/test_zdx_admin.py @@ -0,0 +1,83 @@ +import responses +from box import BoxList + +from pyzscaler.utils import calculate_epoch + + +@responses.activate +def test_list_geolocations(zdx): + # set up the mock response + + mock_response = [ + { + "id": "1", + "name": "geolocation1", + "geo_type": "region", + "children": [{"id": "11", "description": "child geolocation1", "geo_type": "country"}], + }, + { + "id": "2", + "name": "geolocation2", + "geo_type": "region", + "children": [{"id": "21", "description": "child geolocation2", "geo_type": "country"}], + }, + ] + current, past = calculate_epoch(2) + url = "https://api.zdxcloud.net/v1/active_geo" + responses.add(responses.GET, url, json=mock_response, status=200) + + # call the method + result = zdx.admin.list_geolocations() + + # assert the response is correct + assert isinstance(result, BoxList) + assert len(result) == 2 + assert result[0]["id"] == "1" + assert result[1]["id"] == "2" + + # assert the request is correct + request = responses.calls[0].request + assert request.url == url + assert request.method == "GET" + + +@responses.activate +def test_list_departments(zdx): + url = "https://api.zdxcloud.net/v1/administration/departments" + mock_response = [{"id": "1", "name": "department1"}, {"id": "2", "name": "department2"}] + responses.add(responses.GET, url, json=mock_response, status=200) + + # call the method + result = zdx.admin.list_departments() + + # assert the response is correct + assert isinstance(result, BoxList) + assert len(result) == 2 + assert result[0]["id"] == "1" + assert result[1]["id"] == "2" + + # assert the request is correct + request = responses.calls[0].request + assert request.url == url + assert request.method == "GET" + + +@responses.activate +def test_list_locations(zdx): + url = "https://api.zdxcloud.net/v1/administration/locations" + mock_response = [{"id": "1", "name": "location1"}, {"id": "2", "name": "location2"}] + responses.add(responses.GET, url, json=mock_response, status=200) + + # call the method + result = zdx.admin.list_locations() + + # assert the response is correct + assert isinstance(result, BoxList) + assert len(result) == 2 + assert result[0]["id"] == "1" + assert result[1]["id"] == "2" + + # assert the request is correct + request = responses.calls[0].request + assert request.url == url + assert request.method == "GET" diff --git a/tests/zdx/test_zdx_apps.py b/tests/zdx/test_zdx_apps.py new file mode 100644 index 0000000..205abb5 --- /dev/null +++ b/tests/zdx/test_zdx_apps.py @@ -0,0 +1,186 @@ +import responses +from box import Box, BoxList + +from tests.conftest import stub_sleep + + +@responses.activate +def test_list_apps(zdx): + url = "https://api.zdxcloud.net/v1/apps" + mock_response = [{"id": "1", "name": "app1"}, {"id": "2", "name": "app2"}] + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.apps.list_apps() + + assert isinstance(result, BoxList) + assert len(result) == 2 + assert result[0]["id"] == "1" + assert result[1]["id"] == "2" + + request = responses.calls[0].request + assert request.url == url + assert request.method == "GET" + + +@responses.activate +def test_get_app(zdx): + app_id = "1" + url = f"https://api.zdxcloud.net/v1/apps/{app_id}" + mock_response = {"id": "1", "name": "app1"} + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.apps.get_app(app_id) + + assert isinstance(result, Box) + assert result["id"] == "1" + assert result["name"] == "app1" + + request = responses.calls[0].request + assert request.url == url + assert request.method == "GET" + + +@responses.activate +def test_get_app_score(zdx): + app_id = "1" + url = f"https://api.zdxcloud.net/v1/apps/{app_id}/score" + mock_response = { + "metric": "score", + "datapoints": [ + {"timestamp": 1644163200, "value": 80}, + {"timestamp": 1644163500, "value": 75}, + ], + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.apps.get_app_score(app_id) + + assert isinstance(result, Box) + assert result["metric"] == "score" + assert len(result["datapoints"]) == 2 + + request = responses.calls[0].request + assert request.url == url + assert request.method == "GET" + + +@responses.activate +def test_get_app_metrics(zdx): + app_id = "1" + url = f"https://api.zdxcloud.net/v1/apps/{app_id}/metrics" + mock_response = { + "metric": "metricName", + "unit": "metricUnit", + "datapoints": [ + {"timestamp": 1644163200, "value": 100}, + {"timestamp": 1644163500, "value": 90}, + ], + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.apps.get_app_metrics(app_id) + + assert isinstance(result, Box) + assert result["metric"] == "metricName" + assert result["unit"] == "metricUnit" + assert len(result["datapoints"]) == 2 + + request = responses.calls[0].request + assert request.url == url + assert request.method == "GET" + + +@responses.activate +@stub_sleep +def test_list_app_users(zdx): + app_id = "1" + url = f"https://api.zdxcloud.net/v1/apps/{app_id}/users" + mock_response = { + "users": [ + {"id": "1", "name": "user1", "email": "user1@example.com", "score": 80}, + {"id": "2", "name": "user2", "email": "user2@example.com", "score": 90}, + ], + "next_offset": None, + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.apps.list_app_users(app_id) + + assert isinstance(result, BoxList) + assert len(result) == 2 + assert result[0]["id"] == "1" + assert result[1]["id"] == "2" + + request = responses.calls[0].request + assert request.url == url + assert request.method == "GET" + + +@responses.activate +@stub_sleep +def test_list_app_users_multipage(zdx): + app_id = "1" + url = f"https://api.zdxcloud.net/v1/apps/{app_id}/users" + url_with_offset = f"https://api.zdxcloud.net/v1/apps/{app_id}/users?offset=2" + + # First page response + mock_response_1 = { + "users": [ + {"id": "1", "name": "user1", "email": "user1@example.com", "score": 80}, + {"id": "2", "name": "user2", "email": "user2@example.com", "score": 90}, + ], + "next_offset": "2", + } + responses.add(responses.GET, url, json=mock_response_1, status=200) + + # Second page response + mock_response_2 = { + "users": [ + {"id": "3", "name": "user3", "email": "user3@example.com", "score": 70}, + {"id": "4", "name": "user4", "email": "user4@example.com", "score": 60}, + ], + "next_offset": None, # Signifying no more pages + } + responses.add(responses.GET, url_with_offset, json=mock_response_2, status=200) + + result = zdx.apps.list_app_users(app_id) + + assert isinstance(result, BoxList) + assert len(result) == 4 # Total of 4 users across 2 pages + assert result[0]["id"] == "1" + assert result[1]["id"] == "2" + assert result[2]["id"] == "3" + assert result[3]["id"] == "4" + + # Check the first API request + request1 = responses.calls[0].request + assert request1.url == url + assert request1.method == "GET" + + # Check the second API request + request2 = responses.calls[1].request + assert request2.url == url_with_offset + + +@responses.activate +def test_get_app_user(zdx): + app_id = "1" + user_id = "1" + url = f"https://api.zdxcloud.net/v1/apps/{app_id}/users/{user_id}" + since = 10 + mock_response = { + "user": {"id": "1", "name": "user1", "email": "user1@example.com", "score": 80}, + "device": {"id": "device1", "model": "iPhone", "os": "iOS"}, + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.apps.get_app_user(app_id, user_id, since=since) + + assert isinstance(result, Box) + assert result.user.id == mock_response["user"]["id"] + assert result.user.name == mock_response["user"]["name"] + assert result.user.email == mock_response["user"]["email"] + assert result.user.score == mock_response["user"]["score"] + assert result.device.id == mock_response["device"]["id"] + assert result.device.model == mock_response["device"]["model"] + assert result.device.os == mock_response["device"]["os"] diff --git a/tests/zdx/test_zdx_devices.py b/tests/zdx/test_zdx_devices.py new file mode 100644 index 0000000..11be732 --- /dev/null +++ b/tests/zdx/test_zdx_devices.py @@ -0,0 +1,469 @@ +import responses +from box import Box, BoxList + + +@responses.activate +def test_list_devices(zdx): + url = "https://api.zdxcloud.net/v1/devices" + mock_response = { + "devices": [ + {"id": 40176154, "name": "LAPTOP-DQG97O6G (LENOVO 20WNS1HM01 Microsoft Windows 10 Pro;64 bit)", "userid": 76676623} + ], + "next_offset": "67677666", + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.list_devices() + + assert isinstance(result, Box) + assert result.devices[0].id == mock_response["devices"][0]["id"] + + +@responses.activate +def test_get_device(zdx): + device_id = "30989301" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}" + mock_response = { # your large mock response here, truncated for brevity + "id": 30989301, + "name": "LAPTOP-S1IN4SIH (LENOVO 20T1S0Q303 Microsoft Windows 10 Pro;64 bit)", + # More fields... + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_device(device_id) + + assert isinstance(result, Box) + assert result.id == mock_response["id"] + assert result.name == mock_response["name"] + + +@responses.activate +def test_get_device_apps(zdx): + device_id = "123456789" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/apps" + mock_response = [{"id": 1, "name": "Sharepoint", "score": 81}, {"id": 4, "name": "Salesforce", "score": 90}] + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_device_apps(device_id) + + assert isinstance(result, BoxList) + assert len(result) == len(mock_response) + for res, expected in zip(result, mock_response): + assert res == Box(expected) + + +@responses.activate +def test_get_device_app(zdx): + device_id = "123456789" + app_id = "987654321" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/apps/{app_id}" + mock_response = { + "metric": "score", + "datapoints": [ + {"timestamp": 1644163200, "value": 80}, + ], + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_device_app(device_id, app_id) + + assert isinstance(result, Box) + assert result == Box(mock_response) + + +@responses.activate +def test_get_web_probes(zdx): + device_id = "123456789" + app_id = "987654321" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/apps/{app_id}/web-probes" + mock_response = [{"id": 4, "name": "Outlook Online Login Page Probe", "num_probes": 24, "avg_score": 85, "avg_pft": 2340}] + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_web_probes(device_id, app_id) + + assert isinstance(result, BoxList) + assert len(result) == len(mock_response) + for res, expected in zip(result, mock_response): + assert res == Box(expected) + + +@responses.activate +def test_get_web_probe(zdx): + device_id = "123456789" + app_id = "987654321" + probe_id = "123987456" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/apps/{app_id}/web-probes/{probe_id}" + mock_response = {"metric": "string", "unit": "string", "datapoints": [{"timestamp": 0, "value": 0}]} + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_web_probe(device_id, app_id, probe_id) + + assert isinstance(result, Box) + assert result == Box(mock_response) + + +@responses.activate +def test_list_cloudpath_probes(zdx): + device_id = "123456789" + app_id = "987654321" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/apps/{app_id}/cloudpath-probes" + mock_response = [ + { + "id": 4, + "name": "Outlook Online CloudPath Probe", + "num_probes": 12, + "avg_latencies": [ + {"leg_src": "client", "leg_dst": "egress", "latency": 15}, + {"leg_src": "egress", "leg_dst": "zen", "latency": 34}, + ], + } + ] + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.list_cloudpath_probes(device_id, app_id) + + assert isinstance(result, BoxList) + assert len(result) == len(mock_response) + for res, expected in zip(result, mock_response): + assert res == Box(expected) + + +@responses.activate +def test_get_cloudpath_probe(zdx): + device_id = "123456789" + app_id = "987654321" + probe_id = "123987456" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/apps/{app_id}/cloudpath-probes/{probe_id}" + mock_response = { + "leg_src": "string", + "leg_dst": "string", + "stats": [{"metric": "string", "unit": "string", "datapoints": [{"timestamp": 0, "value": 0}]}], + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_cloudpath_probe(device_id, app_id, probe_id) + + assert isinstance(result, Box) + assert result == Box(mock_response) + + +@responses.activate +def test_get_cloudpath(zdx): + device_id = "123456789" + app_id = "987654321" + probe_id = "123987456" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/apps/{app_id}/cloudpath-probes/{probe_id}/cloudpath" + mock_response = { + "timestamp": 0, + "cloudpath": { + "src": "string", + "dst": "string", + "num_hops": 0, + "latency": 0, + }, + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_cloudpath(device_id, app_id, probe_id) + + assert isinstance(result, Box) + assert result == Box(mock_response) + + +@responses.activate +def test_get_call_quality_metrics(zdx): + device_id = "123456789" + app_id = "987654321" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/apps/{app_id}/call-quality-metrics" + mock_response = { + "meet_id": "string", + "meet_session_id": "string", + "meet_subject": "string", + "metrics": [{"metric": "string", "unit": "string", "datapoints": [{"timestamp": 0, "value": 0}]}], + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_call_quality_metrics(device_id, app_id) + + assert isinstance(result, Box) + assert result == Box(mock_response) + + +@responses.activate +def test_get_health_metrics(zdx): + device_id = "123456789" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/health-metrics" + mock_response = { + "category": "string", + "instances": [ + { + "name": "string", + "metrics": [{"metric": "string", "unit": "string", "datapoints": [{"timestamp": 0, "value": 0}]}], + } + ], + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_health_metrics(device_id) + + assert isinstance(result, Box) + assert result == Box(mock_response) + + +@responses.activate +def test_get_events(zdx): + device_id = "123456789" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/events" + mock_response = [ + { + "timestamp": 1643525900, + "events": [ + {"category": "Zscaler", "name": "tunType", "display_name": "Tunnel Change", "prev": "0", "curr": "3"}, + ], + }, + ] + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_events(device_id) + + assert isinstance(result, BoxList) + assert len(result) == len(mock_response) + for res, expected in zip(result, mock_response): + assert res == Box(expected) + + +@responses.activate +def test_list_deeptraces(zdx): + device_id = "123456789" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/deeptraces" + mock_response = [ + { + "trace_id": 0, + "trace_details": { + "session_name": "string", + "user_id": 0, + "username": "string", + "device_id": 0, + "device_name": "string", + "web_probe_id": 0, + "web_probe_name": "string", + }, + "status": "not_started", + "created_at": 0, + "started_at": 0, + "ended_at": 0, + }, + ] + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.list_deeptraces(device_id) + + assert isinstance(result, BoxList) + assert len(result) == len(mock_response) + for res, expected in zip(result, mock_response): + assert res == Box(expected) + + +@responses.activate +def test_start_deeptrace(zdx): + device_id = "123456789" + app_id = "987654321" + session_name = "My Deeptrace" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/deeptraces" + mock_response = {"trace_id": 0, "status": "not_started", "expected_time": 0} + responses.add(responses.POST, url, json=mock_response, status=200) + + result = zdx.devices.start_deeptrace(device_id, app_id, session_name) + + assert isinstance(result, Box) + assert result == Box(mock_response) + + +@responses.activate +def test_get_deeptrace(zdx): + device_id = "123456789" + trace_id = "987654321" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/deeptraces/{trace_id}" + mock_response = { + "trace_id": 0, + "trace_details": { + "session_name": "string", + "user_id": 0, + "username": "string", + "device_id": 0, + "device_name": "string", + "web_probe_id": 0, + "web_probe_name": "string", + }, + "status": "not_started", + "created_at": 0, + "started_at": 0, + "ended_at": 0, + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_deeptrace(device_id, trace_id) + + assert isinstance(result, Box) + assert result == Box(mock_response) + + +@responses.activate +def test_delete_deeptrace(zdx): + device_id = "123456789" + trace_id = "987654321" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/deeptraces/{trace_id}" + mock_response = {"trace_id": 0} + responses.add(responses.DELETE, url, json=mock_response, status=200) + + result = zdx.devices.delete_deeptrace(device_id, trace_id) + + assert isinstance(result.trace_id, int) + assert result == (mock_response) + + +@responses.activate +def test_get_deeptrace_webprobe_metrics(zdx): + device_id = "123456789" + trace_id = "987654321" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/deeptraces/{trace_id}/webprobe-metrics" + mock_response = {"metric": "string", "unit": "string", "datapoints": [{"timestamp": 0, "value": 0}]} + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_deeptrace_webprobe_metrics(device_id, trace_id) + + assert isinstance(result, Box) + assert result == Box(mock_response) + + +@responses.activate +def test_get_deeptrace_cloudpath_metrics(zdx): + device_id = "123456789" + trace_id = "987654321" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/deeptraces/{trace_id}/cloudpath-metrics" + mock_response = { + "leg_src": "string", + "leg_dst": "string", + "stats": [{"metric": "string", "unit": "string", "datapoints": [{"timestamp": 0, "value": 0}]}], + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_deeptrace_cloudpath_metrics(device_id, trace_id) + + assert isinstance(result, Box) + assert result == Box(mock_response) + + +@responses.activate +def test_get_deeptrace_cloudpath(zdx): + device_id = "123456789" + trace_id = "987654321" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/deeptraces/{trace_id}/cloudpath" + mock_response = { + "timestamp": 0, + "cloudpath": { + "src": "string", + "dst": "string", + "num_hops": 0, + "latency": 0, + "loss": 0, + "num_unresp_hops": 0, + "tunnel_type": 0, + "hops": [ + { + "ip": "string", + "gw_mac": "string", + "gw_mac_vendor": "string", + "pkt_sent": 0, + "pkt_rcvd": 0, + "latency_min": 0, + "latency_max": 0, + "latency_avg": 0, + "latency_diff": 0, + } + ], + }, + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_deeptrace_cloudpath(device_id, trace_id) + + assert isinstance(result, Box) + assert result == Box(mock_response) + + +@responses.activate +def test_get_deeptrace_health_metrics(zdx): + device_id = "123456789" + trace_id = "987654321" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/deeptraces/{trace_id}/health-metrics" + mock_response = { + "category": "string", + "instances": [ + { + "name": "string", + "metrics": [{"metric": "string", "unit": "string", "datapoints": [{"timestamp": 0, "value": 0}]}], + } + ], + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_deeptrace_health_metrics(device_id, trace_id) + + assert isinstance(result, Box) + assert result == Box(mock_response) + + +@responses.activate +def test_get_deeptrace_events(zdx): + device_id = "123456789" + trace_id = "987654321" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/deeptraces/{trace_id}/events" + mock_response = [ + { + "timestamp": 1643525900, + "events": [ + { + "category": "Zscaler", + "name": "tunType", + }, + { + "category": "Zscaler", + "name": "ziaState", + }, + ], + }, + { + "timestamp": 1643526200, + "events": [ + { + "category": "Zscaler", + "name": "tunType", + }, + { + "category": "Zscaler", + "name": "ziaState", + }, + ], + }, + ] + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_deeptrace_events(device_id, trace_id) + + assert isinstance(result, list) + assert result == BoxList(mock_response) + + +@responses.activate +def test_get_deeptrace_top_processes(zdx): + device_id = "123456789" + trace_id = "987654321" + url = f"https://api.zdxcloud.net/v1/devices/{device_id}/deeptraces/{trace_id}/top-processes" + mock_response = {"timestamp": 0, "top_processes": [{"category": "string", "processes": [{"name": "string", "id": 0}]}]} + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.devices.get_deeptrace_top_processes(device_id, trace_id) + + assert isinstance(result, Box) + assert result == Box(mock_response) diff --git a/tests/zdx/test_zdx_session.py b/tests/zdx/test_zdx_session.py new file mode 100644 index 0000000..5d9a756 --- /dev/null +++ b/tests/zdx/test_zdx_session.py @@ -0,0 +1,47 @@ +import responses +from box import Box + +from tests.conftest import stub_sleep + + +@responses.activate +def test_create_token(zdx): + client_id = "999999999" + client_secret = "admin@example.com" + url = "https://api.zdxcloud.net/v1/oauth/token" + mock_response = {"token": "test_token", "token_type": "Bearer", "expires_in": 3600} + responses.add(responses.POST, url, json=mock_response, status=200) + + result = zdx.session.create_token(client_id, client_secret) + + assert isinstance(result, Box) + assert result.token == mock_response["token"] + assert result.token_type == mock_response["token_type"] + assert result.expires_in == mock_response["expires_in"] + + +@responses.activate +@stub_sleep +def test_validate_token(zdx): + url = "https://api.zdxcloud.net/v1/oauth/validate" + mock_response = {"valid": True} + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.session.validate_token() + + assert isinstance(result, Box) + assert result.valid == mock_response["valid"] + + +@responses.activate +def test_get_jwks(zdx): + url = "https://api.zdxcloud.net/v1/oauth/jwks" + mock_response = { + "keys": [{"alg": "RS256", "kty": "RSA", "use": "sig", "x5c": ["test_string"], "kid": "test_kid", "x5t": "test_x5t"}] + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.session.get_jwks() + + assert isinstance(result, Box) + assert result.get("keys")[0].get("alg") == mock_response["keys"][0]["alg"] diff --git a/tests/zdx/test_zdx_users.py b/tests/zdx/test_zdx_users.py new file mode 100644 index 0000000..143b134 --- /dev/null +++ b/tests/zdx/test_zdx_users.py @@ -0,0 +1,50 @@ +import responses +from box import Box, BoxList + + +@responses.activate +def test_list_users(zdx): + url = "https://api.zdxcloud.net/v1/users" + mock_response = {"users": [{"id": 0, "name": "string", "email": "string"}], "next_offset": None} + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.users.list_users() + + assert isinstance(result, BoxList) + assert result == BoxList(mock_response["users"]) + + +@responses.activate +def test_get_user(zdx): + user_id = "999999999" + url = f"https://api.zdxcloud.net/v1/users/{user_id}" + mock_response = { + "id": 0, + "name": "string", + "email": "string", + "devices": [ + { + "id": 0, + "name": "string", + "geo_loc": [ + { + "id": "string", + "city": "string", + "state": "string", + "country": "string", + "geo_type": "string", + "geo_lat": "string", + "geo_long": "string", + "geo_detection": "string", + } + ], + "zs_loc": [{"id": 0, "name": "string"}], + } + ], + } + responses.add(responses.GET, url, json=mock_response, status=200) + + result = zdx.users.get_user(user_id) + + assert isinstance(result, Box) + assert result == Box(mock_response) diff --git a/tests/zia/test_locations.py b/tests/zia/test_locations.py index be2bfc2..a3415bb 100644 --- a/tests/zia/test_locations.py +++ b/tests/zia/test_locations.py @@ -248,6 +248,7 @@ def test_add_location(zia, locations): def test_update_location(zia, locations): updated_location = locations[0] updated_location["name"] = "Updated Test" + updated_location["displayTimeUnit"] = "MINUTE" responses.add( responses.GET, diff --git a/tests/zpa/test_zpa_connector_groups.py b/tests/zpa/test_zpa_connector_groups.py deleted file mode 100644 index b3a43f6..0000000 --- a/tests/zpa/test_zpa_connector_groups.py +++ /dev/null @@ -1,46 +0,0 @@ -import pytest -import responses -from box import Box, BoxList - -from tests.conftest import stub_sleep - - -# Don't need to test the data structure as we just have list and get -# methods available. id will suffice until add/update endpoints are available. -@pytest.fixture(name="connector_groups") -def fixture_connector_groups(): - return {"totalPages": 1, "list": [{"id": "1"}, {"id": "2"}]} - - -@responses.activate -@stub_sleep -def test_depr_list_connector_groups(zpa, connector_groups): - responses.add( - responses.GET, - url="https://config.private.zscaler.com/mgmtconfig/v1/admin/customers/1/appConnectorGroup?page=1", - json=connector_groups, - status=200, - ) - responses.add( - responses.GET, - url="https://config.private.zscaler.com/mgmtconfig/v1/admin/customers/1/appConnectorGroup?page=2", - json=[], - status=200, - ) - resp = zpa.connector_groups.list_groups() - assert isinstance(resp, BoxList) - assert len(resp) == 2 - assert resp[0].id == "1" - - -@responses.activate -def test_depr_get_connector_groups(zpa, connector_groups): - responses.add( - responses.GET, - url="https://config.private.zscaler.com/mgmtconfig/v1/admin/customers/1/appConnectorGroup/1", - json=connector_groups["list"][0], - status=200, - ) - resp = zpa.connector_groups.get_group("1") - assert isinstance(resp, Box) - assert resp.id == "1" diff --git a/tests/zpa/test_zpa_policies.py b/tests/zpa/test_zpa_policies.py index f05f2ba..d728ef7 100644 --- a/tests/zpa/test_zpa_policies.py +++ b/tests/zpa/test_zpa_policies.py @@ -13,6 +13,28 @@ def fixture_policies(): return {"totalPages": 1, "list": [{"id": "1"}, {"id": "2"}, {"id": "3"}]} +@pytest.fixture(name="policy_conditions") +def fixture_policy_conditions(): + return [ + [ + ("app", "id", "216197915188658453"), + ("app", "id", "216197915188658455"), + "OR", + ], + [ + ("scim", "216197915188658304", "john.doe@foo.bar"), + ("scim", "216197915188658304", "foo.bar"), + "OR", + ], + ("scim_group", "216197915188658303", "241874"), # check backward compatibility + [ + ("posture", "fc92ead2-4046-428d-bf3f-6e534a53194b", "TRUE"), + ("posture", "490db9b4-96d8-4035-9b5e-935daa697f45", "TRUE"), + "AND", + ], + ] + + @pytest.fixture(name="policy_rules") def fixture_policy_rules(): return { @@ -116,6 +138,34 @@ def test_list_policy_rules_error(zpa, policy_rules): resp = zpa.policies.list_rules("test") +def test_create_conditions(zpa, policy_conditions): + conditions = zpa.policies._create_conditions(policy_conditions) + assert conditions == [ + { + "operands": [ + {"objectType": "APP", "lhs": "id", "rhs": "216197915188658453"}, + {"objectType": "APP", "lhs": "id", "rhs": "216197915188658455"}, + ], + "operator": "OR", + }, + { + "operands": [ + {"objectType": "SCIM", "lhs": "216197915188658304", "rhs": "john.doe@foo.bar"}, + {"objectType": "SCIM", "lhs": "216197915188658304", "rhs": "foo.bar"}, + ], + "operator": "OR", + }, + {"operands": [{"objectType": "SCIM_GROUP", "lhs": "216197915188658303", "rhs": "241874"}]}, + { + "operands": [ + {"objectType": "POSTURE", "lhs": "fc92ead2-4046-428d-bf3f-6e534a53194b", "rhs": "TRUE"}, + {"objectType": "POSTURE", "lhs": "490db9b4-96d8-4035-9b5e-935daa697f45", "rhs": "TRUE"}, + ], + "operator": "AND", + }, + ] + + @responses.activate def test_get_access_policy(zpa, policies): responses.add(