From 04c644ab60a53e7dfcc045fd912b8ecf2920d16c Mon Sep 17 00:00:00 2001 From: JCruiz15 Date: Wed, 26 Jun 2024 11:48:14 +0200 Subject: [PATCH] feat: Find from cadastral registry function implemented --- README.md | 27 ++++++-- pyproject.toml | 2 +- sigpac_tools/__main__.py | 29 ++++++++- sigpac_tools/anotate.py | 13 +++- sigpac_tools/find.py | 83 ++++++++++++++++++++++++ sigpac_tools/locate.py | 4 ++ sigpac_tools/search.py | 4 +- sigpac_tools/utils.py | 132 ++++++++++++++++++++++++++++++++++++++- tests/test_search.py | 18 ++---- tests/test_utils.py | 65 +++++++++++++++++-- 10 files changed, 348 insertions(+), 29 deletions(-) create mode 100644 sigpac_tools/find.py diff --git a/README.md b/README.md index ce5bc22..7f0330d 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # [SIGPAC-Tools](https://github.com/KhaosResearch/sigpac-tools) ![Python Version from PEP 621 TOML](https://img.shields.io/python/required-version-toml?tomlFilePath=https%3A%2F%2Fraw.githubusercontent.com%2FKhaosResearch%2Fsigpac-tools%2Fmain%2Fpyproject.toml) -![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg) -![Status: Active](https://img.shields.io/badge/Status-Active-00aa00.svg) + [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) + ![Status: Active](https://img.shields.io/badge/Status-Active-00aa00.svg) ![Code style: Ruff](https://img.shields.io/badge/code%20style-Ruff-aa0000.svg) @@ -22,7 +22,7 @@ python -m pip install ./sigpac-tools or you can install it directly from the repository: ```bash -python -m pip install https://github.com/KhaosResearch/sigpac-tools +python -m pip install git+https://github.com/KhaosResearch/sigpac-tools ``` ## Usage @@ -100,6 +100,25 @@ geometry = geometry_from_coords( ) ``` +### Get information from a specific cadastral registry + +Known a cadastral registry, you can get the polygon and metadata from it using the `find_from_cadastral_registry` from the module `find`. +This function will return a tuple with a GeoJSON object of the polygon and a dictionary with the metadata of the plot. It will only return the metadata of the "parcela" layer. + +Note that urban cadastral registries are not supported yet, due to the lack of information about them in the SIGPAC database. + +If an invalid cadastral registry is provided, the function will raise a `ValueError`. If it detects that the cadastral registry is urban, it will raise a `NotImplementedError`. + +An example of how to use this function is shown below: + +```python +from sigpac_tools.find import find_from_cadastral_registry + +cadastral_registry = "06001A028000380000LH" +geom, metadata = find_from_cadastral_registry(cadastral_registry) +``` + + ## Acknowledgements This inspired by the JavaScript [SIGPAC client](https://github.com/dan96ct/sigpac-client) made by Daniel Cebrián. @@ -108,4 +127,4 @@ This inspired by the JavaScript [SIGPAC client](https://github.com/dan96ct/sigpa This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. -Copyright 2023 Khaos Research, all rights reserved. \ No newline at end of file +Copyright 2024 Khaos Research, all rights reserved. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index ba6ab20..e5da945 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "sigpac-tools" -version = "1.0.0" +version = "1.1.0" authors = [ { name="Juan Carlos Ruiz Ruiz", email="juancaruru@uma.es" }, ] diff --git a/sigpac_tools/__main__.py b/sigpac_tools/__main__.py index 51d2e43..23c9a74 100644 --- a/sigpac_tools/__main__.py +++ b/sigpac_tools/__main__.py @@ -144,6 +144,20 @@ def get_parser(): metavar="INT", ) + # Find cadastral registry command + + find_parser = subparsers.add_parser( + "find", help="Find a cadastral registry in the SIGPAC database" + ) + find_parser.add_argument( + "--registry", + "-r", + type=str, + help="Cadastral registry to search for", + required=True, + metavar="STRING" + ) + return parser @@ -170,8 +184,6 @@ def main(): from sigpac_tools.locate import geometry_from_coords import json - print(args) - layer = args.layer lat = args.lat lon = args.lon @@ -199,6 +211,19 @@ def main(): metadata = get_metadata(layer, data) logger.info(f"Metadata:\n{json.dumps(metadata, indent=2)}") return metadata + case "find": + from sigpac_tools.find import find_from_cadastral_registry + import json + + registry = args.registry + geom, metadata = find_from_cadastral_registry(registry) + logger.info( + f"Geometry for cadastral registry {registry}:\n{json.dumps(geom, indent=2)}" + ) + logger.info( + f"Metadata for cadastral registry {registry}:\n{json.dumps(metadata, indent=2)}" + ) + return geom, metadata case _: raise ValueError("Invalid command") diff --git a/sigpac_tools/anotate.py b/sigpac_tools/anotate.py index 9870cff..9a62427 100644 --- a/sigpac_tools/anotate.py +++ b/sigpac_tools/anotate.py @@ -65,7 +65,6 @@ def __query( "id": f"{province},{municipality},{aggregate},{zone},{polygon},{parcel},{enclosure}", } response = requests.get(url, params=params) - print(response.url) return response.json() case _: raise KeyError( @@ -125,7 +124,8 @@ def get_metadata(layer: str, data: dict): logger.info( f"Searching for the metadata of the location (province {prov}, municipality {muni}, polygon {polg}, parcel {parc}) in the SIGPAC database..." ) - return __query( + + res = __query( layer=layer, province=prov, municipality=muni, @@ -135,3 +135,12 @@ def get_metadata(layer: str, data: dict): aggregate=aggr, zone=zone, ) + if res is None: + raise ValueError( + f"The location (province {prov}, municipality {muni}, polygon {polg}, parcel {parc}) does not exist in the SIGPAC database. Please check the data and try again." + ) + else: + logger.info( + f"Metadata of the location (province {prov}, municipality {muni}, polygon {polg}, parcel {parc}{f', enclosure {encl}' if layer == 'recinto' else ''}) found in the SIGPAC database." + ) + return res diff --git a/sigpac_tools/find.py b/sigpac_tools/find.py new file mode 100644 index 0000000..528eac3 --- /dev/null +++ b/sigpac_tools/find.py @@ -0,0 +1,83 @@ +import structlog + +from sigpac_tools.search import search +from sigpac_tools.anotate import get_metadata +from sigpac_tools.locate import geometry_from_coords +from sigpac_tools.utils import read_cadastral_registry + + +logger = structlog.get_logger() + + +def find_from_cadastral_registry(cadastral_reg : str): + """ + Find the geometry and metadata of a cadastral reference in the SIGPAC database. The reference must be rural. Urban references are not supported. + + The expected cadastral registry is a 20 character string with the following format: + + - 2 characters for the province + - 3 characters for the municipality + - 1 character for the section + - 3 characters for the polygon + - 5 characters for the parcel + - 4 characters for the id + - 2 characters for the control + Parameters + ---------- + cadastral_reg : str + Cadastral reference to search for + + Returns + ------- + dict + Geojson geometry of the found reference + dict + Metadata of the found reference + + Raises + ------- + ValueError + If the cadastral reference does not exist in the SIGPAC database + ValueError + If the length of the cadastral reference is not 20 characters + ValueError + If the province of the cadastral reference is not valid + ValueError + If the reference is not valid + NotImplementedError + If the reference is urban + """ + reg = read_cadastral_registry(cadastral_reg) + + # Search for coordinates + + search_data = search( + reg + ) + if search_data["features"] == []: + raise ValueError(f"The cadastral reference {cadastral_reg} does not exist in the SIGPAC database. Please check the if the reference is correct and try again. Urban references are not supported.") + + coords_x = [] + coords_y = [] + for feat in search_data["features"]: + coords_x.append((feat["properties"]["x1"] + feat["properties"]["x2"]) / 2) + coords_y.append((feat["properties"]["y1"] + feat["properties"]["y2"]) / 2) + coords = [sum(coords_x) / len(coords_x), sum(coords_y) / len(coords_y)] + + # Get geometry + + geometry = geometry_from_coords( + layer="parcela", + lat=coords[1], + lon=coords[0], + reference=reg["parcel"] + ) + + # Get metadata + + metadata = get_metadata( + layer="parcela", + data=reg + ) + + return geometry, metadata diff --git a/sigpac_tools/locate.py b/sigpac_tools/locate.py index c4d1719..64d3cce 100644 --- a/sigpac_tools/locate.py +++ b/sigpac_tools/locate.py @@ -84,6 +84,10 @@ def geometry_from_coords(layer: str, lat: float, lon: float, reference: int) -> logger.warning( f"Reference '{reference}' not found in the layer '{layer}' at coordinates ({lat}, {lon})" ) + else: + logger.info( + f"Reference '{reference}' found in the layer '{layer}' at coordinates ({lat}, {lon})" + ) return result else: diff --git a/sigpac_tools/search.py b/sigpac_tools/search.py index edb1b5c..5be5b40 100644 --- a/sigpac_tools/search.py +++ b/sigpac_tools/search.py @@ -2,7 +2,7 @@ import structlog from sigpac_tools._globals import BASE_URL -from sigpac_tools.utils import findCommunity +from sigpac_tools.utils import find_community logger = structlog.get_logger() @@ -39,7 +39,7 @@ def search(data: dict) -> dict: '"Community" has not been specified, neither has been "province" and it is compulsory to find the community associated' ) else: - comm = findCommunity(prov) + comm = find_community(prov) if comm: if prov: diff --git a/sigpac_tools/utils.py b/sigpac_tools/utils.py index 53997d1..756dea5 100644 --- a/sigpac_tools/utils.py +++ b/sigpac_tools/utils.py @@ -1,8 +1,10 @@ import math import pyproj +import structlog from sigpac_tools._globals import PROVINCES_BY_COMMUNITY +logger = structlog.get_logger() def lng_lat_to_tile(lng: float, lat: float, zoom: float) -> tuple[int, int]: """Transforms the given coordinates from longitude and latitude to tile coordinates for the given zoom level @@ -75,7 +77,7 @@ def transform_coords(feature: dict) -> None: coord[1], coord[0] = optimus_prime.transform(coord[0], coord[1]) -def findCommunity(province_id: int) -> int: +def find_community(province_id: int) -> int: """Finds the community of the given province id Parameters @@ -92,3 +94,131 @@ def findCommunity(province_id: int) -> int: if province_id in provincias: return comunidad return None + + +def read_cadastral_registry(registry: str) -> dict: + """Read the cadastral reference, validates it and return its data as a dictionary. + The expected format is: + - 2 characters for the province + - 3 characters for the municipality + - 1 character for the section + - 3 characters for the polygon + - 5 characters for the parcel + - 4 characters for the id + - 2 characters for the control + + 20 characters in total. + + Parameters + ---------- + registry (str): Cadastrial reference to read + + Returns + ------- + dict: Data extracted from the cadastral reference + + Raises + ------- + ValueError: If the length of the cadastral reference is not 20 characters + """ + if len(registry) != 20: + raise ValueError("The cadastral reference must have a length of 20 characters") + registry = registry.upper().replace(" ", "") + + reg_prov = registry[:2] + reg_mun = registry[2:5] + reg_sec = registry[5] + reg_pol = registry[6:9] + reg_par = registry[9:14] + reg_id = registry[14:18] + reg_control = registry[18:] + + if not find_community(int(reg_prov)): + raise ValueError( + "The province of the cadastral reference is not valid. Please check if it is a correct rural reference and try again." + ) + + # Will raise an error if the reference is not valid or if it is urban, in any other case, it will log the result and continue + validate_cadastral_registry(registry) + + return { + "province": int(reg_prov), + "municipality": int(reg_mun), + "section": reg_sec, + "polygon": int(reg_pol), + "parcel": int(reg_par), + "id_inm": int(reg_id), + "control": reg_control, + } + + +def validate_cadastral_registry(reference: str) -> None: + """Validate the cadastral reference + + Given a cadastral reference, it validates if the reference is correct or not by comparing the present control characters with the calculated expected ones. + + Parameters + ---------- + reference (str): Cadastral reference to validate + + Returns + ------- + None + + Raises + ------- + ValueError: If the reference is not valid + NotImplementedError: If the reference is urban + """ + + sum_pd1 = 0 + sum_sd2 = 0 + mixt1 = 0 + reference = reference.upper().replace(" ", "") + pos = [13, 15, 12, 5, 4, 17, 9, 21, 3, 7, 1] + res = "MQWERTYUIOPASDFGHJKLBZX" + + if len(reference) != 20: + raise ValueError("The cadastral reference must have a length of 20 characters") + else: + separated_ref = list(reference) + + for i in range(7): + if separated_ref[i].isdigit(): + sum_pd1 += pos[i] * (ord(separated_ref[i]) - 48) + else: + if ord(separated_ref[i]) > 78: + sum_pd1 += pos[i] * (ord(separated_ref[i]) - 63) + else: + sum_pd1 += pos[i] * (ord(separated_ref[i]) - 64) + + for i in range(7): + if separated_ref[i + 7].isdigit(): + sum_sd2 += pos[i] * (ord(separated_ref[i + 7]) - 48) + else: + if ord(separated_ref[i + 7]) > 78: + sum_sd2 += pos[i] * (ord(separated_ref[i + 7]) - 63) + else: + sum_sd2 += pos[i] * (ord(separated_ref[i + 7]) - 64) + + for i in range(4): + mixt1 += pos[i + 7] * (ord(separated_ref[i + 14]) - 48) + + code_pos1 = (sum_pd1 + mixt1) % 23 + code_pos2 = (sum_sd2 + mixt1) % 23 + code1 = res[code_pos1] + code2 = res[code_pos2] + + typo = "URBAN" if separated_ref[5].isdigit() else "RURAL" + + if typo == "URBAN": + raise NotImplementedError( + "Urban cadastral references are not supported yet. Please check the reference and try again." + ) + + if code1 == separated_ref[18] and code2 == separated_ref[19]: + logger.info(f"Reference {reference} ({typo}) is valid.") + else: + raise ValueError( + f"Reference {reference} ({typo}) is not valid. Expected control characters: {code1}{code2}, but got {separated_ref[18]}{separated_ref[19]}. Please check the reference and try again." + ) diff --git a/tests/test_search.py b/tests/test_search.py index b7d376c..a30eba9 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -13,9 +13,7 @@ class TestSearch: @patch("sigpac_tools.search.requests.get") - @patch("sigpac_tools.search.findCommunity") - def test_search_provinces(self, mock_findCommunity, mock_get): - mock_findCommunity.return_value = 1 + def test_search_provinces(self, mock_get): mock_response = Mock() mock_response.json.return_value = provinces_response mock_get.return_value = mock_response @@ -28,8 +26,7 @@ def test_search_provinces(self, mock_findCommunity, mock_get): ) @patch("sigpac_tools.search.requests.get") - @patch("sigpac_tools.search.findCommunity") - def test_search_municipalities(self, mock_findCommunity, mock_get): + def test_search_municipalities(self, mock_get): mock_response = Mock() mock_response.json.return_value = municipalities_response mock_get.return_value = mock_response @@ -80,8 +77,7 @@ def test_search_specific_parcel(self, mock_get): f"{BASE_URL}/fega/ServiciosVisorSigpac/query/recintos/1/1/0/0/1/1.geojson" ) - @patch("sigpac_tools.search.findCommunity") - def test_missing_community_and_province(self, mock_findCommunity): + def test_missing_community_and_province(self): data = {"municipality": 1} with pytest.raises( ValueError, @@ -89,13 +85,11 @@ def test_missing_community_and_province(self, mock_findCommunity): ): search(data) - @patch("sigpac_tools.search.findCommunity") - def test_missing_community_not_found(self, mock_findCommunity): - mock_findCommunity.return_value = None - data = {"province": 1} + def test_missing_community_not_found(self): + data = {"parcel": 1} with pytest.raises( ValueError, - match='"Community" has not been specified and it could have not been found from the "province" parameter', + match='"Community" has not been specified, neither has been "province" and it is compulsory to find the community associated', ): search(data) diff --git a/tests/test_utils.py b/tests/test_utils.py index b1b8a37..bbd2704 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,7 +1,12 @@ import pytest -from sigpac_tools.utils import lng_lat_to_tile, transform_coords, findCommunity - +from sigpac_tools.utils import ( + lng_lat_to_tile, + transform_coords, + find_community, + read_cadastral_registry, + validate_cadastral_registry +) class TestLngLatToTile: def test_central_coordinates(self): @@ -42,16 +47,66 @@ class TestFindCommunity: def test_existing_province_id(self): province_id = 29 expected = 1 - assert findCommunity(province_id) == expected + assert find_community(province_id) == expected def test_non_existing_province_id(self): province_id = 60 - assert findCommunity(province_id) is None + assert find_community(province_id) is None def test_another_existing_province_id(self): province_id = 2 expected = 8 - assert findCommunity(province_id) == expected + assert find_community(province_id) == expected + + +class TestReadCadastralRegistry: + def test_read_cadastral_registry_valid(self): + registry = "29008A008005720000EQ" + expected_result = { + "province": 29, + "municipality": 8, + "section": "A", + "polygon": 8, + "parcel": 572, + "id_inm": 0, + "control": "EQ", + } + + result = read_cadastral_registry(registry) + assert result == expected_result + + def test_read_cadastral_registry_invalid_length(self): + with pytest.raises(ValueError, match="The cadastral reference must have a length of 20 characters"): + read_cadastral_registry("29008A008005720000EQ000") + with pytest.raises(ValueError, match="The cadastral reference must have a length of 20 characters"): + read_cadastral_registry("29008A00800572") + + def test_read_cadastral_registry_invalid_province(self): + with pytest.raises(ValueError, match="The province of the cadastral reference is not valid"): + read_cadastral_registry("99003000100123456789") + + +class TestValidateCadastralRegistry: + def test_validate_cadastral_registry_valid(self): + registry = "29008A008005720000EQ" + validate_cadastral_registry(registry) # This should pass without exceptions + + def test_validate_cadastral_registry_invalid_length(self): + with pytest.raises(ValueError, match="The cadastral reference must have a length of 20 characters"): + validate_cadastral_registry("29008A008005720000EQ000") + with pytest.raises(ValueError, match="The cadastral reference must have a length of 20 characters"): + validate_cadastral_registry("29008A00800572") + + def test_validate_cadastral_registry_urban_not_supported(self): + registry = "9872023VH5797S0001WX" + with pytest.raises(NotImplementedError): + validate_cadastral_registry(registry) + + def test_validate_cadastral_registry_invalid_control(self): + registry = "29008A008005720000OL" # Incorrect control characters + with pytest.raises(ValueError): + validate_cadastral_registry(registry) + if __name__ == "__main__":