Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidation BNLC : détecte les erreurs lors du décodage d'un CSV #3586

Merged
merged 10 commits into from
Nov 10, 2023
103 changes: 84 additions & 19 deletions apps/transport/lib/jobs/consolidate_bnlc_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
@s3_bucket :on_demand_validation

# Custom types
@type consolidation_errors :: %{dataset_errors: list(), validation_errors: list(), download_errors: list()}
@type decode_error :: {:decode, {map(), map()}}
@type download_error :: {:download, {map(), map()}}
@type consolidation_errors :: %{
dataset_errors: list(),
validation_errors: list(),
download_errors: [download_error()],
decode_errors: [decode_error()]
}
@type dataset_without_appropriate_resource_error :: %{
error: :not_at_least_one_appropriate_resource,
dataset_details: map()
Expand Down Expand Up @@ -57,11 +64,11 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
Logger.info("Finding valid resources…")
%{ok: resources_details, errors: validation_errors} = valid_datagouv_resources(datasets_details)

if validation_errors |> Enum.filter(&match?({:validation_error, _, _}, &1)) |> Enum.any?() do
if validator_unavailable?(validation_errors) do
{:discard, "Cannot consolidate the BNLC because the TableSchema validator is not available"}
else
Logger.info("Downloading resources…")
%{ok: download_details, errors: download_errors} = download_resources(resources_details)
%{ok: download_details, errors: download_or_decode_errors} = download_resources(resources_details)
Logger.info("Creating a single file")
consolidate_resources(download_details)

Expand All @@ -72,13 +79,20 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
|> send_email_recap(%{
dataset_errors: dataset_errors,
validation_errors: validation_errors,
download_errors: download_errors
download_errors: Enum.filter(download_or_decode_errors, &match?({:download, _}, &1)),
decode_errors: Enum.filter(download_or_decode_errors, &match?({:decode, _}, &1))
})

:ok
end
end

defp validator_unavailable?(validation_errors) do
validation_errors
|> Enum.filter(&match?({:validator_unavailable_error, _, _}, &1))
|> Enum.any?()
end
Comment on lines +90 to +94
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C'est du boy scout ceci et le renommage de :validation_error vers :validator_unavailable_error


defp upload_temporary_file do
content = File.read!(@bnlc_path)
now = DateTime.utc_now() |> DateTime.truncate(:microsecond) |> DateTime.to_string() |> String.replace(" ", "_")
Expand Down Expand Up @@ -128,8 +142,8 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
end

@spec format_errors(consolidation_errors()) :: binary() | nil
def format_errors(%{dataset_errors: _, validation_errors: _, download_errors: _} = errors) do
[&format_dataset_errors/1, &format_validation_errors/1, &format_download_errors/1]
def format_errors(%{dataset_errors: _, validation_errors: _, download_errors: _, decode_errors: _} = errors) do
[&format_dataset_errors/1, &format_validation_errors/1, &format_download_errors/1, &format_decode_errors/1]
|> Enum.map_join("\n\n", fn fmt_fn -> fmt_fn.(errors) end)
|> String.trim()
|> case do
Expand Down Expand Up @@ -169,12 +183,33 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
"""
end

@spec format_download_errors(%{download_errors: [download_error()]}) :: binary()
def format_download_errors(%{download_errors: []}), do: ""

def format_download_errors(%{download_errors: download_errors}) do
errors =
Enum.map(download_errors, fn {:download, {dataset, resource}} ->
{dataset, resource}
end)

"""
<h2>Impossible de télécharger les ressources suivantes</h2>
#{Enum.map_join(download_errors, "\n", &link_to_resource/1)}
#{Enum.map_join(errors, "\n", &link_to_resource/1)}
"""
end

@spec format_decode_errors(%{decode_errors: [decode_error()]}) :: binary()
def format_decode_errors(%{decode_errors: []}), do: ""

def format_decode_errors(%{decode_errors: decode_errors}) do
errors =
Enum.map(decode_errors, fn {:decode, {dataset, resource}} ->
{dataset, resource}
end)

"""
<h2>Impossible de décoder les fichiers CSV suivants</h2>
#{Enum.map_join(errors, "\n", &link_to_resource/1)}
"""
end

Expand Down Expand Up @@ -348,22 +383,22 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do

@spec valid_datagouv_resources([map()]) :: %{
ok: [],
errors: [{:error, map(), map()} | {:validation_error, map(), map()}]
errors: [{:error, map(), map()} | {:validator_unavailable_error, map(), map()}]
}
@doc """
Identifies valid resources according to the target schema.
For each resource, call the TableSchemaValidator to make sure the resource is valid.

Possible errors:
- the resource is not valid according to the schema ({`:error`, _, _})
- the validator is not available ({`:validation_error`, _, _})
- the validator is not available ({`:validator_unavailable_error`, _, _})
"""
def valid_datagouv_resources(datasets_details) do
analyze_resource = fn dataset_details, %{"url" => resource_url} = resource ->
case TableSchemaValidator.validate(@schema_name, resource_url) do
%{"has_errors" => true} -> {:errors, {:error, dataset_details, resource}}
%{"has_errors" => false} -> {:ok, {dataset_details, resource}}
nil -> {:errors, {:validation_error, dataset_details, resource}}
nil -> {:errors, {:validator_unavailable_error, dataset_details, resource}}
end
end

Expand All @@ -378,31 +413,61 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do

@doc """
From a list of resource object coming from the data.gouv.fr's API, download these (valid)
CSV files locally and guess the CSV separator.
CSV files locally, guess the CSV separator and try to decode the file.

The temporary download path and the guessed CSV separator are added to the resource's payload.

Possible errors:
- cannot download the resource
- cannot decode the CSV file
"""
@spec download_resources([map()]) :: %{ok: [], errors: []}
@spec download_resources([map()]) :: %{ok: [], errors: [decode_error() | download_error()]}
def download_resources(resources_details) do
resources_details
|> Enum.map(fn {dataset_details, %{"id" => resource_id, "url" => resource_url} = resource} ->
|> Enum.map(fn {dataset_details, %{"url" => resource_url} = resource} ->
case http_client().get(resource_url, [], follow_redirect: true) do
{:ok, %HTTPoison.Response{status_code: 200, body: body}} ->
path = System.tmp_dir!() |> Path.join("consolidate_bnlc_#{resource_id}")
File.write!(path, body)
resource = Map.merge(resource, %{@download_path_key => path, @separator_key => guess_csv_separator(body)})
{:ok, {dataset_details, resource}}
{:ok, %HTTPoison.Response{status_code: 200} = response} ->
guess_separator_and_decode({dataset_details, resource}, response)

_ ->
{:errors, {dataset_details, resource}}
{:errors, {:download, {dataset_details, resource}}}
end
end)
|> normalize_ok_errors()
end

@doc """
For a remote resource we successfully downloaded, we try to:
- guess the CSV separator of the file (using the header line)
- decode the CSV file with the guessed separator
"""
def guess_separator_and_decode({dataset_details, %{"id" => resource_id} = resource}, %HTTPoison.Response{
status_code: 200,
body: body
}) do
path = System.tmp_dir!() |> Path.join("consolidate_bnlc_#{resource_id}")
File.write!(path, body)
resource = Map.merge(resource, %{@download_path_key => path, @separator_key => guess_csv_separator(body)})
check_can_decode_csv(body, {dataset_details, resource})
end

defp check_can_decode_csv(
body,
{dataset_details, %{@separator_key => separator, @download_path_key => path} = resource}
) do
errors = [body] |> CSV.decode(separator: separator) |> Enum.filter(&(elem(&1, 0) == :error))

if Enum.empty?(errors) do
{:ok, {dataset_details, resource}}
else
# Could not decode the CSV:
# - we delete the temporary file since we will not include it in the consolidation
# - we return an error
File.rm!(path)
{:errors, {:decode, {dataset_details, resource}}}
end
end

@doc """
Make sure we always have `:ok` and `:errors` keys.

Expand Down
75 changes: 64 additions & 11 deletions apps/transport/test/transport/jobs/consolidate_bnlc_job_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,14 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
%{"title" => "Ressource avec erreurs", "schema" => %{"name" => @target_schema}}}
],
download_errors: [
{%{"page" => "https://example.com/jdd_download_error", "title" => "JDD avec erreur de téléchargement"},
%{"title" => "Ressource indisponible", "schema" => %{"name" => @target_schema}}}
{:download,
{%{"page" => "https://example.com/jdd_download_error", "title" => "JDD avec erreur de téléchargement"},
%{"title" => "Ressource indisponible", "schema" => %{"name" => @target_schema}}}}
],
decode_errors: [
{:decode,
{%{"page" => "https://example.com/jdd_decode_error", "title" => "JDD impossible à décoder"},
%{"title" => "Ressource mal formatée", "schema" => %{"name" => @target_schema}}}}
]
}

