Skip to content

Commit

Permalink
Use a unicast IP address for interconnection (#9696)
Browse files Browse the repository at this point in the history
* Use a unicast IP address for interconnection on the primary

Currently, interconnect/UDP always binds the wildcard address to
the socket, which makes all QEs on the same node share the same
port space(up to 64k). For dense deployment, the UDP port could run
out, even if there are multiple IP address.
To increase the total number of available ports for QEs on a node,
we bind a single/unicast IP address to the socket for interconnect/UDP,
instead of the wildcard address. So segments with different IP address
have different port space.
To fully utilize this patch to alleviate running out of port, it's
better to assign different ADDRESS(gp_segment_configuration.address) to
different segment, although it's not mandatory.

Note: QD/mirror uses the primary's address value in
gp_segment_configuration as the destination IP to connect to the
primary.  So the primary returns the ADDRESS as its local address
by calling `getsockname()`.

* Fix the origin of the source IP address for backends

The destination IP address uses the listenerAddr of the parent slice.
But the source IP address to bind is difficult. Because it's not
stored on the segment, and the slice table is sent to the QEs after
they had bound the address and port. The origin of the source
IP address for different roles is different:
1. QD : by calling `cdbcomponent_getComponentInfo()`
2. QE on master: by qdHostname dispatched by QD
3. QE on segment: by the local address for QE of the TCP connection
  • Loading branch information
gfphoenix78 authored Apr 20, 2020
1 parent 24f1641 commit 790c7ba
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 58 deletions.
49 changes: 49 additions & 0 deletions src/backend/cdb/cdbutil.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
#include "libpq-fe.h"
#include "libpq-int.h"
#include "libpq/ip.h"
#include "miscadmin.h" /* MyProcPort */
#include "cdb/cdbconn.h"
#include "cdb/cdbfts.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "postmaster/fts.h"
#include "postmaster/postmaster.h"
#include "catalog/namespace.h"
#include "utils/gpexpand.h"
#include "access/xact.h"
Expand Down Expand Up @@ -1002,6 +1004,52 @@ cdbcomponent_getComponentInfo(int contentId)
return cdbInfo;
}

static void
ensureInterconnectAddress(void)
{
if (interconnect_address)
return;

if (GpIdentity.segindex >= 0)
{
Assert(Gp_role == GP_ROLE_EXECUTE);
Assert(MyProcPort != NULL);
Assert(MyProcPort->laddr.addr.ss_family == AF_INET
|| MyProcPort->laddr.addr.ss_family == AF_INET6);
/*
* We assume that the QD, using the address in gp_segment_configuration
* as its destination IP address, connects to the segment/QE.
* So, the local address in the PORT can be used for interconnect.
*/
char local_addr[NI_MAXHOST];
getnameinfo((const struct sockaddr *)&MyProcPort->laddr.addr,
MyProcPort->laddr.salen,
local_addr, sizeof(local_addr),
NULL, 0, NI_NUMERICHOST);
interconnect_address = MemoryContextStrdup(TopMemoryContext, local_addr);
}
else if (Gp_role == GP_ROLE_DISPATCH)
{
/*
* Here, we can only retrieve the ADDRESS in gp_segment_configuration
* from `cdbcomponent*`. We couldn't get it in a way as the QEs.
*/
CdbComponentDatabaseInfo *qdInfo;
qdInfo = cdbcomponent_getComponentInfo(MASTER_CONTENT_ID);
interconnect_address = MemoryContextStrdup(TopMemoryContext, qdInfo->config->hostip);
}
else if (qdHostname && qdHostname[0] != '\0')
{
Assert(Gp_role == GP_ROLE_EXECUTE);
/*
* QE on the master can't get its interconnect address like that on the primary.
* The QD connects to its postmaster via the unix domain socket.
*/
interconnect_address = qdHostname;
}
else
Assert(false);
}
/*
* performs all necessary setup required for Greenplum Database mode.
*
Expand All @@ -1015,6 +1063,7 @@ cdb_setup(void)
/* If gp_role is UTILITY, skip this call. */
if (Gp_role != GP_ROLE_UTILITY)
{
ensureInterconnectAddress();
/* Initialize the Motion Layer IPC subsystem. */
InitMotionLayerIPC();
}
Expand Down
12 changes: 9 additions & 3 deletions src/backend/cdb/dispatcher/cdbgang.c
Original file line number Diff line number Diff line change
Expand Up @@ -613,10 +613,16 @@ getCdbProcessesForQD(int isPrimary)
proc = makeNode(CdbProcess);

