Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when used in transactions #129

Closed
arnodirlam opened this issue Aug 11, 2021 · 16 comments
Closed

Error when used in transactions #129

arnodirlam opened this issue Aug 11, 2021 · 16 comments

Comments

@arnodirlam
Copy link

Hi there,

we're using Dataloader without Absinthe and came across this error:

** (DBConnection.ConnectionError) could not checkout the connection owned by #PID<0.3039.0>. When using the sandbox, connections are shared, so this may imply another process is using a connection. Reason: connection not available and request was dropped from queue after 1562ms.

when running the following code in a test:

Ecto.Multi.new()
|> Ecto.Multi.run(:whatever, fn _, _ ->
  Dataloader.new()
  |> Dataloader.add_source(:repo, Dataloader.Ecto.new(MyApp.Repo))
  |> Dataloader.load(:repo, :roles, user)
  |> Dataloader.run()
end)
|> Ev2.Repo.transaction()

Outside of an Ecto.Multi, it works:

Dataloader.new()
|> Dataloader.add_source(:repo, Dataloader.Ecto.new(MyApp.Repo))
|> Dataloader.load(:repo, :roles, user)
|> Dataloader.run()

Is this something that can and should be fixed in Dataloader?

Thanks a lot for this great project! 🙌

@benwilson512
Copy link
Contributor

What version of Dataloader / Ecto are you using?

@arnodirlam
Copy link
Author

Tried with

  • Dataloader 1.0.8 and master
  • Ecto 3.6.2 and master

@benwilson512
Copy link
Contributor

Interesting. Dataloader.Ecto explicitly sets the caller value to the pid that calls load in order to avoid this issue. A test case which reproduces this would be very helpful if you have time open a PR with one.

@arnodirlam
Copy link
Author

Turns out it just expects an :ok tuple, which we did not return. So this works:

Ecto.Multi.new()
|> Ecto.Multi.run(:whatever, fn _, _ ->
  loader =
    Dataloader.new()
    |> Dataloader.add_source(:repo, Dataloader.Ecto.new(MyApp.Repo))
    |> Dataloader.load(:repo, :roles, user)
    |> Dataloader.run()

  {:ok, loader}
end)
|> Ev2.Repo.transaction()

Thanks a lot for your time and help! 🙏 Closing this issue..

@arnodirlam
Copy link
Author

We ran into issues again, now with a correct return value for Multi.run/3. Please see my updated PR branch.

