-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcontroller.py
79 lines (71 loc) · 3.4 KB
/
controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019/4/18 18:08
# @Author : Zhangyp
# @File : handle.py
# @Software: PyCharm
# @license : Copyright(C), eWord Technology Co., Ltd.
# @Contact : yeahcheung213@163.com
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from Common.ReadConfig import PARA
from models import User, Product
import datetime
engine = create_engine(PARA['connect_string'], encoding='utf-8', echo=True, convert_unicode=True)
DBSession = sessionmaker(bind=engine) # 创建DBsession类型
session = DBSession()
# 禅道数据
class Zentao():
def __init__(self):
self.get_user = self.get_user()
self.get_product=self.get_product()
# self.get_QS=self.get_QS()
# 获取user清单
def get_user(self):
users = session.query(User).all()
session.close() # 关闭会话
_list = []
for i in range(len(users)):
# kv = {}
kv = dict() # PEP8要求这么写
kv['id'] = users[i].id
kv['name'] = users[i].name
_list.append(kv)
return _list
# 获取product清单
def get_product(self):
products = session.query(Product).all()
session.close()
_list = []
for i in range(len(products)):
kv = dict()
kv['id'] = products[i].id
kv['name'] = products[i].p_name
_list.append(kv)
return _list
today = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
def get_QS(self,begin_date='1990-02-13 00:00:00', end_date=today):
conn = engine.connect()
sql = "SELECT A.product_name,A.NAME,total_count,critical_count,reopen_count FROM (SELECT product_name, NAME, count( * ) AS total_count, id FROM quality_statistics WHERE closedDate BETWEEN :dt1 AND :dt2 GROUP BY product_name, NAME ) AS A LEFT JOIN ( SELECT product_name, NAME, count( * ) AS critical_count, id FROM quality_statistics WHERE severity <= 2 and closedDate BETWEEN :dt1 AND :dt2 GROUP BY product_name, NAME ) AS B ON A.id = B.id LEFT JOIN ( SELECT product_name, NAME, count( * ) AS reopen_count, id FROM quality_statistics WHERE activatedCount >= 1 and closedDate BETWEEN :dt1 AND :dt2 GROUP BY product_name, NAME ) AS C ON A.id = C.id ORDER BY A.NAME DESC"
# sql = "SELECT A.product_name,A.NAME,total_count,critical_count,reopen_count FROM (SELECT product_name, NAME, count( * ) AS total_count, id FROM quality_statistics WHERE closedDate BETWEEN :dt1 AND :dt2 GROUP BY product_name, NAME ) AS A LEFT JOIN ( SELECT product_name, NAME, count( * ) AS critical_count, id FROM quality_statistics WHERE severity <= 2 GROUP BY product_name, NAME ) AS B ON A.id = B.id LEFT JOIN ( SELECT product_name, NAME, count( * ) AS reopen_count, id FROM quality_statistics WHERE activatedCount >= 1 GROUP BY product_name, NAME ) AS C ON A.id = C.id ORDER BY A.NAME DESC"
s = text(sql)
result = conn.execute(s, dt1=begin_date, dt2=end_date).fetchall() # 替换时间变量
conn.close() # 查询完毕关闭连接
for i in range(len(result)): # 处理列表中的None值
result[i] = list(result[i])
for j in range(len(result[i])):
if result[i][j] is None:
result[i][j] = 0
# print(result)
_list = []
for i in range(len(result)): # 序列化数据
kv = dict()
kv['product_name'] = result[i][0]
kv['user_name'] = result[i][1]
kv['total_bug'] = result[i][2]
kv['critical_count'] = result[i][3]
kv['critical_rate'] = result[i][3] / result[i][2]
kv['reopen_count'] = result[i][4]
kv['reopen_rate'] = result[i][4] / result[i][2]
_list.append(kv)
return _list