Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AUTO-3289] General improvements, add DBConnection adapter #32

Merged
merged 10 commits into from
Jan 18, 2022
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,54 @@ def deps do
]
end
```

## DBConnection Support

[DBConnection](https://github.com/elixir-ecto/db_connection) support is currently in experimental phase, setting it up is very similar to current implementation with the expection of configuration options and obtaining the same results will require an extra step:

### Configuration:

Setting a Module to hold the connection is very similar, but instead you'll use `Snowflex.DBConnection`:

Example:

```elixir
defmodule MyApp.SnowflakeConnection do
use Snowflex.DBConnection,
otp_app: :my_app,
timeout: :timer.minutes(5)
end
```

```elixir
config :my_app, MyApp.SnowflakeConnection,
pool_size: 5, # the connection pool size
worker: MyApp.CustomWorker, # defaults to Snowflex.DBConnection.Server
connection: [
role: "PROD",
warehouse: System.get_env("SNOWFLAKE_POS_WH"),
uid: System.get_env("SNOWFLAKE_POS_UID"),
pwd: System.get_env("SNOWFLAKE_POS_PWD")
]
```

### Usage:

After setup, you can use your connection to query:

```elixir
alias Snowflex.DBConnection.Result

{:ok, %Result{} = result} = MyApp.SnowflakeConnection.execute("my query")
{:ok, %Result{} = result} = MyApp.SnowflakeConnection.execute("my query", ["my params"])
```

As you can see we now receive an `{:ok, result}` tuple, to get results as expected with current implementation, we need to call `process_result/1`:

```elixir
alias Snowflex.DBConnection.Result

{:ok, %Result{} = result} = MyApp.SnowflakeConnection.execute("my query")

