Skip to content

Commit

Permalink
Fix merge GPDB (resgroup related)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreyes committed Jun 17, 2024
1 parent 82967b4 commit 9df5076
Show file tree
Hide file tree
Showing 15 changed files with 142 additions and 127 deletions.
4 changes: 2 additions & 2 deletions src/backend/cdb/cdbutil.c
Original file line number Diff line number Diff line change
Expand Up @@ -1965,7 +1965,7 @@ gp_get_suboverflowed_backends(PG_FUNCTION_ARGS)
LWLockAcquire(ProcArrayLock, LW_SHARED);
for (i = 0; i < ProcGlobal->allProcCount; i++)
{
if (ProcGlobal->allPgXact[i].overflowed)
if (ProcGlobal->subxidStates[i].overflowed)
astate = accumArrayResult(astate,
Int32GetDatum(ProcGlobal->allProcs[i].pid),
false, INT4OID, CurrentMemoryContext);
Expand Down Expand Up @@ -4181,7 +4181,7 @@ gp_get_suboverflowed_backends(PG_FUNCTION_ARGS)
LWLockAcquire(ProcArrayLock, LW_SHARED);
for (i = 0; i < ProcGlobal->allProcCount; i++)
{
if (ProcGlobal->allPgXact[i].overflowed)
if (ProcGlobal->subxidStates[i].overflowed)
astate = accumArrayResult(astate,
Int32GetDatum(ProcGlobal->allProcs[i].pid),
false, INT4OID, CurrentMemoryContext);
Expand Down
12 changes: 6 additions & 6 deletions src/backend/cdb/dispatcher/cdbgang.c
Original file line number Diff line number Diff line change
Expand Up @@ -972,13 +972,13 @@ backend_type(SegmentDatabaseDescriptor *segdb)
}

/*
* qsort comparator for SegmentDatabaseDescriptors. Sorts by descriptor ID.
* sort comparator for SegmentDatabaseDescriptors. Sorts by descriptor ID.
*/
static int
compare_segdb_id(const void *v1, const void *v2)
compare_segdb_id(const ListCell *a, const ListCell *b)
{
SegmentDatabaseDescriptor *d1 = (SegmentDatabaseDescriptor *) lfirst(*(ListCell **) v1);
SegmentDatabaseDescriptor *d2 = (SegmentDatabaseDescriptor *) lfirst(*(ListCell **) v2);
SegmentDatabaseDescriptor *d1 = (SegmentDatabaseDescriptor *) lfirst(a);
SegmentDatabaseDescriptor *d2 = (SegmentDatabaseDescriptor *) lfirst(b);

return d1->identifier - d2->identifier;
}
Expand Down Expand Up @@ -1060,7 +1060,7 @@ gp_backend_info(PG_FUNCTION_ARGS)
* For a slightly better default user experience, sort by descriptor ID.
* Users may of course specify their own ORDER BY if they don't like it.
*/
user_fctx->segdbs = list_qsort(user_fctx->segdbs, compare_segdb_id);
list_sort(user_fctx->segdbs, compare_segdb_id);
user_fctx->curpos = list_head(user_fctx->segdbs);

/* Create a descriptor for the records we'll be returning. */
Expand Down Expand Up @@ -1095,7 +1095,7 @@ gp_backend_info(PG_FUNCTION_ARGS)

/* Get the next descriptor. */
dbdesc = lfirst(user_fctx->curpos);
user_fctx->curpos = lnext(user_fctx->curpos);
user_fctx->curpos = lnext(user_fctx->segdbs, user_fctx->curpos);

/* Fill in the row attributes. */
dbinfo = dbdesc->segment_database_info;
Expand Down
55 changes: 42 additions & 13 deletions src/bin/gpfts/ftsprobe.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <sys/param.h> /* for MAXHOSTNAMELEN */
#include "fts_etcd.h"
#include <assert.h>
#include "postmaster/postmaster.h"

static struct pollfd *PollFds;

Expand Down Expand Up @@ -284,8 +285,15 @@ checkIfConnFailedDueToRecoveryInProgress(PGconn *conn)
&& strstr(PQerrorMessage(conn),POSTMASTER_AFTER_PROMOTE_STANDBY_IN_RECOVERY_DETAIL_MSG);
}

