Skip to content

Commit

Permalink
add basic unit test for file writing
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Aug 15, 2024
1 parent 7556e29 commit f214889
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 17 deletions.
16 changes: 8 additions & 8 deletions src/nanoarrow/common/inline_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,14 @@ struct ArrowArrayStream {
NANOARROW_RETURN_NOT_OK((x_ <= max_) ? NANOARROW_OK : EINVAL)

#if defined(NANOARROW_DEBUG)
#define _NANOARROW_RETURN_NOT_OK_WITH_ERROR_IMPL(NAME, EXPR, ERROR_PTR_EXPR, EXPR_STR) \
do { \
const int NAME = (EXPR); \
if (NAME) { \
ArrowErrorSet((ERROR_PTR_EXPR), "%s failed with errno %d\n* %s:%d", EXPR_STR, \
NAME, __FILE__, __LINE__); \
return NAME; \
} \
#define _NANOARROW_RETURN_NOT_OK_WITH_ERROR_IMPL(NAME, EXPR, ERROR_PTR_EXPR, EXPR_STR) \
do { \
const int NAME = (EXPR); \
if (NAME) { \
ArrowErrorSet((ERROR_PTR_EXPR), "%s failed with errno %d(%s)\n* %s:%d", EXPR_STR, \
NAME, strerror(NAME), __FILE__, __LINE__); \
return NAME; \
} \
} while (0)
#else
#define _NANOARROW_RETURN_NOT_OK_WITH_ERROR_IMPL(NAME, EXPR, ERROR_PTR_EXPR, EXPR_STR) \
Expand Down
2 changes: 2 additions & 0 deletions src/nanoarrow/integration/ipc_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <nanoarrow/nanoarrow_ipc.hpp>
#include <nanoarrow/nanoarrow_testing.hpp>

#define NANOARROW_IPC_FILE_PADDED_MAGIC "ARROW1\0"

std::string GetEnv(char const* name) {
char const* val = std::getenv(name);
return val ? val : "";
Expand Down
1 change: 1 addition & 0 deletions src/nanoarrow/ipc/encoder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct ArrowIpcEncoderPrivate {
};
}

#define NANOARROW_IPC_FILE_PADDED_MAGIC "ARROW1\0"
static_assert(sizeof(NANOARROW_IPC_FILE_PADDED_MAGIC) == 8);

TEST(NanoarrowIpcTest, NanoarrowIpcEncoderConstruction) {
Expand Down
5 changes: 4 additions & 1 deletion src/nanoarrow/ipc/reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ static void ArrowIpcInputStreamBufferRelease(struct ArrowIpcInputStream* stream)

ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* stream,
struct ArrowBuffer* input) {
NANOARROW_DCHECK(stream != NULL);

struct ArrowIpcInputStreamBufferPrivate* private_data =
(struct ArrowIpcInputStreamBufferPrivate*)ArrowMalloc(
sizeof(struct ArrowIpcInputStreamBufferPrivate));
Expand Down Expand Up @@ -154,8 +156,9 @@ static ArrowErrorCode ArrowIpcInputStreamFileRead(struct ArrowIpcInputStream* st

ArrowErrorCode ArrowIpcInputStreamInitFile(struct ArrowIpcInputStream* stream,
void* file_ptr, int close_on_release) {
NANOARROW_DCHECK(stream != NULL);
if (file_ptr == NULL) {
return EINVAL;
return errno ? errno : EINVAL;
}

struct ArrowIpcInputStreamFilePrivate* private_data =
Expand Down
5 changes: 4 additions & 1 deletion src/nanoarrow/ipc/writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream* stream,
void* file_ptr, int close_on_release) {
NANOARROW_DCHECK(stream != NULL);
if (file_ptr == NULL) {
return EINVAL;
return errno ? errno : EINVAL;
}

struct ArrowIpcOutputStreamFilePrivate* private_data =
Expand Down Expand Up @@ -281,6 +281,7 @@ ArrowErrorCode ArrowIpcWriterWriteArrayView(struct ArrowIpcWriter* writer,

if (in == NULL) {
int32_t eos[] = {-1, 0};
private->bytes_written += sizeof(eos);
struct ArrowBufferView eos_view = {.data.as_int32 = eos, .size_bytes = sizeof(eos)};
return ArrowIpcOutputStreamWrite(&private->output_stream, eos_view, error);
}
Expand Down Expand Up @@ -364,6 +365,8 @@ ArrowErrorCode ArrowIpcWriterWriteArrayStream(struct ArrowIpcWriter* writer,
return NANOARROW_OK;
}

#define NANOARROW_IPC_FILE_PADDED_MAGIC "ARROW1\0"

ArrowErrorCode ArrowIpcWriterStartFile(struct ArrowIpcWriter* writer,
struct ArrowError* error) {
NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL);
Expand Down
103 changes: 102 additions & 1 deletion src/nanoarrow/ipc/writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ TEST(NanoarrowIpcWriter, OutputStreamFile) {
fseek(file_ptr, 6, SEEK_SET);

nanoarrow::ipc::UniqueOutputStream stream;
ASSERT_EQ(ArrowIpcOutputStreamInitFile(stream.get(), file_ptr, 1), NANOARROW_OK);
ASSERT_EQ(ArrowIpcOutputStreamInitFile(stream.get(), file_ptr, /*close_on_release=*/1),
NANOARROW_OK);

struct ArrowError error;

Expand All @@ -88,3 +89,103 @@ TEST(NanoarrowIpcWriter, OutputStreamFile) {
EXPECT_EQ(std::string(buffer.data(), buffer.size()),
"HELLO " + message + message + message + message);
}

TEST(NanoarrowIpcWriter, OutputStreamFileError) {
nanoarrow::ipc::UniqueOutputStream stream;
EXPECT_EQ(ArrowIpcOutputStreamInitFile(stream.get(), nullptr, /*close_on_release=*/1),
EINVAL);

auto phony_path = __FILE__ + std::string(".phony");
FILE* file_ptr = fopen(phony_path.c_str(), "rb");
ASSERT_EQ(file_ptr, nullptr);
EXPECT_EQ(ArrowIpcOutputStreamInitFile(stream.get(), file_ptr, /*close_on_release=*/1),
ENOENT);
}

struct ArrowIpcWriterPrivate {
struct ArrowIpcEncoder encoder;
struct ArrowIpcOutputStream output_stream;
struct ArrowBuffer buffer;
struct ArrowBuffer body_buffer;

int writing_file;
int64_t bytes_written;
struct ArrowIpcFooter footer;
};

#define NANOARROW_IPC_FILE_PADDED_MAGIC "ARROW1\0"

TEST(NanoarrowIpcWriter, FileWriting) {
struct ArrowError error;

nanoarrow::UniqueBuffer output;
nanoarrow::ipc::UniqueOutputStream stream;
ASSERT_EQ(ArrowIpcOutputStreamInitBuffer(stream.get(), output.get()), NANOARROW_OK);

nanoarrow::ipc::UniqueWriter writer;
ASSERT_EQ(ArrowIpcWriterInit(writer.get(), stream.get()), NANOARROW_OK);

// the writer starts out in stream mode
auto* p = static_cast<struct ArrowIpcWriterPrivate*>(writer->private_data);
EXPECT_FALSE(p->writing_file);
EXPECT_EQ(p->bytes_written, 0);
EXPECT_EQ(p->footer.schema.release, nullptr);
EXPECT_EQ(p->footer.record_batch_blocks.size_bytes, 0);

// now it switches to file mode
EXPECT_EQ(ArrowIpcWriterStartFile(writer.get(), &error), NANOARROW_OK) << error.message;
EXPECT_TRUE(p->writing_file);
// and has written the leading magic
EXPECT_EQ(p->bytes_written, sizeof(NANOARROW_IPC_FILE_PADDED_MAGIC));
// but not a schema or any record batches
EXPECT_EQ(p->footer.schema.release, nullptr);
EXPECT_EQ(p->footer.record_batch_blocks.size_bytes, 0);

// write a schema
nanoarrow::UniqueSchema schema;
ASSERT_EQ(ArrowSchemaInitFromType(schema.get(), NANOARROW_TYPE_STRUCT), NANOARROW_OK);
EXPECT_EQ(ArrowIpcWriterWriteSchema(writer.get(), schema.get(), &error), NANOARROW_OK)
<< error.message;
// more has been written
auto after_schema = p->bytes_written;
EXPECT_GT(after_schema, sizeof(NANOARROW_IPC_FILE_PADDED_MAGIC));
// the schema is cached in the writer's footer for later finalization
EXPECT_NE(p->footer.schema.release, nullptr);
// still no record batches
EXPECT_EQ(p->footer.record_batch_blocks.size_bytes, 0);

// write a batch
nanoarrow::UniqueArray array;
nanoarrow::UniqueArrayView array_view;
ASSERT_EQ(ArrowArrayInitFromSchema(array.get(), schema.get(), &error), NANOARROW_OK)
<< error.message;
ASSERT_EQ(ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), &error),
NANOARROW_OK)
<< error.message;
ASSERT_EQ(ArrowArrayViewSetArray(array_view.get(), array.get(), &error), NANOARROW_OK)
<< error.message;
EXPECT_EQ(ArrowIpcWriterWriteArrayView(writer.get(), array_view.get(), &error),
NANOARROW_OK)
<< error.message;
// more has been written
auto after_batch = p->bytes_written;
EXPECT_GT(after_batch, after_schema);
// one record batch's block is stored
EXPECT_EQ(p->footer.record_batch_blocks.size_bytes, sizeof(struct ArrowIpcFileBlock));

// end the stream
EXPECT_EQ(ArrowIpcWriterWriteArrayView(writer.get(), nullptr, &error), NANOARROW_OK)
<< error.message;
// more has been written
auto after_eos = p->bytes_written;
EXPECT_GT(after_eos, after_batch);
// EOS isn't stored in the blocks
EXPECT_EQ(p->footer.record_batch_blocks.size_bytes, sizeof(struct ArrowIpcFileBlock));

// finalize the file
EXPECT_EQ(ArrowIpcWriterFinalizeFile(writer.get(), &error), NANOARROW_OK)
<< error.message;
// more has been written
auto after_footer = p->bytes_written;
EXPECT_GT(after_footer, after_eos);
}
14 changes: 8 additions & 6 deletions src/nanoarrow/nanoarrow_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcWriterWriteArrayView)
#define ArrowIpcWriterWriteArrayStream \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcWriterWriteArrayStream)
#define ArrowIpcWriterStartFile \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcWriterStartFile)
#define ArrowIpcWriterFinalizeFile \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcWriterFinalizeFile)
#define ArrowIpcFooterInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcFooterInit)
#define ArrowIpcFooterReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcFooterReset)
#define ArrowIpcEncoderEncodeFooter \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderEncodeFooter)

#endif

Expand Down Expand Up @@ -568,12 +576,6 @@ ArrowErrorCode ArrowIpcWriterFinalizeFile(struct ArrowIpcWriter* writer,

// Internal APIs:

/// \brief The magic which appears at the beginning and end of an IPC file, 0-padded.
///
/// \warning This API is currently only public for use in integration testing;
/// use at your own risk.
#define NANOARROW_IPC_FILE_PADDED_MAGIC "ARROW1\0"

/// \brief Represents a byte range in an IPC file.
///
/// \warning This API is currently only public for use in integration testing;
Expand Down

0 comments on commit f214889

Please sign in to comment.