Skip to content

Commit

Permalink
event aggregate time periods api
Browse files Browse the repository at this point in the history
  • Loading branch information
samparsky committed May 23, 2019
1 parent cd598ba commit 5d8f939
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 4 deletions.
94 changes: 94 additions & 0 deletions routes/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ router.get('/:id/validator-messages/:uid/:type?', channelIfExists, getValidatorM

// event aggregates information
router.get('/:id/events-aggregates', authRequired, channelLoad, getEventAggregates)
// event aggregates with timeframe information
router.get('/:id/events-aggregates/:publisher', channelLoad, getEventsTimeFrameAggregate)

// Submitting events/messages: requires auth
router.post(
Expand All @@ -35,6 +37,61 @@ function getStatus(req, res) {
res.send({ channel: req.channel })
}

function getEventsTimeFrameAggregate(req, res, next) {
const { publisher } = req.params
const {
eventType = 'IMPRESSION',
metric = 'eventCounts',
timeframe = 'year',
limit = 100
} = req.query
const eventsCol = db.getMongo().collection('eventAggregates')
const channel = req.channel
const group = getGroup(timeframe)

const pipeline = [
{
$addFields: {
value: {
$toInt: `$events.${eventType}.${metric}.${publisher}`
}
}
},
{
$match: {
channelId: channel.id,
[`events.${eventType}.${metric}.${publisher}`]: { $exists: true, $ne: null }
}
},
{
$project: {
value: 1,
created: 1,
year: { $year: '$created' },
month: { $month: '$created' },
week: { $week: '$created' },
day: { $dayOfMonth: '$created' },
hour: { $hour: '$created' },
minutes: { $minute: '$created' }
}
},
{ $sort: { created: 1 } },
{
$group: {
_id: { ...group },
value: { $sum: '$value' }
}
},
{ $limit: limit }
]

return eventsCol
.aggregate(pipeline)
.toArray()
.then(aggr => res.send({ channel, aggr }))
.catch(next)
}

function getEventAggregates(req, res, next) {
const eventsCol = db.getMongo().collection('eventAggregates')
const { uid } = req.session
Expand Down Expand Up @@ -207,5 +264,42 @@ function authRequired(req, res, next) {
next()
}

function getGroup(timeframe) {
let group = {}

if (timeframe === 'year') {
group = { year: '$year' }
}

if (timeframe === 'month') {
group = { year: '$year', month: '$month' }
}

if (timeframe === 'week') {
group = { year: '$year', week: '$week' }
}

if (timeframe === 'day') {
group = { year: '$year', month: '$month', day: '$day' }
}

if (timeframe === 'hour') {
group = { year: '$year', month: '$month', day: '$day', hour: '$hour' }
}

if (timeframe === 'minute') {
group = {
year: '$year',
week: '$week',
month: '$month',
day: '$day',
hour: '$hour',
minutes: '$minutes'
}
}

return group
}

// Export it
module.exports = router
34 changes: 30 additions & 4 deletions test/integration.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ tape('new states are not produced when there are no new aggregates', async funct
t.end()
})

tape('/channel/{id}/events-aggregates', async function(t) {
tape('/channel/{id}/events-aggregates/{publisher}', async function(t) {
const id = 'eventAggregateCountTest'
const channel = { ...dummyVals.channel, id }

Expand All @@ -143,8 +143,8 @@ tape('/channel/{id}/events-aggregates', async function(t) {

// post events for that channel for multiple publishers
const publishers = [
[dummyVals.auth.publisher, genEvents(1, dummyVals.ids.publisher)],
[dummyVals.auth.publisher2, genEvents(1, dummyVals.ids.publisher2)]
[dummyVals.auth.creator, genEvents(3, dummyVals.ids.publisher)],
[dummyVals.auth.creator, genEvents(3, dummyVals.ids.publisher2)]
]

await Promise.all(
Expand All @@ -153,14 +153,14 @@ tape('/channel/{id}/events-aggregates', async function(t) {
)
)
await aggrAndTick()

const eventAggrFilterfixtures = [
// if we're a non superuser (validator) returns our event
[dummyVals.auth.publisher, 1],
[dummyVals.auth.publisher2, 1],
// if we're a superuser (validator) returns all events
[dummyVals.auth.leader, 2]
]

const url = `${leaderUrl}/channel/${id}/events-aggregates`

await Promise.all(
Expand All @@ -180,6 +180,32 @@ tape('/channel/{id}/events-aggregates', async function(t) {
})
)

const timeAggrFilterFixtures = [
['?metric=eventPayouts'],
['?metric=eventCounts'],
['?timeframe=minute'],
['?timeframe=year'],
['?timeframe=hour'],
['?timeframe=day'],
['?timeframe=week'],
['?timeframe=month']
]

await Promise.all(
timeAggrFilterFixtures.map(async fixture => {
const [query] = fixture
const resp = await fetch(`${url}/${dummyVals.ids.publisher}${query}`, {
method: 'GET',
headers: {
'content-type': 'application/json'
}
}).then(res => res.json())
t.ok(resp.channel, 'has resp.channel')
// 3 is number of events submitted by publisher in url param
t.ok(resp.aggr[0].value === 3, 'has correct aggr value')
})
)

t.end()
})

Expand Down

0 comments on commit 5d8f939

Please sign in to comment.