Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a query struct for managing query data #19

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## Breaking

- add `Snowflex.Query` as the way to create queries and pass data.
- remove `Snowflex.sql_query` and `Snowflex.param_query` in favor of using a single `Snowflex.do_query` function that uses the new `Snowflex.Query`.

## Added

- `Snowflex.Connection.execute` now accepts a keyword list of options to override `Snowflex.Connection`-level options on a per query basis.
- Updated deps

## [0.3.2] - 2021-06-02

### Added

- Added `map_nulls_to_nil?` variable to connection configuration to allow conversion of `:null` values to `:nil` in snowflake query response

## [0.3.1] - 2021-03-10
Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,24 @@ def MyApp.Application do
end
```

## Usage

In order to execute queries, you will need to create a `Snowflex.Query` struct and pass it to your execute function. It would look something like this for a static query.

```elixir
{:ok, query} = Snowflex.Query.create(%{query_string: "SELECT * FROM my_table"})
MyApp.SnowflakeConnection.execute(query)
```

The `Query.create/1` function will also attempt to cast your SQL params.

```elixir
# This will cast the params to [{:sql_integer, 1}]
{:ok, query} = Snowflex.Query.create(%{query_string: "SELECT * FROM my_table WHERE id = ?", params: [1]})

MyApp.SnowflakeConnection.execute(query)
```

## Caveats

If you are planning to connect to the Snowflake warehouse, your local Erlang instance
Expand Down
23 changes: 9 additions & 14 deletions lib/snowflex.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
defmodule Snowflex do
@moduledoc """
The client interface for connecting to the Snowflake data warehouse.
The client interface for connecting to the Snowflake data warehouse. This module should not be used directly except for the types. The preferred method is to use a `Snowflex.Connection` to manage all query executions.

The main entry point to this module is `Snowflex.sql_query`. This function takes a string containing
The main entry point to this module is `Snowflex.do_query`. This function takes a `Snowflex.Query` struct containing
a SQL query and returns a list of maps (one per row). NOTE: due to the way the Erlang ODBC works, all values comeback
as strings. You will need to cast values appropriately.
"""
alias Ecto.Changeset
alias Snowflex.Worker
alias Snowflex.{Worker, Query}

# Shamelessly copied from http://erlang.org/doc/man/odbc.html#common-data-types-
@type precision :: integer()
Expand All @@ -34,31 +34,29 @@ defmodule Snowflex do

@type query_param :: {odbc_data_type(), [value()]}
@type sql_data :: list(%{optional(String.t()) => String.t()})
@type query_opts :: [timeout: timeout(), map_null_to_nil?: boolean()]
@type connection_opts :: [timeout: timeout(), map_null_to_nil?: boolean()]

@spec sql_query(atom(), String.t(), query_opts()) ::
@spec do_query(pool_name :: atom(), query :: Query.t(), connection_opts()) ::
sql_data() | {:error, term()}
def sql_query(pool_name, query, opts) do
def do_query(pool_name, query = %Query{params: nil}, opts) do
zbarnes757 marked this conversation as resolved.
Show resolved Hide resolved
timeout = Keyword.get(opts, :timeout)

case :poolboy.transaction(
pool_name,
&Worker.sql_query(&1, query, timeout),
&Worker.sql_query(&1, query.query_string, timeout),
timeout
) do
{:ok, results} -> process_results(results, opts)
err -> err
end
end

@spec param_query(atom(), String.t(), list(query_param()), query_opts()) ::
sql_data() | {:error, term()}
def param_query(pool_name, query, params, opts) do
def do_query(pool_name, query = %Query{}, opts) do
timeout = Keyword.get(opts, :timeout)

case :poolboy.transaction(
pool_name,
&Worker.param_query(&1, query, params, timeout),
&Worker.param_query(&1, query.query_string, query.params, timeout),
timeout
) do
{:ok, results} -> process_results(results, opts)
Expand All @@ -70,9 +68,6 @@ defmodule Snowflex do
Enum.map(data, &cast_row(&1, schema))
end

def int_param(val), do: {:sql_integer, val}
def string_param(val, length \\ 250), do: {{:sql_varchar, length}, val}

# Helpers

defp process_results(data, opts) when is_list(data) do
Expand Down
44 changes: 22 additions & 22 deletions lib/snowflex/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ defmodule Snowflex.Connection do
use Snowflex.Connection,
otp_app: :my_app,
timeout: :timer.seconds(60),
keep_alive?: true
keep_alive?: true,
map_nulls_to_nil?: true
end
```