[error] Task #PID<0.938.0> started from #PID<0.936.0> terminating
** (DBConnection.ConnectionError) could not checkout the connection owned by #PID<0.931.0>. When using the sandbox, connections are shared, so this may imply another process is using a connection. Reason: connection not available and request was dropped from queue after 999ms. You can configure how long requests wait in the queue using :queue_target and :queue_interval. See DBConnection.start_link/2 for more information
    (ecto_sql 3.4.4) lib/ecto/adapters/sql.ex:593: Ecto.Adapters.SQL.raise_sql_call_error/1
    (ecto_sql 3.4.4) lib/ecto/adapters/sql.ex:526: Ecto.Adapters.SQL.execute/5
    (ecto 3.4.5) lib/ecto/repo/queryable.ex:192: Ecto.Repo.Queryable.execute/4
    (ecto 3.4.5) lib/ecto/repo/queryable.ex:17: Ecto.Repo.Queryable.all/3
    (dataloader 1.0.8) lib/dataloader/ecto.ex:328: Dataloader.Ecto.run_batch/6
    (dataloader 1.0.8) lib/dataloader/ecto.ex:656: Dataloader.Source.Dataloader.Ecto.run_batch/2
    (dataloader 1.0.8) lib/dataloader/ecto.ex:614: anonymous fn/2 in Dataloader.Source.Dataloader.Ecto.run_batches/1
    (elixir 1.12.1) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
    (elixir 1.12.1) lib/task/supervised.ex:35: Task.Supervised.reply/5
    (stdlib 3.15.1) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Function: &:erlang.apply/2
    Args: [#Function<14.77029692/1 in Dataloader.Source.Dataloader.Ecto.run_batches/1>, [{{:queryable, #PID<0.931.0>, Dataloader.User, :one, :id, %{}}, #MapSet<[{490, 490}]>}]]

Grateful for any hints or pointers! 🙏

@arnodirlam
Copy link
Author

@benwilson512 not sure if you've seen I re-opened this. I tried to fix it, but didn't get very far in my attempts.

Can you confirm that it's an issue in dataloader, or am I overlooking something again? 🙂

@benwilson512
Copy link
Contributor

@arnodirlam are you able to create an example I can run?

@arnodirlam
Copy link
Author

@benwilson512 Yes, the tests in the linked PR should fail: #130

@giddie
Copy link

giddie commented Oct 25, 2021

I'm seeing something similar when using a transaction. As far as I can tell, the issue is:

  • Test process calls a function that starts a Repo.transaction
  • Inside transaction a Dataloader is created and run
  • Dataloader.Ecto.run_batches spawns async tasks to run the batches
  • The run_batch function (now in a separate process) calls a Repo function
  • The Repo function attempts to checkout a connection
  • The only connection is being held by Repo.transaction in the first step, which is in a different process
  • The run_batch function times out and fails

@giddie
Copy link

giddie commented Oct 25, 2021

@arnodirlam I created #134 to solve my issue with Repo.transaction. I suspect this may solve your problem too.

@arnodirlam
Copy link
Author

arnodirlam commented Nov 10, 2021

Thanks a lot for confirming the issue and for coming up with a PR, @giddie 🙏

I've done more research, and found out the following:

Why does dataloader use async processes?

Partly for error handling, partly for performance optimization.

The hierarchy of processes looks as follows:

  1. A process calls Dataloader
  2. A process is spawned for loading all sources (using Task.async + Task.await) [Dataloader.run/1]
  3. A process is spawned for each source (using Task.async_stream) [Dataloader.run_tasks/3]
  4. For each source, a process is spawned for loading all batches (using Task.async + Task.await) [Dataloader.Ecto.run/1]
  5. For each source, a process is spawned for each batch (using Task.async_stream) [Dataloader.Ecto.run_batches/1]

Spawning 2. and 4. is for catching errors raised while loading sources and batches, respectively. They trap exits (docs).
Spawning 3. and 5. is for loading sources and batches in parallel. They are also used to enforce timeouts on loading each batch.

Why doesn't that work with transactions?

A transaction is bound to a DB connection that's held by the process calling Dataloader. DB connections, however, cannot be shared between processes.

I believe it is not about the sandbox. Transactions are per process, you cannot share a transaction across multiple process. If you have work you want to process in parallel, you can do all of the normalization on the side and leave the final step of inserting the data to the main process.

-- José Valim on Ecto mailing list (Mar 13, 2017)

How does Ecto itself handle this?

Ecto's preloader checks whether the process it's called from already has a checked out connection. If yes, it does not preload in parallel (using Task.async_stream) but synchronously:

    if match?([_,_|_], preloaders) and not checked_out?(repo_name) and
         Keyword.get(opts, :in_parallel, true) do
      preloaders
      |> Task.async_stream(&(&1.(opts)), timeout: :infinity)
      |> Enum.map(fn {:ok, assoc} -> assoc end)
    else
      Enum.map(preloaders, &(&1.(opts)))
    end

-- shortened version of the code in Ecto.Repo.Preloader introduced by José Valim in 78ba871

How to fix the issue?

By restructuring Dataloader to not spawn processes if it's run from within a transaction. Instead of using Task.async for catching errors, we could wrap calls to the data-loading in try/rescue blocks. Would that cover all error cases?

In particular, I suggest the following changes (will update according to feedback):

  • Replace uses of Task.async by counterparts using try/rescue (levels 2. and 4. in process hierarchy)
  • Categorize sources into those that need to be run synchronously (i.e. Ecto sources with a checked out connection) and those that can be run asynchronously. Run the async ones using Task.async, then run the synchronous ones sequentially, then Task.await all async ones.
  • Load batches synchronously if the source needs to be run synchronously
  • For Ecto sources loading batches synchronously, handle timeouts using the :timeout option on Ecto.Repo functions (docs) as timeouts aren't enforced anymore by processes spawned using Task.async_stream. The timeout for loading each batch equals the total timeout (for all batches) minus the total time used so far for loading previous batches.
  • Optimization: Don't spawn processes to load only a single source or batch (see Ecto.Repo.Preloader code above)
  • Optional: Add source option :in_parallel to force running in parallel (similar to @giddie's PR #314, but named like in Ecto) - up to you, @giddie

I know it's quite a bit of refactoring, but I think it's worth it!

Looking forward to feedback and discussion! @giddie @benwilson512 😄

@arnodirlam arnodirlam changed the title Use in Ecto.Multi.run within tests causes Ecto connection sandbox error Error when used in transactions Nov 10, 2021
@arnodirlam
Copy link
Author

Also @seddy who worked on the async parts a lot 👌

@seddy
Copy link
Contributor

seddy commented Nov 10, 2021

Don't think you meant me @arnodirlam, though I'm more than happy to take credit for someone else's work obviously 😹

EDIT: Unless you're referring to this thing I did ages ago and had completely forgotten about 😅 #41

@arnodirlam
Copy link
Author

arnodirlam commented Nov 10, 2021

So this wasn't you? @seddy ce9ac3f 🤔

EDIT: Yea, we're talking ages ago 😄

@giddie
Copy link

giddie commented Nov 11, 2021

Great work @arnodirlam. Looks like you went a few steps further than me by delving into Ecto to find a better solution and writing it all up :) Your recommendations sound good to me.

@benwilson512
Copy link
Contributor

Handled now in 1.0.11. To use, make sure that when you call Dataloader.new/1 you do:

loader = Dataloader.new(async?: !MyApp.Repo.in_transaction?())

This will be handled automatically in 2.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants