diff --git a/src/fwrite.c b/src/fwrite.c index de5f6ba186..523b3ce1d7 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -5,6 +5,7 @@ #include // for access() #include #include +#include #ifdef WIN32 #include #include @@ -121,46 +122,25 @@ SEXP writefile(SEXP list_of_columns, } lineLenMax++; // column separator } - - int bufSize = 4*1024*1024; // 4MB buffer. Large enough to fit many lines. Small enough to fit in cache. - if (lineLenMax > bufSize) bufSize = lineLenMax; - int writeTrigger = bufSize-lineLenMax-1; // When to write. - // TODO: int linesPerBuf = bufSize/lineLenMax - if (verbose) Rprintf("lineLenMax: %d\nbufSize: %d\nwriteTrigger: %d\n", lineLenMax, bufSize, writeTrigger); - - char *buffer = Calloc(bufSize, char); - if (buffer == NULL) error("Unable to allocate %dMB buffer", bufSize/(1024*1024)); - clock_t tlineLenMax=clock()-t0; + if (verbose) Rprintf("Maximum line length is %d calculated in %.3fs\n", lineLenMax, 1.0*tlineLenMax/CLOCKS_PER_SEC); + // TODO: could parallelize by column, but currently no need as insignificant time t0=clock(); - clock_t t1,tformat=0,twrite=0; - // clock_t tt0,tSTR=0,tNUM=0; - SEXP str; - char *ch = buffer; - int numWrite=0; - - // Write the column names + + if (verbose) Rprintf("Writing column names ... "); if (LOGICAL(col_names)[0]) { - SEXP names = getAttrib(list_of_columns, R_NamesSymbol); + SEXP names = getAttrib(list_of_columns, R_NamesSymbol); if (names!=NULL) { if (LENGTH(names) != ncols) error("Internal error: length of column names is not equal to the number of columns. Please report."); + int bufSize = 0; + for (int col_i=0; col_i= bufSize) error("The buffer isn't big enough to hold single column name %d", col_i+1); - if (ch-buffer+maxLen >= bufSize) { - if (ch == buffer) error("Internal error: ch>buffer at this point"); - // write out the column names we've written to the buffer already and rewind the buffer - if (WRITE(f, buffer, (int)(ch-buffer)) == -1) { close(f); error("Error writing to file: %s", filename); } - numWrite++; - ch = buffer; - } - // now proceed safely to write the column name to the buffer - + SEXP str = STRING_ELT(names, col_i); if (str==NA_STRING) { if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; } break; @@ -176,103 +156,117 @@ SEXP writefile(SEXP list_of_columns, ch--; // backup onto the last col_sep after the last column memcpy(ch, row_sep, row_sep_len); // replace it with the newline ch += row_sep_len; - - // For gigantic data row lengths the buffer may only be able to hold one row at a time. So write the column - // names now and clear the buffer, just in case, ready for the first data row. if (WRITE(f, buffer, (int)(ch-buffer)) == -1) { close(f); error("Error writing to file: %s", filename); } - numWrite++; - ch = buffer; + free(buffer); } } + if (verbose) Rprintf("done in %.3fs\n", 1.0*(clock()-t0)/CLOCKS_PER_SEC); + if (nrows == 0) { + if (verbose) Rprintf("No data rows present (nrow==0)\n"); + if (CLOSE(f)) error("Error closing file: %s", filename); + return(R_NilValue); + } + + // Decide buffer size on each core + // Large enough to fit many lines (to reduce calls to write()). Small enough to fit in each core's cache. + // If the lines turn out smaller, that's ok. we just won't use all the buffer in that case. But we must be + // sure to allow for worst case; i.e. every row in the batch all being the maximum line length. + int bufSize = 1*1024*1024; // 1MB TODO: experiment / fetch cache size + if (lineLenMax > bufSize) bufSize = lineLenMax; + const int rowsPerBatch = bufSize/lineLenMax; + const int numBatches = (nrows-1)/rowsPerBatch + 1; + if (verbose) Rprintf("Writing data rows in %d batches of %d rows (each buffer size %.3fMB) ... ", + numBatches, rowsPerBatch, 1.0*bufSize/(1024*1024)); + t0 = clock(); - // Write the data rows - for (RLEN row_i = 0; row_i < nrows; row_i++) { - for (int col_i = 0; col_i < ncols; col_i++) { - SEXP column = VECTOR_ELT(list_of_columns, col_i); - switch(TYPEOF(column)) { - case LGLSXP: - true_false = LOGICAL(column)[row_i]; - if (true_false == NA_LOGICAL) { - if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; } - } else if (true_false) { - memcpy(ch,"TRUE",4); // Other than strings, field widths are limited which we check elsewhere here to ensure - ch += 4; - } else { - memcpy(ch,"FALSE",5); - ch += 5; - } - break; - case REALSXP: - if (ISNA(REAL(column)[row_i])) { - if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; } - } else { - //tt0 = clock(); - ch += sprintf(ch, "%.15G", REAL(column)[row_i]); - //tNUM += clock()-tt0; - } - break; - case INTSXP: - if (INTEGER(column)[row_i] == NA_INTEGER) { - if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; } - } else if (levels[col_i] != NULL) { // isFactor(column) == TRUE - str = STRING_ELT(levels[col_i], INTEGER(column)[row_i]-1); - if (quote) { - QUOTE_FIELD; - } else { - memcpy(ch, CHAR(str), LENGTH(str)); - ch += LENGTH(str); + int nth; + #pragma omp parallel + { + char *buffer = malloc(bufSize); // one buffer per thread + // TODO Ask Norm how to error() safely ... if (buffer == NULL) error("Unable to allocate %dMB buffer", bufSize/(1024*1024)); + char *ch = buffer; + #pragma omp single + { + nth = omp_get_num_threads(); + } + #pragma omp for ordered schedule(dynamic) + for(RLEN start_row = 0; start_row < nrows; start_row += rowsPerBatch) { + int upp = start_row + rowsPerBatch; + if (upp > nrows) upp = nrows; + for (RLEN row_i = start_row; row_i < upp; row_i++) { + for (int col_i = 0; col_i < ncols; col_i++) { + SEXP column = VECTOR_ELT(list_of_columns, col_i); + SEXP str; + switch(TYPEOF(column)) { + case LGLSXP: + true_false = LOGICAL(column)[row_i]; + if (true_false == NA_LOGICAL) { + if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; } + } else if (true_false) { + memcpy(ch,"TRUE",4); // Other than strings, field widths are limited which we check elsewhere here to ensure + ch += 4; + } else { + memcpy(ch,"FALSE",5); + ch += 5; + } + break; + case REALSXP: + if (ISNA(REAL(column)[row_i])) { + if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; } + } else { + //tt0 = clock(); + ch += sprintf(ch, "%.15G", REAL(column)[row_i]); + //tNUM += clock()-tt0; + } + break; + case INTSXP: + if (INTEGER(column)[row_i] == NA_INTEGER) { + if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; } + } else if (levels[col_i] != NULL) { // isFactor(column) == TRUE + str = STRING_ELT(levels[col_i], INTEGER(column)[row_i]-1); + if (quote) { + QUOTE_FIELD; + } else { + memcpy(ch, CHAR(str), LENGTH(str)); + ch += LENGTH(str); + } + } else { + ch += sprintf(ch, "%d", INTEGER(column)[row_i]); + } + break; + case STRSXP: + str = STRING_ELT(column, row_i); + if (str==NA_STRING) { + if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; } + } else if (quote) { + QUOTE_FIELD; + } else { + //tt0 = clock(); + memcpy(ch, CHAR(str), LENGTH(str)); // could have large fields. Doubt call overhead is much of an issue on small fields. + ch += LENGTH(str); + //tSTR += clock()-tt0; + } + break; + // default: + // An uncovered type would have already thrown above when calculating maxLineLen earlier } - } else { - ch += sprintf(ch, "%d", INTEGER(column)[row_i]); - } - break; - case STRSXP: - str = STRING_ELT(column, row_i); - if (str==NA_STRING) { - if (na_len) { memcpy(ch, na_str, na_len); ch += na_len; } - } else if (quote) { - QUOTE_FIELD; - } else { - //tt0 = clock(); - memcpy(ch, CHAR(str), LENGTH(str)); // could have large fields. Doubt call overhead is much of an issue on small fields. - ch += LENGTH(str); - //tSTR += clock()-tt0; + *ch++ = col_sep; } - break; - default: - error("Column %d's type is '%s' - not yet implemented.", col_i+1,type2char(TYPEOF(column)) ); + ch--; // backup onto the last col_sep after the last column + memcpy(ch, row_sep, row_sep_len); // replace it with the newline. TODO: replace memcpy call with eol1 eol2 --eolLen + ch += row_sep_len; + } + #pragma omp ordered + { + WRITE(f, buffer, (int)(ch-buffer)); + // TODO: safe way to throw if ( == -1) { close(f); error("Error writing to file: %s", filename); + ch = buffer; } - *ch++ = col_sep; - } - ch--; // backup onto the last col_sep after the last column - memcpy(ch, row_sep, row_sep_len); // replace it with the newline. TODO: replace memcpy call with eol1 eol2 --eolLen - ch += row_sep_len; - // Rprintf("Writing a line out length %d %10s\n", (int)(ch-buffer), buffer); - if ((ch-buffer)>writeTrigger) { - t1 = clock(); tformat += t1-t0; t0 = t1; - if (WRITE(f, buffer, (int)(ch-buffer)) == -1) { close(f); error("Error writing to file: %s", filename); } - t1 = clock(); twrite += t1-t0; t0 = t1; - numWrite++; - ch = buffer; } + free(buffer); } - if (ch>buffer) { - // write last batch remaining in buffer - t1 = clock(); tformat += t1-t0; t0 = t1; - if (WRITE(f, buffer, (int)(ch-buffer)) == -1) { close(f); error("Error writing to file: %s", filename); } - numWrite++; - t1 = clock(); twrite += t1-t0; t0 = t1; - } + if (verbose) Rprintf("all %d threads done\n", nth); // TO DO: report elapsed time since (clock()-t0)/NTH is only estimate if (CLOSE(f)) error("Error closing file: %s", filename); - Free(buffer); - clock_t total = tlineLenMax + tformat + twrite; - if (verbose) { - Rprintf("%8.3fs (%3.0f%%) calc max line length\n", 1.0*tlineLenMax/CLOCKS_PER_SEC, 100.0*tlineLenMax/total); - Rprintf("%8.3fs (%3.0f%%) format\n", 1.0*tformat/CLOCKS_PER_SEC, 100.0*tformat/total); - Rprintf("%8.3fs (%3.0f%%) write (%d calls)\n", 1.0*twrite/CLOCKS_PER_SEC, 100.0*twrite/total, numWrite); - //Rprintf(" %8.3fs (%3.0f%%) STR\n", 1.0*tSTR/CLOCKS_PER_SEC, 100.0*tSTR/tformat); - //Rprintf(" %8.3fs (%3.0f%%) NUM\n", 1.0*tNUM/CLOCKS_PER_SEC, 100.0*tNUM/tformat); - } return(R_NilValue); // must always return SEXP from C level otherwise hang on Windows }