Skip to content

Commit

Permalink
support spatial queries
Browse files Browse the repository at this point in the history
  • Loading branch information
tjwebb committed Sep 12, 2015
1 parent 1f20f80 commit 2a0775b
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 116 deletions.
156 changes: 72 additions & 84 deletions lib/adapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ import _ from 'lodash'
import camelize from 'camelize'
import WaterlineSequel from 'waterline-sequel'

import KnexPostgis from 'knex-postgis'
import WaterlineError from 'waterline-errors'
import AdapterError from './error'
import Util from './util'
import SpatialUtil from './spatial'
import SQL from './sql'

const Adapter = {
Expand Down Expand Up @@ -71,28 +73,31 @@ const Adapter = {
if (!connection.identity) {
return cb(WaterlineError.adapter.IdentityMissing)
}
if (this.connections.get(connection.identity)) {
if (Adapter.connections.get(connection.identity)) {
return cb(WaterlineError.adapter.IdentityDuplicate)
}

_.defaults(connection, this.defaults)

_.defaults(connection, Adapter.defaults)

let knex = Knex({
client: 'pg',
connection: connection.url || connection.connection,
pool: connection.pool,
debug: process.env.WATERLINE_DEBUG_SQL || connection.debug
})
let cxn = {
identity: connection.identity,
schema: this.buildSchema(connection, collections),
schema: Adapter.buildSchema(connection, collections),
collections: collections,
knex: Knex({
client: 'pg',
connection: connection.url || connection.connection,
pool: connection.pool,
debug: process.env.WATERLINE_DEBUG_SQL || connection.debug
})
config: connection,
knex: knex,
st: KnexPostgis(knex)
}

return this.getVersion(cxn)
return Adapter.getVersion(cxn)
.then(version => {
cxn.version = Util.validateVersion(version)
this.connections.set(connection.identity, cxn)
Adapter.connections.set(connection.identity, cxn)
cb()
})
.catch(AdapterError.wrap(cb))
Expand Down Expand Up @@ -136,15 +141,15 @@ const Adapter = {
* @param tableName
*/
describe (connectionName, tableName, cb) {
let cxn = this.connections.get(connectionName)
let cxn = Adapter.connections.get(connectionName)

return cxn.knex(tableName).columnInfo()
.then(columnInfo => {
if (_.isEmpty(columnInfo)) {
return cb()
}

return this.query(connectionName, tableName, SQL.indexes, [ tableName ])
return Adapter.query(connectionName, tableName, SQL.indexes, [ tableName ])
.then(({ rows }) => {
_.merge(columnInfo, _.indexBy(camelize(rows), 'columnName'))
_.isFunction(cb) && cb(null, columnInfo)
Expand All @@ -161,21 +166,13 @@ const Adapter = {
* @param queryString
* @param data
*/
query (connectionName, tableName, queryString, _args, _cb) {
let cxn = this.connections.get(connectionName)
let args = null
let cb = _cb

if (_.isFunction(_args)) {
cb = _args
}
else {
args = _args
}

query (connectionName, tableName, queryString, _args, cb = _args) {
let args = _.isFunction(_args) ? [ ] : _args
let cxn = Adapter.connections.get(connectionName)
let query = cxn.knex.raw(Util.toKnexRawQuery(queryString), Util.castValues(args))

return query.then((result = { }) => {
return query
.then((result = { }) => {
_.isFunction(cb) && cb(null, result)
return result
})
Expand All @@ -187,11 +184,11 @@ const Adapter = {
*
* @param connectionName
* @param tableName
* @param definition - the waterline schema definition for this model
* @param definition - the waterline schema definition for model
* @param cb
*/
define (connectionName, tableName, definition, cb) {
let cxn = this.connections.get(connectionName)
let cxn = Adapter.connections.get(connectionName)

return cxn.knex.schema
.createTable(tableName, table => {
Expand All @@ -211,9 +208,9 @@ const Adapter = {
* Drop a table
*/
drop (connectionName, tableName, relations = [ ], cb = relations) {
let cxn = this.connections.get(connectionName)
let cxn = Adapter.connections.get(connectionName)

cxn.knex.schema.dropTableIfExists(tableName)
return cxn.knex.schema.dropTableIfExists(tableName)
.then(() => {
return Promise.all(_.map(relations, relation => {
return cxn.knex.schema.dropTable(relation)
Expand All @@ -229,12 +226,12 @@ const Adapter = {
* Add a column to a table
*/
addAttribute (connectionName, tableName, attributeName, definition, cb) {
let cxn = this.connections.get(connectionName)
let cxn = Adapter.connections.get(connectionName)

return cxn.knex.schema
.table(tableName, table => {
let newColumn = Util.toKnexColumn(table, attributeName, definition)
return Util.applyColumnConstraints(newColumn, definition)
Util.applyColumnConstraints(newColumn, definition)
})
.then(() => {
_.isFunction(cb) && cb()
Expand All @@ -246,7 +243,7 @@ const Adapter = {
* Remove a column from a table
*/
removeAttribute (connectionName, tableName, attributeName, cb) {
let cxn = this.connections.get(connectionName)
let cxn = Adapter.connections.get(connectionName)

return cxn.knex.schema
.table(tableName, table => {
Expand All @@ -262,19 +259,21 @@ const Adapter = {
/**
* Create a new record
*/
create (connectionName, tableName, record, cb) {
let cxn = this.connections.get(connectionName)
let data = Util.sanitize(record, cxn.collections[tableName])

//console.log('create txn', cb)

let txn = Util.getTransaction(cb, cxn.knex)
create (connectionName, tableName, data, cb) {
let cxn = Adapter.connections.get(connectionName)
let insertData = Util.sanitize(data, cxn.collections[tableName], cxn)
let schema = cxn.collections[tableName]
let spatialColumns = SpatialUtil.buildSpatialSelect(schema.definition, cxn)

return cxn.knex(tableName)
.insert(insertData)
.returning([ '*', ...spatialColumns ])
.then(rows => {
let casted = Util.castResultRows(rows, schema)
let result = _.isArray(data) ? casted : casted[0]

return txn(tableName)
.insert(data).returning('*')
.then(([row]) => {
cb(null, row)
return row
_.isFunction(cb) && cb(null, result)
return result
})
.catch(AdapterError.wrap(cb))
},
Expand All @@ -283,31 +282,21 @@ const Adapter = {
* Create multiple records
*/
createEach (connectionName, tableName, records, cb) {
let cxn = this.connections.get(connectionName)
let data = _.map(records, record => {
return Util.sanitize(record, cxn.collections[tableName])
})

return cxn.knex(tableName).returning('*').insert(data)
.then(rows => {
cb(null, rows)
return rows
})
.catch(AdapterError.wrap(cb))
return Adapter.create(connectionName, tableName, records, cb)
},

/**
* Update a record
*/
update (connectionName, tableName, options, data, cb) {
let cxn = this.connections.get(connectionName)
let wlsql = new WaterlineSequel(cxn.schema, this.wlSqlOptions)
let cxn = Adapter.connections.get(connectionName)
let wlsql = new WaterlineSequel(cxn.schema, Adapter.wlSqlOptions)

return new Promise((resolve, reject) => {
resolve(wlsql.update(tableName, options, data))
})
.then(({ query, values }) => {
return this.query(connectionName, tableName, query, values)
return Adapter.query(connectionName, tableName, query, values)
})
.then(({ rows }) => {
cb && cb(null, rows)
Expand All @@ -319,14 +308,14 @@ const Adapter = {
* Destroy a record
*/
destroy (connectionName, tableName, options, cb) {
let cxn = this.connections.get(connectionName)
let wlsql = new WaterlineSequel(cxn.schema, this.wlSqlOptions)
let cxn = Adapter.connections.get(connectionName)
let wlsql = new WaterlineSequel(cxn.schema, Adapter.wlSqlOptions)

return new Promise((resolve, reject) => {
resolve(wlsql.destroy(tableName, options))
})
.then(({ query, values }) => {
return this.query(connectionName, tableName, query, values)
return Adapter.query(connectionName, tableName, query, values)
})
.then(({ rows }) => {
cb(null, rows)
Expand All @@ -338,15 +327,15 @@ const Adapter = {
* Populate record associations
*/
join (connectionName, tableName, options, cb) {
let cxn = this.connections.get(connectionName)
let cxn = Adapter.connections.get(connectionName)

return Util.buildKnexJoinQuery (cxn, tableName, options)
.then(result => {
// return unique records only.
// TODO move to SQL
_.each(_.reject(options.joins, { select: false }), join => {
let alias = Util.getJoinAlias(join)
let pk = this.getPrimaryKey(cxn, join.child)
let pk = Adapter.getPrimaryKey(cxn, join.child)

_.each(result, row => {
row[alias] = _.unique(row[alias], pk)
Expand Down Expand Up @@ -382,18 +371,23 @@ const Adapter = {
* Find records
*/
find (connectionName, tableName, options, cb) {
let cxn = this.connections.get(connectionName)
let wlsql = new WaterlineSequel(cxn.schema, this.wlSqlOptions)
let cxn = Adapter.connections.get(connectionName)
let wlsql = new WaterlineSequel(cxn.schema, Adapter.wlSqlOptions)
let schema = cxn.collections[tableName]

return new Promise((resolve, reject) => {
resolve(wlsql.find(tableName, options))
})
.then(({ query: [query], values: [values] }) => {
return this.query(connectionName, tableName, query, values)
let spatialColumns = SpatialUtil.buildSpatialSelect(schema.definition, cxn)
let fullQuery = Util.addSelectColumns(spatialColumns, query)

return Adapter.query(connectionName, tableName, fullQuery, values)
})
.then(({ rows }) => {
_.isFunction(cb) && cb(null, rows)
return rows
let result = Util.castResultRows(rows, schema)
_.isFunction(cb) && cb(null, result)
return result
})
.catch(AdapterError.wrap(cb))
},
Expand All @@ -402,14 +396,14 @@ const Adapter = {
* Count the number of records
*/
count (connectionName, tableName, options, cb) {
let cxn = this.connections.get(connectionName)
let wlsql = new WaterlineSequel(cxn.schema, this.wlSqlOptions)
let cxn = Adapter.connections.get(connectionName)
let wlsql = new WaterlineSequel(cxn.schema, Adapter.wlSqlOptions)

return new Promise((resolve, reject) => {
resolve(wlsql.count(tableName, options))
})
.then(({ query: [query], values: [values] }) => {
return this.query(connectionName, tableName, query, values)
return Adapter.query(connectionName, tableName, query, values)
})
.then(({ rows: [row] }) => {
let count = Number(row.count)
Expand All @@ -432,14 +426,10 @@ const Adapter = {
* .catch(txn.rollback)
*/
transaction (connectionName, tableName, cb) {
let cxn = this.connections.get(connectionName)
let cxn = Adapter.connections.get(connectionName)

return new Promise(resolve => {
cxn.knex.transaction(txn => {
/*
console.log('actual txn', txn)
console.log('actual txn keys', _.keys(txn))
*/
_.isFunction(cb) && cb(null, txn)
resolve(txn)
})
Expand All @@ -450,8 +440,8 @@ const Adapter = {
* Stream query results
*/
stream (connectionName, tableName, options, outputStream) {
let cxn = this.connections.get(connectionName)
let wlsql = new WaterlineSequel(cxn.schema, this.wlSqlOptions)
let cxn = Adapter.connections.get(connectionName)
let wlsql = new WaterlineSequel(cxn.schema, Adapter.wlSqlOptions)

return new Promise((resolve, reject) => {
resolve(wlsql.find(tableName, options))
Expand All @@ -476,18 +466,16 @@ const Adapter = {
* @return {[type]} [description]
*/
teardown (conn, cb = conn) {
let connections = conn ? [ this.connections.get(conn) ] : this.connections.values()
let connections = conn ? [ Adapter.connections.get(conn) ] : Adapter.connections.values()

for (let cxn of connections) {
if (!cxn) continue

cxn.knex.destroy()
this.connections.delete(cxn.identity)
Adapter.connections.delete(cxn.identity)
}
cb()
}
}

_.bindAll(Adapter)

export default Adapter
Loading

0 comments on commit 2a0775b

Please sign in to comment.