Skip to content

Commit

Permalink
Fix up some potential memory leaks in fwrite (#6757)
Browse files Browse the repository at this point in the history
* More careful about free() before any STOP(), #ifndef location

* Also don't assign mystream unless is_gzip

* missing nocov ends

* zstreams on stack, no more thread_streams on heap

* back to !BUFF style of alloc checking

* Avoid the leak on zero-row fwrite

* Avoid memory leak on error path

This isn't covered by the tests, but manually failing this allocation in
vgdb results in a leak otherwise:

268,096 (5,952 direct, 262,144 indirect) bytes in 1 blocks are definitely lost
in loss record 1,574 of 1,601
   at 0x48407B4: malloc (vg_replace_malloc.c:381)
   by 0x74ACD86: deflateInit2_ (in /lib/x86_64-linux-gnu/libz.so.1.2.13)
   by 0x90EA5232: init_stream (fwrite.c:576)
   by 0x90EA5EB0: fwriteMain (fwrite.c:806)
   by 0x90EA79EE: fwriteR (fwriteR.c:310)

* Typo, translate

Co-authored-by: aitap <krylov.r00t@gmail.com>

* more translations of DTPRINT

* Skip redundant 'else'

* new cite in NEWS

---------

Co-authored-by: Philippe Chataignon <philippe.chataignon@gmail.com>
Co-authored-by: Ivan K <krylov.r00t@gmail.com>
  • Loading branch information
3 people authored Jan 24, 2025
1 parent 4899b39 commit 79aed53
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 61 deletions.
2 changes: 1 addition & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ rowwiseDT(

6. `fread()` gains `logicalYN` argument to read columns consisting only of strings `Y`, `N` as `logical` (as opposed to character), [#4563](https://github.com/Rdatatable/data.table/issues/4563). The default is controlled by option `datatable.logicalYN`, itself defaulting to `FALSE`, for back-compatibility -- some smaller tables (especially sharded tables) might inadvertently read a "true" string column as `logical` and cause bugs. This is particularly important for tables with a column named `y` or `n` -- automatic header detection under `logicalYN=TRUE` will see these values in the first row as being "data" as opposed to column names. A parallel option was not included for `fwrite()` at this time -- users looking for a compact representation of logical columns can still use `fwrite(logical01=TRUE)`. We also opted for now to check only `Y`, `N` and not `Yes`/`No`/`YES`/`NO`.

7. `fwrite()` with `compress="gzip"` produces compatible gz files when composed of multiple independent chunks owing to parallelization, [#6356](https://github.com/Rdatatable/data.table/issues/6356). Earlier `fwrite()` versions could have issues with HTTP upload using `Content-Encoding: gzip` and `Transfer-Encoding: chunked`. Thanks to @oliverfoster for report and @philippechataignon for the fix.
7. `fwrite()` with `compress="gzip"` produces compatible gz files when composed of multiple independent chunks owing to parallelization, [#6356](https://github.com/Rdatatable/data.table/issues/6356). Earlier `fwrite()` versions could have issues with HTTP upload using `Content-Encoding: gzip` and `Transfer-Encoding: chunked`. Thanks to @oliverfoster for report and @philippechataignon for the fix. Thanks also @aitap for pre-release testing that found some possible memory leaks in the initial fix.

8. `fwrite()` gains a new parameter `compressLevel` to control compression level for gzip, [#5506](https://github.com/Rdatatable/data.table/issues/5506). This parameter balances compression speed and total compression, and corresponds directly to the analogous command-line parameter, e.g. `compressLevel=4` corresponds to passing `-4`; the default, `6`, matches the command-line default, i.e. equivalent to passing `-6`. Thanks @mgarbuzov for the request and @philippechataignon for implementing.

Expand Down
118 changes: 58 additions & 60 deletions src/fwrite.c
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ void fwriteMain(fwriteMainArgs args)
}

// alloc nth write buffers
errno=0;
errno = 0;
size_t alloc_size = nth * buffSize;
if (verbose) {
DTPRINT(_("Allocate %zu bytes (%zu MiB) for buffPool\n"), alloc_size, alloc_size / MEGA);
Expand All @@ -797,28 +797,20 @@ void fwriteMain(fwriteMainArgs args)

// init compress variables
#ifndef NOZLIB
z_stream *thread_streams = NULL;
z_stream strm;
// NB: fine to free() this even if unallocated
char *zbuffPool = NULL;
size_t zbuffSize = 0;
size_t compress_len = 0;
if (args.is_gzip) {
// alloc zlib streams
thread_streams = (z_stream*) malloc(nth * sizeof(z_stream));
if (verbose) {
DTPRINT(_("Allocate %zu bytes for thread_streams\n"), nth * sizeof(z_stream));
// compute zbuffSize which is the same for each thread
if (init_stream(&strm) != Z_OK) {
// # nocov start
free(buffPool);
STOP(_("Can't init stream structure for deflateBound"));
// # nocov end
}
if (!thread_streams)
STOP(_("Failed to allocated %d bytes for threads_streams."), (int)(nth * sizeof(z_stream))); // #nocov
// VLA on stack should be fine for nth structs; in zlib v1.2.11 sizeof(struct)==112 on 64bit
// not declared inside the parallel region because solaris appears to move the struct in
// memory when the #pragma omp for is entered, which causes zlib's internal self reference
// pointer to mismatch, #4099

// compute zbuffSize which is the same for each thread
z_stream *stream = thread_streams;
if (init_stream(stream) != Z_OK)
STOP(_("Can't init stream structure for deflateBound")); // #nocov
zbuffSize = deflateBound(stream, buffSize);
zbuffSize = deflateBound(&strm, buffSize);
if (verbose)
DTPRINT(_("zbuffSize=%d returned from deflateBound\n"), (int)zbuffSize);

Expand All @@ -832,11 +824,13 @@ void fwriteMain(fwriteMainArgs args)
if (!zbuffPool) {
// # nocov start
free(buffPool);
deflateEnd(&strm);
STOP(_("Unable to allocate %zu MiB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."),
zbuffSize / MEGA, nth, errno, strerror(errno));
// # nocov end
}
}

#endif

// write header
Expand Down Expand Up @@ -880,11 +874,8 @@ void fwriteMain(fwriteMainArgs args)
DTPRINT("%s", buff);
} else {
int ret0=0, ret1=0, ret2=0;
if (args.is_gzip) {
#ifndef NOZLIB
z_stream *stream = thread_streams;
if (init_stream(stream) != Z_OK)
STOP(_("Can't init stream structure for writing header")); // #nocov
if (args.is_gzip) {
char* zbuff = zbuffPool;
// write minimal gzip header
char* header = "\037\213\10\0\0\0\0\0\0\3";
Expand All @@ -895,19 +886,26 @@ void fwriteMain(fwriteMainArgs args)
size_t zbuffUsed = zbuffSize;
len = (size_t)(ch - buff);
crc = crc32(crc, (unsigned char*)buff, len);
ret1 = compressbuff(stream, zbuff, &zbuffUsed, buff, len);
ret1 = compressbuff(&strm, zbuff, &zbuffUsed, buff, len);
deflateEnd(&strm);
if (ret1==Z_OK) {
ret2 = WRITE(f, zbuff, (int)zbuffUsed);
compress_len += zbuffUsed;
}
#endif
} else {
#endif
ret2 = WRITE(f, buff, (int)(ch-buff));
#ifndef NOZLIB
}
#endif
if (ret0 == -1 || ret1 || ret2 == -1) {
// # nocov start
int errwrite = errno; // capture write errno now in case close fails with a different errno
CLOSE(f);
free(buffPool);
#ifndef NOZLIB
free(zbuffPool);
#endif
if (ret0 == -1) STOP(_("Can't write gzip header error: %d"), ret0);
else if (ret1) STOP(_("Compress gzip error: %d"), ret1);
else STOP(_("%s: '%s'"), strerror(errwrite), args.filename);
Expand All @@ -922,6 +920,10 @@ void fwriteMain(fwriteMainArgs args)
if (args.nrow == 0) {
if (verbose)
DTPRINT(_("No data rows present (nrow==0)\n"));
free(buffPool);
#ifndef NOZLIB
free(zbuffPool);
#endif
if (f != -1 && CLOSE(f))
STOP(_("%s: '%s'"), strerror(errno), args.filename); // # nocov
return;
Expand All @@ -947,14 +949,14 @@ void fwriteMain(fwriteMainArgs args)
char* ch = myBuff;

#ifndef NOZLIB
z_stream mystream;
size_t mylen = 0;
int mycrc = 0;
z_stream *mystream = &thread_streams[me];
void *myzBuff = NULL;
size_t myzbuffUsed = 0;
if (args.is_gzip) {
myzBuff = zbuffPool + me * zbuffSize;
if (init_stream(mystream) != Z_OK) { // this should be thread safe according to zlib documentation
if (init_stream(&mystream) != Z_OK) { // this should be thread safe according to zlib documentation
failed = true; // # nocov
my_failed_compress = -998; // # nocov
}
Expand Down Expand Up @@ -1002,7 +1004,8 @@ void fwriteMain(fwriteMainArgs args)
myzbuffUsed = zbuffSize;
mylen = (size_t)(ch - myBuff);
mycrc = crc32(0, (unsigned char*)myBuff, mylen);
int ret = compressbuff(mystream, myzBuff, &myzbuffUsed, myBuff, mylen);
int ret = compressbuff(&mystream, myzBuff, &myzbuffUsed, myBuff, mylen);
deflateEnd(&mystream);
if (ret) {
failed=true;
my_failed_compress=ret;
Expand All @@ -1023,25 +1026,27 @@ void fwriteMain(fwriteMainArgs args)
if (f == -1) {
*ch='\0'; // standard C string end marker so DTPRINT knows where to stop
DTPRINT("%s", myBuff);
} else if (args.is_gzip) {
} else
#ifndef NOZLIB
ret = WRITE(f, myzBuff, (int)myzbuffUsed);
compress_len += myzbuffUsed;
if (args.is_gzip) {
ret = WRITE(f, myzBuff, (int)myzbuffUsed);
compress_len += myzbuffUsed;
} else
#endif
} else {
{
ret = WRITE(f, myBuff, (int)(ch-myBuff));
}
if (ret == -1) {
failed=true; // # nocov
failed_write=errno; // # nocov
}

if (args.is_gzip) {
#ifndef NOZLIB
if (args.is_gzip) {
crc = crc32_combine(crc, mycrc, mylen);
len += mylen;
#endif
}
#endif

int used = 100 * ((double)(ch - myBuff)) / buffSize; // percentage of original buffMB
if (used > maxBuffUsedPC)
Expand All @@ -1067,36 +1072,29 @@ void fwriteMain(fwriteMainArgs args)
}
}
}
if (args.is_gzip) {
#ifndef NOZLIB
deflateEnd(mystream);
#endif
}

} // end of parallel for loop

free(buffPool);

#ifndef NOZLIB
free(zbuffPool);

/* put a 4-byte integer into a byte array in LSB order */
#define PUT4(a,b) ((a)[0]=(b), (a)[1]=(b)>>8, (a)[2]=(b)>>16, (a)[3]=(b)>>24)

// write gzip tailer with crc and len
if (args.is_gzip) {
#ifndef NOZLIB
unsigned char tail[10];
tail[0] = 3;
tail[1] = 0;
PUT4(tail + 2, crc);
PUT4(tail + 6, len);
int ret = WRITE(f, tail, 10);
compress_len += 10;
if (ret == -1)
STOP("Error: can't write gzip tailer"); // # nocov
#endif
}

free(buffPool);
#ifndef NOZLIB
free(thread_streams);
free(zbuffPool);
if (args.is_gzip) {
unsigned char tail[10];
tail[0] = 3;
tail[1] = 0;
PUT4(tail + 2, crc);
PUT4(tail + 6, len);
int ret = WRITE(f, tail, 10);
compress_len += 10;
if (ret == -1)
STOP(_("Failed to write gzip trailer")); // # nocov
}
#endif

// Finished parallel region and can call R API safely now.
Expand All @@ -1112,13 +1110,13 @@ void fwriteMain(fwriteMainArgs args)
}

if (verbose) {
if (args.is_gzip) {
#ifndef NOZLIB
DTPRINT("zlib: uncompressed length=%zu (%zu MiB), compressed length=%zu (%zu MiB), ratio=%.1f%%, crc=%x\n",
if (args.is_gzip) {
DTPRINT(_("zlib: uncompressed length=%zu (%zu MiB), compressed length=%zu (%zu MiB), ratio=%.1f%%, crc=%x\n"),
len, len / MEGA, compress_len, compress_len / MEGA, len != 0 ? (100.0 * compress_len) / len : 0, crc);
#endif
}
DTPRINT("Written %"PRId64" rows in %.3f secs using %d thread%s. MaxBuffUsed=%d%%\n",
#endif
DTPRINT(_("Written %"PRId64" rows in %.3f secs using %d thread%s. MaxBuffUsed=%d%%\n"),
args.nrow, 1.0*(wallclock()-t0), nth, nth ==1 ? "" : "s", maxBuffUsedPC);
}

Expand Down

0 comments on commit 79aed53

Please sign in to comment.