Skip to content

Commit

Permalink
fwrite parallelized. #580
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdowle committed Apr 15, 2016
1 parent 5d44456 commit 9694b99
Showing 1 changed file with 115 additions and 121 deletions.
236 changes: 115 additions & 121 deletions src/fwrite.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <unistd.h> // for access()
#include <fcntl.h>
#include <time.h>
#include <omp.h>
#ifdef WIN32
#include <sys/types.h>
#include <sys/stat.h>
Expand Down Expand Up @@ -121,46 +122,25 @@ SEXP writefile(SEXP list_of_columns,
}
lineLenMax++; // column separator
}

int bufSize = 4*1024*1024; // 4MB buffer. Large enough to fit many lines. Small enough to fit in cache.
if (lineLenMax > bufSize) bufSize = lineLenMax;
int writeTrigger = bufSize-lineLenMax-1; // When to write.
// TODO: int linesPerBuf = bufSize/lineLenMax
if (verbose) Rprintf("lineLenMax: %d\nbufSize: %d\nwriteTrigger: %d\n", lineLenMax, bufSize, writeTrigger);

char *buffer = Calloc(bufSize, char);
if (buffer == NULL) error("Unable to allocate %dMB buffer", bufSize/(1024*1024));

clock_t tlineLenMax=clock()-t0;
if (verbose) Rprintf("Maximum line length is %d calculated in %.3fs\n", lineLenMax, 1.0*tlineLenMax/CLOCKS_PER_SEC);
// TODO: could parallelize by column, but currently no need as insignificant time
t0=clock();
clock_t t1,tformat=0,twrite=0;
// clock_t tt0,tSTR=0,tNUM=0;
SEXP str;
char *ch = buffer;
int numWrite=0;

// Write the column names