/*
* Check if the primary segment is restarting normally by examing the PQ error message.
* It could be that they are in RESET (waiting for the children to exit) or making
* progress in RECOVERY. Note there is no good source of RESET progress indications
* that we could check, so we simply always allow it. Normally RESET should be fast
* and there's a timeout in postmaster to guard against long wait.
*/
static void
checkIfFailedDueToRecoveryInProgress(fts_segment_info *fts_info)
checkIfFailedDueToNormalRestart(fts_segment_info *fts_info)
{
if (strstr(PQerrorMessage(fts_info->conn), _(POSTMASTER_IN_RECOVERY_MSG)) ||
strstr(PQerrorMessage(fts_info->conn), _(POSTMASTER_IN_STARTUP_MSG)))
Expand Down Expand Up @@ -322,6 +330,7 @@ checkIfFailedDueToRecoveryInProgress(fts_segment_info *fts_info)
*/
if (tmpptr <= fts_info->xlogrecptr)
{
fts_info->restart_state = PM_IN_RECOVERY_NOT_MAKING_PROGRESS;
cbdb_log_debug("detected segment is in recovery mode and not making progress (content=%d) "
"primary dbid=%d, mirror dbid=%d",
PRIMARY_CONFIG(fts_info)->segindex,
Expand All @@ -330,7 +339,7 @@ checkIfFailedDueToRecoveryInProgress(fts_segment_info *fts_info)
}
else
{
fts_info->recovery_making_progress = true;
fts_info->restart_state = PM_IN_RECOVERY_MAKING_PROGRESS;
fts_info->xlogrecptr = tmpptr;

cbdb_log_debug("detected segment is in recovery mode replayed (%X/%X) (content=%d) "
Expand All @@ -342,6 +351,15 @@ checkIfFailedDueToRecoveryInProgress(fts_segment_info *fts_info)
fts_info->has_mirror_configured ? MIRROR_CONFIG(fts_info)->dbid : -1);
}
}
else if (strstr(PQerrorMessage(fts_info->conn), _(POSTMASTER_IN_RESET_MSG)))
{
fts_info->restart_state = PM_IN_RESETTING;
cbdb_log_debug("FTS: detected segment is in RESET state (content=%d) "
"primary dbid=%d, mirror dbid=%d",
fts_info->primary_cdbinfo->config->segindex,
fts_info->primary_cdbinfo->config->dbid,
fts_info->mirror_cdbinfo->config->dbid);
}
}

