Skip to content

Commit

Permalink
Do not run parallel preloads if connection is checked out, closes #3584
Browse files Browse the repository at this point in the history
  • Loading branch information
josevalim committed Apr 24, 2021
1 parent abed68f commit 78ba871
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 3 deletions.
5 changes: 5 additions & 0 deletions lib/ecto/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ defmodule Ecto.Adapter do
"""
@callback checkout(adapter_meta, config :: Keyword.t(), (() -> result)) :: result when result: var

@doc """
Returns true if a connection has been checked out.
"""
@callback checked_out?(adapter_meta) :: boolean

@doc """
Returns the loaders for a given type.
Expand Down
27 changes: 27 additions & 0 deletions lib/ecto/repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ defmodule Ecto.Repo do
adapter.checkout(meta, opts, fun)
end

def checked_out? do
{adapter, meta} = Ecto.Repo.Registry.lookup(get_dynamic_repo())
adapter.checked_out?(meta)
end

@compile {:inline, get_dynamic_repo: 0, with_default_options: 2}

def get_dynamic_repo() do
Expand Down Expand Up @@ -460,6 +465,28 @@ defmodule Ecto.Repo do
"""
@callback checkout((() -> result), opts :: Keyword.t()) :: result when result: var

@doc """
Returns true if a connection has been checked out.
This is true if inside a `c:Ecto.Repo.checkout/2` or
`c:Ecto.Repo.transaction/2`.
## Examples
MyRepo.checked_out?
#=> false
MyRepo.transaction(fn ->
MyRepo.checked_out? #=> true
end)
MyRepo.checkout(fn ->
MyRepo.checked_out? #=> true
end)
"""
@callback checked_out?() :: boolean

@doc """
Loads `data` into a struct or a map.
Expand Down
7 changes: 6 additions & 1 deletion lib/ecto/repo/preloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ defmodule Ecto.Repo.Preloader do

# Then we execute queries in parallel
defp maybe_pmap(preloaders, repo_name, opts) do
if match?([_,_|_], preloaders) and not Ecto.Repo.Transaction.in_transaction?(repo_name) and
if match?([_,_|_], preloaders) and not checked_out?(repo_name) and
Keyword.get(opts, :in_parallel, true) do
# We pass caller: self() so the ownership pool knows where
# to fetch the connection from and set the proper timeouts.
Expand All @@ -138,6 +138,11 @@ defmodule Ecto.Repo.Preloader do
end
end

defp checked_out?(repo_name) do
{adapter, meta} = Ecto.Repo.Registry.lookup(repo_name)
adapter.checked_out?(meta)
end

# Then we unpack the query results, merge them, and preload recursively
defp preload_assocs(
[{assoc, query?, loaded_ids, loaded_structs, preloads} | assocs],
Expand Down
1 change: 1 addition & 0 deletions test/ecto/repo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,7 @@ defmodule Ecto.RepoTest do
def loaders(_, _), do: raise "not implemented"
def init(_), do: raise "not implemented"
def checkout(_, _, _), do: raise "not implemented"
def checked_out?(_), do: raise "not implemented"
def ensure_all_started(_, _), do: raise "not implemented"
end

Expand Down
2 changes: 2 additions & 0 deletions test/mix/tasks/ecto.create_drop_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Mix.Tasks.Ecto.CreateDropTest do
def loaders(_, _), do: raise "not implemented"
def init(_), do: raise "not implemented"
def checkout(_, _, _), do: raise "not implemented"
def checked_out?(_), do: raise "not implemented"
def ensure_all_started(_, _), do: raise "not implemented"

def storage_up(_), do: Process.get(:storage_up) || raise "no storage_up"
Expand All @@ -28,6 +29,7 @@ defmodule Mix.Tasks.Ecto.CreateDropTest do
def loaders(_, _), do: raise "not implemented"
def init(_), do: raise "not implemented"
def checkout(_, _, _), do: raise "not implemented"
def checked_out?(_), do: raise "not implemented"
def ensure_all_started(_, _), do: raise "not implemented"
end

Expand Down
14 changes: 12 additions & 2 deletions test/support/test_repo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,19 @@ defmodule Ecto.TestAdapter do
{:ok, Supervisor.child_spec({Task, fn -> :timer.sleep(:infinity) end}, []), %{meta: :meta}}
end

def checkout(_mod, _opts, fun) do
def checkout(mod, _opts, fun) do
send self(), {:checkout, fun}
fun.()
Process.put({mod, :checked_out?}, true)

try do
fun.()
after
Process.delete({mod, :checked_out?})
end
end

def checked_out?(mod) do
Process.get({mod, :checked_out?}) || false
end

## Types
Expand Down

0 comments on commit 78ba871

Please sign in to comment.