diff --git a/apps/transport/lib/jobs/consolidate_bnlc_job.ex b/apps/transport/lib/jobs/consolidate_bnlc_job.ex
index 01f4788713..ad918adb9f 100644
--- a/apps/transport/lib/jobs/consolidate_bnlc_job.ex
+++ b/apps/transport/lib/jobs/consolidate_bnlc_job.ex
@@ -23,7 +23,14 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
@s3_bucket :on_demand_validation
# Custom types
- @type consolidation_errors :: %{dataset_errors: list(), validation_errors: list(), download_errors: list()}
+ @type decode_error :: {:decode, {map(), map()}}
+ @type download_error :: {:download, {map(), map()}}
+ @type consolidation_errors :: %{
+ dataset_errors: list(),
+ validation_errors: list(),
+ download_errors: [download_error()],
+ decode_errors: [decode_error()]
+ }
@type dataset_without_appropriate_resource_error :: %{
error: :not_at_least_one_appropriate_resource,
dataset_details: map()
@@ -57,11 +64,11 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
Logger.info("Finding valid resources…")
%{ok: resources_details, errors: validation_errors} = valid_datagouv_resources(datasets_details)
- if validation_errors |> Enum.filter(&match?({:validation_error, _, _}, &1)) |> Enum.any?() do
+ if validator_unavailable?(validation_errors) do
{:discard, "Cannot consolidate the BNLC because the TableSchema validator is not available"}
else
Logger.info("Downloading resources…")
- %{ok: download_details, errors: download_errors} = download_resources(resources_details)
+ %{ok: download_details, errors: download_or_decode_errors} = download_resources(resources_details)
Logger.info("Creating a single file")
consolidate_resources(download_details)
@@ -72,13 +79,20 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
|> send_email_recap(%{
dataset_errors: dataset_errors,
validation_errors: validation_errors,
- download_errors: download_errors
+ download_errors: Enum.filter(download_or_decode_errors, &match?({:download, _}, &1)),
+ decode_errors: Enum.filter(download_or_decode_errors, &match?({:decode, _}, &1))
})
:ok
end
end
+ defp validator_unavailable?(validation_errors) do
+ validation_errors
+ |> Enum.filter(&match?({:validator_unavailable_error, _, _}, &1))
+ |> Enum.any?()
+ end
+
defp upload_temporary_file do
content = File.read!(@bnlc_path)
now = DateTime.utc_now() |> DateTime.truncate(:microsecond) |> DateTime.to_string() |> String.replace(" ", "_")
@@ -128,8 +142,8 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
end
@spec format_errors(consolidation_errors()) :: binary() | nil
- def format_errors(%{dataset_errors: _, validation_errors: _, download_errors: _} = errors) do
- [&format_dataset_errors/1, &format_validation_errors/1, &format_download_errors/1]
+ def format_errors(%{dataset_errors: _, validation_errors: _, download_errors: _, decode_errors: _} = errors) do
+ [&format_dataset_errors/1, &format_validation_errors/1, &format_download_errors/1, &format_decode_errors/1]
|> Enum.map_join("\n\n", fn fmt_fn -> fmt_fn.(errors) end)
|> String.trim()
|> case do
@@ -169,12 +183,33 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
"""
end
+ @spec format_download_errors(%{download_errors: [download_error()]}) :: binary()
def format_download_errors(%{download_errors: []}), do: ""
def format_download_errors(%{download_errors: download_errors}) do
+ errors =
+ Enum.map(download_errors, fn {:download, {dataset, resource}} ->
+ {dataset, resource}
+ end)
+
"""
Impossible de télécharger les ressources suivantes
- #{Enum.map_join(download_errors, "\n", &link_to_resource/1)}
+ #{Enum.map_join(errors, "\n", &link_to_resource/1)}
+ """
+ end
+
+ @spec format_decode_errors(%{decode_errors: [decode_error()]}) :: binary()
+ def format_decode_errors(%{decode_errors: []}), do: ""
+
+ def format_decode_errors(%{decode_errors: decode_errors}) do
+ errors =
+ Enum.map(decode_errors, fn {:decode, {dataset, resource}} ->
+ {dataset, resource}
+ end)
+
+ """
+ Impossible de décoder les fichiers CSV suivants
+ #{Enum.map_join(errors, "\n", &link_to_resource/1)}
"""
end
@@ -348,7 +383,7 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
@spec valid_datagouv_resources([map()]) :: %{
ok: [],
- errors: [{:error, map(), map()} | {:validation_error, map(), map()}]
+ errors: [{:error, map(), map()} | {:validator_unavailable_error, map(), map()}]
}
@doc """
Identifies valid resources according to the target schema.
@@ -356,14 +391,14 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
Possible errors:
- the resource is not valid according to the schema ({`:error`, _, _})
- - the validator is not available ({`:validation_error`, _, _})
+ - the validator is not available ({`:validator_unavailable_error`, _, _})
"""
def valid_datagouv_resources(datasets_details) do
analyze_resource = fn dataset_details, %{"url" => resource_url} = resource ->
case TableSchemaValidator.validate(@schema_name, resource_url) do
%{"has_errors" => true} -> {:errors, {:error, dataset_details, resource}}
%{"has_errors" => false} -> {:ok, {dataset_details, resource}}
- nil -> {:errors, {:validation_error, dataset_details, resource}}
+ nil -> {:errors, {:validator_unavailable_error, dataset_details, resource}}
end
end
@@ -378,31 +413,61 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
@doc """
From a list of resource object coming from the data.gouv.fr's API, download these (valid)
- CSV files locally and guess the CSV separator.
+ CSV files locally, guess the CSV separator and try to decode the file.
The temporary download path and the guessed CSV separator are added to the resource's payload.
Possible errors:
- cannot download the resource
+ - cannot decode the CSV file
"""
- @spec download_resources([map()]) :: %{ok: [], errors: []}
+ @spec download_resources([map()]) :: %{ok: [], errors: [decode_error() | download_error()]}
def download_resources(resources_details) do
resources_details
- |> Enum.map(fn {dataset_details, %{"id" => resource_id, "url" => resource_url} = resource} ->
+ |> Enum.map(fn {dataset_details, %{"url" => resource_url} = resource} ->
case http_client().get(resource_url, [], follow_redirect: true) do
- {:ok, %HTTPoison.Response{status_code: 200, body: body}} ->
- path = System.tmp_dir!() |> Path.join("consolidate_bnlc_#{resource_id}")
- File.write!(path, body)
- resource = Map.merge(resource, %{@download_path_key => path, @separator_key => guess_csv_separator(body)})
- {:ok, {dataset_details, resource}}
+ {:ok, %HTTPoison.Response{status_code: 200} = response} ->
+ guess_separator_and_decode({dataset_details, resource}, response)
_ ->
- {:errors, {dataset_details, resource}}
+ {:errors, {:download, {dataset_details, resource}}}
end
end)
|> normalize_ok_errors()
end
+ @doc """
+ For a remote resource we successfully downloaded, we try to:
+ - guess the CSV separator of the file (using the header line)
+ - decode the CSV file with the guessed separator
+ """
+ def guess_separator_and_decode({dataset_details, %{"id" => resource_id} = resource}, %HTTPoison.Response{
+ status_code: 200,
+ body: body
+ }) do
+ path = System.tmp_dir!() |> Path.join("consolidate_bnlc_#{resource_id}")
+ File.write!(path, body)
+ resource = Map.merge(resource, %{@download_path_key => path, @separator_key => guess_csv_separator(body)})
+ check_can_decode_csv(body, {dataset_details, resource})
+ end
+
+ defp check_can_decode_csv(
+ body,
+ {dataset_details, %{@separator_key => separator, @download_path_key => path} = resource}
+ ) do
+ errors = [body] |> CSV.decode(separator: separator) |> Enum.filter(&(elem(&1, 0) == :error))
+
+ if Enum.empty?(errors) do
+ {:ok, {dataset_details, resource}}
+ else
+ # Could not decode the CSV:
+ # - we delete the temporary file since we will not include it in the consolidation
+ # - we return an error
+ File.rm!(path)
+ {:errors, {:decode, {dataset_details, resource}}}
+ end
+ end
+
@doc """
Make sure we always have `:ok` and `:errors` keys.
diff --git a/apps/transport/test/transport/jobs/consolidate_bnlc_job_test.exs b/apps/transport/test/transport/jobs/consolidate_bnlc_job_test.exs
index 10ca585d9a..a41caa9260 100644
--- a/apps/transport/test/transport/jobs/consolidate_bnlc_job_test.exs
+++ b/apps/transport/test/transport/jobs/consolidate_bnlc_job_test.exs
@@ -82,8 +82,14 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
%{"title" => "Ressource avec erreurs", "schema" => %{"name" => @target_schema}}}
],
download_errors: [
- {%{"page" => "https://example.com/jdd_download_error", "title" => "JDD avec erreur de téléchargement"},
- %{"title" => "Ressource indisponible", "schema" => %{"name" => @target_schema}}}
+ {:download,
+ {%{"page" => "https://example.com/jdd_download_error", "title" => "JDD avec erreur de téléchargement"},
+ %{"title" => "Ressource indisponible", "schema" => %{"name" => @target_schema}}}}
+ ],
+ decode_errors: [
+ {:decode,
+ {%{"page" => "https://example.com/jdd_decode_error", "title" => "JDD impossible à décoder"},
+ %{"title" => "Ressource mal formatée", "schema" => %{"name" => @target_schema}}}}
]
}
@@ -98,12 +104,22 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
Impossible de télécharger les ressources suivantes
- Ressource `Ressource indisponible` (JDD avec erreur de téléchargement)\
+ Ressource `Ressource indisponible` (JDD avec erreur de téléchargement)
+
+
+ Impossible de décoder les fichiers CSV suivants
+ Ressource `Ressource mal formatée` (JDD impossible à décoder)\
""" == ConsolidateBNLCJob.format_errors(errors)
end
test "nil when there are no errors" do
- assert nil == ConsolidateBNLCJob.format_errors(%{dataset_errors: [], validation_errors: [], download_errors: []})
+ assert nil ==
+ ConsolidateBNLCJob.format_errors(%{
+ dataset_errors: [],
+ validation_errors: [],
+ download_errors: [],
+ decode_errors: []
+ })
end
end
@@ -154,7 +170,7 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
assert %{
errors: [
{:error, dataset_details, other_resource},
- {:validation_error, other_dataset_details, validation_error_resource}
+ {:validator_unavailable_error, other_dataset_details, validation_error_resource}
],
ok: [{dataset_details, resource}]
} == ConsolidateBNLCJob.valid_datagouv_resources(datasets_details)
@@ -172,17 +188,28 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
"slug" => "foo"
}
- dataset_error_detail = %{
+ dataset_download_error_detail = %{
"resources" => [
- resource_error = %{
+ resource_download_error = %{
"id" => Ecto.UUID.generate(),
"schema" => %{"name" => @target_schema},
- "url" => error_url = "https://example.com/other_file.csv"
+ "url" => download_error_url = "https://example.com/download_error.csv"
}
],
"slug" => "bar"
}
+ dataset_decode_error_detail = %{
+ "resources" => [
+ resource_decode_error = %{
+ "id" => resource_decode_error_id = Ecto.UUID.generate(),
+ "schema" => %{"name" => @target_schema},
+ "url" => decode_error_url = "https://example.com/decode_error.csv"
+ }
+ ],
+ "slug" => "baz"
+ }
+
Transport.HTTPoison.Mock
|> expect(:get, fn ^url, [], [follow_redirect: true] ->
body = """
@@ -194,14 +221,39 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
end)
Transport.HTTPoison.Mock
- |> expect(:get, fn ^error_url, [], [follow_redirect: true] ->
+ |> expect(:get, fn ^download_error_url, [], [follow_redirect: true] ->
{:ok, %HTTPoison.Response{status_code: 500, body: ""}}
end)
- resources_details = [{dataset_detail, resource}, {dataset_error_detail, resource_error}]
+ Transport.HTTPoison.Mock
+ |> expect(:get, fn ^decode_error_url, [], [follow_redirect: true] ->
+ # Malformed CSV: unescaped double quotes: `""2"`
+ body = """
+ "foo","bar"
+ "1",""2"
+ """
+
+ {:ok, %HTTPoison.Response{status_code: 200, body: body}}
+ end)
+
+ resources_details = [
+ {dataset_detail, resource},
+ {dataset_download_error_detail, resource_download_error},
+ {dataset_decode_error_detail, resource_decode_error}
+ ]
assert %{
- errors: [{^dataset_error_detail, ^resource_error}],
+ errors: [
+ {:download, {^dataset_download_error_detail, ^resource_download_error}},
+ {:decode,
+ {^dataset_decode_error_detail,
+ %{
+ "id" => ^resource_decode_error_id,
+ "csv_separator" => ?,,
+ "tmp_download_path" => tmp_decode_error_download_path,
+ "url" => ^decode_error_url
+ }}}
+ ],
ok: [
{^dataset_detail,
%{"id" => ^resource_id, "url" => ^url, "csv_separator" => ?,, "tmp_download_path" => tmp_download_path}}
@@ -210,6 +262,7 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
assert String.ends_with?(tmp_download_path, "consolidate_bnlc_#{resource_id}")
assert File.exists?(tmp_download_path)
+ refute File.exists?(tmp_decode_error_download_path)
end
test "guess_csv_separator" do