Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a query struct for managing query data #19

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## Breaking

- add `Snowflex.Query` as the way to create queries and pass data.
- remove `Snowflex.sql_query` and `Snowflex.param_query` in favor of using a single `Snowflex.run_query` function that uses the new `Snowflex.Query`.

## Added

- `Snowflex.Connection.execute` now accepts a keyword list of options to override `Snowflex.Connection`-level options on a per query basis.
- Updated deps

## [0.3.2] - 2021-06-02

### Added

- Added `map_nulls_to_nil?` variable to connection configuration to allow conversion of `:null` values to `:nil` in snowflake query response

## [0.3.1] - 2021-03-10
Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,24 @@ def MyApp.Application do
end
```

## Usage

In order to execute queries, you will need to create a `Snowflex.Query` struct and pass it to your execute function. It would look something like this for a static query.

```elixir
{:ok, query} = Snowflex.Query.create(%{query_string: "SELECT * FROM my_table"})
MyApp.SnowflakeConnection.execute(query)
```

The `Query.create/1` function will also attempt to cast your SQL params.

```elixir
# This will cast the params to [{:sql_integer, 1}]
{:ok, query} = Snowflex.Query.create(%{query_string: "SELECT * FROM my_table WHERE id = ?", params: [1]})

MyApp.SnowflakeConnection.execute(query)
```

## Caveats

If you are planning to connect to the Snowflake warehouse, your local Erlang instance
Expand Down
30 changes: 16 additions & 14 deletions lib/snowflex.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
defmodule Snowflex do
@moduledoc """
The client interface for connecting to the Snowflake data warehouse.
The client interface for connecting to the Snowflake data warehouse. This module should not be used directly except for the types. The preferred method is to use a `Snowflex.Connection` to manage all query executions.

The main entry point to this module is `Snowflex.sql_query`. This function takes a string containing
The main entry point to this module is `Snowflex.run_query`. This function takes a `Snowflex.Query` struct containing
a SQL query and returns a list of maps (one per row). NOTE: due to the way the Erlang ODBC works, all values comeback
as strings. You will need to cast values appropriately.
"""
alias Ecto.Changeset
alias Snowflex.Worker
alias Snowflex.{Worker, Query}

# Shamelessly copied from http://erlang.org/doc/man/odbc.html#common-data-types-
@type precision :: integer()
Expand All @@ -34,45 +34,47 @@ defmodule Snowflex do

@type query_param :: {odbc_data_type(), [value()]}
@type sql_data :: list(%{optional(String.t()) => String.t()})
@type query_opts :: [timeout: timeout(), map_null_to_nil?: boolean()]
@type connection_opts :: [timeout: timeout(), map_null_to_nil?: boolean()]

@spec sql_query(atom(), String.t(), query_opts()) ::
@doc """
Runs a query using the specified pool
"""
@spec run_query(pool_name :: atom(), query :: Query.t(), connection_opts()) ::
sql_data() | {:error, term()}
def sql_query(pool_name, query, opts) do
def run_query(pool_name, query = %Query{params: nil}, opts) do
timeout = Keyword.get(opts, :timeout)

case :poolboy.transaction(
pool_name,
&Worker.sql_query(&1, query, timeout),
&Worker.sql_query(&1, query.query_string, timeout),
timeout
) do
{:ok, results} -> process_results(results, opts)
err -> err
end
end

@spec param_query(atom(), String.t(), list(query_param()), query_opts()) ::
sql_data() | {:error, term()}
def param_query(pool_name, query, params, opts) do
def run_query(pool_name, query = %Query{}, opts) do
timeout = Keyword.get(opts, :timeout)

case :poolboy.transaction(
pool_name,
&Worker.param_query(&1, query, params, timeout),
&Worker.param_query(&1, query.query_string, query.params, timeout),
timeout
) do
{:ok, results} -> process_results(results, opts)
err -> err
end
end

@doc """
Cast data results from a query into a given schema
"""
@spec cast_results(data :: Enum.t(), schema :: Ecto.Schema.t()) :: list
def cast_results(data, schema) do
Enum.map(data, &cast_row(&1, schema))
end

def int_param(val), do: {:sql_integer, val}
def string_param(val, length \\ 250), do: {{:sql_varchar, length}, val}

# Helpers

defp process_results(data, opts) when is_list(data) do
Expand Down
44 changes: 22 additions & 22 deletions lib/snowflex/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ defmodule Snowflex.Connection do
use Snowflex.Connection,
otp_app: :my_app,
timeout: :timer.seconds(60),
keep_alive?: true
keep_alive?: true,
map_nulls_to_nil?: true
end
```

Expand Down Expand Up @@ -67,19 +68,27 @@ defmodule Snowflex.Connection do

