forked from tamadalab/Argo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathretry_logic.py
64 lines (52 loc) · 1.82 KB
/
retry_logic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import requests
import os
import threading
import datetime
import time
def request_with_rate_limit_handling():
global wait_time_seconds
url = "https://api.github.com/graphql"
"""
query {
rateLimit {
limit
cost
remaining
resetAt
}
}
"""
query = "{\"query\":\"query{\\n\\trateLimit{\\n\\t\\tlimit\\n\\t\\tcost\\n\\t\\tremaining\\n\\t\\tresetAt\\n\\t}\\n}\"}"
api_key = os.getenv('GITHUB_API_KEY')
if api_key is None:
raise Exception("Couldn't find the GitHub API key. Please set it as an environment variable.")
headers = {
"Authorization": "bearer " + api_key,
"Content-Type": "application/json",
}
response = requests.request("POST", url, data=query, headers=headers)
json_data = response.json()
reset_at_str = json_data["data"]["rateLimit"]["resetAt"]
reset_at = datetime.datetime.strptime(reset_at_str, "%Y-%m-%dT%H:%M:%SZ")
wait_time_seconds = (reset_at - datetime.datetime.utcnow()).total_seconds()
interrupt_event = threading.Event()
# キー入力監視のスレッドを作成
t1 = threading.Thread(target=key_capture_thread, args=(interrupt_event,))
t1.daemon = True
t1.start()
start_time = time.time()
while time.time() - start_time < wait_time_seconds:
if interrupt_event.is_set():
print("\rInterrupted by user")
break
remaining_time = wait_time_seconds - (time.time() - start_time)
minutes, seconds = divmod(remaining_time, 60)
print(f"\rWaiting for {int(minutes)} minutes and {int(seconds)} seconds...", end="")
time.sleep(1)
time.sleep(5)
return
def key_capture_thread(event):
print("Press Enter to interrupt waiting:", end="", flush=True)
print()
input()
event.set()