diff --git a/src/rdbms/rdbms_queries.erl b/src/rdbms/rdbms_queries.erl index 25a2ce27710..2edca04d906 100644 --- a/src/rdbms/rdbms_queries.erl +++ b/src/rdbms/rdbms_queries.erl @@ -42,6 +42,7 @@ -export([join/2, prepare_upsert/6, + prepare_upsert/7, execute_upsert/5, request_upsert/5]). @@ -114,11 +115,22 @@ request_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) -> QueryName :: atom(), TableName :: atom(), InsertFields :: [binary()], - Updates :: [binary() | {binary(), binary()}], + Updates :: [binary() | {assignment | expression, binary(), binary()}], UniqueKeyFields :: [binary()]) -> {ok, QueryName :: atom()} | {error, already_exists}. prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields) -> - SQL = upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields), + prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, none). + +-spec prepare_upsert(Host :: mongoose_rdbms:server(), + QueryName :: atom(), + TableName :: atom(), + InsertFields :: [ColumnName :: binary()], + Updates :: [binary() | {binary(), binary()}], + UniqueKeyFields :: [binary()], + IncrementalField :: none | binary()) -> + {ok, QueryName :: atom()} | {error, already_exists}. +prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> + SQL = upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField), Query = iolist_to_binary(SQL), ?LOG_DEBUG(#{what => rdbms_upsert_query, name => Name, query => Query}), Fields = prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields), @@ -132,12 +144,12 @@ prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) -> _ -> InsertFields ++ UpdateFields end. -upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields) -> +upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of {mysql, _} -> - upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields); + upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField); {pgsql, _} -> - upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields); + upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField); {odbc, mssql} -> upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields); NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) @@ -152,22 +164,39 @@ mysql_and_pgsql_insert(Table, Columns) -> join(Placeholders, ", "), ")"]. -upsert_mysql_query(Table, InsertFields, Updates, [Key | _]) -> +upsert_mysql_query(Table, InsertFields, Updates, [Key | _], IncrementalField) -> Insert = mysql_and_pgsql_insert(Table, InsertFields), - OnConflict = mysql_on_conflict(Updates, Key), + OnConflict = mysql_on_conflict(Table, Updates, Key, IncrementalField), [Insert, OnConflict]. -upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields) -> +upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> Insert = mysql_and_pgsql_insert(Table, InsertFields), OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields), - [Insert, OnConflict]. + WhereIncrements = pgsql_ensure_increments(Table, IncrementalField), + [Insert, OnConflict, WhereIncrements]. -mysql_on_conflict([], Key) -> +mysql_on_conflict(_Table, [], Key, _) -> %% Update field to itself (no-op), there is no 'DO NOTHING' in MySQL [" ON DUPLICATE KEY UPDATE ", Key, " = ", Key]; -mysql_on_conflict(UpdateFields, _) -> +mysql_on_conflict(_Table, UpdateFields, _, none) -> [" ON DUPLICATE KEY UPDATE ", - update_fields_on_conflict(UpdateFields)]. + update_fields_on_conflict(UpdateFields)]; +mysql_on_conflict(Table, UpdateFields, _, IncrementalField) -> + TableName = atom_to_list(Table), + FieldsWithPlaceHolders = [mysql_fields_with_placeholders(TableName, Update, IncrementalField) + || Update <- UpdateFields], + IncrUpdates = join(FieldsWithPlaceHolders, ", "), + [" AS alias ON DUPLICATE KEY UPDATE ", IncrUpdates]. + +mysql_fields_with_placeholders(TableName, UpdateField, IncrementalField) -> + Alternatives = case UpdateField of + {Op, Column, Expression} when Op =:= assignment; Op =:= expression -> + [Expression, ", ", TableName, ".", Column, ")"]; + Column -> + ["? , ", TableName, ".", Column, ")"] + end, + [ Column, " = IF(", TableName, ".", IncrementalField, " < alias.", IncrementalField, ", " + | Alternatives]. pgsql_on_conflict([], UniqueKeyFields) -> JoinedKeys = join(UniqueKeyFields, ", "), @@ -179,9 +208,15 @@ pgsql_on_conflict(UpdateFields, UniqueKeyFields) -> update_fields_on_conflict(UpdateFields)]. update_fields_on_conflict(Updates) -> - FieldsWithPlaceHolders = [update_field_expression(Update) || Update <- Updates], + FieldsWithPlaceHolders = [get_field_expression(Update) || Update <- Updates], join(FieldsWithPlaceHolders, ", "). +pgsql_ensure_increments(_Table, none) -> + []; +pgsql_ensure_increments(Table, IncrementalField) -> + TableName = atom_to_list(Table), + [" WHERE ", TableName, ".", IncrementalField, " < EXCLUDED.", IncrementalField]. + upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields) -> UniqueKeysInSelect = [[" ? AS ", Key] || Key <- UniqueKeyFields], BinTab = atom_to_binary(Table, utf8), @@ -200,28 +235,17 @@ mssql_on_conflict([]) -> ";"; mssql_on_conflict(Updates) -> [" WHEN MATCHED THEN UPDATE SET ", update_fields_on_conflict(Updates), ";"]. -update_field_expression(Update) -> - case get_field_expression(Update) of - {true, Expr} -> Expr; - true -> [Update, " = ?"]; - false -> Update - end. - -get_field_expression({_, FieldExpr}) -> - case binary:match(FieldExpr, <<"=">>) of - nomatch -> false; - _ -> {true, FieldExpr} - end; -get_field_expression(FieldExpr) -> - binary:match(FieldExpr, <<"=">>) =:= nomatch. - -get_field_name({Field, _}) -> - case binary:match(Field, <<"=">>) of - nomatch -> {true, Field}; - _ -> false - end; -get_field_name(FieldExpr) -> - binary:match(FieldExpr, <<"=">>) =:= nomatch. +get_field_expression({Op, ColumnName, Expr}) when Op =:= assignment; Op =:= expression -> + [ColumnName, " = ", Expr]; +get_field_expression(Field) when is_binary(Field) -> + [Field, " = ?"]. + +get_field_name({assignment, Field, _}) when is_binary(Field) -> + false; +get_field_name({expression, Field, _}) when is_binary(Field) -> + {true, Field}; +get_field_name(Field) when is_binary(Field) -> + true. %% F can be either a fun or a list of queries %% TODO: We should probably move the list of queries transaction