/*
Expand Down Expand Up @@ -376,10 +394,11 @@ ftsConnect(fts_context *context)
case FTS_SYNCREP_OFF_SEGMENT:
case FTS_PROMOTE_SEGMENT:
/*
* We always default to false. If connect fails due to recovery in progress
* this variable will be set based on LSN value in error message.
* We always default to PM_NOT_IN_RESTART. If connect fails, we then check
* the primary's restarting state, so we can skip promoting mirror if it's in
* PM_IN_RESETTING or PM_IN_RECOVERY_MAKING_PROGRESS.
*/
fts_info->recovery_making_progress = false;
fts_info->restart_state = PM_NOT_IN_RESTART;
if (fts_info->conn == NULL)
{
Assert(fts_info->retry_count <= context->config->probe_retries);
Expand Down Expand Up @@ -426,7 +445,7 @@ ftsConnect(fts_context *context)

case PGRES_POLLING_FAILED:
fts_info->state = nextFailedState(fts_info->state);
checkIfFailedDueToRecoveryInProgress(fts_info);
checkIfFailedDueToNormalRestart(fts_info);
cbdb_log_debug("cannot establish libpq connection "
"(content=%d, dbid=%d): %s, retry_count=%d",
PRIMARY_CONFIG(fts_info)->segindex,
Expand Down Expand Up @@ -1266,11 +1285,21 @@ processResponse(fts_context *context)
}
break;
case FTS_PROBE_FAILED:
/*
* Primary is down
* If primary is in recovery, do not mark it down and promote mirror
*/
if (fts_info->recovery_making_progress)
/* Primary is down */

/* If primary is in resetting or making progress in recovery, do not mark it down and promote mirror */
if (fts_info->restart_state == PM_IN_RESETTING)
{
Assert(strstr(PQerrorMessage(fts_info->conn), _(POSTMASTER_IN_RESET_MSG)));
cbdb_log_debug(
"FTS: detected segment is in resetting mode "
"(content=%d) primary dbid=%d, mirror dbid=%d",
primary->config->segindex, primary->config->dbid, mirror->config->dbid);

fts_info->state = FTS_RESPONSE_PROCESSED;
break;
}
else if (fts_info->restart_state == PM_IN_RECOVERY_MAKING_PROGRESS)
{
assert(strstr(PQerrorMessage(fts_info->conn), _(POSTMASTER_IN_RECOVERY_MSG)) ||
strstr(PQerrorMessage(fts_info->conn), _(POSTMASTER_IN_STARTUP_MSG)));
Expand Down Expand Up @@ -1467,7 +1496,7 @@ FtsWalRepInitProbeContext(CdbComponentDatabases *cdbs, fts_context *context, fts
fts_info->state = FTS_PROBE_FAILED;
else
fts_info->state = FTS_PROBE_SEGMENT;
fts_info->recovery_making_progress = false;
fts_info->restart_state = PM_NOT_IN_RESTART;
fts_info->xlogrecptr = InvalidXLogRecPtr;

fts_info->primary_cdbinfo = primary;
Expand Down Expand Up @@ -1495,7 +1524,7 @@ FtsWalRepInitProbeContext(CdbComponentDatabases *cdbs, fts_context *context, fts
fts_info->state = FTS_PROBE_FAILED;
else
fts_info->state = FTS_PROBE_SEGMENT;
fts_info->recovery_making_progress = false;
fts_info->restart_state = PM_NOT_IN_RESTART;
fts_info->xlogrecptr = InvalidXLogRecPtr;

fts_info->primary_cdbinfo = master;
Expand Down
2 changes: 1 addition & 1 deletion src/include/catalog/catversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@
*/

/* 3yyymmddN */
#define CATALOG_VERSION_NO 302301101
#define CATALOG_VERSION_NO 302406171

#endif
2 changes: 1 addition & 1 deletion src/include/catalog/pg_proc.dat
Original file line number Diff line number Diff line change
Expand Up @@ -11963,7 +11963,7 @@
{ oid => 7182, descr => 'wait until all endpoint of this parallel retrieve cursor has been retrieved finished',
proname => 'gp_wait_parallel_retrieve_cursor', provolatile => 'v', proparallel => 'u', prorettype => 'bool', proargtypes => 'text int4', proallargtypes => '{text,int4,bool}', proargmodes => '{i,i,o}', proargnames => '{cursorname,timeout_sec,finished}', prosrc => 'gp_wait_parallel_retrieve_cursor', proexeclocation => 'c' },

{ oid => 7183, descr => 'debugging information for segment backends',
{ oid => 7146, descr => 'debugging information for segment backends',
proname => 'gp_backend_info', prorettype => 'record', prorows => '1', proretset => 't', proargtypes => '', proallargtypes => '{int4,char,int4,text,int4,int4}', prosrc => 'gp_backend_info', pronargs => 6,
proargnames => '{id,type,content,host,port,pid}', proargmodes => '{o,o,o,o,o,o}', proexeclocation => 'c'}

Expand Down
3 changes: 1 addition & 2 deletions src/test/isolation2/expected/fts_segment_reset.out
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ CREATE
1<: <... completed>
ERROR: fault triggered, fault name:'start_prepare' fault type:'panic'
2<: <... completed>
DETAIL: Segments are in reset/recovery mode.
ERROR: failed to acquire resources on one or more segments
CREATE

-- We shouldn't see failover to mirror
select gp_request_fts_probe_scan();
Expand Down
39 changes: 22 additions & 17 deletions src/test/recovery/t/018_wal_optimize.pl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@

use PostgresNode;
use TestLib;
use Test::More tests => 38;

# GPDB: Effectively disable some of these tests. We cannot run
# PREPARE TRANSACTION in utility-mode.
# use Test::More tests => 38;
use Test::More tests => 36;

sub check_orphan_relfilenodes
{
Expand Down Expand Up @@ -115,22 +119,23 @@ sub run_wal_optimize
"SELECT count(*), min(id) FROM trunc_ins;");
is($result, qq(1|2), "wal_level = $wal_level, TRUNCATE INSERT");

# Same for prepared transaction.
# Tuples inserted after the truncation should be seen.
$node->safe_psql(
'postgres', "
BEGIN;
CREATE TABLE twophase (id serial PRIMARY KEY);
INSERT INTO twophase VALUES (DEFAULT);
TRUNCATE twophase;
INSERT INTO twophase VALUES (DEFAULT);
PREPARE TRANSACTION 't';
COMMIT PREPARED 't';");
$node->stop('immediate');
$node->start;
$result = $node->safe_psql('postgres',
"SELECT count(*), min(id) FROM trunc_ins;");
is($result, qq(1|2), "wal_level = $wal_level, TRUNCATE INSERT PREPARE");
# GPDB: Disable this test.
# # Same for prepared transaction.
# # Tuples inserted after the truncation should be seen.
# $node->safe_psql(
# 'postgres', "
# BEGIN;
# CREATE TABLE twophase (id serial PRIMARY KEY);
# INSERT INTO twophase VALUES (DEFAULT);
# TRUNCATE twophase;
# INSERT INTO twophase VALUES (DEFAULT);
# PREPARE TRANSACTION 't';
# COMMIT PREPARED 't';");
# $node->stop('immediate');
# $node->start;
# $result = $node->safe_psql('postgres',
# "SELECT count(*), min(id) FROM trunc_ins;");
# is($result, qq(1|2), "wal_level = $wal_level, TRUNCATE INSERT PREPARE");

# Writing WAL at end of xact, instead of syncing.
$node->safe_psql(
Expand Down
88 changes: 46 additions & 42 deletions src/test/recovery/t/021_row_visibility.pl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

use PostgresNode;
use TestLib;
use Test::More tests => 10;
# GPDB: Effectively disable some of these tests. We cannot run
# PREPARE TRANSACTION in utility-mode.
# use Test::More tests => 10;
use Test::More tests => 6;
use Config;

# Initialize primary node
Expand Down Expand Up @@ -118,47 +121,48 @@
qr/first update\n\(1 row\)$/m),
'committed update visible');

#
# 5. Check that changes in prepared xacts is invisible
#
ok( send_query_and_wait(
\%psql_primary, q[
DELETE from test_visibility; -- delete old data, so we start with clean slate
BEGIN;
INSERT INTO test_visibility VALUES('inserted in prepared will_commit');
PREPARE TRANSACTION 'will_commit';],
qr/^PREPARE TRANSACTION$/m),
'prepared will_commit');

ok( send_query_and_wait(
\%psql_primary, q[
BEGIN;
INSERT INTO test_visibility VALUES('inserted in prepared will_abort');
PREPARE TRANSACTION 'will_abort';
],
qr/^PREPARE TRANSACTION$/m),
'prepared will_abort');

$node_primary->wait_for_catchup($node_standby, 'replay',
$node_primary->lsn('insert'));

ok( send_query_and_wait(
\%psql_standby,
q[SELECT * FROM test_visibility ORDER BY data;],
qr/^\(0 rows\)$/m),
'uncommitted prepared invisible');

# For some variation, finish prepared xacts via separate connections
$node_primary->safe_psql('postgres', "COMMIT PREPARED 'will_commit';");
$node_primary->safe_psql('postgres', "ROLLBACK PREPARED 'will_abort';");
$node_primary->wait_for_catchup($node_standby, 'replay',
$node_primary->lsn('insert'));

ok( send_query_and_wait(
\%psql_standby,
q[SELECT * FROM test_visibility ORDER BY data;],
qr/will_commit.*\n\(1 row\)$/m),
'finished prepared visible');
# GPDB: Disable this test.
# #
# # 5. Check that changes in prepared xacts is invisible
# #
# ok( send_query_and_wait(
# \%psql_primary, q[
# DELETE from test_visibility; -- delete old data, so we start with clean slate
# BEGIN;
# INSERT INTO test_visibility VALUES('inserted in prepared will_commit');
# PREPARE TRANSACTION 'will_commit';],
# qr/^PREPARE TRANSACTION$/m),
# 'prepared will_commit');

# ok( send_query_and_wait(
# \%psql_primary, q[
# BEGIN;
# INSERT INTO test_visibility VALUES('inserted in prepared will_abort');
# PREPARE TRANSACTION 'will_abort';
# ],
# qr/^PREPARE TRANSACTION$/m),
# 'prepared will_abort');

# $node_primary->wait_for_catchup($node_standby, 'replay',
# $node_primary->lsn('insert'));

# ok( send_query_and_wait(
# \%psql_standby,
# q[SELECT * FROM test_visibility ORDER BY data;],
# qr/^\(0 rows\)$/m),
# 'uncommitted prepared invisible');

# # For some variation, finish prepared xacts via separate connections
# $node_primary->safe_psql('postgres', "COMMIT PREPARED 'will_commit';");
# $node_primary->safe_psql('postgres', "ROLLBACK PREPARED 'will_abort';");
# $node_primary->wait_for_catchup($node_standby, 'replay',
# $node_primary->lsn('insert'));

# ok( send_query_and_wait(
# \%psql_standby,
# q[SELECT * FROM test_visibility ORDER BY data;],
# qr/will_commit.*\n\(1 row\)$/m),
# 'finished prepared visible');

# explicitly shut down psql instances gracefully - to avoid hangs
# or worse on windows
Expand Down
6 changes: 6 additions & 0 deletions src/test/recovery/t/023_pitr_prepared_xact.pl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
use warnings;
use PostgresNode;
use TestLib;

# GPDB: Effectively disable this TAP test. We cannot run PREPARE
# TRANSACTION in utility-mode.
use Test::More tests => 1;
is(-1, -1, "Disable this TAP test");
exit;

use File::Compare;

# Initialize and start primary node with WAL archiving
Expand Down
2 changes: 1 addition & 1 deletion src/test/regress/expected/gp_prepared_xacts.out
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- PREPARE TRANSACTION should not work
BEGIN;
PREPARE TRANSACTION 'foo_prep_xact';
ERROR: PREPARE TRANSACTION is not yet supported in Greenplum Database
ERROR: PREPARE TRANSACTION is not yet supported in Cloudberry Database
-- PREPARE TRANSACTION should not work in utility-mode connections either
\! PGOPTIONS='-c gp_role=utility' psql -X regression -c "BEGIN; PREPARE TRANSACTION 'foo_prep_xact';"
ERROR: PREPARE TRANSACTION is not supported in utility mode
4 changes: 0 additions & 4 deletions src/test/regress/input/dispatch.source
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,6 @@ set optimizer=off;
--gang reused
create table t_create_gang_time(tc1 int,tc2 int);

--1-gang reused
select * from t_create_gang_time where tc1=1;
explain analyze select * from t_create_gang_time where tc1=1;

--n-gang reused and 1-gang is created.
select * from t_create_gang_time t1, t_create_gang_time t2 where t1.tc1=2;

Expand Down
Loading

0 comments on commit 9df5076

Please sign in to comment.