-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevent_indexer.rb
88 lines (71 loc) · 2.27 KB
/
event_indexer.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class EventIndexer
def initialize
@client = ElasticsearchService.client
@migrator = EventMigrator.new
end
def index(event)
data = event.parsed_data
period = data['period']
application = event.installation.application || data['application']
counters = data['counters'] || []
sets = data['sets'] || []
timespans = data['timespans'] || []
counters_data = index_counters event, application, period, counters
sets_data = index_sets event, application, period, sets
timespans_data = index_timespans event, application, period, timespans
bulk_index counters_data.concat(sets_data).concat(timespans_data)
end
private
def index_counters(event, application, period, counters)
bulk_data = []
counters.each_with_index do |counter, i|
@migrator.migrate_counter counter
bulk_data.push({index: {_type: 'counter', _id: "#{event.id}-#{i}"}})
bulk_data.push({
installation_uuid: event.installation.uuid,
application: application,
metric: counter['metric'],
key: counter['key'],
value: counter['value'],
beginning: period['beginning'],
end: period['end']
})
end
bulk_data
end
def index_sets(event, application, period, sets)
bulk_data = []
sets.each_with_index do |set, i|
bulk_data.push({index: {_type: 'set', _id: "#{event.id}-#{i}"}})
bulk_data.push({
installation_uuid: event.installation.uuid,
application: application,
metric: set['metric'],
key: set['key'],
elements: set['elements'],
beginning: period['beginning'],
end: period['end']
})
end
bulk_data
end
def index_timespans(event, application, period, timespans)
bulk_data = []
timespans.each_with_index do |set, i|
bulk_data.push({index: {_type: 'timespan', _id: "#{event.id}-#{i}"}})
bulk_data.push({
installation_uuid: event.installation.uuid,
application: application,
metric: set['metric'],
key: set['key'],
days: set['days'],
beginning: period['beginning'],
end: period['end']
})
end
bulk_data
end
def bulk_index(data)
@client.bulk index: ElasticsearchService.index_name, body: data unless data.empty?
end
end