Skip to content

Commit

Permalink
SNOW-802910 Remove the use of partialEscapeUnicode buffer for split s…
Browse files Browse the repository at this point in the history
…trings to fix dropped characters (#1398)

* Changes that attempt to get rid of the partial escape buffer and treat the input stream as continuous (minus abstractions)

Leftover data in the buffers (that were previously copied over to partial buffer)
are carried forward through a prepend into the next buffered read from the input

At the end we process remaining data leftover from the final buffered read

Test changes are to get it to compile/pass, mostly

Patch is purely POC quality and not intended for the customer, its just to illustrate
a point about the soundness of 'partialEscapedUnicode' buffer usage

* add unit tests, format code and add comments

increase buffer length for streaming test

remove use of partialEscapedUnicode

---------

Co-authored-by: Harsh Chouraria <h.chouraria@snowflake.com>
  • Loading branch information
sfc-gh-ext-simba-lb and sfc-gh-hachouraria authored May 24, 2023
1 parent 02930a3 commit f56bfa9
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 75 deletions.
71 changes: 10 additions & 61 deletions src/main/java/net/snowflake/client/jdbc/ResultJsonParserV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ public void startParsing(JsonResultChunk resultChunk, SFBaseSession session)
state = State.NEXT_ROW;
outputPosition = 0;
outputCurValuePosition = 0;
if (partialEscapedUnicode == null) {
partialEscapedUnicode = ByteBuffer.wrap(new byte[256]);
} else {
((Buffer) partialEscapedUnicode).clear();
}
currentColumn = 0;

// outputDataLength can be smaller as no ',' and '[' are stored
Expand All @@ -65,12 +60,8 @@ public void startParsing(JsonResultChunk resultChunk, SFBaseSession session)
* Check if the chunk has been parsed correctly. After calling this it is safe to acquire the
* output data
*/
public void endParsing(SFBaseSession session) throws SnowflakeSQLException {
if (((Buffer) partialEscapedUnicode).position() > 0) {
((Buffer) partialEscapedUnicode).flip();
continueParsingInternal(partialEscapedUnicode, true, session);
((Buffer) partialEscapedUnicode).clear();
}
public void endParsing(ByteBuffer in, SFBaseSession session) throws SnowflakeSQLException {
continueParsingInternal(in, true, session);

if (state != State.ROW_FINISHED) {
throw new SnowflakeSQLLoggedException(
Expand All @@ -89,52 +80,16 @@ public void endParsing(SFBaseSession session) throws SnowflakeSQLException {
* @param in readOnly byteBuffer backed by an array (the data to be reed is from position to
* limit)
*/
public void continueParsing(ByteBuffer in, SFBaseSession session) throws SnowflakeSQLException {
public int continueParsing(ByteBuffer in, SFBaseSession session) throws SnowflakeSQLException {
if (state == State.UNINITIALIZED) {
throw new SnowflakeSQLLoggedException(
session,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
SqlState.INTERNAL_ERROR,
"Json parser hasn't been initialized!");
}

// If stopped during a \\u, continue here
if (((Buffer) partialEscapedUnicode).position() > 0) {
int lenToCopy = Math.min(12 - ((Buffer) partialEscapedUnicode).position(), in.remaining());
if (lenToCopy > partialEscapedUnicode.remaining()) {
resizePartialEscapedUnicode(lenToCopy);
}
partialEscapedUnicode.put(in.array(), in.arrayOffset() + ((Buffer) in).position(), lenToCopy);
((Buffer) in).position(((Buffer) in).position() + lenToCopy);

if (((Buffer) partialEscapedUnicode).position() < 12) {
// Not enough data to parse escaped unicode
return;
}
ByteBuffer toBeParsed = partialEscapedUnicode.duplicate();
((Buffer) toBeParsed).flip();
continueParsingInternal(toBeParsed, false, session);
((Buffer) partialEscapedUnicode).clear();
}
continueParsingInternal(in, false, session);
}

private void resizePartialEscapedUnicode(int lenToCopy) {
int newSize = 2 * partialEscapedUnicode.capacity();
while (newSize < partialEscapedUnicode.capacity() + lenToCopy) {
newSize *= 2;
}
byte[] newArray = new byte[newSize];
System.arraycopy(
partialEscapedUnicode.array(),
partialEscapedUnicode.arrayOffset(),
newArray,
0,
((Buffer) partialEscapedUnicode).position());
ByteBuffer newBuf = ByteBuffer.wrap(newArray);
((Buffer) newBuf).position(((Buffer) partialEscapedUnicode).position());
((Buffer) partialEscapedUnicode).clear();
partialEscapedUnicode = newBuf;
return in.remaining();
}

/**
Expand Down Expand Up @@ -369,18 +324,12 @@ private void continueParsingInternal(ByteBuffer in, boolean lastData, SFBaseSess
}
state = State.IN_STRING;
} else {
// store until next data arrives
if (partialEscapedUnicode.remaining() < in.remaining() + 2) {
resizePartialEscapedUnicode(in.remaining() + 2);
}
partialEscapedUnicode.put((byte) 0x5c /* '\\' */);
partialEscapedUnicode.put(
in.array(),
in.arrayOffset() + ((Buffer) in).position() - 1,
in.remaining() + 1);

((Buffer) in).position(((Buffer) in).position() + in.remaining());
state = State.IN_STRING;
// if the number of bytes left un-parsed in the buffer is less than 9 (unless it is
// the last remaining data in the buffer),
// there is not enough bytes to parse the codepoint. Move the position back 1,
// so we can re-enter parsing at this position with the ESCAPE state.
in.position(in.position() - 1);
state = State.ESCAPE;
return;
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.MappingJsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
Expand Down Expand Up @@ -1035,19 +1032,47 @@ private void parseJsonToChunkV2(InputStream jsonInputStream, SnowflakeResultChun
jp.startParsing((JsonResultChunk) resultChunk, session);

byte[] buf = new byte[STREAM_BUFFER_SIZE];

// To be used to copy the leftover buffer data in the case of buffer ending in escape state
// during parsing.
byte[] prevBuffer = null;
ByteBuffer bBuf = null;
int len;
logger.debug(
"Thread {} start to read inputstream for #chunk{}",
Thread.currentThread().getId(),
chunkIndex);
while ((len = jsonInputStream.read(buf)) != -1) {
jp.continueParsing(ByteBuffer.wrap(buf, 0, len), session);
if (prevBuffer != null) {
// if parsing stopped during an escape sequence in jp.continueParsing() and there is
// leftover data in the buffer,
// prepend the copied data to the next buffer read from the output stream.
ByteArrayOutputStream os = new ByteArrayOutputStream();
os.write(prevBuffer);
os.write(buf);
buf = os.toByteArray();
len += prevBuffer.length;
}
bBuf = ByteBuffer.wrap(buf, 0, len);
jp.continueParsing(bBuf, session);
if (bBuf.remaining() > 0) {
// if there is any data left un-parsed, it will be prepended to the next buffer read.
prevBuffer = new byte[bBuf.remaining()];
bBuf.get(prevBuffer);
} else {
prevBuffer = null;
}
}
logger.debug(
"Thread {} finish reading inputstream for #chunk{}",
Thread.currentThread().getId(),
chunkIndex);
jp.endParsing(session);
if (prevBuffer != null) {
bBuf = ByteBuffer.wrap(prevBuffer);
} else {
bBuf = ByteBuffer.wrap(new byte[0]);
}
jp.endParsing(bBuf, session);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ public void simpleTest() throws SnowflakeSQLException {
JsonResultChunk chunk = new JsonResultChunk("", 8, 2, data.length, session);
ResultJsonParserV2 jp = new ResultJsonParserV2();
jp.startParsing(chunk, session);
jp.continueParsing(ByteBuffer.wrap(data), session);
jp.endParsing(session);
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
jp.continueParsing(byteBuffer, session);
byte[] remaining = new byte[byteBuffer.remaining()];
byteBuffer.get(remaining);
jp.endParsing(ByteBuffer.wrap(remaining), session);
assertEquals("1", chunk.getCell(0, 0).toString());
assertEquals("1.01", chunk.getCell(0, 1).toString());
assertNull(chunk.getCell(1, 0));
Expand Down Expand Up @@ -68,15 +71,21 @@ public void simpleStreamingTest() throws SnowflakeSQLException {
JsonResultChunk chunk = new JsonResultChunk("", 8, 2, data.length, session);
ResultJsonParserV2 jp = new ResultJsonParserV2();
jp.startParsing(chunk, session);
int len = 2;
int len = 15;
ByteBuffer byteBuffer = null;
for (int i = 0; i < data.length; i += len) {
if (i + len < data.length) {
jp.continueParsing(ByteBuffer.wrap(data, i, len), session);
byteBuffer = ByteBuffer.wrap(data, i, len);
jp.continueParsing(byteBuffer, session);
} else {
jp.continueParsing(ByteBuffer.wrap(data, i, data.length - i), session);
byteBuffer = ByteBuffer.wrap(data, i, data.length - i);
jp.continueParsing(byteBuffer, session);
}
}
jp.endParsing(session);
byte[] remaining = new byte[byteBuffer.remaining()];
byteBuffer.get(remaining);
jp.endParsing(ByteBuffer.wrap(remaining), session);

assertEquals("1", chunk.getCell(0, 0).toString());
assertEquals("1.01", chunk.getCell(0, 1).toString());
assertNull(chunk.getCell(1, 0));
Expand Down Expand Up @@ -136,11 +145,82 @@ public void LargestColumnTest() throws SnowflakeSQLException {
JsonResultChunk chunk = new JsonResultChunk("", 2, 2, data.length, session);
ResultJsonParserV2 jp = new ResultJsonParserV2();
jp.startParsing(chunk, session);
jp.continueParsing(ByteBuffer.wrap(data), session);
jp.endParsing(session);
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
jp.continueParsing(byteBuffer, session);
byte[] remaining = new byte[byteBuffer.remaining()];
byteBuffer.get(remaining);
jp.endParsing(ByteBuffer.wrap(remaining), session);
assertEquals(a.toString(), chunk.getCell(0, 0).toString());
assertEquals(b.toString(), chunk.getCell(0, 1).toString());
assertEquals(c.toString(), chunk.getCell(1, 0).toString());
assertEquals(StringEscapeUtils.unescapeJava(s.toString()), chunk.getCell(1, 1).toString());
}

// SNOW-802910: Test to cover edge case '\u0000\u0000' where null could be dropped.
@Test
public void testAsciiSequential() throws SnowflakeSQLException {
SFSession session = null;
String ascii = "[\"\\u0000\\u0000\\u0000\"]";
byte[] data = ascii.getBytes(StandardCharsets.UTF_8);
JsonResultChunk chunk = new JsonResultChunk("", 1, 1, data.length, session);
ResultJsonParserV2 jp = new ResultJsonParserV2();
jp.startParsing(chunk, session);

// parse the first null
ByteBuffer byteBuffer = ByteBuffer.wrap(data, 0, 14);
jp.continueParsing(byteBuffer, session);
// parse the rest of the string
byteBuffer = ByteBuffer.wrap(data, 9, 13);
jp.continueParsing(byteBuffer, session);
byteBuffer = ByteBuffer.wrap(data, 15, 7);
jp.continueParsing(byteBuffer, session);

// finish parsing
byte[] remaining = new byte[byteBuffer.remaining()];
byteBuffer.get(remaining);
jp.endParsing(ByteBuffer.wrap(remaining), session);

// check null is not dropped
assertEquals("00 00 00 ", stringToHex(chunk.getCell(0, 0).toString()));
}

// SNOW-802910: Test to cover edge case '\u0003ä\u0000' where null could be dropped.
@Test
public void testAsciiCharacter() throws SnowflakeSQLException {
SFSession session = null;
String ascii = "[\"\\u0003ä\\u0000\"]";
byte[] data = ascii.getBytes(StandardCharsets.UTF_8);
JsonResultChunk chunk = new JsonResultChunk("", 1, 1, data.length, session);
ResultJsonParserV2 jp = new ResultJsonParserV2();
jp.startParsing(chunk, session);

// parse ETX and UTF-8 character
ByteBuffer byteBuffer = ByteBuffer.wrap(data, 0, data.length);
jp.continueParsing(byteBuffer, session);

int position = byteBuffer.position();
// try to parse null
byteBuffer = ByteBuffer.wrap(data, position, data.length - position);
jp.continueParsing(byteBuffer, session);

byte[] remaining = new byte[byteBuffer.remaining()];
byteBuffer.get(remaining);
jp.endParsing(ByteBuffer.wrap(remaining), session);

// Ã00 is returned
assertEquals("03 C3 A4 00 ", stringToHex(chunk.getCell(0, 0).toString()));
}

public static String stringToHex(String input) {
byte[] byteArray = input.getBytes(StandardCharsets.UTF_8);
StringBuilder sb = new StringBuilder();
char[] hexBytes = new char[2];
for (int i = 0; i < byteArray.length; i++) {
hexBytes[0] = Character.forDigit((byteArray[i] >> 4) & 0xF, 16);
hexBytes[1] = Character.forDigit((byteArray[i] & 0xF), 16);
sb.append(hexBytes);
sb.append(" "); // Space every two characters to make it easier to read visually
}
return sb.toString().toUpperCase();
}
}

0 comments on commit f56bfa9

Please sign in to comment.