diff --git a/.github/workflows/push-rockspec.yml b/.github/workflows/push-rockspec.yml new file mode 100644 index 0000000..4f86518 --- /dev/null +++ b/.github/workflows/push-rockspec.yml @@ -0,0 +1,52 @@ +name: Create and push rockspec for moonlibs/xqueue + +on: + push: + tags: + - '*' + +env: + ROCK_NAME: xqueue + +jobs: + pack-and-push-tagged-rockspec: + runs-on: ubuntu-latest + if: startsWith(github.ref, 'refs/tags') + steps: + - uses: actions/checkout@master + - uses: tarantool/setup-tarantool@v3 + with: + tarantool-version: '2.6' + + # https://stackoverflow.com/questions/58177786/get-the-current-pushed-tag-in-github-actions + - name: Set env + run: echo "TAG=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV + + - run: tarantoolctl rocks new_version --tag=${{ env.TAG }} rockspecs/xqueue-scm-5.rockspec ${{ env.TAG }} "git+https://github.com/${{ github.repository }}.git" + - run: tarantoolctl rocks --server https://moonlibs.github.io/rocks install ${{ env.ROCK_NAME }}-${{ env.TAG }}-1.rockspec + - run: tarantoolctl rocks pack ${{ env.ROCK_NAME }}-${{ env.TAG }}-1.rockspec + # Install native lua with luarocks + - uses: leafo/gh-actions-lua@v9 + with: + luaVersion: "luajit-2.1.0-beta3" + - uses: leafo/gh-actions-luarocks@v4 + with: + luarocksVersion: "3.8.0" + - uses: unfor19/install-aws-cli-action@v1.0.3 + - run: mkdir .build && cp ${{env.ROCK_NAME}}-dev-1.rockspec ${{env.ROCK_NAME}}-${{env.TAG}}-1.rockspec .build/ && cp *.src.rock .build/ + - name: rebuild and publish s3 luarocks server + env: + AWS_ACCESS_KEY_ID: ${{ secrets.MOONLIBS_S3_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.MOONLIBS_S3_SECRET_KEY}} + AWS_EC2_METADATA_DISABLED: true + run: | + cd .build && aws s3 sync s3://moonlibs/ ./ && luarocks-admin make_manifest . && aws s3 sync --acl public-read ./ s3://moonlibs/; + - uses: "marvinpinto/action-automatic-releases@latest" + with: + repo_token: "${{ secrets.GITHUB_TOKEN }}" + prerelease: false + files: | + README.md + ${{env.ROCK_NAME}}-dev-1.rockspec + ${{env.ROCK_NAME}}-${{env.TAG}}-1.rockspec + ${{env.ROCK_NAME}}-${{env.TAG}}-1.src.rock diff --git a/README.md b/README.md index 1460888..a042b22 100644 --- a/README.md +++ b/README.md @@ -7,32 +7,31 @@ * restart (renew, requeue?) - take task from W/D/Z states and put it into R/W * it seems, that ttl/ttr/bury requires attrs field - # Status * **R** - ready - task is ready to be taken - - created by put without delay - - turned on by release/kick without delay - - turned on from W when delay passed - - deleted after ttl if ttl is enabled + * created by put without delay + * turned on by release/kick without delay + * turned on from W when delay passed + * deleted after ttl if ttl is enabled * **T** - taken - task is taken by consumer. may not be taken by other. - - turned into R after ttr if ttr is enabled + * turned into R after ttr if ttr is enabled * **W** - waiting - task is not ready to be taken. waiting for its `delay` - - requires `runat` - - `delay` may be set during put, release, kick - - turned into R after delay + * requires `runat` + * `delay` may be set during put, release, kick + * turned into R after delay * **B** - buried - task was temporary discarded from queue by consumer - - may be revived using kick by administrator - - use it in unpredicted conditions, when man intervention is required - - Without mandatory monitoring (stats.buried) usage of buried is useless and awry + * may be revived using kick by administrator + * use it in unpredicted conditions, when man intervention is required + * Without mandatory monitoring (stats.buried) usage of buried is useless and awry * **Z** - zombie - task was processed and ack'ed and **temporary** kept for delay * **D** - done - task was processed and ack'ed and **permanently** left in database - - enabled when keep feature is set + * enabled when keep feature is set # Interface @@ -44,64 +43,64 @@ Imbue space with power of queue ```lua M.upgrade(space, { - format = { - -- space format. applied to space.format() if passed - }, - fields = { - -- id is always taken from pk - status = 'status_field_name' | status_field_no, - runat = 'runat_field_name' | runat_field_no, - priority = 'priority_field_name' | priority_field_no, - }, - features = { - id = 'auto_increment' | 'time64' | 'uuid' | 'required' | function - -- auto_increment - if pk is number, then use it for auto_increment - -- uuid - if pk is string, then use uuid for id - -- required - primary key MUST be present in tuple during put - -- function - funciton will be called to aquire id for task - retval = 'table' | 'tuple' - -- table requires space format. default if applicable. a bit slower - - buried = true, -- if true, support bury/kick - delayed = true, -- if true, support delayed tasks, requires `runat` - - keep = true, -- if true, keep ack'ed tasks in [D]one state, instead of deleting - -- mutually exclusive with zombie - - zombie = true|number, -- requires `runat` field - -- if number, then with default zombie delay, otherwise only if set delay during ack - -- mutually exclusive with keep - - ttl = true|number, -- requires `runat` field - -- Time To Live. Task is expired unless taken within time - -- if number, then with default ttl, otherwise only if set during put/release - ttr = true|number, -- requires `runat` field - -- Time To Release. Task is returned into [R]eady unless processed (turned to ack|release from taken) within time - -- if number, then with default ttl, otherwise only if set during take - }, - -- Set tubes for which statistics collector will be enabled - tube_stats = { 'tube-1', 'tube-2' }, + format = { + -- space format. applied to space.format() if passed + }, + fields = { + -- id is always taken from pk + status = 'status_field_name' | status_field_no, + runat = 'runat_field_name' | runat_field_no, + priority = 'priority_field_name' | priority_field_no, + }, + features = { + id = 'auto_increment' | 'time64' | 'uuid' | 'required' | function() ... return task_id end + -- auto_increment - if pk is number, then use it for auto_increment + -- uuid - if pk is string, then use uuid for id + -- required - primary key MUST be present in tuple during put + -- function - funciton will be called to aquire id for task + retval = 'table' | 'tuple' + -- table requires space format. default if applicable. a bit slower + + buried = true, -- if true, support bury/kick + delayed = true, -- if true, support delayed tasks, requires `runat` + + keep = true, -- if true, keep ack'ed tasks in [D]one state, instead of deleting + -- mutually exclusive with zombie + + zombie = true|number, -- requires `runat` field + -- if number, then with default zombie delay, otherwise only if set delay during ack + -- mutually exclusive with keep + + ttl = true|number, -- requires `runat` field + -- Time To Live. Task is expired unless taken within time + -- if number, then with default ttl, otherwise only if set during put/release + ttr = true|number, -- requires `runat` field + -- Time To Release. Task is returned into [R]eady unless processed (turned to ack|release from taken) within time + -- if number, then with default ttl, otherwise only if set during take + }, + -- Set tubes for which statistics collector will be enabled + tube_stats = { 'tube-1', 'tube-2' }, }) ``` ## Producer methods * `space:put` - - `task` - table or array or tuple - + `table` - * **requires** space format - * suitable for id generation - + `array` - * ignores space format - * for id generation use `NULL` (**not** `nil`) - + `tuple` - * ignores space format - * **can't** be used with id generation - - `attr` - table of attributes - + `delay` - number of seconds - * if set, task will become `W` instead of `R` for `delay` seconds - + `ttl` - number of seconds - * if set, task will be discarded after ttl seconds unless was taken + * `task` - table or array or tuple + * `table` + * **requires** space format + * suitable for id generation + * `array` + * ignores space format + * for id generation use `NULL` (**not** `nil`) + * `tuple` + * ignores space format + * **can't** be used with id generation + * `attr` - table of attributes + * `delay` - number of seconds + * if set, task will become `W` instead of `R` for `delay` seconds + * `ttl` - number of seconds + * if set, task will be discarded after ttl seconds unless was taken ```lua box.space.myqueue:put{ name="xxx"; data="yyy"; } @@ -115,54 +114,123 @@ box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5; ttl = 100 }) ## Consumer methods * `space:take(timeout)` - - `timeout` - number of seconds to wait for new task - + choose reasonable time - + beware of **readahead** size (see tarantool docs) - - returns task tuple or table (see retval) or nothing on timeout - - *TODO*: ttr must be there + * `timeout` - number of seconds to wait for new task + * choose reasonable time + * beware of **readahead** size (see tarantool docs) + * returns task tuple or table (see retval) or nothing on timeout + * *TODO*: ttr must be there * `space:ack(id, [attr])` - - `id`: - + `string` | `number` - primary key - + `tuple` - key will be extracted using index - + *TODO*: composite pk - - `attr`: - + `update` - table for update, like in space:update - + `delay` - number of seconds - * applicable only when `zombie` enabled - * if `zombie` is enabled, delay deletion for delay time - - returns task tuple or table (see retval) + * `id`: + * `string` | `number` - primary key + * `tuple` - key will be extracted using index + * *TODO*: composite pk + * `attr`: + * `update` - table for update, like in space:update + * `delay` - number of seconds + * applicable only when `zombie` enabled + * if `zombie` is enabled, delay deletion for delay time + * returns task tuple or table (see retval) * `space:release(id, [attr])` - - `id` - as in `:ack` - - `attr` - + `update` - table for update, like in space:update - + `ttl` - timeout for time to live - + `delay` - number of seconds - * if set, task will become `W` instead of `R` for `delay` seconds + * `id` - as in `:ack` + * `attr` + * `update` - table for update, like in space:update + * `ttl` - timeout for time to live + * `delay` - number of seconds + * if set, task will become `W` instead of `R` for `delay` seconds * `space:bury(id, [attr])` - - `id` - as in `:ack` - - `attr` - + `update` - table for update, like in space:update - + `ttl` - timeout for time to live (?) + * `id` - as in `:ack` + * `attr` + * `update` - table for update, like in space:update + * `ttl` - timeout for time to live (?) + +* `space:wait(id, [timeout])` + * `id` - as in `:ack` + * `timeout` - number of seconds to wait for the task processing + * returns task tuple or table (see retval) and boolean `was_processed` flag ## Admin methods * `space:dig(id, [attr])` - dig out task from buried state - - `id` - as in `:ack` - - `attr` - + `update` - table for update, like in space:update - + `ttl` - timeout for time to live - + `delay` - number of seconds - * if set, task will become `W` instead of `R` for `delay` seconds - - returns task tuple or table (see retval) - -* `space:plow(N, [attr])` - dig out N oldest tasks from buried state - - `N` - number. To dig out all buried tasks set infinity or nil - - `attr` - attrs for every task, like in dig - -* `space:queue_stats()` - - returns table - + *TODO* - + * `id` - as in `:ack` + * `attr` + * `update` - table for update, like in space:update + * `ttl` - timeout for time to live + * `delay` - number of seconds + * if set, task will become `W` instead of `R` for `delay` seconds + * returns task tuple or table (see retval) + +* `space:kill(id)` - kill the task when it was taken + * `id` - as in `:ack` + * returns task tuple or table (see retval) + +* `space:dig(N, [attr])` - dig out N oldest tasks from buried state + * `N` - number. To dig out all buried tasks set infinity or nil + * `attr` - attrs for every task, like in dig + +* `space:stats()` + * returns table + + ```yaml + tarantool> box.space.queue:stats() + --- + - tube: + B: + counts: {'T': 0, 'W': 0, 'R': 0, 'D': 0, 'Z': 0, 'B': 0} + transition: + X-R: 6 + R-T: 7 + T-S: 3 + T-X: 4 + A: + counts: {'B': 0, 'W': 0, 'D': 0, 'T': 0, 'Z': 0, 'R': 0} + transition: [] + C: + counts: {'T': 0, 'W': 0, 'R': 0, 'D': 0, 'Z': 0, 'B': 0} + transition: + T-R: 16 + R-T: 19 + S-R: 3 + T-X: 3 + transition: + X-R: 6 + R-T: 26 + T-R: 19 + T-X: 7 + counts: {'T': 0, 'W': 0, 'R': 0, 'D': 0, 'Z': 0, 'B': 0} + ... + ``` + +## Explanation of Statuses + +| Status | Description | +|--------|-----------------------------------------------------------------------------| +| X | Special status, means task went to void, or appeared from void (makes sense in transitions) | +| R | (Ready) Task is Ready to be Taken | +| W | (Waiting) Task processing was delayed by producer or consumer | +| T | (Taken) Task has been taken by consumer | +| B | (Buried) Fatal error happened during task processing (and consumer buried it) | +| Z | (Zombie) Task was processed successfully and left in the Queue for zombie_delay | +| D | (Done) Task was processed successfully and left in the Queue | +| S | Extra status for tube transition | + +## Status Transitions + +| Transition | Description | +|------------|---------------------------------------------------------------------------------------------| +| X -> R | Task was put into the Queue into Ready state (method :put) | +| X -> W | Task was put into the Queue into Waiting state (method :put with specified delay) | +| W -> R | Task was scheduled by runat mechanism of the Queue and became ready to be Taken by consumer | +| R -> T | Task was taken by consumer | +| T -> W | Consumer returned task into the Queue and delayed its execution | +| T -> R | Consumer returned task into the Queue for immediate execution | +| T -> X | Consumer acknowledged successful execution of the Task, and task was removed from the Queue | +| T -> Z | Consumer acknowledged successful execution of the Task, and task was moved to Zombie state for zombie_delay timeout | +| T -> D | Consumer acknowledged successful execution of the Task, and task was moved to Done status and will be left in the Queue forever | +| T -> S | Task was moved into another tube | +| S -> R | Task was moved from another tube to be Ready | +| S -> W | Task was moved from another tube to be Waiting | +| S -> D | Task was moved from another tube to be Done | +| S -> B | Task was moved from another tube to be Buried | diff --git a/xqueue-dev-1.rockspec b/xqueue-dev-1.rockspec new file mode 100644 index 0000000..e688e21 --- /dev/null +++ b/xqueue-dev-1.rockspec @@ -0,0 +1,21 @@ +rockspec_format = "3.0" +package = "xqueue" +version = "dev-1" +source = { + url = "git+https://github.com/moonlibs/xqueue.git", + branch = "master" +} +description = { + summary = "Package for loading external lua config", + homepage = "https://github.com/moonlibs/xqueue.git", + license = "BSD" +} +dependencies = { + "lua ~> 5.1" +} +build = { + type = "builtin", + modules = { + xqueue = "xqueue.lua" + } +} diff --git a/xqueue.lua b/xqueue.lua index df07ccc..864f9a0 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -581,7 +581,6 @@ function M.upgrade(space,opts,depth) transition = {}; tube = stat_tube; } - -- TODO: benchmark index:count() if self.fields.tube then for _, t in space:pairs(nil, { iterator = box.index.ALL }) do local s = t[self.fields.status] @@ -598,6 +597,8 @@ function M.upgrade(space,opts,depth) self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1 end end + else + self._stat = { counts = {}, transition = {}, tube = stat_tube } end -- 3. features check @@ -1740,6 +1741,8 @@ local pretty_st = { D = "Done", } +local shortmap = { __serialize = 'map' } + ---@param pretty? boolean function methods:stats(pretty) local stats = table.deepcopy(self.xq._stat) @@ -1754,10 +1757,12 @@ function methods:stats(pretty) else stats.counts[s] = stats.counts[s] or 0LL for _, tube_stat in pairs(stats.tube) do + setmetatable(tube_stat.counts, shortmap) tube_stat.counts[s] = tube_stat.counts[s] or 0LL end end end + setmetatable(stats.counts, shortmap) return stats end