Skip to content

Commit

Permalink
Merge branch 'main' into python-bigquery-migration-migration
Browse files Browse the repository at this point in the history
  • Loading branch information
dizcology authored Jun 21, 2023
2 parents ab77a0a + 2b0fd48 commit bdeaf5a
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 0 deletions.
162 changes: 162 additions & 0 deletions dlp/snippets/deid.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,137 @@ def write_data(data: types.storage.Value) -> str:
# [END dlp_deidentify_date_shift]


# [START dlp_deidentify_time_extract]
import csv # noqa: F811, E402, I100
from datetime import datetime # noqa: F811, E402, I100
from typing import List # noqa: F811, E402

import google.cloud.dlp # noqa: F811, E402


def deidentify_with_time_extract(
project: str,
date_fields: List[str],
input_csv_file: str,
output_csv_file: str,
) -> None:
""" Uses the Data Loss Prevention API to deidentify dates in a CSV file through
time part extraction.
Args:
project: The Google Cloud project id to use as a parent resource.
date_fields: A list of (date) fields in CSV file to de-identify
through time extraction. Example: ['birth_date', 'register_date'].
Date values in format: mm/DD/YYYY are considered as part of this
sample.
input_csv_file: The path to the CSV file to deidentify. The first row
of the file must specify column names, and all other rows must
contain valid values.
output_csv_file: The output file path to save the time extracted data.
"""

# Instantiate a client.
dlp = google.cloud.dlp_v2.DlpServiceClient()

# Convert date field list to Protobuf type.
def map_fields(field):
return {"name": field}

if date_fields:
date_fields = map(map_fields, date_fields)
else:
date_fields = []

csv_lines = []
with open(input_csv_file) as csvfile:
reader = csv.reader(csvfile)
for row in reader:
csv_lines.append(row)

# Helper function for converting CSV rows to Protobuf types
def map_headers(header):
return {"name": header}

def map_data(value):
try:
date = datetime.strptime(value, "%m/%d/%Y")
return {
"date_value": {
"year": date.year, "month": date.month, "day": date.day
}
}
except ValueError:
return {"string_value": value}

def map_rows(row):
return {"values": map(map_data, row)}

# Using the helper functions, convert CSV rows to protobuf-compatible
# dictionaries.
csv_headers = map(map_headers, csv_lines[0])
csv_rows = map(map_rows, csv_lines[1:])

# Construct the table dictionary.
table = {"headers": csv_headers, "rows": csv_rows}

# Construct the `item` for table to de-identify.
item = {"table": table}

# Construct deidentify configuration dictionary.
deidentify_config = {
"record_transformations": {
"field_transformations": [
{
"primitive_transformation": {
"time_part_config": {
"part_to_extract": "YEAR"
}
},
"fields": date_fields,
}
]
}
}

# Write to CSV helper methods.
def write_header(header):
return header.name

def write_data(data):
return data.string_value or "{}/{}/{}".format(
data.date_value.month,
data.date_value.day,
data.date_value.year,
)

# Convert the project id into a full resource id.
parent = f"projects/{project}"

# Call the API
response = dlp.deidentify_content(
request={
"parent": parent,
"deidentify_config": deidentify_config,
"item": item,
}
)

# Print the result.
print("Table after de-identification: {}".format(response.item.table))

# Write results to CSV file.
with open(output_csv_file, "w") as csvfile:
write_file = csv.writer(csvfile, delimiter=",")
write_file.writerow(map(write_header, response.item.table.headers))
for row in response.item.table.rows:
write_file.writerow(map(write_data, row.values))

# Print status.
print(f"Successfully saved date-extracted output to {output_csv_file}")


# [END dlp_deidentify_time_extract]


# [START dlp_deidentify_replace_infotype]
from typing import List # noqa: F811, E402, I100

Expand Down Expand Up @@ -2124,6 +2255,30 @@ def deidentify_table_with_multiple_crypto_hash(
"key_name.",
)

time_extract_parser = subparsers.add_parser(
"deid_time_extract",
help="Deidentify dates in a CSV file by extracting a date part.",
)
time_extract_parser.add_argument(
"project",
help="The Google Cloud project id to use as a parent resource.",
)
time_extract_parser.add_argument(
"input_csv_file",
help="The path to the CSV file to deidentify. The first row of the "
"file must specify column names, and all other rows must contain "
"valid values.",
)
time_extract_parser.add_argument(
"date_fields",
nargs="+",
help="The list of date fields in the CSV file to de-identify. Example: "
"['birth_date', 'register_date']",
)
time_extract_parser.add_argument(
"output_csv_file", help="The path to save the time-extracted data."
)

replace_with_infotype_parser = subparsers.add_parser(
"replace_with_infotype",
help="Deidentify sensitive data in a string by replacing it with the "
Expand Down Expand Up @@ -2485,6 +2640,13 @@ def deidentify_table_with_multiple_crypto_hash(
wrapped_key=args.wrapped_key,
key_name=args.key_name,
)
elif args.content == "deid_time_extract":
deidentify_with_time_extract(
args.project,
date_fields=args.date_fields,
input_csv_file=args.input_csv_file,
output_csv_file=args.output_csv_file,
)
elif args.content == "replace_with_infotype":
deidentify_with_replace_infotype(
args.project,
Expand Down
15 changes: 15 additions & 0 deletions dlp/snippets/deid_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,21 @@ def test_deidentify_with_date_shift_using_context_field(
assert "Successful" in out


def test_deidentify_with_time_extract(tempdir: TextIO, capsys: pytest.CaptureFixture) -> None:
output_filepath = os.path.join(str(tempdir), "year-extracted.csv")

deid.deidentify_with_time_extract(
GCLOUD_PROJECT,
input_csv_file=CSV_FILE,
output_csv_file=output_filepath,
date_fields=DATE_FIELDS,
)

out, _ = capsys.readouterr()

assert "Successful" in out


def test_reidentify_with_fpe(capsys: pytest.CaptureFixture) -> None:
labeled_fpe_string = "My SSN is SSN_TOKEN(9):731997681"

Expand Down

0 comments on commit bdeaf5a

Please sign in to comment.