if (verbose) Rprintf("Writing column names ... ");
if (LOGICAL(col_names)[0]) {
SEXP names = getAttrib(list_of_columns, R_NamesSymbol);
SEXP names = getAttrib(list_of_columns, R_NamesSymbol);
if (names!=NULL) {
if (LENGTH(names) != ncols) error("Internal error: length of column names is not equal to the number of columns. Please report.");
int bufSize = 0;
for (int col_i=0; col_i<ncols; col_i++) bufSize += LENGTH(STRING_ELT(names, col_i));
bufSize *= 1+quote; // in case every colname is filled with quotes to be escaped!
bufSize += ncols*(2*quote + 1) + 3;
char *buffer = malloc(bufSize);
if (buffer == NULL) error("Unable to allocate %dMB buffer for column names", bufSize/(1024*1024));
char *ch = buffer;
for (int col_i=0; col_i<ncols; col_i++) {
str = STRING_ELT(names, col_i);

// The lineLenMax was just for data rows, not the column names.
// Be robust to enormously long column names e.g. 100,000 character single column name which might not fit in the buffer
int maxLen = 2 + (str==NA_STRING ? na_len : LENGTH(str)*(1+quote)+quote*2);
// ^ to cover sep or windows eol
if (maxLen >= bufSize) error("The buffer isn't big enough to hold single column name %d", col_i+1);
if (ch-buffer+maxLen >= bufSize) {
if (ch == buffer) error("Internal error: ch>buffer at this point");
// write out the column names we've written to the buffer already and rewind the buffer
if (WRITE(f, buffer, (int)(ch-buffer)) == -1) { close(f); error("Error writing to file: %s", filename); }
numWrite++;
ch = buffer;
}
// now proceed safely to write the column name to the buffer

SEXP str = STRING_ELT(names, col_i);
if (str==NA_STRING) {
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
break;
Expand All @@ -176,103 +156,117 @@ SEXP writefile(SEXP list_of_columns,
ch--; // backup onto the last col_sep after the last column
memcpy(ch, row_sep, row_sep_len); // replace it with the newline
ch += row_sep_len;

// For gigantic data row lengths the buffer may only be able to hold one row at a time. So write the column
// names now and clear the buffer, just in case, ready for the first data row.
if (WRITE(f, buffer, (int)(ch-buffer)) == -1) { close(f); error("Error writing to file: %s", filename); }
numWrite++;
ch = buffer;
free(buffer);
}
}
if (verbose) Rprintf("done in %.3fs\n", 1.0*(clock()-t0)/CLOCKS_PER_SEC);
if (nrows == 0) {
if (verbose) Rprintf("No data rows present (nrow==0)\n");
if (CLOSE(f)) error("Error closing file: %s", filename);
return(R_NilValue);
}

// Decide buffer size on each core
// Large enough to fit many lines (to reduce calls to write()). Small enough to fit in each core's cache.
// If the lines turn out smaller, that's ok. we just won't use all the buffer in that case. But we must be
// sure to allow for worst case; i.e. every row in the batch all being the maximum line length.
int bufSize = 1*1024*1024; // 1MB TODO: experiment / fetch cache size
if (lineLenMax > bufSize) bufSize = lineLenMax;
const int rowsPerBatch = bufSize/lineLenMax;
const int numBatches = (nrows-1)/rowsPerBatch + 1;
if (verbose) Rprintf("Writing data rows in %d batches of %d rows (each buffer size %.3fMB) ... ",
numBatches, rowsPerBatch, 1.0*bufSize/(1024*1024));
t0 = clock();

// Write the data rows
for (RLEN row_i = 0; row_i < nrows; row_i++) {
for (int col_i = 0; col_i < ncols; col_i++) {
SEXP column = VECTOR_ELT(list_of_columns, col_i);
switch(TYPEOF(column)) {
case LGLSXP:
true_false = LOGICAL(column)[row_i];
if (true_false == NA_LOGICAL) {
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
} else if (true_false) {
memcpy(ch,"TRUE",4); // Other than strings, field widths are limited which we check elsewhere here to ensure
ch += 4;
} else {
memcpy(ch,"FALSE",5);
ch += 5;
}
break;
case REALSXP:
if (ISNA(REAL(column)[row_i])) {
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
} else {
//tt0 = clock();
ch += sprintf(ch, "%.15G", REAL(column)[row_i]);
//tNUM += clock()-tt0;
}
break;
case INTSXP:
if (INTEGER(column)[row_i] == NA_INTEGER) {
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
} else if (levels[col_i] != NULL) { // isFactor(column) == TRUE
str = STRING_ELT(levels[col_i], INTEGER(column)[row_i]-1);
if (quote) {
QUOTE_FIELD;
} else {
memcpy(ch, CHAR(str), LENGTH(str));
ch += LENGTH(str);
int nth;
#pragma omp parallel
{
char *buffer = malloc(bufSize); // one buffer per thread
// TODO Ask Norm how to error() safely ... if (buffer == NULL) error("Unable to allocate %dMB buffer", bufSize/(1024*1024));
char *ch = buffer;
#pragma omp single
{
nth = omp_get_num_threads();
}
#pragma omp for ordered schedule(dynamic)
for(RLEN start_row = 0; start_row < nrows; start_row += rowsPerBatch) {
int upp = start_row + rowsPerBatch;
if (upp > nrows) upp = nrows;
for (RLEN row_i = start_row; row_i < upp; row_i++) {
for (int col_i = 0; col_i < ncols; col_i++) {
SEXP column = VECTOR_ELT(list_of_columns, col_i);
SEXP str;
switch(TYPEOF(column)) {
case LGLSXP:
true_false = LOGICAL(column)[row_i];
if (true_false == NA_LOGICAL) {
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
} else if (true_false) {
memcpy(ch,"TRUE",4); // Other than strings, field widths are limited which we check elsewhere here to ensure
ch += 4;
} else {
memcpy(ch,"FALSE",5);
ch += 5;
}
break;
case REALSXP:
if (ISNA(REAL(column)[row_i])) {
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
} else {
//tt0 = clock();
ch += sprintf(ch, "%.15G", REAL(column)[row_i]);
//tNUM += clock()-tt0;
}
break;
case INTSXP:
if (INTEGER(column)[row_i] == NA_INTEGER) {
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
} else if (levels[col_i] != NULL) { // isFactor(column) == TRUE
str = STRING_ELT(levels[col_i], INTEGER(column)[row_i]-1);
if (quote) {
QUOTE_FIELD;
} else {
memcpy(ch, CHAR(str), LENGTH(str));
ch += LENGTH(str);
}
} else {
ch += sprintf(ch, "%d", INTEGER(column)[row_i]);
}
break;
case STRSXP:
str = STRING_ELT(column, row_i);
if (str==NA_STRING) {
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
} else if (quote) {
QUOTE_FIELD;
} else {
//tt0 = clock();
memcpy(ch, CHAR(str), LENGTH(str)); // could have large fields. Doubt call overhead is much of an issue on small fields.
ch += LENGTH(str);
//tSTR += clock()-tt0;
}
break;
// default:
// An uncovered type would have already thrown above when calculating maxLineLen earlier
}
} else {
ch += sprintf(ch, "%d", INTEGER(column)[row_i]);
}
break;
case STRSXP:
str = STRING_ELT(column, row_i);
if (str==NA_STRING) {
if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; }
} else if (quote) {
QUOTE_FIELD;
} else {
//tt0 = clock();
memcpy(ch, CHAR(str), LENGTH(str)); // could have large fields. Doubt call overhead is much of an issue on small fields.
ch += LENGTH(str);
//tSTR += clock()-tt0;
*ch++ = col_sep;
}
break;
default:
error("Column %d's type is '%s' - not yet implemented.", col_i+1,type2char(TYPEOF(column)) );
ch--; // backup onto the last col_sep after the last column
memcpy(ch, row_sep, row_sep_len); // replace it with the newline. TODO: replace memcpy call with eol1 eol2 --eolLen
ch += row_sep_len;
}
#pragma omp ordered
{
WRITE(f, buffer, (int)(ch-buffer));
// TODO: safe way to throw if ( == -1) { close(f); error("Error writing to file: %s", filename);
ch = buffer;
}
*ch++ = col_sep;
}
ch--; // backup onto the last col_sep after the last column
memcpy(ch, row_sep, row_sep_len); // replace it with the newline. TODO: replace memcpy call with eol1 eol2 --eolLen
ch += row_sep_len;
// Rprintf("Writing a line out length %d %10s\n", (int)(ch-buffer), buffer);
if ((ch-buffer)>writeTrigger) {
t1 = clock(); tformat += t1-t0; t0 = t1;
if (WRITE(f, buffer, (int)(ch-buffer)) == -1) { close(f); error("Error writing to file: %s", filename); }
t1 = clock(); twrite += t1-t0; t0 = t1;
numWrite++;
ch = buffer;
}
free(buffer);
}
if (ch>buffer) {
// write last batch remaining in buffer
t1 = clock(); tformat += t1-t0; t0 = t1;
if (WRITE(f, buffer, (int)(ch-buffer)) == -1) { close(f); error("Error writing to file: %s", filename); }
numWrite++;
t1 = clock(); twrite += t1-t0; t0 = t1;
}
if (verbose) Rprintf("all %d threads done\n", nth); // TO DO: report elapsed time since (clock()-t0)/NTH is only estimate
if (CLOSE(f)) error("Error closing file: %s", filename);
Free(buffer);
clock_t total = tlineLenMax + tformat + twrite;
if (verbose) {
Rprintf("%8.3fs (%3.0f%%) calc max line length\n", 1.0*tlineLenMax/CLOCKS_PER_SEC, 100.0*tlineLenMax/total);
Rprintf("%8.3fs (%3.0f%%) format\n", 1.0*tformat/CLOCKS_PER_SEC, 100.0*tformat/total);
Rprintf("%8.3fs (%3.0f%%) write (%d calls)\n", 1.0*twrite/CLOCKS_PER_SEC, 100.0*twrite/total, numWrite);
//Rprintf(" %8.3fs (%3.0f%%) STR\n", 1.0*tSTR/CLOCKS_PER_SEC, 100.0*tSTR/tformat);
//Rprintf(" %8.3fs (%3.0f%%) NUM\n", 1.0*tNUM/CLOCKS_PER_SEC, 100.0*tNUM/tformat);
}
return(R_NilValue); // must always return SEXP from C level otherwise hang on Windows
}

Expand Down

0 comments on commit 9694b99

Please sign in to comment.