Expand All @@ -98,12 +104,22 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do


<h2>Impossible de télécharger les ressources suivantes</h2>
Ressource `Ressource indisponible` (<a href="https://example.com/jdd_download_error">JDD avec erreur de téléchargement</a>)\
Ressource `Ressource indisponible` (<a href="https://example.com/jdd_download_error">JDD avec erreur de téléchargement</a>)


<h2>Impossible de décoder les fichiers CSV suivants</h2>
Ressource `Ressource mal formatée` (<a href="https://example.com/jdd_decode_error">JDD impossible à décoder</a>)\
""" == ConsolidateBNLCJob.format_errors(errors)
end

test "nil when there are no errors" do
assert nil == ConsolidateBNLCJob.format_errors(%{dataset_errors: [], validation_errors: [], download_errors: []})
assert nil ==
ConsolidateBNLCJob.format_errors(%{
dataset_errors: [],
validation_errors: [],
download_errors: [],
decode_errors: []
})
end
end

Expand Down Expand Up @@ -154,7 +170,7 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
assert %{
errors: [
{:error, dataset_details, other_resource},
{:validation_error, other_dataset_details, validation_error_resource}
{:validator_unavailable_error, other_dataset_details, validation_error_resource}
],
ok: [{dataset_details, resource}]
} == ConsolidateBNLCJob.valid_datagouv_resources(datasets_details)
Expand All @@ -172,17 +188,28 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
"slug" => "foo"
}

dataset_error_detail = %{
dataset_download_error_detail = %{
"resources" => [
resource_error = %{
resource_download_error = %{
"id" => Ecto.UUID.generate(),
"schema" => %{"name" => @target_schema},
"url" => error_url = "https://example.com/other_file.csv"
"url" => download_error_url = "https://example.com/download_error.csv"
}
],
"slug" => "bar"
}

dataset_decode_error_detail = %{
"resources" => [
resource_decode_error = %{
"id" => resource_decode_error_id = Ecto.UUID.generate(),
"schema" => %{"name" => @target_schema},
"url" => decode_error_url = "https://example.com/decode_error.csv"
}
],
"slug" => "baz"
}

Transport.HTTPoison.Mock
|> expect(:get, fn ^url, [], [follow_redirect: true] ->
body = """
Expand All @@ -194,14 +221,39 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
end)

