Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] protocol v3 #57

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 131 additions & 10 deletions spec/functional_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ local constants = require("cassandra.constants")

describe("cassandra", function()

before_each(function()
setup(function()
session = cassandra.new()
session:set_timeout(1000)

Expand Down Expand Up @@ -167,7 +167,8 @@ describe("cassandra", function()
local stmt, err = session:prepare("SELECT native_protocol_version FROM system.local")
assert.falsy(err)
assert.truthy(stmt)
local rows = session:execute(stmt)
local rows, err = session:execute(stmt)
assert.falsy(err)
assert.same(1, #rows)
assert.truthy(rows[1].native_protocol_version == "2" or rows[1].native_protocol_version == "3")
end)
Expand All @@ -176,7 +177,8 @@ describe("cassandra", function()
local stmt, err = session:prepare("SELECT * FROM system.local WHERE key IN ?")
assert.falsy(err)
assert.truthy(stmt)
local rows = session:execute(stmt, {cassandra.list({"local", "not local"})})
local rows, err = session:execute(stmt, {cassandra.list({"local", "not local"})})
assert.falsy(err)
assert.same(1, #rows)
assert.truthy(rows[1].key == "local")
end)
Expand Down Expand Up @@ -210,6 +212,7 @@ describe("cassandra", function()
end)

describe("Real use-case", function()
local table_created, err

setup(function()
table_created, err = session:execute [[
Expand All @@ -231,9 +234,6 @@ describe("cassandra", function()
end)

describe("DDL statements", function()
it("should be possible to create a table", function()
assert.same("users", table_created.table)
end)

it("should not be possible to create a table twice", function()
local table_created, err = session:execute [[
Expand All @@ -248,6 +248,54 @@ describe("cassandra", function()
assert.same('Cannot add already existing column family "users" to keyspace "lua_tests"', err.raw_message)
assert.same('Cassandra returned error (Already_exists): "Cannot add already existing column family "users" to keyspace "lua_tests""', tostring(err))
end)

it("should parse a TABLE SCHEMA_CHANGE statement result", function()
assert.same({
change_type = "CREATED",
keyspace = "lua_tests",
table = "users",
target = "TABLE",
type = "SCHEMA_CHANGE"
}, table_created)
end)

it("should parse a TYPE SCHEMA_CHANGE statement result", function()
local type_created, err = session:execute [[
CREATE TYPE IF NOT EXISTS new_type (
key uuid,
value text
)
]]
assert.falsy(err)
assert.same({
change_type = "CREATED",
keyspace = "lua_tests",
target = "TYPE",
type = "SCHEMA_CHANGE",
user_type = "new_type"
}, type_created)
end)

it("should parse a KEYSPACE SCHEMA_CHANGE statement result", function()
local keyspace_created, err = session:execute [[
CREATE KEYSPACE lua_tests_keyspace_test
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }
]]
assert.falsy(err)
assert.same({
change_type = "CREATED",
keyspace = "lua_tests_keyspace_test",
target = "KEYSPACE",
type = "SCHEMA_CHANGE"
}, keyspace_created)

finally(function()
local keyspace_created, err = session:execute("DROP KEYSPACE lua_tests_keyspace_test")
if err then
error(err)
end
end)
end)
end)

describe("DML statements", function()
Expand Down Expand Up @@ -314,7 +362,7 @@ describe("cassandra", function()
session:execute([[
CREATE TABLE type_test_table (
key varchar PRIMARY KEY,
value ]] .. type.name .. [[
value ]]..type.name..[[
)
]])
end)
Expand All @@ -324,13 +372,13 @@ describe("cassandra", function()
end)

it("should be possible to insert and get value back", function()
local ok, err = session:execute([[
INSERT INTO type_test_table (key, value)
VALUES (?, ?)
local _, err = session:execute([[
INSERT INTO type_test_table (key, value) VALUES (?, ?)
]], {"key", type.insert_value ~= nil and type.insert_value or type.value})
assert.falsy(err)

local rows, err = session:execute("SELECT value FROM type_test_table WHERE key = 'key'")
assert.falsy(err)
assert.same(1, #rows)
if type.read_test then
assert.truthy(type.read_test(rows[1].value))
Expand All @@ -340,6 +388,53 @@ describe("cassandra", function()
end)
end)
end

describe("User Defined Type", function()

setup(function()
local _, err = session:execute [[
CREATE TYPE address (
street text,
city text,
zip int,
country text
)
]]
assert.falsy(err)

local _, err = session:execute [[
CREATE TABLE user_profiles (
email text PRIMARY KEY,
address frozen<address>
)
]]
assert.falsy(err)
end)

teardown(function()
session:execute("DROP TYPE address")
session:execute("DROP TABLE user_profiles")
end)

it("should be possible to insert and get value back", function()
local _, err = session:execute([[
INSERT INTO user_profiles(email, address) VALUES (?, ?)
]], {"email@domain.com", cassandra.udt({ "montgomery street", "san francisco", 94111, nil })})

assert.falsy(err)

local rows, err = session:execute("SELECT address FROM user_profiles WHERE email = 'email@domain.com'")
assert.falsy(err)
assert.same(1, #rows)
local row = rows[1]
assert.same("montgomery street", row.address.street)
assert.same("san francisco", row.address.city)
assert.same(94111, row.address.zip)
assert.same("", row.address.country)
end)

end)

end)

describe("Pagination #pagination", function()
Expand Down Expand Up @@ -453,6 +548,32 @@ describe("cassandra", function()
end)
end)

describe("Query flags #flags", function()

setup(function()
session:execute [[
CREATE TABLE IF NOT EXISTS flags_test_table(
key int PRIMARY KEY,
value varchar
)
]]
session:execute([[ INSERT INTO flags_test_table(key, value)
VALUES(?,?) ]], { i, "test" })
end)

teardown(function()
session:execute("DROP TABLE flags_test_table")
end)

it("should support the serial_consitency flag", function()
-- serial_consistency only works for conditional update statements but
-- we are here tracking the driver's behaviour when passing the flag
local rows, err = session:execute("SELECT * FROM flags_test_table", nil, {serial_consistency=cassandra.consistency.ANY})
assert.falsy(err)
end)

end)

describe("Counters #counters", function()

setup(function()
Expand Down
3 changes: 2 additions & 1 deletion spec/type_fixtures.lua
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ return {
{name='map<text,text>', insert_value=cassandra.map({k1='v1', k2='v2'}), read_value={k1='v1', k2='v2'}},
{name='map<text,int>', insert_value=cassandra.map({k1=3, k2=4}), read_value={k1=3, k2=4}},
{name='map<text,text>', insert_value=cassandra.map({}), read_value=nil},
{name='set<text>', insert_value=cassandra.set({'abc', 'def'}), read_value={'abc', 'def'}}
{name='set<text>', insert_value=cassandra.set({'abc', 'def'}), read_value={'abc', 'def'}},
{name='tuple<int, text, float>', insert_value=cassandra.tuple({1, 'foo', cassandra.float(2.1)}), read_test=function(value) return value[1] == 1 and value[2] == 'foo' and math.abs(value[3] - 2.1) < 0.0000001 end}
}
23 changes: 11 additions & 12 deletions src/cassandra.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ math.randomseed(ngx and ngx.time() or os.time())
local _M = {
version="0.5-7",
consistency=constants.consistency,
batch_types=constants.batch_types
batch_types=constants.batch_types,
error_codes=constants.error_codes
}

-- create functions for type annotations
Expand Down Expand Up @@ -115,7 +116,11 @@ function _M:connect(contact_points, port)
end
if not self.initialized then
--todo: not tested
startup(self)
local _
_, err = startup(self)
if err then
return false, err
end
self.initialized = true
end
return true
Expand Down Expand Up @@ -168,9 +173,6 @@ local batch_statement_mt = {
add=function(self, query, args)
table.insert(self.queries, {query=query, args=args})
end,
representation=function(self)
return encoding.batch_representation(self.queries, self.type)
end,
is_batch_statement = true
}
}
Expand Down Expand Up @@ -206,9 +208,7 @@ local default_options = {
}

function _M:execute(query, args, options)
local op_code = protocol.query_op_code(query)
if not options then options = {} end

-- Default options
for k, v in pairs(default_options) do
if options[k] == nil then
Expand All @@ -218,18 +218,18 @@ function _M:execute(query, args, options)

if options.auto_paging then
local page = 0
return function(query, paging_state)
return function(paginated_query, paging_state)
-- Latest fetched rows have been returned for sure, end the iteration
if not paging_state and page > 0 then return nil end

local rows, err = self:execute(query, args, {
local rows, err = self:execute(paginated_query, args, {
page_size=options.page_size,
paging_state=paging_state
})
page = page + 1

-- If we have some results, retrieve the paging_state
local paging_state
paging_state = nil
if rows ~= nil then
paging_state = rows.meta.paging_state
end
Expand All @@ -243,11 +243,10 @@ function _M:execute(query, args, options)
end, query, nil
end

local frame_body = protocol.frame_body(query, args, options)
local op_code, frame_body = protocol.op_code_and_frame_body(query, args, options)

-- Send frame
local response, err = protocol.send_frame_and_get_response(self, op_code, frame_body, options.tracing)

-- Check response errors
if not response then
return nil, err
Expand Down
18 changes: 14 additions & 4 deletions src/cassandra/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ local error_codes = {

return {
version_codes = {
REQUEST=0x02,
RESPONSE=0x82
REQUEST=0x03,
RESPONSE=0x83
},
flags = {
COMPRESSION=0x01, -- not implemented
TRACING=0x02
},
op_codes = {
ERROR=0x00,
Expand Down Expand Up @@ -60,8 +64,12 @@ return {
},
query_flags = {
VALUES=0x01,
SKIP_METADATA=0x02, -- not implemented
PAGE_SIZE=0x04,
PAGING_STATE=0x08
PAGING_STATE=0x08,
SERIAL_CONSISTENCY=0x10,
DEFAULT_TIMESTAMP=0x20, -- not implemented
NAMED_VALUES=0x40 -- not implemented
},
rows_flags = {
GLOBAL_TABLES_SPEC=0x01,
Expand Down Expand Up @@ -114,6 +122,8 @@ return {
inet=0x10,
list=0x20,
map=0x21,
set=0x22
set=0x22,
udt=0x30,
tuple=0x31
}
}
Loading