-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpull_requests_parser.py
143 lines (128 loc) · 5.74 KB
/
pull_requests_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import csv
import pytz
import requests
import json
from time import sleep
from git_logger import get_assignee_story
from github import Github, Repository, GithubException, PullRequest
EMPTY_FIELD = 'Empty field'
TIMEDELTA = 0.05
TIMEZONE = 'Europe/Moscow'
FIELDNAMES = ('repository name', 'title', 'id', 'state', 'commit into', 'commit from', 'created at', 'creator name',
'creator login', 'creator email', 'changed files', 'comment body',
'comment created at', 'comment author name', 'comment author login',
'comment author email', 'merger name', 'merger login', 'merger email', 'source branch',
'target branch', 'assignee story', 'related issues', 'labels', 'milestone')
def log_pr_to_stdout(info):
print(info)
def log_pr_to_csv(info, csv_name):
with open(csv_name, 'a', newline='') as file:
writer = csv.DictWriter(file, fieldnames=FIELDNAMES)
writer.writerow(info)
def get_related_issues(pull_request_number, repo_owner, repo_name, token):
access_token = token
repo_owner = repo_owner.login
# Формирование запроса GraphQL
query = """
{
repository(owner: "%s", name: "%s") {
pullRequest(number: %d) {
id
closingIssuesReferences(first: 50) {
edges {
node {
id
body
number
title
url
}
}
}
}
}
}
""" % (repo_owner, repo_name, pull_request_number)
# Формирование заголовков запроса
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
# Отправка запроса GraphQL
response = requests.post("https://api.github.com/graphql", headers=headers, data=json.dumps({"query": query}))
response_data = response.json()
# Обработка полученных данных
pull_request_data = response_data["data"]["repository"]["pullRequest"]
issues_data = pull_request_data["closingIssuesReferences"]["edges"]
list_issues_url = []
# сохранение информации об issues
for issue in issues_data:
issue_node = issue["node"]
list_issues_url.append(issue_node["url"])
return ';'.join(list_issues_url)
def log_repositories_pr(repository: Repository, csv_name, token, start, finish, log_comments=False):
for pull in repository.get_pulls(state='all'):
if pull.created_at.astimezone(pytz.timezone(TIMEZONE)) < start or pull.created_at.astimezone(
pytz.timezone(TIMEZONE)) > finish:
continue
nvl = lambda val: val or EMPTY_FIELD
get_info = lambda obj, attr: EMPTY_FIELD if obj is None else getattr(obj, attr)
info_tmp = {
'repository name': repository.full_name,
'title': pull.title,
'id': pull.number,
'state': pull.state,
'commit into': pull.base.label,
'commit from': pull.head.label,
'created at': pull.created_at,
'creator name': nvl(pull.user.name),
'creator login': pull.user.login,
'creator email': pull.user.email,
'changed files': '; '.join([file.filename for file in pull.get_files()]),
'comment body': EMPTY_FIELD,
'comment created at': EMPTY_FIELD,
'comment author name': EMPTY_FIELD,
'comment author login': EMPTY_FIELD,
'comment author email': EMPTY_FIELD,
'merger name': get_info(pull.merged_by, 'name'),
'merger login': get_info(pull.merged_by, 'login'),
'merger email': get_info(pull.merged_by, 'email'),
'source branch': pull.head.ref,
'target branch': pull.base.ref,
'assignee story': get_assignee_story(pull),
'related issues': EMPTY_FIELD if pull.issue_url is None else get_related_issues(pull.number, repository.owner, repository.name, token),
'labels': EMPTY_FIELD if pull.labels is None else ';'.join([label.name for label in pull.labels]),
'milestone': get_info(pull.milestone, 'title')
}
if log_comments:
comments = pull.get_comments()
if comments.totalCount > 0:
for comment in comments:
info = info_tmp
info['comment body'] = comment.body
info['comment created at'] = comment.created_at
info['comment author name'] = comment.user.name
info['comment author login'] = comment.user.login
info['comment author email'] = nvl(comment.user.email)
log_pr_to_csv(info, csv_name)
log_pr_to_stdout(info)
else:
log_pr_to_csv(info_tmp, csv_name)
log_pr_to_stdout(info_tmp)
sleep(TIMEDELTA)
def log_pull_requests(client: Github, working_repos, csv_name, token, start, finish, fork_flag, log_comments=False):
with open(csv_name, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(FIELDNAMES)
for repo in working_repos:
try:
print('=' * 20, repo.full_name, '=' * 20)
log_repositories_pr(repo, csv_name, token, start, finish)
if fork_flag:
for forked_repo in repo.get_forks():
print('=' * 20, "FORKED:", forked_repo.full_name, '=' * 20)
log_repositories_pr(forked_repo, csv_name, token, start, finish, log_comments)
sleep(TIMEDELTA)
sleep(TIMEDELTA)
except Exception as e:
print(e)