Skip to content

Commit

Permalink
feat: Adds ability to bypass cache for pull queries (#6891)
Browse files Browse the repository at this point in the history
* initial draft

* added window range

* added window range checkstyle

* added emptyiterator

* added session store cache bypass for range

* rewrite iterator

* rewrite iterator 2

* fix tests

* clean up iterator code

* refactor fetchUncached et al

* refactor fetch et al

* change structure of windowstorecachebypass class

* clean up

* pass checkstyles

* refactor fetches

* refactor fetches sessionStore

* add some tests

* add some comments

* FIX CHECKSTYLE

* experimental message change

* experimental message change 2

* fix merge conflict

* kick off another build
  • Loading branch information
cprasad1 authored Feb 11, 2021
1 parent ff3171c commit 4b3bc96
Show file tree
Hide file tree
Showing 9 changed files with 483 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,15 @@ public MaterializedWindowedTable windowed() {
final WindowType wndType = wndInfo.getType();
switch (wndType) {
case SESSION:
return new KsMaterializedSessionTable(stateStore, SessionStoreCacheBypass::fetch);
return new KsMaterializedSessionTable(stateStore,
SessionStoreCacheBypass::fetch, SessionStoreCacheBypass::fetchRange);

case HOPPING:
case TUMBLING:
return new KsMaterializedWindowTable(stateStore, wndInfo.getSize().get(),
WindowStoreCacheBypass::fetch);
WindowStoreCacheBypass::fetch,
WindowStoreCacheBypass::fetchAll,
WindowStoreCacheBypass::fetchRange);

default:
throw new UnsupportedOperationException("Unknown window type: " + wndInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.execution.streams.materialization.ks.SessionStoreCacheBypass.SessionStoreCacheBypassFetcher;
import io.confluent.ksql.execution.streams.materialization.ks.SessionStoreCacheBypass.SessionStoreCacheBypassFetcherRange;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
Expand All @@ -42,11 +43,15 @@ class KsMaterializedSessionTable implements MaterializedWindowedTable {

private final KsStateStore stateStore;
private final SessionStoreCacheBypassFetcher cacheBypassFetcher;
private final SessionStoreCacheBypassFetcherRange cacheBypassFetcherRange;

KsMaterializedSessionTable(final KsStateStore store,
final SessionStoreCacheBypassFetcher cacheBypassFetcher) {
final SessionStoreCacheBypassFetcher cacheBypassFetcher,
final SessionStoreCacheBypassFetcherRange cacheBypassFetcherRange) {
this.stateStore = Objects.requireNonNull(store, "store");
this.cacheBypassFetcher = Objects.requireNonNull(cacheBypassFetcher, "cacheBypassFetcher");
this.cacheBypassFetcherRange = Objects.requireNonNull(cacheBypassFetcherRange,
"cacheBypassFetcherRange");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.confluent.ksql.execution.streams.materialization.MaterializedWindowedTable;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcher;
import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcherAll;
import io.confluent.ksql.execution.streams.materialization.ks.WindowStoreCacheBypass.WindowStoreCacheBypassFetcherRange;
import io.confluent.ksql.util.IteratorUtil;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -48,12 +50,20 @@ class KsMaterializedWindowTable implements MaterializedWindowedTable {
private final KsStateStore stateStore;
private final Duration windowSize;
private final WindowStoreCacheBypassFetcher cacheBypassFetcher;
private final WindowStoreCacheBypassFetcherAll cacheBypassFetcherAll;
private final WindowStoreCacheBypassFetcherRange cacheBypassFetcherRange;

KsMaterializedWindowTable(final KsStateStore store, final Duration windowSize,
final WindowStoreCacheBypassFetcher cacheBypassFetcher) {
final WindowStoreCacheBypassFetcher cacheBypassFetcher,
final WindowStoreCacheBypassFetcherAll cacheBypassFetcherAll,
final WindowStoreCacheBypassFetcherRange cacheBypassFetcherRange) {
this.stateStore = Objects.requireNonNull(store, "store");
this.windowSize = Objects.requireNonNull(windowSize, "windowSize");
this.cacheBypassFetcher = Objects.requireNonNull(cacheBypassFetcher, "cacheBypassFetcher");
this.cacheBypassFetcherAll = Objects.requireNonNull(
cacheBypassFetcherAll, "cacheBypassFetcherAll");
this.cacheBypassFetcherRange = Objects.requireNonNull(
cacheBypassFetcherRange, "cacheBypassFetcherRange");
}

@Override
Expand Down Expand Up @@ -122,7 +132,7 @@ public Iterator<WindowedRow> get(
final Instant upper = calculateUpperBound(windowStartBounds, windowEndBounds);

final KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>> iterator
= store.fetchAll(lower, upper);
= cacheBypassFetcherAll.fetchAll(store, lower, upper);
return Streams.stream(IteratorUtil.onComplete(iterator, iterator::close)).map(next -> {
final Instant windowStart = next.key.window().startTime();
if (!windowStartBounds.contains(windowStart)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Function;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
Expand All @@ -42,6 +43,9 @@ public final class SessionStoreCacheBypass {
private static final Field STORE_NAME_FIELD;
private static final Field STORE_TYPE_FIELD;
static final Field SERDES_FIELD;
private static final String STORE_UNAVAILABLE_MESSAGE = "State store is not available anymore "
+ "and may have been migrated to another instance; "
+ "please re-discover its location from the state metadata.";

static {
try {
Expand Down Expand Up @@ -69,77 +73,154 @@ KeyValueIterator<Windowed<GenericKey>, GenericRow> fetch(
);
}

@SuppressWarnings("unchecked")
interface SessionStoreCacheBypassFetcherRange {

KeyValueIterator<Windowed<GenericKey>, GenericRow> fetchRange(
ReadOnlySessionStore<GenericKey, GenericRow> store,
GenericKey keyFrom,
GenericKey keyTo
);
}

/*
This method is used for single key lookups. It calls the fetchUncached method.
*/
public static KeyValueIterator<Windowed<GenericKey>, GenericRow> fetch(
final ReadOnlySessionStore<GenericKey, GenericRow> store,
final GenericKey key
) {
Objects.requireNonNull(key, "key can't be null");
final List<ReadOnlySessionStore<GenericKey, GenericRow>> stores = getStores(store);
final Function<ReadOnlySessionStore<GenericKey, GenericRow>,
KeyValueIterator<Windowed<GenericKey>, GenericRow>> fetchFunc
= sessionStore -> fetchUncached(sessionStore, key);
return findFirstNonEmptyIterator(stores, fetchFunc);
}

final StateStoreProvider provider;
final String storeName;
final QueryableStoreType<ReadOnlySessionStore<GenericKey, GenericRow>> storeType;
try {
provider = (StateStoreProvider) PROVIDER_FIELD.get(store);
storeName = (String) STORE_NAME_FIELD.get(store);
storeType = (QueryableStoreType<ReadOnlySessionStore<GenericKey, GenericRow>>)
STORE_TYPE_FIELD.get(store);
} catch (final IllegalAccessException e) {
throw new RuntimeException("Stream internals changed unexpectedly!", e);
/*
This method is used for single key lookups. It is invoked by the fetch method
*/
private static KeyValueIterator<Windowed<GenericKey>, GenericRow> fetchUncached(
final ReadOnlySessionStore<GenericKey, GenericRow> sessionStore,
final GenericKey key
) {
if (!(sessionStore instanceof MeteredSessionStore)) {
throw new IllegalStateException("Expecting a MeteredSessionStore");
} else {
final StateSerdes<GenericKey, GenericRow> serdes = getSerdes(sessionStore);
final Bytes rawKey = Bytes.wrap(serdes.rawKey(key));
final SessionStore<Bytes, byte[]> wrapped = getInnermostStore(sessionStore);
final KeyValueIterator<Windowed<Bytes>, byte[]> fetch = wrapped.fetch(rawKey);
return new DeserializingIterator(fetch, serdes);
}
}

/*
This method is used for range queries. It calls the fetchRangeUncached method.
*/
public static KeyValueIterator<Windowed<GenericKey>, GenericRow> fetchRange(
final ReadOnlySessionStore<GenericKey, GenericRow> store,
final GenericKey keyFrom,
final GenericKey keyTo
) {
Objects.requireNonNull(keyFrom, "lower key can't be null");
Objects.requireNonNull(keyTo, "upper key can't be null");

final List<ReadOnlySessionStore<GenericKey, GenericRow>> stores = getStores(store);
final Function<ReadOnlySessionStore<GenericKey, GenericRow>,
KeyValueIterator<Windowed<GenericKey>, GenericRow>> fetchFunc
= sessionStore -> fetchRangeUncached(sessionStore, keyFrom, keyTo);
return findFirstNonEmptyIterator(stores, fetchFunc);
}

/*
This method is used for range queries. It is invoked by the fetchRange method
*/
private static KeyValueIterator<Windowed<GenericKey>, GenericRow> fetchRangeUncached(
final ReadOnlySessionStore<GenericKey, GenericRow> sessionStore,
final GenericKey keyFrom,
final GenericKey keyTo
) {
if (!(sessionStore instanceof MeteredSessionStore)) {
throw new IllegalStateException("Expecting a MeteredSessionStore");
} else {

final StateSerdes<GenericKey, GenericRow> serdes = getSerdes(sessionStore);
final Bytes rawKeyFrom = Bytes.wrap(serdes.rawKey(keyFrom));
final Bytes rawKeyTo = Bytes.wrap(serdes.rawKey(keyTo));
final SessionStore<Bytes, byte[]> wrapped = getInnermostStore(sessionStore);
final KeyValueIterator<Windowed<Bytes>, byte[]> fetch = wrapped.fetch(rawKeyFrom, rawKeyTo);
return new DeserializingIterator(fetch, serdes);
}
final List<ReadOnlySessionStore<GenericKey, GenericRow>> stores
= provider.stores(storeName, storeType);
}

private static KeyValueIterator<Windowed<GenericKey>, GenericRow> findFirstNonEmptyIterator(
final List<ReadOnlySessionStore<GenericKey, GenericRow>> stores,
final Function<ReadOnlySessionStore<GenericKey, GenericRow>,
KeyValueIterator<Windowed<GenericKey>, GenericRow>> fetchFunc
) {
for (final ReadOnlySessionStore<GenericKey, GenericRow> sessionStore : stores) {
try {
final KeyValueIterator<Windowed<GenericKey>, GenericRow> result
= fetchUncached(sessionStore, key);
= fetchFunc.apply(sessionStore);
// returns the first non-empty iterator
if (!result.hasNext()) {
result.close();
} else {
return result;
}
} catch (final InvalidStateStoreException e) {
throw new InvalidStateStoreException(
"State store is not available anymore and may have been migrated to another instance; "
+ "please re-discover its location from the state metadata.", e);
throw new InvalidStateStoreException(STORE_UNAVAILABLE_MESSAGE, e);
}
}
return new EmptyKeyValueIterator();
}

@SuppressWarnings("unchecked")
private static KeyValueIterator<Windowed<GenericKey>, GenericRow> fetchUncached(
final ReadOnlySessionStore<GenericKey, GenericRow> sessionStore,
final GenericKey key
private static StateSerdes<GenericKey, GenericRow> getSerdes(
final ReadOnlySessionStore<GenericKey, GenericRow> sessionStore
) throws RuntimeException {
try {
return (StateSerdes<GenericKey, GenericRow>) SERDES_FIELD.get(sessionStore);
} catch (final IllegalAccessException e) {
throw new RuntimeException("Stream internals changed unexpectedly!", e);
}
}

@SuppressWarnings("unchecked")
private static SessionStore<Bytes, byte[]> getInnermostStore(
final ReadOnlySessionStore<GenericKey, GenericRow> sessionStore
) {
if (sessionStore instanceof MeteredSessionStore) {
final StateSerdes<GenericKey, GenericRow> serdes;
try {
serdes = (StateSerdes<GenericKey, GenericRow>) SERDES_FIELD.get(sessionStore);
} catch (final IllegalAccessException e) {
throw new RuntimeException("Stream internals changed unexpectedly!", e);
SessionStore<Bytes, byte[]> wrapped
= ((MeteredSessionStore<GenericKey, GenericRow>) sessionStore).wrapped();
// Unwrap state stores until we get to the last SessionStore, which is past the caching
// layer.
while (wrapped instanceof WrappedStateStore) {
final StateStore store = ((WrappedStateStore<?, ?, ?>) wrapped).wrapped();
// A RocksDBSessionStore wraps a SegmentedBytesStore, which isn't a SessionStore, so
// we just store there.
if (!(store instanceof SessionStore)) {
break;
}
wrapped = (SessionStore<Bytes, byte[]>) store;
}
// now we have the innermost layer of the store.
return wrapped;
}

final Bytes rawKey = Bytes.wrap(serdes.rawKey(key));
SessionStore<Bytes, byte[]> wrapped
= ((MeteredSessionStore<GenericKey, GenericRow>) sessionStore).wrapped();
// Unwrap state stores until we get to the last SessionStore, which is past the caching
// layer.
while (wrapped instanceof WrappedStateStore) {
final StateStore store = ((WrappedStateStore<?, ?, ?>) wrapped).wrapped();
// A RocksDBSessionStore wraps a SegmentedBytesStore, which isn't a SessionStore, so
// we just store there.
if (!(store instanceof SessionStore)) {
break;
}
wrapped = (SessionStore<Bytes, byte[]>) store;
}
// now we have the innermost layer of the store.
final KeyValueIterator<Windowed<Bytes>, byte[]> fetch = wrapped.fetch(rawKey);
return new DeserializingIterator(fetch, serdes);
} else {
throw new IllegalStateException("Expecting a MeteredSessionStore");
@SuppressWarnings("unchecked")
private static List<ReadOnlySessionStore<GenericKey, GenericRow>> getStores(
final ReadOnlySessionStore<GenericKey, GenericRow> store
) {
final QueryableStoreType<ReadOnlySessionStore<GenericKey, GenericRow>> storeType;
try {
final StateStoreProvider provider = (StateStoreProvider) PROVIDER_FIELD.get(store);
final String storeName = (String) STORE_NAME_FIELD.get(store);
storeType = (QueryableStoreType<ReadOnlySessionStore<GenericKey, GenericRow>>)
STORE_TYPE_FIELD.get(store);
return provider.stores(storeName, storeType);
} catch (final IllegalAccessException e) {
throw new RuntimeException("Stream internals changed unexpectedly!", e);
}
}

Expand Down
Loading

0 comments on commit 4b3bc96

Please sign in to comment.