Skip to content

Commit

Permalink
Implement incremental upserts for MySQL and Postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Mar 26, 2022
1 parent 17f7747 commit b47f53b
Showing 1 changed file with 59 additions and 35 deletions.
94 changes: 59 additions & 35 deletions src/rdbms/rdbms_queries.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

-export([join/2,
prepare_upsert/6,
prepare_upsert/7,
execute_upsert/5,
request_upsert/5]).

Expand Down Expand Up @@ -114,11 +115,22 @@ request_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) ->
QueryName :: atom(),
TableName :: atom(),
InsertFields :: [binary()],
Updates :: [binary() | {binary(), binary()}],
Updates :: [binary() | {assignment | expression, binary(), binary()}],
UniqueKeyFields :: [binary()]) ->
{ok, QueryName :: atom()} | {error, already_exists}.
prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields) ->
SQL = upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields),
prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, none).

-spec prepare_upsert(Host :: mongoose_rdbms:server(),
QueryName :: atom(),
TableName :: atom(),
InsertFields :: [ColumnName :: binary()],
Updates :: [binary() | {binary(), binary()}],
UniqueKeyFields :: [binary()],
IncrementalField :: none | binary()) ->
{ok, QueryName :: atom()} | {error, already_exists}.
prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) ->
SQL = upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField),
Query = iolist_to_binary(SQL),
?LOG_DEBUG(#{what => rdbms_upsert_query, name => Name, query => Query}),
Fields = prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields),
Expand All @@ -132,12 +144,12 @@ prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) ->
_ -> InsertFields ++ UpdateFields
end.

upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields) ->
upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) ->
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of
{mysql, _} ->
upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields);
upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField);
{pgsql, _} ->
upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields);
upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField);
{odbc, mssql} ->
upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields);
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
Expand All @@ -152,22 +164,39 @@ mysql_and_pgsql_insert(Table, Columns) ->
join(Placeholders, ", "),
")"].

upsert_mysql_query(Table, InsertFields, Updates, [Key | _]) ->
upsert_mysql_query(Table, InsertFields, Updates, [Key | _], IncrementalField) ->
Insert = mysql_and_pgsql_insert(Table, InsertFields),
OnConflict = mysql_on_conflict(Updates, Key),
OnConflict = mysql_on_conflict(Table, Updates, Key, IncrementalField),
[Insert, OnConflict].

upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields) ->
upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) ->
Insert = mysql_and_pgsql_insert(Table, InsertFields),
OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields),
[Insert, OnConflict].
WhereIncrements = pgsql_ensure_increments(Table, IncrementalField),
[Insert, OnConflict, WhereIncrements].

mysql_on_conflict([], Key) ->
mysql_on_conflict(_Table, [], Key, _) ->
%% Update field to itself (no-op), there is no 'DO NOTHING' in MySQL
[" ON DUPLICATE KEY UPDATE ", Key, " = ", Key];
mysql_on_conflict(UpdateFields, _) ->
mysql_on_conflict(_Table, UpdateFields, _, none) ->
[" ON DUPLICATE KEY UPDATE ",
update_fields_on_conflict(UpdateFields)].
update_fields_on_conflict(UpdateFields)];
mysql_on_conflict(Table, UpdateFields, _, IncrementalField) ->
TableName = atom_to_list(Table),
FieldsWithPlaceHolders = [mysql_fields_with_placeholders(TableName, Update, IncrementalField)
|| Update <- UpdateFields],
IncrUpdates = join(FieldsWithPlaceHolders, ", "),
[" AS alias ON DUPLICATE KEY UPDATE ", IncrUpdates].

mysql_fields_with_placeholders(TableName, UpdateField, IncrementalField) ->
Alternatives = case UpdateField of
{Op, Column, Expression} when Op =:= assignment; Op =:= expression ->
[Expression, ", ", TableName, ".", Column, ")"];
Column ->
["? , ", TableName, ".", Column, ")"]
end,
[ Column, " = IF(", TableName, ".", IncrementalField, " < alias.", IncrementalField, ", "
| Alternatives].

pgsql_on_conflict([], UniqueKeyFields) ->
JoinedKeys = join(UniqueKeyFields, ", "),
Expand All @@ -179,9 +208,15 @@ pgsql_on_conflict(UpdateFields, UniqueKeyFields) ->
update_fields_on_conflict(UpdateFields)].

update_fields_on_conflict(Updates) ->
FieldsWithPlaceHolders = [update_field_expression(Update) || Update <- Updates],
FieldsWithPlaceHolders = [get_field_expression(Update) || Update <- Updates],
join(FieldsWithPlaceHolders, ", ").

pgsql_ensure_increments(_Table, none) ->
[];
pgsql_ensure_increments(Table, IncrementalField) ->
TableName = atom_to_list(Table),
[" WHERE ", TableName, ".", IncrementalField, " < EXCLUDED.", IncrementalField].

upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields) ->
UniqueKeysInSelect = [[" ? AS ", Key] || Key <- UniqueKeyFields],
BinTab = atom_to_binary(Table, utf8),
Expand All @@ -200,28 +235,17 @@ mssql_on_conflict([]) -> ";";
mssql_on_conflict(Updates) ->
[" WHEN MATCHED THEN UPDATE SET ", update_fields_on_conflict(Updates), ";"].

update_field_expression(Update) ->
case get_field_expression(Update) of
{true, Expr} -> Expr;
true -> [Update, " = ?"];
false -> Update
end.

get_field_expression({_, FieldExpr}) ->
case binary:match(FieldExpr, <<"=">>) of
nomatch -> false;
_ -> {true, FieldExpr}
end;
get_field_expression(FieldExpr) ->
binary:match(FieldExpr, <<"=">>) =:= nomatch.

get_field_name({Field, _}) ->
case binary:match(Field, <<"=">>) of
nomatch -> {true, Field};
_ -> false
end;
get_field_name(FieldExpr) ->
binary:match(FieldExpr, <<"=">>) =:= nomatch.
get_field_expression({Op, ColumnName, Expr}) when Op =:= assignment; Op =:= expression ->
[ColumnName, " = ", Expr];
get_field_expression(Field) when is_binary(Field) ->
[Field, " = ?"].

get_field_name({assignment, Field, _}) when is_binary(Field) ->
false;
get_field_name({expression, Field, _}) when is_binary(Field) ->
{true, Field};
get_field_name(Field) when is_binary(Field) ->
true.

%% F can be either a fun or a list of queries
%% TODO: We should probably move the list of queries transaction
Expand Down

0 comments on commit b47f53b

Please sign in to comment.