Skip to content

Commit

Permalink
Merge pull request #499 from larskanis/improve_copy_data
Browse files Browse the repository at this point in the history
Improve copy_data error handling
  • Loading branch information
larskanis authored Feb 25, 2023
2 parents 8c91814 + 1563e73 commit 73733ac
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 35 deletions.
6 changes: 0 additions & 6 deletions lib/pg.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ module PG
end
end


class NotAllCopyDataRetrieved < PG::Error
end
class NotInBlockingMode < PG::Error
end

# Get the PG library version.
#
# +include_buildnum+ is no longer used and any value passed will be ignored.
Expand Down
39 changes: 20 additions & 19 deletions lib/pg/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,19 @@ def copy_data( sql, coder=nil )
yield res
rescue Exception => err
errmsg = "%s while copy data: %s" % [ err.class.name, err.message ]
put_copy_end( errmsg )
get_result
raise
begin
put_copy_end( errmsg )
rescue PG::Error
# Ignore error in cleanup to avoid losing original exception
end
discard_results
raise err
else
put_copy_end
begin
put_copy_end
rescue PG::Error => err
raise PG::LostCopyState.new("#{err} (probably by executing another SQL query while running a COPY command)", connection: self)
end
get_last_result
ensure
self.encoder_for_put_copy_data = old_coder if coder
Expand All @@ -213,24 +221,17 @@ def copy_data( sql, coder=nil )
self.decoder_for_get_copy_data = coder
end
yield res
rescue Exception => err
rescue Exception
cancel
begin
while get_copy_data
end
rescue PG::Error
# Ignore error in cleanup to avoid losing original exception
end
while get_result
end
raise err
discard_results
raise
else
res = get_last_result
if !res || res.result_status != PGRES_COMMAND_OK
while get_copy_data
end
while get_result
end
if !res
discard_results
raise PG::LostCopyState.new("Lost COPY state (probably by executing another SQL query while running a COPY command)", connection: self)
elsif res.result_status != PGRES_COMMAND_OK
discard_results
raise PG::NotAllCopyDataRetrieved.new("Not all COPY data retrieved", connection: self)
end
res
Expand Down
7 changes: 7 additions & 0 deletions lib/pg/exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,12 @@ def initialize(msg=nil, connection: nil, result: nil)
end
end

class NotAllCopyDataRetrieved < PG::Error
end
class LostCopyState < PG::Error
end
class NotInBlockingMode < PG::Error
end

end # module PG

37 changes: 27 additions & 10 deletions spec/pg/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,7 @@
@conn.sync_put_copy_end
res = @conn.get_last_result
expect( res.result_status ).to eq( PG::PGRES_COMMAND_OK )
@conn.exec( "DROP TABLE IF EXISTS copytable2" )
end

describe "#copy_data" do
Expand Down Expand Up @@ -1247,6 +1248,7 @@

res = @conn.exec( "SELECT * FROM copytable ORDER BY col1" )
expect( res.values ).to eq( [["1"], ["2"]] )
@conn.exec( "DROP TABLE IF EXISTS copytable" )
end

it "can process #copy_data input queries with lots of data" do
Expand All @@ -1263,6 +1265,7 @@
expect( res.values ).to eq( [["1000"]] )
res = @conn.exec( "SELECT * FROM copytable2 LIMIT 1" )
expect( res.values ).to eq( [[str.chomp]] )
@conn.exec( "DROP TABLE IF EXISTS copytable2" )
end

it "can handle client errors in #copy_data for input" do
Expand All @@ -1277,6 +1280,7 @@
end

expect( @conn ).to still_be_usable
@conn.exec( "DROP TABLE IF EXISTS copytable" )
end

it "can handle server errors in #copy_data for input" do
Expand All @@ -1290,30 +1294,43 @@
}.to raise_error(PG::Error, /invalid input syntax for .*integer/){|err| expect(err).to have_attributes(connection: @conn) }
end
expect( @conn ).to still_be_usable
@conn.exec( "DROP TABLE IF EXISTS copytable" )
end

it "gracefully handle SQL statements while in #copy_data for input" do
it "doesn't lose client error when #copy_data can not be finished" do
@conn.exec "ROLLBACK"
@conn.transaction do
@conn.exec( "CREATE TEMP TABLE copytable (col1 INT)" )
expect {
@conn.copy_data( "COPY copytable FROM STDOUT" ) do |res|
@conn.exec "SELECT 1"
@conn.discard_results # end copy state so that put_copy_end fails in copy_data
raise "boom"
end
}.to raise_error(PG::Error, /no COPY in progress/){|err| expect(err).to have_attributes(connection: @conn) }
}.to raise_error(RuntimeError, "boom")
end
expect( @conn ).to still_be_usable
@conn.exec( "DROP TABLE IF EXISTS copytable" )
end

it "gracefully handle SQL statements while in #copy_data for input" do
@conn.exec "ROLLBACK"
@conn.exec( "CREATE TEMP TABLE copytable (col1 INT)" )
expect {
@conn.copy_data( "COPY copytable FROM STDOUT" ) do |res|
@conn.exec "SELECT 1"
end
}.to raise_error(PG::LostCopyState, /another SQL query/){|err| expect(err).to have_attributes(connection: @conn) }
expect( @conn ).to still_be_usable
@conn.exec( "DROP TABLE copytable" )
end

it "gracefully handle SQL statements while in #copy_data for output" do
@conn.exec "ROLLBACK"
@conn.transaction do
expect {
@conn.copy_data( "COPY (VALUES(1), (2)) TO STDOUT" ) do |res|
@conn.exec "SELECT 3"
end
}.to raise_error(PG::Error, /no COPY in progress/){|err| expect(err).to have_attributes(connection: @conn) }
end
expect {
@conn.copy_data( "COPY (VALUES(1), (2)) TO STDOUT" ) do |res|
@conn.exec "SELECT 3"
end
}.to raise_error(PG::LostCopyState, /another SQL query/){|err| expect(err).to have_attributes(connection: @conn) }
expect( @conn ).to still_be_usable
end

Expand Down

0 comments on commit 73733ac

Please sign in to comment.