/*
* Set QD listener address to NULL. This will be filled during starting up
* outgoing interconnect connection.
* Set QD listener address to the ADDRESS of the master, so the motions that connect to
* the master knows what the interconnect address of the peer is. `adjustMasterRouting()`
* is not necessary, and it could be wrong if the QD/QE on the master binds a single IP
* address for interconnection instead of the wildcard address. Binding the wildcard address
* for interconnection has some flaws:
* 1. All the QD/QE in the same node share the same port space(for a same AF_INET/AF_INET6),
* which contributes to run out of port.
* 2. When the segments have their own ADDRESS, the connection address could be confusing.
*/
proc->listenerAddr = NULL;
proc->listenerAddr = pstrdup(qdinfo->config->hostip);

if (Gp_interconnect_type == INTERCONNECT_TYPE_UDPIFC)
proc->listenerPort = (Gp_listener_port >> 16) & 0x0ffff;
Expand Down
27 changes: 0 additions & 27 deletions src/backend/cdb/motion/ic_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -722,33 +722,6 @@ removeChunkTransportState(ChunkTransportState *transportStates,
return pEntry;
}

/*
* Set the listener address associated with the slice to
* the master address that is established through libpq
* connection. This guarantees that the outgoing connections
* will connect to an address that is reachable in the event
* when the master can not be reached by segments through
* the network interface recorded in the catalog.
*/
void
adjustMasterRouting(ExecSlice *recvSlice)
{
ListCell *lc = NULL;

Assert(MyProcPort);

foreach(lc, recvSlice->primaryProcesses)
{
CdbProcess *cdbProc = (CdbProcess *) lfirst(lc);

if (cdbProc)
{
if (cdbProc->listenerAddr == NULL)
cdbProc->listenerAddr = pstrdup(MyProcPort->remote_host);
}
}
}

/*
* checkForCancelFromQD
* Check for cancel from QD.
Expand Down
30 changes: 11 additions & 19 deletions src/backend/cdb/motion/ic_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "miscadmin.h"
#include "libpq/libpq-be.h"
#include "libpq/ip.h"
#include "postmaster/postmaster.h"
#include "utils/builtins.h"

#include "cdb/cdbselect.h"
Expand Down Expand Up @@ -141,8 +142,6 @@ setupTCPListeningSocket(int backlog, int *listenerSocketFd, uint16 *listenerPort
*rp;
int s;
char service[32];
char myname[128];
char *localname = NULL;

/*
* we let the system pick the TCP port here so we don't have to manage
Expand All @@ -156,31 +155,26 @@ setupTCPListeningSocket(int backlog, int *listenerSocketFd, uint16 *listenerPort
hints.ai_protocol = 0; /* Any protocol - TCP implied for network use due to SOCK_STREAM */

/*
* We use INADDR_ANY if we don't have a valid address for ourselves (e.g.
* QD local connections tend to be AF_UNIX, or on 127.0.0.1 -- so bind
* everything)
* We set interconnect_address on the primary to the local address of the connection from QD
* to the primary, which is the primary's ADDRESS from gp_segment_configuration,
* used for interconnection.
* However it's wrong on the master. Because the connection from the client to the master may
* have different IP addresses as its destination, which is very likely not the master's
* ADDRESS in gp_segment_configuration.
*/
if (Gp_role == GP_ROLE_DISPATCH || MyProcPort == NULL ||
(MyProcPort->laddr.addr.ss_family != AF_INET &&
MyProcPort->laddr.addr.ss_family != AF_INET6))
localname = NULL; /* We will listen on all network adapters */
else
if (interconnect_address)
{
/*
* Restrict what IP address we will listen on to just the one that was
* used to create this QE session.
*/
getnameinfo((const struct sockaddr *) &(MyProcPort->laddr.addr), MyProcPort->laddr.salen,
myname, sizeof(myname),
NULL, 0, NI_NUMERICHOST);
hints.ai_flags |= AI_NUMERICHOST;
localname = myname;
elog(DEBUG1, "binding to %s only", localname);
ereport(DEBUG1, (errmsg("binding to %s only", interconnect_address)));
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
ereport(DEBUG4, (errmsg("binding listener %s", localname)));
ereport(DEBUG4, (errmsg("binding listener %s", interconnect_address)));
}

