From c4c133e463db61d0d763abd288d0044a1dcd7c5b Mon Sep 17 00:00:00 2001 From: Omar Abdel-Wahab Date: Thu, 28 Jan 2016 00:09:19 +0200 Subject: [PATCH 1/2] Feature: define ignored PostgreSQL fields by using `__ignore__` as source --- lib/mosql/schema.rb | 11 +++++++---- lib/mosql/streamer.rb | 4 ++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 7e0f119..91a07b2 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -12,7 +12,7 @@ def to_array(lst) col = { :source => ent.fetch(:source), :type => ent.fetch(:type), - :name => (ent.keys - [:source, :type]).first, + :name => (ent.keys - [:source, :type]).first } elsif ent.is_a?(Hash) && ent.keys.length == 1 && ent.values.first.is_a?(String) col = { @@ -217,8 +217,11 @@ def transform(ns, obj, schema=nil) source = col[:source] type = col[:type] - if source.start_with?("$") + if source == '__ignore__' + # Just ignore the field + elsif source.start_with?("$") v = fetch_special_source(obj, source, original) + row << v else v = fetch_and_delete_dotted(obj, source) case v @@ -234,8 +237,8 @@ def transform(ns, obj, schema=nil) else v = transform_primitive(v, type) end + row << v end - row << v end if schema[:meta][:extra_props] @@ -269,7 +272,7 @@ def sanitize(value) end def copy_column?(col) - col[:source] != '$timestamp' + col[:source] != '$timestamp' && col[:source] != '__ignore__' end def all_columns(schema, copy=false) diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index d630e96..bd2a94e 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -52,8 +52,8 @@ def bulk_upsert(table, ns, items) begin @schema.copy_data(table.db, ns, items) rescue Sequel::DatabaseError => e - log.debug("Bulk insert error (#{e}), attempting invidual upserts...") - cols = @schema.all_columns(@schema.find_ns(ns)) + log.debug("Bulk insert error (#{e}), attempting individual upserts...") + cols = @schema.all_columns_for_copy(@schema.find_ns(ns)) items.each do |it| h = {} cols.zip(it).each { |k,v| h[k] = v } From bb2f6bfeeeb403bd0589def8064de10e0c75ade6 Mon Sep 17 00:00:00 2001 From: Ramy Khater Date: Mon, 1 Aug 2016 12:28:37 +0200 Subject: [PATCH 2/2] Fixing invalid byte sequence in UTF-8 error by replacing them. --- lib/mosql/schema.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 91a07b2..6b1bdca 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -323,7 +323,7 @@ def quote_copy(val) when Sequel::SQL::Blob "\\\\x" + [val].pack("h*") else - val.to_s.gsub(/([\\\t\n\r])/, '\\\\\\1') + val.to_s.encode!('UTF-8', :undef => :replace, :invalid => :replace).gsub(/([\\\t\n\r])/, '\\\\\\1') end end