Skip to content

Commit

Permalink
Import dumped Prometheus metric data to an influxdb instance (#32)
Browse files Browse the repository at this point in the history
This feature can re-import dumped Prometheus metric data into an influxdb instance, thus we can then use that influxdb as a remote_read storage backend to analysis data offline.
  • Loading branch information
AstroProfundis authored Aug 16, 2018
1 parent e31b848 commit 8e631b3
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 5 deletions.
7 changes: 7 additions & 0 deletions insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from file import configfiles
from file import logfiles
from metric import prometheus
from metric.importer import prometheus as import_prom
from runtime import perf
from tidb import pdctl
from utils import fileopt
Expand Down Expand Up @@ -291,6 +292,12 @@ def dump_metrics(self, args):
you find certain data missing or empty in result, please try
to run this script again with root.""")

# re-import dumped data
if args.subcmd == 'metric' and args.subcmd_metric == "load":
insight_importer = import_prom.PromDump(args)
insight_importer.run_importing()
exit(0)

insight = Insight(args)

try:
Expand Down
1 change: 1 addition & 0 deletions metric/importer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# -*- coding: utf-8 -*-
109 changes: 109 additions & 0 deletions metric/importer/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
# Re-import dumped Prometheus metric data (in plain or compressed
# JSON format) to a local influxdb instance.
# Imported data will be put into a dedicated (or specified) database
# for futher analysis.

import datetime
import influxdb
import json
import logging
import os
import random
import string
import zlib

from utils import fileopt
from utils import util


class PromDump():
def __init__(self, args):
# if db_name, else db_name = prom_dump_`date`
self.host = args.host if args.host else 'localhost'
self.port = args.port if args.port else 8086
self.datadir = args.dir if args.dir else 'data'
self.db_name = args.db if args.db else self.unique_dbname()
self.user = args.user
self.passwd = args.passwd

# unique_dbname() generates a unique database name for importing, to prevents
# overwritting of previous imported data
def unique_dbname(self):
dbname = []
# universal prefix
dbname.append('tidb_insight_prom')
# current time
dbname.append(datetime.datetime.now().strftime("%Y%m%d%H%M"))
# a 4 digits random string
dbname.append(''.join(random.choice(
string.ascii_lowercase + string.digits) for _ in range(4)))

return '_'.join(dbname)

def load_dump(self):
def file_list(dir=None):
f_list = []
for file in fileopt.list_dir(dir):
if os.path.isdir(file):
f_list += file_list(file)
else:
f_list.append(file)
return f_list

for file in file_list(self.datadir):
if file.endswith('.json'):
raw = fileopt.read_file(file)
elif file.endswith('.dat'):
raw = zlib.decompress(fileopt.read_file(file, 'rb'))
else:
logging.debug("Skipped unrecorgnized file '%s'" % file)
continue
yield json.loads(raw)

def build_series(self):
def format_prom_metric(key=None):
points = []
point = {'fields': {}}
# build point header
for metric in key:
point['measurement'] = metric['metric']['__name__']
point['tags'] = {
'cluster': self.db_name,
'monitor': 'prometheus',
}
for k, v in metric['metric'].items():
point['tags'][k] = v
# build point values
for value in metric['values']:
point['time'] = datetime.datetime.utcfromtimestamp(
value[0]).strftime('%Y-%m-%dT%H:%M:%SZ')
try:
point['fields']['value'] = float(value[1])
except ValueError:
point['fields']['value'] = value[1]
points.append(point.copy())
return points

for key in self.load_dump():
yield format_prom_metric(key)

def write2influxdb(self):
client = influxdb.InfluxDBClient(
host=self.host, port=self.port, username=self.user, password=self.passwd,
database=self.db_name, timeout=30)
# create_database has no effect if the database already exist
client.create_database(self.db_name)
logging.info("Metrics will be imported to database '%s'." %
self.db_name)

for series in self.build_series():
try:
client.write_points(series, batch_size=2000)
except influxdb.exceptions.InfluxDBClientError as e:
logging.warn(
"Write error for key '%s', data may be empty." % series[0]['measurement'])
logging.debug(e)

def run_importing(self):
self.write2influxdb()
10 changes: 5 additions & 5 deletions utils/fileopt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@
# read data from file


def read_file(filename):
def read_file(filename, mode='r'):
data = None
with open(filename, 'r') as f:
with open(filename, mode) as f:
data = f.read()
f.close()
return data


# write data to file, in plain text
def write_file(filename, data):
with open(filename, 'w') as f:
# write data to file
def write_file(filename, data, mode='w'):
with open(filename, mode) as f:
try:
f.write(str(data, 'utf-8'))
except TypeError:
Expand Down
15 changes: 15 additions & 0 deletions utils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,21 @@ def parse_insight_opts():
help="Query resolution step width of Prometheus in seconds, 15.0 by default.")
parser_prom.add_argument("--compress", action="store_true", default=False,
help="Compress dumped JSON file, disabled by default. If compressed, the dumped file won't be able to read directly.")

parser_load = subparser_metric.add_parser(
"load", help="Load dumped metrics to local influxdb.")
parser_load.add_argument("--host", action="store", default=None,
help="The host of local influxdb, `localhost` by default.")
parser_load.add_argument("--port", type=int, action="store",
default=None, help="The port of local TSDB, `8086` by default.")
parser_load.add_argument("--dir", action="store", default=None,
help="The directory of dumped data, `data` by default.")
parser_load.add_argument("--db", action="store", default=None,
help="The database of imported metrics, if not set, a unique name will be auto generated by default.")
parser_load.add_argument("--user", action="store", default=None,
help="The user with priviledge to create database, empty (no authentication needed) by default.")
parser_load.add_argument("--passwd", action="store", default=None,
help="The password of user, empty (no authentication needed) by default.")
####

return parser.parse_args()
Expand Down

0 comments on commit 8e631b3

Please sign in to comment.