s = getaddrinfo(localname, service, &hints, &addrs);
s = getaddrinfo(interconnect_address, service, &hints, &addrs);
if (s != 0)
elog(ERROR, "getaddrinfo says %s", gai_strerror(s));

Expand Down Expand Up @@ -515,8 +509,6 @@ startOutgoingConnections(ChunkTransportState *transportStates,

recvSlice = &transportStates->sliceTable->slices[sendSlice->parentIndex];

adjustMasterRouting(recvSlice);

if (gp_interconnect_aggressive_retry)
{
if ((list_length(recvSlice->children) * list_length(sendSlice->segments)) > listenerBacklog)
Expand Down
30 changes: 22 additions & 8 deletions src/backend/cdb/motion/ic_udpifc.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "libpq/ip.h"
#include "port/atomics.h"
#include "port/pg_crc32c.h"
#include "postmaster/postmaster.h"
#include "storage/latch.h"
#include "storage/pmsignal.h"
#include "utils/builtins.h"
Expand Down Expand Up @@ -1187,7 +1188,26 @@ setupUDPListeningSocket(int *listenerSocketFd, uint16 *listenerPort, int *txFami
#endif

fun = "getaddrinfo";
s = getaddrinfo(NULL, service, &hints, &addrs);
/*
* We set interconnect_address on the primary to the local address of the connection from QD
* to the primary, which is the primary's ADDRESS from gp_segment_configuration,
* used for interconnection.
* However it's wrong on the master. Because the connection from the client to the master may
* have different IP addresses as its destination, which is very likely not the master's
* ADDRESS in gp_segment_configuration.
*/
if (interconnect_address)
{
/*
* Restrict what IP address we will listen on to just the one that was
* used to create this QE session.
*/
hints.ai_flags |= AI_NUMERICHOST;
ereport(DEBUG1, (errmsg("binding to %s only", interconnect_address)));
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
ereport(DEBUG4, (errmsg("binding address %s", interconnect_address)));
}
s = getaddrinfo(interconnect_address, service, &hints, &addrs);
if (s != 0)
elog(ERROR, "getaddrinfo says %s", gai_strerror(s));

Expand Down Expand Up @@ -2643,12 +2663,6 @@ startOutgoingUDPConnections(ChunkTransportState *transportStates,

recvSlice = &transportStates->sliceTable->slices[sendSlice->parentIndex];

/*
* Potentially introduce a Bug (MPP-17186). The workaround is to turn off
* log_hostname guc.
*/
adjustMasterRouting(recvSlice);

if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "Interconnect seg%d slice%d setting up sending motion node",
GpIdentity.segindex, sendSlice->sliceIndex);
Expand Down Expand Up @@ -6874,7 +6888,7 @@ SendDummyPacket(void)
hint.ai_flags = AI_NUMERICHOST;
#endif

ret = pg_getaddrinfo_all(NULL, port_str, &hint, &addrs);
ret = pg_getaddrinfo_all(interconnect_address, port_str, &hint, &addrs);
if (ret || !addrs)
{
elog(LOG, "send dummy packet failed, pg_getaddrinfo_all(): %m");
Expand Down
6 changes: 6 additions & 0 deletions src/backend/postmaster/postmaster.c
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ char *Unix_socket_directories;
/* The TCP listen address(es) */
char *ListenAddresses;

/*
* The interconnect address. We assume the interconnect is the address
* in gp_segment_configuration. And it's never changed at runtime.
*/
char *interconnect_address = NULL;

/*
* ReservedBackends is the number of backends reserved for superuser use.
* This number is taken out of the pool size given by MaxBackends so
Expand Down
1 change: 0 additions & 1 deletion src/include/cdb/ml_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@ extern void TeardownUDPIFCInterconnect(ChunkTransportState *transportStates,
bool forceEOS);

extern uint32 getActiveMotionConns(void);
extern void adjustMasterRouting(ExecSlice *recvSlice);

extern char *format_sockaddr(struct sockaddr_storage *sa, char *buf, size_t len);

Expand Down
1 change: 1 addition & 0 deletions src/include/postmaster/postmaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ extern int Unix_socket_permissions;
extern char *Unix_socket_group;
extern char *Unix_socket_directories;
extern char *ListenAddresses;
extern char *interconnect_address;
extern bool ClientAuthInProgress;
extern int PreAuthDelay;
extern int AuthenticationTimeout;
Expand Down

0 comments on commit 790c7ba

Please sign in to comment.