Skip to content

Commit

Permalink
fix: OP-sqlite write transaction single operation notification (#461)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chriztiaan authored Jan 8, 2025
1 parent 7db9933 commit 181a9db
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 38 deletions.
6 changes: 6 additions & 0 deletions .changeset/metal-fans-argue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/op-sqlite': minor
---

Fixed single write transaction operations in `ps_crud` not being processed. Batching update notifications per write lock.
This will also fix downstream features such as watched queries and reactive query hooks in cases where the query is fired before the data was committed, and batching will improve performance specifically in cases where a lot of data changes occur.
89 changes: 67 additions & 22 deletions packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,85 @@
import { DB, SQLBatchTuple } from '@op-engineering/op-sqlite';
import { BaseObserver, DBAdapterListener, QueryResult, RowUpdateType } from '@powersync/common';
import { DB, SQLBatchTuple, UpdateHookOperation } from '@op-engineering/op-sqlite';
import {
BaseObserver,
BatchedUpdateNotification,
DBAdapterListener,
QueryResult,
RowUpdateType,
UpdateNotification
} from '@powersync/common';

export type OPSQLiteConnectionOptions = {
baseDB: DB;
};

export type OPSQLiteUpdateNotification = {
table: string;
operation: UpdateHookOperation;
row?: any;
rowId: number;
};

export class OPSQLiteConnection extends BaseObserver<DBAdapterListener> {
protected DB: DB;
private updateBuffer: UpdateNotification[];

constructor(protected options: OPSQLiteConnectionOptions) {
super();
this.DB = options.baseDB;
this.updateBuffer = [];

this.DB.rollbackHook(() => {
this.updateBuffer = [];
});

// link table update commands
this.DB.updateHook((update) => {
this.iterateListeners((cb) => {
let opType: RowUpdateType;
switch (update.operation) {
case 'INSERT':
opType = RowUpdateType.SQLITE_INSERT;
break;
case 'DELETE':
opType = RowUpdateType.SQLITE_DELETE;
break;
case 'UPDATE':
opType = RowUpdateType.SQLITE_UPDATE;
break;
}
cb.tablesUpdated?.({
table: update.table,
opType,
rowId: update.rowId
});
});
this.addTableUpdate(update);
});
}

addTableUpdate(update: OPSQLiteUpdateNotification) {
let opType: RowUpdateType;
switch (update.operation) {
case 'INSERT':
opType = RowUpdateType.SQLITE_INSERT;
break;
case 'DELETE':
opType = RowUpdateType.SQLITE_DELETE;
break;
case 'UPDATE':
opType = RowUpdateType.SQLITE_UPDATE;
break;
}

this.updateBuffer.push({
table: update.table,
opType,
rowId: update.rowId
});
}

flushUpdates() {
if (!this.updateBuffer.length) {
return;
}

const groupedUpdates = this.updateBuffer.reduce((grouping: Record<string, UpdateNotification[]>, update) => {
const { table } = update;
const updateGroup = grouping[table] || (grouping[table] = []);
updateGroup.push(update);
return grouping;
}, {});

const batchedUpdate: BatchedUpdateNotification = {
groupedUpdates,
rawUpdates: this.updateBuffer,
tables: Object.keys(groupedUpdates)
};

this.updateBuffer = [];
this.iterateListeners((l) => l.tablesUpdated?.(batchedUpdate));
}

close() {
return this.DB.close();
}
Expand Down
29 changes: 13 additions & 16 deletions packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
import {
BaseObserver,
DBAdapter,
DBAdapterListener,
DBLockOptions,
QueryResult,
SQLOpenOptions,
Transaction
} from '@powersync/common';
import { BaseObserver, DBAdapter, DBAdapterListener, DBLockOptions, QueryResult, Transaction } from '@powersync/common';
import { ANDROID_DATABASE_PATH, IOS_LIBRARY_PATH, open, type DB } from '@op-engineering/op-sqlite';
import Lock from 'async-lock';
import { OPSQLiteConnection } from './OPSQLiteConnection';
Expand Down Expand Up @@ -194,13 +186,18 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement

return new Promise(async (resolve, reject) => {
try {
await this.locks.acquire(
LockType.WRITE,
async () => {
resolve(await fn(this.writeConnection!));
},
{ timeout: options?.timeoutMs }
);
await this.locks
.acquire(
LockType.WRITE,
async () => {
resolve(await fn(this.writeConnection!));
},
{ timeout: options?.timeoutMs }
)
.then(() => {
// flush updates once a write lock has been released
this.writeConnection!.flushUpdates();
});
} catch (ex) {
reject(ex);
}
Expand Down

0 comments on commit 181a9db

Please sign in to comment.