`execute/1`
```
query = "SELECT * FROM foo"
{:ok, query} = Snowflex.Query.create(%{query_string: "SELECT * FROM foo"})

SnowflakeConnection.execute(query)
```

`execute/2`
```
query = \"""
# or with parameters

query_string = \"""
SELECT * FROM foo
WHERE bar = ?
\"""

SnowflakeConnection.execute(query, [Snowflex.string_param("baz")])
{:ok, query} = Snowflex.Query.create(%{query_string: query_string, params: ["baz"]})
SnowflakeConnection.execute(query)
```

You may also override any Connection-level options on each execute.

```
{:ok, query} = Snowflex.Query.create(%{query_string: "SELECT * FROM foo"})

SnowflakeConnection.execute(query, timeout: :timer.minutes(1))
```
"""

Expand All @@ -102,7 +111,7 @@ defmodule Snowflex.Connection do
]
@keep_alive? keep_alive?
@heartbeat_interval :timer.hours(3)
@query_opts [
@default_connection_opts [
timeout: timeout,
map_nulls_to_nil?: map_nulls_to_nil?
]
Expand Down Expand Up @@ -133,27 +142,18 @@ defmodule Snowflex.Connection do
end

@impl Snowflex.Connection
def execute(query) when is_binary(query) do
Snowflex.sql_query(@name, query, @query_opts)
end

@impl Snowflex.Connection
def execute(query, params) when is_binary(query) and is_list(params) do
Snowflex.param_query(@name, query, params, @query_opts)
def execute(query = %Snowflex.Query{}, connection_opts \\ []) do
connection_opts = Keyword.merge(@default_connection_opts, connection_opts)
Snowflex.run_query(@name, query, connection_opts)
end
end
end

## Callbacks

@doc """
Wraps `Snowflex.sql_query/3` and injects the relevant information from the connection
"""
@callback execute(query :: String.t()) :: Snowflex.sql_data() | {:error, any}

@doc """
Wraps `Snowflex.param_query/4` and injects the relevant information from the connection
Wraps `Snowflex.run_query/3` and injects the relevant information from the connection
"""
@callback execute(query :: String.t(), params :: list(Snowflex.query_param())) ::
@callback execute(query :: Snowflex.Query.t(), connection_opts :: Snowflex.connection_opts()) ::
Snowflex.sql_data() | {:error, any}
end
63 changes: 63 additions & 0 deletions lib/snowflex/query.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
defmodule Snowflex.Query do
@moduledoc """
The module creates a structured data type for queries for easier execution.
"""

@type t :: %__MODULE__{
query_string: String.t(),
params: nil | list(Snowflex.query_param())
}

@type query_attrs :: %{
query_string: String.t(),
params: nil | list(String.t() | integer() | Date.t())
}

defstruct query_string: nil, params: nil

@doc """
Build a new Snowflex.Query struct. Will raise if unable to create.
"""
@spec create!(attrs :: query_attrs()) :: t()
def create!(attrs) do
attrs
|> cast_params()
|> build_struct()
end

@doc """
Build a new Snowflex.Query struct
"""
@spec create(attrs :: query_attrs()) :: {:ok, t()} | {:error, :invalid_query}
def create(attrs) do
try do
query = create!(attrs)

{:ok, query}
rescue
e in ArgumentError ->
{:error, e.message}
end
end

# HELPERS

defp cast_params(attrs = %{params: params}) when not is_nil(params) do
Map.put(attrs, :params, Enum.map(params, &do_param_cast/1))
end

defp cast_params(attrs), do: attrs

defp do_param_cast(param) when is_binary(param), do: {{:sql_varchar, 250}, param}
defp do_param_cast(param) when is_integer(param), do: {:sql_integer, param}
defp do_param_cast(param = %Date{}), do: {{:sql_varchar, 250}, Date.to_iso8601(param)}
defp do_param_cast(_param), do: raise(ArgumentError, "unsupported parameter type given")

defp build_struct(attrs) do
unless Map.has_key?(attrs, :query_string) do
raise(ArgumentError, "must provide :query_string to build Query")
end

struct(__MODULE__, attrs)
end
end
11 changes: 6 additions & 5 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
"decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"},
"dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"},
"earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm", "8cf8a291ebf1c7b9539e3cddb19e9cef066c2441b1640f13c34c1d3cfc825fec"},
"earmark_parser": {:hex, :earmark_parser, "1.4.10", "6603d7a603b9c18d3d20db69921527f82ef09990885ed7525003c7fe7dc86c56", [:mix], [], "hexpm", "8e2d5370b732385db2c9b22215c3f59c84ac7dda7ed7e544d7c459496ae519c0"},
"ecto": {:hex, :ecto, "3.5.5", "48219a991bb86daba6e38a1e64f8cea540cded58950ff38fbc8163e062281a07", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "98dd0e5e1de7f45beca6130d13116eae675db59adfa055fb79612406acf6f6f1"},
"earmark_parser": {:hex, :earmark_parser, "1.4.13", "0c98163e7d04a15feb62000e1a891489feb29f3d10cb57d4f845c405852bbef8", [:mix], [], "hexpm", "d602c26af3a0af43d2f2645613f65841657ad6efc9f0e361c3b6c06b578214ba"},
"ecto": {:hex, :ecto, "3.6.2", "efdf52acfc4ce29249bab5417415bd50abd62db7b0603b8bab0d7b996548c2bc", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "efad6dfb04e6f986b8a3047822b0f826d9affe8e4ebdd2aeedbfcb14fd48884e"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.23.0", "a069bc9b0bf8efe323ecde8c0d62afc13d308b1fa3d228b65bca5cf8703a529d", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "f5e2c4702468b2fd11b10d39416ddadd2fcdd173ba2a0285ebd92c39827a5a16"},
"ex_doc": {:hex, :ex_doc, "0.24.2", "e4c26603830c1a2286dae45f4412a4d1980e1e89dc779fcd0181ed1d5a05c8d9", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "e134e1d9e821b8d9e4244687fb2ace58d479b67b282de5158333b0d57c6fb7da"},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.0", "98312c9f0d3730fde4049985a1105da5155bfe5c11e47bdc7406d88e01e4219b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "75ffa34ab1056b7e24844c90bfc62aaf6f3a37a15faa76b07bc5eba27e4a8b4a"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
}
12 changes: 10 additions & 2 deletions test/snowflex/connection_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Snowflex.ConnectionTest do
use ExUnit.Case, async: true

alias Snowflex.Query

defmodule SnowflakeConnection do
use Snowflex.Connection,
otp_app: :snowflex
Expand Down Expand Up @@ -34,15 +36,21 @@ defmodule Snowflex.ConnectionTest do
test "should execute a sql query" do
start_supervised!(SnowflakeConnection)

assert [%{"col" => 1}, %{"col" => 2}] == SnowflakeConnection.execute("my query")
assert [%{"col" => 1}, %{"col" => 2}] ==
%{query_string: "my query"}
|> Query.create!()
|> SnowflakeConnection.execute()
end
end

describe "execute/2" do
test "should execute a param query" do
start_supervised!(SnowflakeConnection)

assert [%{"col" => 1}, %{"col" => 2}] == SnowflakeConnection.execute("my query", [])
assert [%{"col" => 1}, %{"col" => 2}] ==
%{query_string: "my query", params: []}
|> Query.create!()
|> SnowflakeConnection.execute()
end
end
end
51 changes: 51 additions & 0 deletions test/snowflex/query_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
defmodule Snowflex.QueryTest do
use ExUnit.Case

alias Snowflex.Query

describe "create!/1" do
test "should create a new Snowflex.Query with no params" do
assert %Query{query_string: "my string", params: nil} =
Query.create!(%{query_string: "my string"})
end

test "should create a Snowflex.Query with params" do
assert %Query{query_string: "my string", params: [{{:sql_varchar, 250}, "hi"}]} =
Query.create!(%{query_string: "my string", params: ["hi"]})
end

test "should raise an error when missing params" do
assert_raise ArgumentError, "must provide :query_string to build Query", fn ->
Query.create!(%{params: ["hi"]})
end
end

test "should raise an error with an unsupported param type" do
assert_raise ArgumentError, "unsupported parameter type given", fn ->
Query.create!(%{query_string: "my string", params: [nil]})
end
end
end

describe "create/1" do
test "should create a new Snowflex.Query with no params" do
assert {:ok, %Query{query_string: "my string", params: nil}} =
Query.create(%{query_string: "my string"})
end

test "should create a Snowflex.Query with params" do
assert {:ok, %Query{query_string: "my string", params: [{{:sql_varchar, 250}, "hi"}]}} =
Query.create(%{query_string: "my string", params: ["hi"]})
end

test "should return an error when missing params" do
assert {:error, "must provide :query_string to build Query"} =
Query.create(%{params: ["hi"]})
end

test "should return an error with an unsupported param type" do
assert {:error, "unsupported parameter type given"} =
Query.create(%{query_string: "my string", params: [nil]})
end
end
end
2 changes: 1 addition & 1 deletion test/snowflex/worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ defmodule Snowflex.WorkerTest do
Process.sleep(7)

Snowflex.Worker.param_query(worker, "SELECT * FROM my_table WHERE name=?", [
Snowflex.string_param("dustin")
{{:sql_varchar, 250}, "dustin"}
])

Process.sleep(7)
Expand Down