diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b52e8b1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +.vscode +venv +log +data +.history +src/config/*.toml +!src/config/main.sample.toml \ No newline at end of file diff --git a/favicon.ico b/favicon.ico new file mode 100644 index 0000000..2240363 Binary files /dev/null and b/favicon.ico differ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7f10982 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,29 @@ +astroid==2.13.5 +certifi==2024.2.2 +charset-normalizer==3.3.2 +confluent-kafka==2.2.0 +dill==0.3.8 +idna==3.7 +isort==5.13.2 +lazy-object-proxy==1.10.0 +mccabe==0.7.0 +numpy==1.22.4 +pandas==1.3.5 +platformdirs==4.2.2 +protobuf==4.23.4 +psycopg2-binary==2.9.9 +pylint==2.15.2 +python-dateutil==2.8.2 +pytz==2024.1 +PyYAML==6.0 +requests==2.32.2 +six==1.16.0 +supervisor==4.2.5 +toml==0.10.2 +tomli==2.0.1 +tomlkit==0.12.5 +tornado==6.3.3 +typing_extensions==4.11.0 +urllib3==2.2.1 +uuid==1.30 +wrapt==1.16.0 diff --git a/src/config/main.sample.toml b/src/config/main.sample.toml new file mode 100644 index 0000000..a521287 --- /dev/null +++ b/src/config/main.sample.toml @@ -0,0 +1,14 @@ +[server] +server_name = "id_allocation" +ip = "127.0.0.1" +port = 9001 +work_path = "/app" +thread_count = 8 +process_count = 6 +log_path = "/app/log" +log_level = 1 +log_max_size = 1600 + +[id_allocation] +write = "postgresql://postgres:password@host/dbname" +read = "postgresql://postgres:password@host/dbname" diff --git a/src/controller/__init__.py b/src/controller/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/controller/allocation_controller.py b/src/controller/allocation_controller.py new file mode 100644 index 0000000..040ef5e --- /dev/null +++ b/src/controller/allocation_controller.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +''' +Author: Zella Zhong +Date: 2024-05-23 22:47:04 +LastEditors: Zella Zhong +LastEditTime: 2024-05-24 04:52:22 +FilePath: /id_allocation/src/controller/allocation_controller.py +Description: allocation controller +''' +import json +import logging +import time +import setting + +from httpsvr import httpsvr +import psycopg2 +from setting import get_write_conn + + +def dict_factory(cursor, row): + """ + Convert query result to a dictionary. + """ + col_names = [col_desc[0] for col_desc in cursor.description] + return dict(zip(col_names, row)) + + +class AllocationController(httpsvr.BaseController): + '''AllocationController''' + def __init__(self, obj, param=None): + super(AllocationController, self).__init__(obj) + + def allocation(self): + ''' + description: + requestbody: { + "graph_id": "string", + "updated_nanosecond": "int64", + "vids": ["string"], + } + return: { + "return_graph_id": "string", + "return_updated_nanosecond": "int64", + } + ''' + post_data = self.inout.request.body + data = json.loads(post_data) + graph_id = data.get("graph_id", "") + updated_nanosecond = data.get("updated_nanosecond", 0) + vids = data.get("vids", []) + if graph_id == "" or updated_nanosecond == 0: + return httpsvr.Resp(msg="Invalid input body", data={}, code=-1) + if len(vids) == 0: + return httpsvr.Resp(msg="Invalid input body", data={}, code=-1) + + + data = {} + rows = [] + code = 0 + msg = "" + try: + pg_conn = get_write_conn() + cursor = pg_conn.cursor() + + process_vids = "ARRAY[" + ",".join(["'" + x + "'" for x in vids]) + "]" + ssql = "SELECT * FROM process_id_allocation(%s, '%s', %d);" % (process_vids, graph_id, updated_nanosecond) + cursor.execute(ssql) + rows = [dict_factory(cursor, row) for row in cursor.fetchall()] + logging.debug("allocation vids: {}, result: {}".format(process_vids, rows)) + if len(rows) == 0: + cursor.close() + pg_conn.close() + return httpsvr.Resp(msg="allocation ID=null", data={}, code=-1) + + data = rows[0] + cursor.close() + pg_conn.close() + except Exception as e: + code = -1 + msg = repr(e) + logging.exception(e) + + return httpsvr.Resp(msg=msg, data=data, code=code) diff --git a/src/data_server.py b/src/data_server.py new file mode 100644 index 0000000..a64e7db --- /dev/null +++ b/src/data_server.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +''' +Author: Zella Zhong +Date: 2024-05-23 22:34:52 +LastEditors: Zella Zhong +LastEditTime: 2024-05-23 22:54:59 +FilePath: /id_allocation/src/data_server.py +Description: main entry point for allocating +''' +import os +import logging + +import setting +import setting.filelogger as logger + +from controller.allocation_controller import AllocationController + + +if __name__ == "__main__": + config = setting.load_settings(env="development") + # config = setting.load_settings(env="production") + if not os.path.exists(config["server"]["log_path"]): + os.makedirs(config["server"]["log_path"]) + logger.InitLogger(config) + logger.SetLoggerName("id_allocation") + + try: + from httpsvr import httpsvr + # [path, controller class, method] + ctrl_info = [ + ["/id_allocation/allocation", AllocationController, "allocation"], + ] + svr = httpsvr.HttpSvr(config, ctrl_info) + svr.Start() + + except Exception as e: + logging.exception(e) diff --git a/src/httpsvr/__init__.py b/src/httpsvr/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/httpsvr/config.py b/src/httpsvr/config.py new file mode 100644 index 0000000..32a703b --- /dev/null +++ b/src/httpsvr/config.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import os +import logging + +class HttpSvrConfig(): + """config is an toml object""" + def __init__(self, config): + self.ip = "" + self.port = config["server"]["port"] + self.work_path = config["server"]["work_path"] + self.server_name = config["server"]["server_name"] + try: + self.thread_count = config["server"]["thread_count"] + except: + self.thread_count = 8 + + try: + self.process_count = config["server"]["process_count"] + except: + self.process_count = 0 + + self.log_path = config["server"]["log_path"] + self.log_level = config["server"]["log_level"] + self.log_max_size = config["server"]["log_max_size"] + + if self.log_path == "": + self.log_path = "./log" + + if self.ip == "": + self.ip = "127.0.0.1" + + try: + self.max_buffer_size = config["server"]["max_buffer_size"] + except: + self.max_buffer_size = 0 + diff --git a/src/httpsvr/httpsvr.py b/src/httpsvr/httpsvr.py new file mode 100644 index 0000000..b7d3884 --- /dev/null +++ b/src/httpsvr/httpsvr.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import time +import logging +import traceback +import json +import tornado.httpserver +import tornado.ioloop +import tornado.web +import tornado.gen +import tornado.process +import tornado.netutil +from tornado.concurrent import run_on_executor +from concurrent.futures import ThreadPoolExecutor + +from httpsvr.config import HttpSvrConfig +from setting.filelogger import InitLogger + + +class BaseController(): + def __init__(self, obj, param=None): + """ + Base class for controllers + { + "inout": tornado.web.RequestHandler object, + "path": url path, + "cmdid": cgi cmdid, + } + """ + if obj != None: + self.inout = obj.inout + self.path = obj.path + self.method = obj.method + else: + self.inout = param["inout"] + self.path = param["path"] + self.method = param["method"] + + def get_json_body(self): + """把body用json解码,返回dict""" + try: + args = json.loads(self.request.body) + except Exception as e: + args = {} + return args + + +class BaseHandler(tornado.web.RequestHandler): + """BaseHandler""" + executor = None + ctrl_map = {} + port = 0 + + @staticmethod + def SetExecutor(threadCount): + BaseHandler.executor = ThreadPoolExecutor(threadCount) + + + @staticmethod + def SetCtrlInfo(ctrl_info): + d = {} + for r in ctrl_info: + d[ r[0] ] = { + "class": r[1], + "method": r[2], + } + BaseHandler.ctrl_map = d + + @staticmethod + def SetPort(port): + BaseHandler.port = port + + @tornado.gen.coroutine + def get(self, path=None): + self.start_time = time.time() + self.method = "GET" + resp = yield self.__do(path, "GET") + self.__formatResp(path, resp) + + @tornado.gen.coroutine + def post(self, path=None): + self.start_time = time.time() + self.method = "POST" + resp = yield self.__do(path, "POST") + self.__formatResp(path, resp) + + @run_on_executor + def __do(self, path, method): + self.__logReq() + self.__status_code = 200 + self.__reason = "" + self.__auth_ok = True + + try: + ctrlInfo = self.ctrl_map.get(path, None) + if ctrlInfo == None: + raise Exception("no ctrl match " + path) + + param = { + "inout": self, + "path": path, + "method": method, + } + baseCtrlObj = BaseController(None, param) + ctrlObj = ctrlInfo["class"](baseCtrlObj) + + method = getattr(ctrlObj, ctrlInfo["method"], None) + if method is None: + raise Exception("no class.method match " + path) + + return method() + except Exception as e: + self.__reason = traceback.format_exc(None) + self.__status_code = 404 + logging.error(self.__reason) + return None + + def __logReq(self): + logging.debug(self.request) + # logging.debug(self.request.headers) + + def __formatResp(self, path, resp): + time_used = int((time.time() - self.start_time) * 1000) + if self.__status_code != 200: + self.clear() + self.set_status(self.__status_code) + if self.__reason != "": + self.finish(self.__reason) + else: + self.finish("%d" % self.__status_code) + return + + if resp is not None: + if isinstance(resp, RenderResp): + self.render(resp.template_name, **resp.kwargs) + return + + if isinstance(resp, CommResp): + if resp.code != 200: + self.clear() + self.set_status(resp.code) + if resp.data != "": + self.finish(resp.data) + else: + self.finish("%d" % resp.code) + else: + if resp.contentType != "": + self.set_header("Content-Type", resp.contentType) + self.set_header("Access-Control-Allow-Origin", "*") + self.write(resp.data) + return + + if isinstance(resp, Resp): + resp = resp.resp + + if type(resp) == dict: + tpl = self.get_argument("tpl", None) + if tpl != None: + r = path.split("/") + t = "%s_%s_%s.html" % (r[-2], r[-1], tpl) + try: + self.render(t, **resp) + except Exception as e: + logging.error("render template error") + raise e + else: + self.set_header("Content-Type", "application/json; charset=UTF-8") + self.set_header("Access-Control-Allow-Origin", "*") + self.write(json.dumps(resp)) + else: + self.write(resp) + + +class HttpSvr: + """HttpSvr implementation""" + def __init__(self, config, ctrl_info): + """http svr class. single process, multi thread.""" + self.settings = HttpSvrConfig(config) + InitLogger(config) + + BaseHandler.SetExecutor(self.settings.thread_count) + logging.info("ctrl_info {}".format(ctrl_info)) + BaseHandler.SetCtrlInfo(ctrl_info) + + def Start(self): + """ + Start HttpSvr. + """ + handlers = [ + (r"(.*)", BaseHandler) + ] + work_path = self.settings.work_path + if not work_path.endswith("/"): + work_path = work_path + "/" + + logging.info("start websvr on %s %d", self.settings.ip, self.settings.port) + logging.info("module_name:%s, work_path:%s, thread_count %s", self.settings.server_name, work_path, self.settings.thread_count) + logging.info("Ctrl: %s", str(BaseHandler.ctrl_map)) + app = tornado.web.Application( + handlers=handlers, + debug=False, + ) + if self.settings.max_buffer_size > 0: + http_server = tornado.httpserver.HTTPServer(app, max_buffer_size=self.settings.max_buffer_size) + else: + http_server = tornado.httpserver.HTTPServer(app) + + if self.settings.process_count > 0: # app.debug=true multi-process occurs error. + sockets = tornado.netutil.bind_sockets(self.settings.port, address=self.settings.ip) + tornado.process.fork_processes(self.settings.process_count) + http_server.add_sockets(sockets) + else: + http_server.bind(self.settings.port, address=self.settings.ip) + http_server.start() + + tornado.ioloop.IOLoop.instance().start() + + +class Resp(object): + """ + When you need to write json data, return this instance + """ + + def __init__(self, data=None, code=0, msg=""): + self.resp = {"code": code, "msg": msg, "data": data} + + +class RenderResp(object): + """ + When you need to render the template, please return this instance. + """ + + def __init__(self, template_name, **kwargs): + self.template_name = template_name + self.kwargs = kwargs + + +class CommResp(object): + """ + When you need to write non-json data, please return this instance. + If code != 200, use data to write reason and do not use contentType. + """ + + def __init__(self, data=None, code=200, contentType=""): + self.data = data + self.code = code + self.contentType = contentType diff --git a/src/script/create_id_allocation.sql b/src/script/create_id_allocation.sql new file mode 100644 index 0000000..ca32006 --- /dev/null +++ b/src/script/create_id_allocation.sql @@ -0,0 +1,73 @@ +CREATE TABLE id_allocation ( + id SERIAL PRIMARY KEY, + unique_id VARCHAR(265) NOT NULL, + graph_id VARCHAR(255) NOT NULL, + platform VARCHAR(255) NOT NULL, + identity VARCHAR(255) NOT NULL, + updated_nanosecond BIGINT DEFAULT 0, + picked_time TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT unique_index UNIQUE (unique_id) +); + +CREATE INDEX idx_platform ON id_allocation (platform); +CREATE INDEX idx_graph_id_updated_time ON id_allocation (graph_id, updated_nanosecond); + + +CREATE OR REPLACE FUNCTION process_id_allocation( + vids text[], + new_graph_id varchar(255), + new_updated_nanosecond bigint +) +RETURNS TABLE(return_graph_id varchar(255), return_updated_nanosecond bigint) AS $$ +DECLARE + existing_record RECORD; + min_updated_record RECORD; +BEGIN + -- Find the record with the minimum updated_nanosecond + SELECT graph_id, updated_nanosecond INTO min_updated_record + FROM id_allocation + WHERE unique_id = ANY(vids) + ORDER BY updated_nanosecond + LIMIT 1; + + IF min_updated_record IS NOT NULL THEN + -- Return the graph_id and updated_nanosecond with the smallest updated_nanosecond + return_graph_id := min_updated_record.graph_id; + return_updated_nanosecond := min_updated_record.updated_nanosecond; + + -- Update the table with the smallest updated_nanosecond + UPDATE id_allocation + SET graph_id = return_graph_id, updated_nanosecond = return_updated_nanosecond, picked_time = CURRENT_TIMESTAMP + WHERE unique_id = ANY(vids); + ELSE + -- Insert new records + FOR existing_record IN + SELECT unnest(vids) AS unique_id + LOOP + INSERT INTO id_allocation (unique_id, graph_id, platform, identity, updated_nanosecond) + VALUES ( + existing_record.unique_id, + new_graph_id, + split_part(existing_record.unique_id, ',', 1), + split_part(existing_record.unique_id, ',', 2), + new_updated_nanosecond, + picked_time = CURRENT_TIMESTAMP + ); + END LOOP; + + -- Return the input graph_id and updated_nanosecond since no existing records were found + return_graph_id := new_graph_id; + return_updated_nanosecond := new_updated_nanosecond; + END IF; + + RETURN NEXT; +END; +$$ LANGUAGE plpgsql; + + + +SELECT * FROM process_id_allocation( + ARRAY['twitter,chan_izaki65652', 'farcaster,gytred', 'ethereum,0x61ae970ac67ff4164ebf2fd6f38f630df522e5ef'], + 'aec0c81c-7ab2-42e6-bb74-e7ea8d2cf903', + 1716471514174958 +); \ No newline at end of file diff --git a/src/script/test_api.sh b/src/script/test_api.sh new file mode 100644 index 0000000..c2e2003 --- /dev/null +++ b/src/script/test_api.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +curl -w "\nTime: %{time_total}s\n" -X POST http://127.0.0.1:9001/id_allocation/allocation -d '{"graph_id":"cba9bbf5-ee77-48f6-919c-302fcd1c2a23","updated_nanosecond":1716482473640540,"vids":["twitter,chan_izaki65652","farcaster,gytred","ethereum,0x61ae970ac67ff4164ebf2fd6f38f630df522e5ef"]}' \ No newline at end of file diff --git a/src/setting/__init__.py b/src/setting/__init__.py new file mode 100644 index 0000000..1f0a474 --- /dev/null +++ b/src/setting/__init__.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +''' +Author: Zella Zhong +Date: 2024-05-23 22:36:57 +LastEditors: Zella Zhong +LastEditTime: 2024-05-23 22:46:16 +FilePath: /id_allocation/src/setting/__init__.py +Description: load toml config and global setting +''' +import sys +import logging +import os +import toml + +import psycopg2 +from psycopg2 import pool + +Settings = { + "env": "development", + "datapath": "./data", +} + +ID_ALLOCATION = { + "write": "", + "read": "", +} + +def load_settings(env="production"): + """ + @description: load configurations from file + """ + global Settings + global ID_ALLOCATION + + config_file = "/app/config/production.toml" + if env == "testing": + config_file = "/app/config/testing.toml" + elif env == "development": + config_file = "./config/development.toml" + elif env == "production": + config_file = "/app/config/production.toml" + else: + raise ValueError("Unknown environment") + + config = toml.load(config_file) + Settings["env"] = env + Settings["datapath"] = os.path.join(config["server"]["work_path"], "data") + ID_ALLOCATION = load_dsn(config_file) + return config + + +def load_dsn(config_file): + """ + @description: load pg dsn + @params: config_file + @return dsn_settings + """ + try: + config = toml.load(config_file) + pg_dsn_settings = { + "write": config["id_allocation"]["write"], + "read": config["id_allocation"]["read"], + } + return pg_dsn_settings + except Exception as ex: + logging.exception(ex) + + +def get_write_conn(): + try: + pg_conn = psycopg2.connect(ID_ALLOCATION["write"]) + except Exception as e: + logging.exception(e) + raise e + + return pg_conn + + +conn_pool = None + +def initialize_connection_pool(minconn=1, maxconn=10): + """ + Initialize the connection pool. + """ + global conn_pool + db_params = { + "dbname": "xx", + "user": "xx", + "password": "xx", + "host": "xx" + } + try: + # conn_pool = pool.ThreadedConnectionPool(minconn=minconn, maxconn=maxconn, **db_params) + conn_pool = pool.SimpleConnectionPool(minconn=minconn, maxconn=maxconn, **db_params) + logging.info("Database connection pool created.") + except Exception as e: + logging.error("Error creating the database connection pool: {}".format(e)) + conn_pool = None + + +def get_connection(): + global conn_pool + if conn_pool is None or conn_pool.closed: + logging.info("Connection pool does not exist or has been closed, initializing a new one.") + initialize_connection_pool() + + if conn_pool: + try: + conn = conn_pool.getconn() + if conn: + logging.info("Retrieved a connection from the pool. Used connections: {}".format(len(conn_pool._used))) + return conn + else: + logging.error("Failed to retrieve a connection from the pool.") + except Exception as e: + logging.error("Error getting a connection from the pool: {}".format(e)) + else: + logging.error("Connection pool is not available.") + return None + + +# Function to release a connection back to the pool +def put_connection(conn): + global conn_pool + if conn_pool: + logging.info("conn_pool used connection {}".format(len(conn_pool._used))) + conn_pool.putconn(conn) \ No newline at end of file diff --git a/src/setting/filelogger.py b/src/setting/filelogger.py new file mode 100644 index 0000000..4638341 --- /dev/null +++ b/src/setting/filelogger.py @@ -0,0 +1,308 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import logging +import datetime +import codecs +import time +import os +import sys +import traceback + +from logging import Handler +import setting + +# sys.version > '3' +import io as cStringIO +try: + unicode + _unicode = True +except NameError: + _unicode = False + + +class FileHandler(Handler): + """ + Writes formatted logging records + """ + def __init__(self, config): + logging.Handler.__init__(self) + + self.log_path = "./log" + self.log_level = 5 + self.log_max_size = 1600 * 1024 * 1024 + self.encoding = "utf8" + self.server_name = None + self.printstd = False + + if config is not None: + try: + self.log_path = config["server"]["log_path"] + except: + pass + + try: + self.log_level = config["server"]["log_level"] + except: + pass + + try: + self.log_max_size = config["server"]["log_max_size"] + except: + pass + + try: + self.server_name = config["server"]["server_name"] + except: + pass + + # OFF=1, PANIC=2, ERROR=3, WARN = 4, DEBUG=5 + if self.log_level == 5: + self.setLevel(logging.DEBUG) + elif self.log_level == 4: + self.setLevel(logging.WARNING) + elif self.log_level == 3: + self.setLevel(logging.INFO) + elif self.log_level == 0: + self.setLevel(logging.NOTSET) + else: + self.setLevel(logging.ERROR) + + self.last_file_time = "" + self.stream = None + self.log_size = 0 + + self._open() + + def setPrintStd(self): + """ + Print at terminal. + """ + self.printstd = True + + def flush(self): + """ + Flushes the stream. + """ + self.acquire() + try: + if self.stream and hasattr(self.stream, "flush"): + self.stream.flush() + finally: + self.release() + + def close(self): + """ + Closes the stream. + """ + self.acquire() + try: + try: + if self.stream: + try: + self.flush() + finally: + stream = self.stream + self.stream = None + if hasattr(stream, "close"): + stream.close() + finally: + # Issue #19523: call unconditionally to + # prevent a handler leak when delay is set + logging.Handler.close(self) + finally: + self.release() + + def _open(self): + """ + Open the current base file with the (original) mode and encoding. + Return the resulting stream. + """ + if self.server_name is None: + return + file_time = datetime.datetime.now().strftime("%Y%m%d%H") + + if file_time != self.last_file_time: + self.acquire() + try: + file_time = datetime.datetime.now().strftime("%Y%m%d%H") + if file_time != self.last_file_time: + if self.stream: + if hasattr(self.stream, "flush"): + self.stream.flush() + if hasattr(self.stream, "close"): + self.stream.close() + self.stream = None + + if len(self.server_name) > 0 and self.log_path.find(self.server_name) > 0: + fn = "%s/%s.log" % (self.log_path, file_time) + else: + fn = "%s/%s_%s.log" % (self.log_path, self.server_name, file_time) + + self.stream = codecs.open(fn, "a", self.encoding) + finally: + self.release() + + def emit(self, record): + """ + Emit a record. + + If the stream was not opened because 'delay' was specified in the + constructor, open it before calling the superclass's emit. + """ + if self.log_level == 0: + return + self._open() + + if not self.printstd and self.log_size > self.log_max_size: + return + + try: + msg = self.format(record) + stream = self.stream + fs = "%s\n" + if not _unicode: + msg = fs % msg + else: + if (isinstance(msg, unicode) and getattr(stream, 'encoding', None)): + ufs = u'%s\n' + msg = ufs % msg + else: + msg = fs % msg + if getattr(stream, 'encoding', None): + msg = msg.decode(stream.encodin) + except (KeyboardInterrupt, SystemExit): + raise + except: + self.handleError(record) + return + + if self.printstd: + try: + sys.stderr.write(msg) + except: + pass + + if self.stream and self.log_size + len(msg) < self.log_max_size: + try: + if msg[-1:] != "\n": + msg = msg + "\n" + except: + pass + + self.stream.write(msg) + self.flush() + self.log_size += len(msg) + + +class Formatter(): + """Formatter logging format defines""" + def __init__(self, log_name="root"): + self.__log_name = log_name + + def set_log_name(self, log_name): + self.__log_name = log_name + + def format_exception(self, ei): + """ + Format and return the specified exception information as a string. + + This default implementation just uses + traceback.print_exception() + """ + sio = cStringIO.StringIO() + traceback.print_exception(ei[0], ei[1], ei[2], None, sio) + s = sio.getvalue() + sio.close() + if s[-1:] == "\n": + s = s[:-1] + return s + + def format(self, record): + """ + Format log record. + """ + ct = time.localtime(record.created) + t = time.strftime("%Y-%m-%d %H:%M:%S", ct) + + record.message = record.getMessage() + + if record.exc_info: + # Cache the traceback text to avoid converting it multiple times + # (it's constant anyway) + if not record.exc_text: + record.exc_text = self.format_exception(record.exc_info) + + s = "%s [%s] [%d:%d] [%s:%d] " % (t, self.__log_name, record.process, record.thread, record.filename, record.lineno) + e = "" + + if record.levelno == logging.DEBUG: + s += "\033[32m[DEBUG] " + e = " \033[0m" + elif record.levelno == logging.INFO: + s += "\033[1;35m[INFO] " + e = " \033[0m" + elif record.levelno == logging.WARNING: + s += "\033[1;33m[WARN] " + e = " \033[0m" + elif record.levelno == logging.ERROR or record.levelno == logging.CRITICAL: + s += "\033[1;31m[ERROR] " + e = " \033[0m" + + s += record.message + + if record.exc_text: + try: + s = s + record.exc_text + except UnicodeError: + # Sometimes filenames have non-ASCII chars, which can lead + # to errors when s is Unicode and record.exc_text is str + # See issue 8924. + # We also use replace for when there are multiple + # encodings, e.g. UTF-8 for the filesystem and latin-1 + # for a script. See issue 13232. + s = s + record.exc_text.decode(sys.getfilesystemencoding(), + 'replace') + return s + e + + +# As long as it is imported, it will be initialized to output to stdout. +static_file_handler = FileHandler(None) +static_file_handler.setFormatter(Formatter()) +static_file_handler.setPrintStd() + +logger = logging.getLogger() +logger.addHandler(static_file_handler) +logger.setLevel(static_file_handler.level) + + +def InitLogger(config): + """initialize logger + """ + global static_file_handler, logger + if static_file_handler is not None: + logger.removeHandler(static_file_handler) + static_file_handler.close() + static_file_handler = None + + if setting.Settings["env"] == "development": + static_file_handler = FileHandler(config) + static_file_handler.setLevel(logging.DEBUG) + static_file_handler.setPrintStd() + static_file_handler.setFormatter(Formatter()) + + logger.addHandler(static_file_handler) + logger.setLevel(static_file_handler.level) + else: + static_file_handler = FileHandler(config) + static_file_handler.setLevel(logging.DEBUG) + static_file_handler.setPrintStd() + static_file_handler.setFormatter(Formatter()) + + logger.addHandler(static_file_handler) + logger.setLevel(static_file_handler.level) + + +def SetLoggerName(name): + fmt = Formatter() + fmt.set_log_name(name) + static_file_handler.setFormatter(fmt) \ No newline at end of file