[%{"col" => 1}, %{"col" => 2}] = SnowflakeDBConnection.process_result(result)
```
9 changes: 9 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@ config :snowflex, Snowflex.ConnectionTest.SnowflakeConnection,
min: 1,
max: 1
]

config :snowflex, Snowflex.DBConnectionTest.SnowflakeDBConnection,
worker: Snowflex.DBConnectionTest.MockWorker,
pool_size: 3,
connection: [
server: "snowflex.us-east-8.snowflakecomputing.com",
role: "DEV",
warehouse: "CUSTOMER_DEV_WH"
]
65 changes: 65 additions & 0 deletions lib/snowflex/db_connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
defmodule Snowflex.DBConnection do
@moduledoc """
Defines a Snowflake connection with DBConnection adapter.

## Definition

When used, the connection expects the `:otp_app` option. You may also define a standard timeout.
This will default to 60 seconds.

```
defmodule SnowflakeDBConnection do
use Snowflex.DBConnection,
otp_app: :my_app,
timeout: :timer.seconds(60)
end
```
"""

alias Snowflex.DBConnection.{
Protocol,
Query,
Result
}

@doc false
defmacro __using__(opts) do
quote bind_quoted: [opts: opts] do
# setup compile time config
otp_app = Keyword.fetch!(opts, :otp_app)
timeout = Keyword.get(opts, :timeout, :timer.seconds(60))

@otp_app otp_app
@timeout timeout
@name __MODULE__

def child_spec(_) do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we ignore child spec input options?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opts should be provided statically when configuring your connection module with use Snowflake.DBConnection, opt1: _, opt2: _, .... Ignoring the opts in child_spec/1 ensures that the static config in the use macro is always correct, and not accidentally overridden in the supervision tree. I like this approach 👍

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, thanks

config = Application.get_env(@otp_app, __MODULE__, [])
connection = Keyword.get(config, :connection, [])

opts =
Keyword.merge(config,
name: @name,
timeout: @timeout,
connection: connection
)

DBConnection.child_spec(Protocol, opts)
end

def execute(statement, params \\ []) when is_binary(statement) and is_list(params) do
case prepare_execute("", statement, params) do
{:ok, _query, result} -> {:ok, result}
{:error, error} -> {:error, error}
end
end

defdelegate process_result(result, opts \\ [map_nulls_to_nil?: true]), to: Result

defp prepare_execute(name, statement, params, opts \\ []) do
query = %Query{name: name, statement: statement}
DBConnection.prepare_execute(@name, query, params, opts)
end
end
end
end
30 changes: 30 additions & 0 deletions lib/snowflex/db_connection/error.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule Snowflex.DBConnection.Error do
@moduledoc """
Defines an error returned from the ODBC adapter.
"""

defexception [:message]

@type t :: %__MODULE__{
message: String.t()
}

@spec exception(term()) :: t()
def exception({odbc_code, native_code, reason}) do
message =
to_string(reason) <>
" - ODBC_CODE: " <>
to_string(odbc_code) <>
" - SNOWFLAKE_CODE: " <> to_string(native_code)

%__MODULE__{
message: message
}
end

def exception(message) do
%__MODULE__{
message: to_string(message)
}
end
end
149 changes: 149 additions & 0 deletions lib/snowflex/db_connection/protocol.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
defmodule Snowflex.DBConnection.Protocol do
use DBConnection

require Logger

alias Snowflex.DBConnection.{
Query,
Result,
Server
}

defstruct pid: nil, status: :idle, conn_opts: [], worker: Server

@type state :: %__MODULE__{
pid: pid(),
status: :idle,
conn_opts: Keyword.t(),
worker: Server | any()
}

## DBConnection Callbacks

@impl DBConnection
def connect(opts) do
connection_args = Keyword.fetch!(opts, :connection)
worker = Keyword.get(opts, :worker, Server)

{:ok, pid} = worker.start_link(opts)

state = %__MODULE__{
pid: pid,
status: :idle,
conn_opts: connection_args,
worker: worker
}

{:ok, state}
end

@impl DBConnection
def disconnect(_err, %{pid: pid}), do: Server.disconnect(pid)

@impl DBConnection
def checkout(state), do: {:ok, state}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there should be any sort of limit on the maximum number of connections that can be checked out at once? Maybe it doesn't matter 🤷


@impl DBConnection
def ping(state) do
query = %Query{name: "ping", statement: "SELECT /* snowflex:heartbeat */ 1;"}

case do_query(query, [], [], state) do
{:ok, _, _, new_state} -> {:ok, new_state}
{:error, reason, new_state} -> {:disconnect, reason, new_state}
end
end

@impl DBConnection
def handle_prepare(query, _opts, state) do
{:ok, query, state}
end

@impl DBConnection
def handle_execute(query, params, opts, state) do
do_query(query, params, opts, state)
end

@impl DBConnection
def handle_status(_, %{status: {status, _}} = state), do: {status, state}
def handle_status(_, %{status: status} = state), do: {status, state}

@impl DBConnection
def handle_close(_query, _opts, state) do
{:ok, %Result{}, state}
end

## Not implemented Callbacks

@impl DBConnection
def handle_begin(_opts, _state) do
throw("not implemented")
end

@impl DBConnection
def handle_commit(_opts, _state) do
throw("not implemented")
end

@impl DBConnection
def handle_rollback(_opts, _state) do
throw("not implemented")
end

@impl DBConnection
def handle_declare(_query, _params, _opts, _state) do
throw("not implemeted")
end

@impl DBConnection
def handle_deallocate(_query, _cursor, _opts, _state) do
throw("not implemeted")
end

@impl DBConnection
def handle_fetch(_query, _cursor, _opts, _state) do
throw("not implemeted")
end

## Helpers

defp do_query(%Query{} = query, [], opts, %{worker: worker} = state) do
case worker.sql_query(state.pid, query.statement, opts) do
{:ok, result} ->
result = parse_result(result, query)
{:ok, query, result, state}

{:error, reason} ->
{:error, reason, state}
end
end

defp do_query(%Query{} = query, params, opts, %{worker: worker} = state) do
case worker.param_query(state.pid, query.statement, params, opts) do
{:ok, result} ->
result = parse_result(result, query)
{:ok, query, result, state}

{:error, reason} ->
{:error, reason, state}
end
end

defp parse_result({:selected, columns, rows, _}, query),
do: parse_result({:selected, columns, rows}, query)

defp parse_result({:selected, columns, rows}, query) do
parse_result(columns, rows, query)
end

defp parse_result(result, _query), do: result

defp parse_result(columns, rows, query) do
%Result{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should %Result structs always include statement? If not, it'd be nice to add something to the docs describing when it will/won't be set.

columns: Enum.map(columns, &to_string(&1)),
rows: rows,
num_rows: Enum.count(rows),
success: true,
statement: query.statement
}
end
end
29 changes: 29 additions & 0 deletions lib/snowflex/db_connection/query.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule Snowflex.DBConnection.Query do
defstruct [
:ref,
:name,
:statement,
:columns,
:result_oids,
cache: :reference
]

defimpl DBConnection.Query do
def parse(query, _opts), do: query
def describe(query, _opts), do: query
def encode(_query, params, _opts), do: params
def decode(_query, result, _opts), do: result
end

defimpl String.Chars do
alias Snowflex.DBConnection.Query

def to_string(%{statement: statement}) do
case statement do
statement when is_binary(statement) -> IO.iodata_to_binary(statement)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's already a binary, it should be OK to pass through directly.

Suggested change
statement when is_binary(statement) -> IO.iodata_to_binary(statement)
statement when is_binary(statement) -> statement

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Result.t(), statement is typed as String.t() | nil. If it can be iodata(), I would update the typespec to indicate that. A Query.t() typespec would also be a nice addition here.

statement when is_list(statement) -> IO.iodata_to_binary(statement)
%{statement: %Query{} = q} -> String.Chars.to_string(q)
end
end
end
end
Loading