Expand Down Expand Up @@ -67,19 +68,27 @@ defmodule Snowflex.Connection do

`execute/1`
```
query = "SELECT * FROM foo"
{:ok, query} = Snowflex.Query.create(%{query_string: "SELECT * FROM foo"})

SnowflakeConnection.execute(query)
```

`execute/2`
```
query = \"""
# or with parameters

query_string = \"""
SELECT * FROM foo
WHERE bar = ?
\"""

SnowflakeConnection.execute(query, [Snowflex.string_param("baz")])
{:ok, query} = Snowflex.Query.create(%{query_string: query_string, params: ["baz"]})
SnowflakeConnection.execute(query)
```

You may also override any Connection-level options on each execute.

```
{:ok, query} = Snowflex.Query.create(%{query_string: "SELECT * FROM foo"})

SnowflakeConnection.execute(query, timeout: :timer.minutes(1))
```
"""

Expand All @@ -102,7 +111,7 @@ defmodule Snowflex.Connection do
]
@keep_alive? keep_alive?
@heartbeat_interval :timer.hours(3)
@query_opts [
@default_connection_opts [
timeout: timeout,
map_nulls_to_nil?: map_nulls_to_nil?
]
Expand Down Expand Up @@ -133,27 +142,18 @@ defmodule Snowflex.Connection do
end

@impl Snowflex.Connection
def execute(query) when is_binary(query) do
Snowflex.sql_query(@name, query, @query_opts)
end

@impl Snowflex.Connection
def execute(query, params) when is_binary(query) and is_list(params) do
Snowflex.param_query(@name, query, params, @query_opts)
def execute(query = %Snowflex.Query{}, connection_opts \\ []) do
connection_opts = Keyword.merge(@default_connection_opts, connection_opts)
Snowflex.do_query(@name, query, connection_opts)
end
end
end

## Callbacks

@doc """
Wraps `Snowflex.sql_query/3` and injects the relevant information from the connection
"""
@callback execute(query :: String.t()) :: Snowflex.sql_data() | {:error, any}

@doc """
Wraps `Snowflex.param_query/4` and injects the relevant information from the connection
Wraps `Snowflex.do_query/3` and injects the relevant information from the connection
"""
@callback execute(query :: String.t(), params :: list(Snowflex.query_param())) ::
@callback execute(query :: Snowflex.Query.t(), connection_opts :: Snowflex.connection_opts()) ::
Snowflex.sql_data() | {:error, any}
end
49 changes: 49 additions & 0 deletions lib/snowflex/query.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
defmodule Snowflex.Query do
@moduledoc """
The module creates a structured data type for queries for easier execution.
"""

use Ecto.Schema
import Ecto.Changeset

alias Snowflex.Types.SQLParam

@type query_attrs :: %{
query_string: String.t(),
params: nil | list()
}

@required_fields ~w(query_string)a
@fields @required_fields ++ ~w(params)a

@primary_key false
embedded_schema do
field(:query_string, :string)
field(:params, {:array, SQLParam})
end

@doc """
Build a new Snowflex.Query struct
"""
@spec create(attrs :: query_attrs()) :: {:ok, %__MODULE__{}} | {:error, Ecto.Changeset.t()}
def create(attrs) do
%__MODULE__{}
|> cast(attrs, @fields)
|> validate_required(@required_fields)
|> case do
changeset = %Ecto.Changeset{valid?: true} ->
query = apply_changes(changeset)
params = dump_params(query.params)

{:ok, %__MODULE__{query | params: params}}

changeset ->
{:error, changeset}
end
end

# HELPERS

defp dump_params(nil), do: nil
defp dump_params(params), do: Enum.flat_map(params, &Map.to_list/1)
end
21 changes: 21 additions & 0 deletions lib/snowflex/types/sql_param.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule Snowflex.Types.SQLParam do
@moduledoc """
Ecto.Type to cast params into odbc acceptable params
"""
use Ecto.Type

@impl true
def type, do: :map

@impl true
def cast(term) when is_binary(term), do: {:ok, %{{:sql_varchar, 250} => term}}
def cast(term) when is_integer(term), do: {:ok, %{sql_integer: term}}
def cast(term = %Date{}), do: {:ok, %{{:sql_varchar, 250} => Date.to_iso8601(term)}}
def cast(_term), do: :error

@impl true
def load(term), do: {:ok, term}

@impl true
def dump(term), do: {:ok, term}
end
11 changes: 6 additions & 5 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
"decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"},
"dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"},
"earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm", "8cf8a291ebf1c7b9539e3cddb19e9cef066c2441b1640f13c34c1d3cfc825fec"},
"earmark_parser": {:hex, :earmark_parser, "1.4.10", "6603d7a603b9c18d3d20db69921527f82ef09990885ed7525003c7fe7dc86c56", [:mix], [], "hexpm", "8e2d5370b732385db2c9b22215c3f59c84ac7dda7ed7e544d7c459496ae519c0"},
"ecto": {:hex, :ecto, "3.5.5", "48219a991bb86daba6e38a1e64f8cea540cded58950ff38fbc8163e062281a07", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "98dd0e5e1de7f45beca6130d13116eae675db59adfa055fb79612406acf6f6f1"},
"earmark_parser": {:hex, :earmark_parser, "1.4.13", "0c98163e7d04a15feb62000e1a891489feb29f3d10cb57d4f845c405852bbef8", [:mix], [], "hexpm", "d602c26af3a0af43d2f2645613f65841657ad6efc9f0e361c3b6c06b578214ba"},
"ecto": {:hex, :ecto, "3.6.2", "efdf52acfc4ce29249bab5417415bd50abd62db7b0603b8bab0d7b996548c2bc", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "efad6dfb04e6f986b8a3047822b0f826d9affe8e4ebdd2aeedbfcb14fd48884e"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.23.0", "a069bc9b0bf8efe323ecde8c0d62afc13d308b1fa3d228b65bca5cf8703a529d", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "f5e2c4702468b2fd11b10d39416ddadd2fcdd173ba2a0285ebd92c39827a5a16"},
"ex_doc": {:hex, :ex_doc, "0.24.2", "e4c26603830c1a2286dae45f4412a4d1980e1e89dc779fcd0181ed1d5a05c8d9", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "e134e1d9e821b8d9e4244687fb2ace58d479b67b282de5158333b0d57c6fb7da"},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.0", "98312c9f0d3730fde4049985a1105da5155bfe5c11e47bdc7406d88e01e4219b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "75ffa34ab1056b7e24844c90bfc62aaf6f3a37a15faa76b07bc5eba27e4a8b4a"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
}
10 changes: 8 additions & 2 deletions test/snowflex/connection_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Snowflex.ConnectionTest do
use ExUnit.Case, async: true

alias Snowflex.Query

defmodule SnowflakeConnection do
use Snowflex.Connection,
otp_app: :snowflex
Expand Down Expand Up @@ -34,15 +36,19 @@ defmodule Snowflex.ConnectionTest do
test "should execute a sql query" do
start_supervised!(SnowflakeConnection)

assert [%{"col" => 1}, %{"col" => 2}] == SnowflakeConnection.execute("my query")
{:ok, query} = Query.create(%{query_string: "my query"})

assert [%{"col" => 1}, %{"col" => 2}] == SnowflakeConnection.execute(query)
end
end

describe "execute/2" do
test "should execute a param query" do
start_supervised!(SnowflakeConnection)

assert [%{"col" => 1}, %{"col" => 2}] == SnowflakeConnection.execute("my query", [])
{:ok, query} = Query.create(%{query_string: "my query", params: []})

assert [%{"col" => 1}, %{"col" => 2}] == SnowflakeConnection.execute(query)
end
end
end
21 changes: 21 additions & 0 deletions test/snowflex/query_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule Snowflex.QueryTest do
use ExUnit.Case

alias Snowflex.Query

describe "create/1" do
test "should create a new Snowflex.Query with no params" do
assert {:ok, %Query{query_string: "my string", params: nil}} =
Query.create(%{query_string: "my string"})
end

test "should create a Snowflex.Query with params" do
assert {:ok, %Query{query_string: "my string", params: [{{:sql_varchar, 250}, "hi"}]}} =
Query.create(%{query_string: "my string", params: ["hi"]})
end

test "should return an error when missing params" do
assert {:error, %Ecto.Changeset{}} = Query.create(%{params: ["hi"]})
end
end
end