From f9b3a74ed94de3e9ae41ca42b1d89ace71cb3963 Mon Sep 17 00:00:00 2001 From: Jeremy Evans Date: Mon, 20 Jan 2025 14:08:32 -0800 Subject: [PATCH] Add query_blocker extension, for blocking queries inside a block This is useful for ensuring a given block of code does not execute any queries. For use in concurrent programs, the query_blocker takes a :scope option for the scope of the block. --- CHANGELOG | 2 + lib/sequel/extensions/query_blocker.rb | 126 +++++++++++++++++++++++++ spec/adapters/spec_helper.rb | 2 +- spec/extensions/query_blocker_spec.rb | 124 ++++++++++++++++++++++++ spec/integration/plugin_test.rb | 34 +++++++ www/pages/plugins.html.erb | 4 + 6 files changed, 291 insertions(+), 1 deletion(-) create mode 100644 lib/sequel/extensions/query_blocker.rb create mode 100644 spec/extensions/query_blocker_spec.rb diff --git a/CHANGELOG b/CHANGELOG index 9f309c521..c7d4f6970 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,7 @@ === master +* Add query_blocker extension, for blocking queries inside a block (jeremyevans) + * Support alter_table add_primary_key/add_unique_constraint :using_index option on PostgreSQL 9.1+ (jeremyevans) * Add temporary names for internally defined anonymous classes and modules on Ruby 3.3+ (jeremyevans) diff --git a/lib/sequel/extensions/query_blocker.rb b/lib/sequel/extensions/query_blocker.rb new file mode 100644 index 000000000..2caacaaa0 --- /dev/null +++ b/lib/sequel/extensions/query_blocker.rb @@ -0,0 +1,126 @@ +# frozen-string-literal: true +# +# The query_blocker extension adds Database#block_queries. +# Inside the block passed to #block_queries, any attempts to +# execute a query/statement on the database will raise a +# Sequel::QueryBlocker::BlockedQuery exception. +# +# DB.extension :query_blocker +# DB.block_queries do +# ds = DB[:table] # No exception +# ds = ds.where(column: 1) # No exception +# ds.all # Attempts query, exception raised +# end +# +# To handle concurrency, you can pass a :scope option: +# +# # Current Thread +# DB.block_queries(scope: :thread){} +# +# # Current Fiber +# DB.block_queries(scope: :fiber){} +# +# # Specific Thread +# DB.block_queries(scope: Thread.current){} +# +# # Specific Fiber +# DB.block_queries(scope: Fiber.current){} +# +# Note that this should catch all queries executed through the +# Database instance. Whether it catches queries executed directly +# on a connection object depends on the adapter in use. +# +# Related module: Sequel::QueryBlocker + +# :nocov: +require "fiber" if RUBY_VERSION <= "2.7" +# :nocov: + +# +module Sequel + module QueryBlocker + # Exception class raised if there is an attempt to execute a + # query/statement on the database inside a block passed to + # block_queries. + class BlockedQuery < Sequel::Error + end + + def self.extended(db) + db.instance_exec do + @blocked_query_scopes ||= {} + end + end + + # Check whether queries are blocked before executing them. + def log_connection_yield(sql, conn, args=nil) + # All database adapters should be calling this method around + # query execution (otherwise the queries would not get logged), + # ensuring the blocking is checked. Any database adapter issuing + # a query without calling this method is considered buggy. + check_blocked_queries! + super + end + + # Whether queries are currently blocked. + def block_queries? + b = @blocked_query_scopes + Sequel.synchronize{b[:global] || b[Thread.current] || b[Fiber.current]} || false + end + + # Reject (raise an BlockedQuery exception) if there is an attempt to execute + # a query/statement inside the block. + # + # The :scope option indicates which queries are rejected inside the block: + # + # :global :: This is the default, and rejects all queries. + # :thread :: Reject all queries in the current thread. + # :fiber :: Reject all queries in the current fiber. + # Thread :: Reject all queries in the given thread. + # Fiber :: Reject all queries in the given fiber. + def block_queries(opts=OPTS) + case scope = opts[:scope] + when nil + scope = :global + when :global + # nothing + when :thread + scope = Thread.current + when :fiber + scope = Fiber.current + when Thread, Fiber + # nothing + else + raise Sequel::Error, "invalid scope given to block_queries: #{scope.inspect}" + end + + prev_value = nil + scopes = @blocked_query_scopes + + begin + Sequel.synchronize do + prev_value = scopes[scope] + scopes[scope] = true + end + + yield + ensure + Sequel.synchronize do + if prev_value + scopes[scope] = prev_value + else + scopes.delete(scope) + end + end + end + end + + private + + # Raise a BlockQuery exception if queries are currently blocked. + def check_blocked_queries! + raise BlockedQuery, "cannot execute query inside a block_queries block" if block_queries? + end + end + + Database.register_extension(:query_blocker, QueryBlocker) +end diff --git a/spec/adapters/spec_helper.rb b/spec/adapters/spec_helper.rb index f327d4130..3f48fa0fe 100644 --- a/spec/adapters/spec_helper.rb +++ b/spec/adapters/spec_helper.rb @@ -76,7 +76,7 @@ def DB.drop_table(*tables) if ENV['SEQUEL_FREEZE_DATABASE'] raise "cannot freeze database when running specs for specific adapters" if adapter_test_type - DB.extension(:constraint_validations, :string_agg, :date_arithmetic) + DB.extension(:constraint_validations, :string_agg, :date_arithmetic, :query_blocker) DB.extension(:pg_array) if DB.database_type == :postgres DB.freeze end diff --git a/spec/extensions/query_blocker_spec.rb b/spec/extensions/query_blocker_spec.rb new file mode 100644 index 000000000..70895aa74 --- /dev/null +++ b/spec/extensions/query_blocker_spec.rb @@ -0,0 +1,124 @@ +require_relative "spec_helper" + +describe "query_blocker extension" do + fiber_is_thread = RUBY_ENGINE == 'jruby' && Fiber.new{Thread.current}.resume != Thread.current + + before do + @db = Sequel.mock(:extensions=>[:query_blocker]) + @ds = @db[:items] + end + + it "#block_queries should block queries globally inside the block when called without options" do + @ds.all.must_equal [] + proc{@db.block_queries{@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries{Thread.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.join} + @ds.all.must_equal [] + end + + it "#block_queries should block queries globally inside the block when called with scope: :global" do + @ds.all.must_equal [] + proc{@db.block_queries(:scope=>:global){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries(:scope=>:global){Thread.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.join} + @ds.all.must_equal [] + end + + it "#block_queries should block queries inside the current thread when called with scope: :thread" do + @ds.all.must_equal [] + proc{@db.block_queries(:scope=>:thread){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries(:scope=>:thread){Thread.new{@ds.all}.value}.must_equal [] + @db.block_queries(:scope=>:thread){Fiber.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.resume} unless fiber_is_thread + @ds.all.must_equal [] + end + + it "#block_queries should block queries inside the current fiber when called with scope: :fiber" do + @ds.all.must_equal [] + proc{@db.block_queries(:scope=>:fiber){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries(:scope=>:fiber){Thread.new{@ds.all}.value}.must_equal [] + @db.block_queries(:scope=>:fiber){Fiber.new{@ds.all}.resume}.must_equal [] + @ds.all.must_equal [] + end + + it "#block_queries should block queries inside the given thread when called with scope: Thread" do + @ds.all.must_equal [] + proc{@db.block_queries(:scope=>Thread.current){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries(:scope=>Thread.current){Thread.new{@ds.all}.value}.must_equal [] + @db.block_queries(:scope=>Thread.current){Fiber.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.resume} unless fiber_is_thread + @ds.all.must_equal [] + end + + it "#block_queries should block queries inside the given fiber when called with scope: Fiber" do + @ds.all.must_equal [] + proc{@db.block_queries(:scope=>Fiber.current){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries(:scope=>Fiber.current){Thread.new{@ds.all}.value}.must_equal [] + @db.block_queries(:scope=>Fiber.current){Fiber.new{@ds.all}.resume}.must_equal [] + @ds.all.must_equal [] + end + + it "#block_queries should raise Error if called with unsupported :scope option" do + proc{@db.block_queries(:scope=>Object.new){}}.must_raise Sequel::Error + end + + it "#block_queries should handle nested usage" do + @ds.all.must_equal [] + Thread.new{@ds.all}.value.must_equal [] + Fiber.new{@ds.all}.resume.must_equal [] + + @db.block_queries(scope: :fiber) do + proc{@db.block_queries(:scope=>:fiber){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries(:scope=>:fiber){Thread.new{@ds.all}.value}.must_equal [] + @db.block_queries(:scope=>:fiber){Fiber.new{@ds.all}.resume}.must_equal [] + + @db.block_queries(scope: :fiber) do + proc{@db.block_queries(:scope=>:fiber){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries(:scope=>:fiber){Thread.new{@ds.all}.value}.must_equal [] + @db.block_queries(:scope=>:fiber){Fiber.new{@ds.all}.resume}.must_equal [] + end + + @db.block_queries(scope: :thread) do + proc{@db.block_queries(:scope=>:thread){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries(:scope=>:thread){Thread.new{@ds.all}.value}.must_equal [] + @db.block_queries(:scope=>:thread){Fiber.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.resume} unless fiber_is_thread + + @db.block_queries(scope: :thread) do + proc{@db.block_queries(:scope=>:thread){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries(:scope=>:thread){Thread.new{@ds.all}.value}.must_equal [] + @db.block_queries(:scope=>:thread){Fiber.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.resume} unless fiber_is_thread + end + + @db.block_queries do + proc{@ds.all}.must_raise Sequel::QueryBlocker::BlockedQuery + proc{@db.block_queries{@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries{Thread.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.join} + + @db.block_queries do + proc{@ds.all}.must_raise Sequel::QueryBlocker::BlockedQuery + proc{@db.block_queries{@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries{Thread.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.join} + end + + proc{@ds.all}.must_raise Sequel::QueryBlocker::BlockedQuery + proc{@db.block_queries{@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries{Thread.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.join} + end + + proc{@db.block_queries(:scope=>:thread){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries(:scope=>:thread){Thread.new{@ds.all}.value}.must_equal [] + @db.block_queries(:scope=>:thread){Fiber.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.resume} unless fiber_is_thread + end + + proc{@db.block_queries(:scope=>:fiber){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery + @db.block_queries(:scope=>:fiber){Thread.new{@ds.all}.value}.must_equal [] + @db.block_queries(:scope=>:fiber){Fiber.new{@ds.all}.resume}.must_equal [] + end + + @ds.all.must_equal [] + Thread.new{@ds.all}.value.must_equal [] + Fiber.new{@ds.all}.resume.must_equal [] + end + + it "#block_queries? should check whether queries are currently blocked" do + @db.block_queries?.must_equal false + @db.block_queries{@db.block_queries?}.must_equal true + @db.block_queries?.must_equal false + end +end diff --git a/spec/integration/plugin_test.rb b/spec/integration/plugin_test.rb index 40b8635ec..0425d3487 100644 --- a/spec/integration/plugin_test.rb +++ b/spec/integration/plugin_test.rb @@ -3222,3 +3222,37 @@ def set(k, v, ttl) self[k] = v end counts.must_equal [49] end end unless DB.database_type == :db2 && DB.offset_strategy == :emulate + +describe "query_blocker extension" do + before(:all) do + @db = DB + @db.create_table!(:query_blocker_test) do + Integer :i + end + @db.extension :query_blocker + @ds = @db[:query_blocker_test] + end + before do + @ds.delete + end + after(:all) do + @db.drop_table?(:query_blocker_test) + end + + types = { + "SELECT" => proc{@ds.all}, + "INSERT" => proc{@ds.insert(1)}, + "UPDATE" => proc{@ds.update(:i => 1)}, + "DELETE" => proc{@ds.delete}, + "TRUNCATE" => proc{@ds.truncate}, + "bound variable" => proc{@ds.call(:select)}, + "prepared statement" => proc{@ds.prepare(:select, :select_query_blocker_test).call}, + "arbitrary SQL" => proc{@db.run(@ds.select(1).sql)}, + }.each do |type, block| + it "should block #{type} queries" do + @db.block_queries do + proc{instance_exec(&block)}.must_raise Sequel::QueryBlocker::BlockedQuery + end + end + end +end diff --git a/www/pages/plugins.html.erb b/www/pages/plugins.html.erb index a0bfef93f..4321d24d8 100644 --- a/www/pages/plugins.html.erb +++ b/www/pages/plugins.html.erb @@ -755,6 +755,10 @@ Uses timestamptz (timestamp with time zone) as the generic timestamp type used for Time and DateTime classes.
  • +query_blocker +Supports raising an exception if queries are executed inside a given block. +
  • +
  • run_transaction_hooks Support running after_commit and after_rollback transaction hooks before transaction commit/rollback, designed for transactional testing.