Transport.HTTPoison.Mock
|> expect(:get, fn ^error_url, [], [follow_redirect: true] ->
|> expect(:get, fn ^download_error_url, [], [follow_redirect: true] ->
{:ok, %HTTPoison.Response{status_code: 500, body: ""}}
end)

resources_details = [{dataset_detail, resource}, {dataset_error_detail, resource_error}]
Transport.HTTPoison.Mock
|> expect(:get, fn ^decode_error_url, [], [follow_redirect: true] ->
# Malformed CSV: unescaped double quotes: `""2"`
body = """
"foo","bar"
"1",""2"
"""

{:ok, %HTTPoison.Response{status_code: 200, body: body}}
end)

resources_details = [
{dataset_detail, resource},
{dataset_download_error_detail, resource_download_error},
{dataset_decode_error_detail, resource_decode_error}
]

assert %{
errors: [{^dataset_error_detail, ^resource_error}],
errors: [
{:download, {^dataset_download_error_detail, ^resource_download_error}},
{:decode,
{^dataset_decode_error_detail,
%{
"id" => ^resource_decode_error_id,
"csv_separator" => ?,,
"tmp_download_path" => tmp_decode_error_download_path,
"url" => ^decode_error_url
}}}
],
ok: [
{^dataset_detail,
%{"id" => ^resource_id, "url" => ^url, "csv_separator" => ?,, "tmp_download_path" => tmp_download_path}}
Expand All @@ -210,6 +262,7 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do

assert String.ends_with?(tmp_download_path, "consolidate_bnlc_#{resource_id}")
assert File.exists?(tmp_download_path)
refute File.exists?(tmp_decode_error_download_path)
end

test "guess_csv_separator" do
Expand Down