-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathschema.lua
281 lines (226 loc) · 7.87 KB
/
schema.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
local fiber = require('fiber')
local msgpack = require('msgpack')
local digest = require('digest')
local errors = require('errors')
local log = require('log')
local ReloadSchemaError = errors.new_class('ReloadSchemaError', {capture_stack = false})
local const = require('crud.common.const')
local dev_checks = require('crud.common.dev_checks')
local schema = {}
local function table_len(t)
local len = 0
for _ in pairs(t) do
len = len + 1
end
return len
end
local function call_reload_schema_on_replicaset(replicaset, channel)
replicaset.master.conn:reload_schema()
channel:put(true)
end
local function call_reload_schema(replicasets)
local replicasets_num = table_len(replicasets)
local channel = fiber.channel(replicasets_num)
local fibers = {}
for _, replicaset in pairs(replicasets) do
local f = fiber.new(call_reload_schema_on_replicaset, replicaset, channel)
table.insert(fibers, f)
end
for _ = 1,replicasets_num do
if channel:get(const.RELOAD_SCHEMA_TIMEOUT) == nil then
for _, f in ipairs(fibers) do
if f:status() ~= 'dead' then
f:cancel()
end
end
return nil, ReloadSchemaError:new("Reloading schema timed out")
end
end
return true
end
local reload_in_progress = {}
local reload_schema_cond = {}
function schema.reload_schema(vshard_router)
local replicasets = vshard_router:routeall()
local vshard_router_name = vshard_router.name
if reload_in_progress[vshard_router_name] == true then
if not reload_schema_cond[vshard_router_name]:wait(const.RELOAD_SCHEMA_TIMEOUT) then
return nil, ReloadSchemaError:new('Waiting for schema to be reloaded is timed out')
end
else
reload_in_progress[vshard_router_name] = true
if reload_schema_cond[vshard_router_name] == nil then
reload_schema_cond[vshard_router_name] = fiber.cond()
end
local ok, err = call_reload_schema(replicasets)
if not ok then
return nil, err
end
reload_schema_cond[vshard_router_name]:broadcast()
reload_in_progress[vshard_router_name] = false
end
return true
end
-- schema.wrap_func_reload calls func with specified arguments.
-- func should return `res, err, need_reload`
-- If function returned error and `need_reload` is true,
-- then schema is reloaded and one more attempt is performed
-- (but no more than RELOAD_RETRIES_NUM).
-- This wrapper is used for functions that can fail if router uses outdated
-- space schema. In case of such errors these functions returns `need_reload`
-- for schema-dependent errors.
function schema.wrap_func_reload(vshard_router, func, ...)
local i = 0
local res, err, need_reload
while true do
res, err, need_reload = func(vshard_router, ...)
if err == nil or need_reload ~= const.NEED_SCHEMA_RELOAD then
break
end
local ok, reload_schema_err = schema.reload_schema(vshard_router)
if not ok then
log.warn("Failed to reload schema: %s", reload_schema_err)
break
end
i = i + 1
if i > const.RELOAD_RETRIES_NUM then
local warn_msg = "Number of attempts to reload schema has been ended: %s"
log.warn(warn_msg, const.RELOAD_RETRIES_NUM)
break
end
end
return res, err
end
local function get_space_schema_hash(space)
if space == nil then
return ''
end
local indexes_info = {}
for i = 0, table.maxn(space.index) do
local index = space.index[i]
if index ~= nil then
indexes_info[i] = {
unique = index.unique,
parts = index.parts,
id = index.id,
type = index.type,
name = index.name,
path = index.path,
}
end
end
local space_info = {
format = space:format(),
indexes = indexes_info,
}
return digest.murmur(msgpack.encode(space_info))
end
function schema.filter_obj_fields(obj, field_names)
if field_names == nil or obj == nil then
return obj
end
local result = {}
for _, field_name in ipairs(field_names) do
result[field_name] = obj[field_name]
end
return result
end
local function filter_tuple_fields(tuple, field_names)
if field_names == nil or tuple == nil then
return tuple
end
local result = {}
for i, field_name in ipairs(field_names) do
result[i] = tuple[field_name]
end
return result
end
function schema.filter_tuples_fields(tuples, field_names)
dev_checks('?table', '?table')
if field_names == nil then
return tuples
end
local result = {}
for _, tuple in ipairs(tuples) do
local filtered_tuple = filter_tuple_fields(tuple, field_names)
table.insert(result, filtered_tuple)
end
return result
end
function schema.truncate_row_trailing_fields(tuple, field_names)
dev_checks('table|tuple', 'table')
local count_names = #field_names
local index = count_names + 1
local len_tuple = #tuple
if box.tuple.is(tuple) then
return tuple:transform(index, len_tuple - count_names)
end
for i = index, len_tuple do
tuple[i] = nil
end
return tuple
end
function schema.wrap_func_result(space, func, args, opts)
dev_checks('table', 'function', 'table', 'table')
local result = {}
opts = opts or {}
local ok, func_res = pcall(func, unpack(args))
if not ok then
result.err = func_res
if opts.add_space_schema_hash then
result.space_schema_hash = get_space_schema_hash(space)
end
else
if opts.noreturn ~= true then
result.res = filter_tuple_fields(func_res, opts.field_names)
end
end
if opts.fetch_latest_metadata == true then
local replica_schema_version
if box.info.schema_version ~= nil then
replica_schema_version = box.info.schema_version
else
replica_schema_version = box.internal.schema_version()
end
result.storage_info = {
replica_uuid = box.info().uuid,
replica_schema_version = replica_schema_version,
}
end
return result
end
-- schema.wrap_box_space_func_result pcalls some box.space function
-- and returns its result as a table
-- `{res = ..., err = ..., space_schema_hash = ...}`
-- space_schema_hash is computed if function failed and
-- `add_space_schema_hash` is true
function schema.wrap_box_space_func_result(space, box_space_func_name, box_space_func_args, opts)
dev_checks('table', 'string', 'table', 'table')
local function func(space, box_space_func_name, box_space_func_args)
return space[box_space_func_name](space, unpack(box_space_func_args))
end
return schema.wrap_func_result(space, func, {space, box_space_func_name, box_space_func_args}, opts)
end
-- schema.result_needs_reload checks that schema reload can
-- be helpful to avoid storage error.
-- It checks if space_schema_hash returned by storage
-- is the same as hash of space used on router.
-- Note, that storage returns `space_schema_hash = nil`
-- if reloading space format can't avoid the error.
function schema.result_needs_reload(space, result)
if result.space_schema_hash == nil then
return false
end
return result.space_schema_hash ~= get_space_schema_hash(space)
end
function schema.batching_result_needs_reload(space, results, tuples_count)
local storage_errs_count = 0
local space_schema_hash = get_space_schema_hash(space)
for _, result in ipairs(results) do
if result.space_schema_hash ~= nil and result.space_schema_hash ~= space_schema_hash then
storage_errs_count = storage_errs_count + 1
end
end
return storage_errs_count == tuples_count
end
return schema