Skip to content

Commit

Permalink
Fix copy from directory table. (apache#416)
Browse files Browse the repository at this point in the history
We use temporary tupledesction in copy from directory table, which
will have no side effect on relcache.
  • Loading branch information
wenchaozhang-123 authored and zhangwenchao committed Apr 28, 2024
1 parent 2c61f05 commit 9781717
Showing 1 changed file with 55 additions and 35 deletions.
90 changes: 55 additions & 35 deletions src/backend/commands/copyfrom.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,14 @@ SendCopyFromForwardedTuple(CopyFromState cstate,
char *line,
int line_len,
Datum *values,
bool *nulls);
bool *nulls,
bool is_directory_table);
static void SendCopyFromForwardedHeader(CopyFromState cstate, CdbCopy *cdbCopy);
static void SendCopyFromForwardedError(CopyFromState cstate, CdbCopy *cdbCopy, char *errmsg);

static bool NextCopyFromDispatch(CopyFromState cstate, ExprContext *econtext,
Datum *values, bool *nulls);
static bool NextCopyFromExecute(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls);
static bool NextCopyFromExecute(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls, bool is_directory_table);
static bool NextCopyFromRawFieldsX(CopyFromState cstate, char ***fields, int *nfields,
int stop_processing_at_field);
static bool NextCopyFromX(CopyFromState cstate, ExprContext *econtext,
Expand Down Expand Up @@ -1026,7 +1027,8 @@ CopyFromDirectoryTable(CopyFromState cstate)
cstate->line_buf.data,
cstate->line_buf.len,
myslot->tts_values,
myslot->tts_isnull);
myslot->tts_isnull,
true);
{
int64 total_completed_from_qes;
int64 total_rejected_from_qes;
Expand Down Expand Up @@ -1054,34 +1056,16 @@ CopyFromDirectoryTable(CopyFromState cstate)

econtext = GetPerTupleExprContext(estate);

if (NextCopyFromExecute(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
if (NextCopyFromExecute(cstate, econtext, myslot->tts_values, myslot->tts_isnull, true))
{
if (tupdesc)
pfree(tupdesc);

tupdesc = CreateTemplateTupleDesc(DIRECTORY_TABLE_COLUMNS);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "relative_path",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "size",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_modified",
TIMESTAMPTZOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "md5",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "tag",
TEXTOID, -1 ,0);

tmpslot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
tmpslot = MakeSingleTupleTableSlot(cstate->rel->rd_att, &TTSOpsVirtual);

for (i = 0; i < DIRECTORY_TABLE_COLUMNS; i++)
{
tmpslot->tts_values[i] = myslot->tts_values[i];
tmpslot->tts_isnull[i] = myslot->tts_isnull[i];
}

cstate->rel->rd_att = CreateTupleDescCopy(tupdesc);
cstate->rel->rd_att->tdrefcount = 1; /* mark as refcounted */

/*
* Reset the per-tuple exprcontext. We do this after every tuple, to
* clean-up after expression evaluations etc.
Expand Down Expand Up @@ -1129,9 +1113,6 @@ CopyFromDirectoryTable(CopyFromState cstate)

list_free(recheckIndexes);

if (tupdesc)
pfree(tupdesc);

if (UFileExists(dirTable->spcId, orgiFileName))
{
UFileUnlink(dirTable->spcId, orgiFileName);
Expand Down Expand Up @@ -1174,7 +1155,8 @@ CopyFromDirectoryTable(CopyFromState cstate)
errmsg("unable to sync file \"%s\": %s", glob_copystmt->dirfilename, UFileGetLastError(file))));

UFileClose(file);


ReleaseTupleDesc(cstate->rel->rd_att);
ExecClearTuple(myslot);
ExecClearTuple(tmpslot);

Expand Down Expand Up @@ -1345,8 +1327,6 @@ BeginCopyFromDirectoryTable(ParseState *pstate,

num_phys_attrs = tupDesc->natts;

cstate->rel->rd_att = CreateTupleDescCopy(tupDesc);
cstate->rel->rd_att->tdrefcount = 1; /* mark as refcounted */
cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, NIL);

/*
Expand Down Expand Up @@ -1929,7 +1909,7 @@ CopyFrom(CopyFromState cstate)

if (cstate->dispatch_mode == COPY_EXECUTOR)
{
if (!NextCopyFromExecute(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
if (!NextCopyFromExecute(cstate, econtext, myslot->tts_values, myslot->tts_isnull, false))
break;

/*
Expand Down Expand Up @@ -2161,7 +2141,8 @@ CopyFrom(CopyFromState cstate)
cstate->line_buf.data,
cstate->line_buf.len,
myslot->tts_values,
myslot->tts_isnull);
myslot->tts_isnull,
false);
skip_tuple = true;
processed++;
}
Expand Down Expand Up @@ -3409,7 +3390,7 @@ NextCopyFromDispatch(CopyFromState cstate, ExprContext *econtext,
* rows from the QD.
*/
static bool
NextCopyFromExecute(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
NextCopyFromExecute(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls, bool is_directory_table)
{
TupleDesc tupDesc;
AttrNumber num_phys_attrs,
Expand All @@ -3420,7 +3401,26 @@ NextCopyFromExecute(CopyFromState cstate, ExprContext *econtext, Datum *values,
int r;
bool got_error;

tupDesc = RelationGetDescr(cstate->rel);
if (!is_directory_table)
{
tupDesc = RelationGetDescr(cstate->rel);
}
else
{
tupDesc = CreateTemplateTupleDesc(6);
TupleDescInitEntry(tupDesc, (AttrNumber) 1, "relative_path",
TEXTOID, -1, 0);
TupleDescInitEntry(tupDesc, (AttrNumber) 2, "size",
INT8OID, -1, 0);
TupleDescInitEntry(tupDesc, (AttrNumber) 3, "last_modified",
TIMESTAMPTZOID, -1, 0);
TupleDescInitEntry(tupDesc, (AttrNumber) 4, "md5",
TEXTOID, -1, 0);
TupleDescInitEntry(tupDesc, (AttrNumber) 5, "tag",
TEXTOID, -1 ,0);
TupleDescInitEntry(tupDesc, (AttrNumber) 6, "file",
TEXTOID, -1, 0);
}
num_phys_attrs = tupDesc->natts;
attr_count = list_length(cstate->attnumlist);

Expand Down Expand Up @@ -3727,7 +3727,8 @@ SendCopyFromForwardedTuple(CopyFromState cstate,
char *line,
int line_len,
Datum *values,
bool *nulls)
bool *nulls,
bool is_directory_table)
{
TupleDesc tupDesc;
FormData_pg_attribute *attr;
Expand All @@ -3740,7 +3741,26 @@ SendCopyFromForwardedTuple(CopyFromState cstate,
if (!OidIsValid(RelationGetRelid(rel)))
elog(ERROR, "invalid target table OID in COPY");

tupDesc = RelationGetDescr(rel);
if (!is_directory_table)
{
tupDesc = RelationGetDescr(rel);
}
else
{
tupDesc = CreateTemplateTupleDesc(6);
TupleDescInitEntry(tupDesc, (AttrNumber) 1, "relative_path",
TEXTOID, -1, 0);
TupleDescInitEntry(tupDesc, (AttrNumber) 2, "size",
INT8OID, -1, 0);
TupleDescInitEntry(tupDesc, (AttrNumber) 3, "last_modified",
TIMESTAMPTZOID, -1, 0);
TupleDescInitEntry(tupDesc, (AttrNumber) 4, "md5",
TEXTOID, -1, 0);
TupleDescInitEntry(tupDesc, (AttrNumber) 5, "tag",
TEXTOID, -1 ,0);
TupleDescInitEntry(tupDesc, (AttrNumber) 6, "file",
TEXTOID, -1, 0);
}
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;

Expand Down

0 comments on commit 9781717

Please sign in to comment.