-
Notifications
You must be signed in to change notification settings - Fork 838
/
Copy pathrotator.py
135 lines (110 loc) · 4.97 KB
/
rotator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from time import time
from typing import Optional
from slack_sdk.errors import SlackApiError, SlackTokenRotationError
from slack_sdk.web import WebClient
from slack_sdk.oauth.installation_store import Installation, Bot
class TokenRotator:
client: WebClient
client_id: str
client_secret: str
def __init__(self, *, client_id: str, client_secret: str, client: Optional[WebClient] = None):
self.client = client if client is not None else WebClient(token=None)
self.client_id = client_id
self.client_secret = client_secret
def perform_token_rotation(
self,
*,
installation: Installation,
minutes_before_expiration: int = 120, # 2 hours by default
) -> Optional[Installation]:
"""Performs token rotation if the underlying tokens (bot / user) are expired / expiring.
Args:
installation: the current installation data
minutes_before_expiration: the minutes before the token expiration
Returns:
None if no rotation is necessary for now.
"""
# TODO: make the following two calls in parallel for better performance
# bot
rotated_bot: Optional[Bot] = self.perform_bot_token_rotation(
bot=installation.to_bot(),
minutes_before_expiration=minutes_before_expiration,
)
# user
rotated_installation: Optional[Installation] = self.perform_user_token_rotation(
installation=installation,
minutes_before_expiration=minutes_before_expiration,
)
if rotated_bot is not None:
if rotated_installation is None:
rotated_installation = Installation(**installation.to_dict_for_copying())
rotated_installation.bot_token = rotated_bot.bot_token
rotated_installation.bot_refresh_token = rotated_bot.bot_refresh_token
rotated_installation.bot_token_expires_at = rotated_bot.bot_token_expires_at
return rotated_installation
def perform_bot_token_rotation(
self,
*,
bot: Bot,
minutes_before_expiration: int = 120, # 2 hours by default
) -> Optional[Bot]:
"""Performs bot token rotation if the underlying bot token is expired / expiring.
Args:
bot: the current bot installation data
minutes_before_expiration: the minutes before the token expiration
Returns:
None if no rotation is necessary for now.
"""
if bot.bot_token_expires_at is None:
return None
if bot.bot_token_expires_at > time() + minutes_before_expiration * 60:
return None
try:
refresh_response = self.client.oauth_v2_access(
client_id=self.client_id,
client_secret=self.client_secret,
grant_type="refresh_token",
refresh_token=bot.bot_refresh_token,
)
if refresh_response.get("token_type") != "bot":
return None
refreshed_bot = Bot(**bot.to_dict_for_copying())
refreshed_bot.bot_token = refresh_response["access_token"]
refreshed_bot.bot_refresh_token = refresh_response.get("refresh_token")
refreshed_bot.bot_token_expires_at = int(time()) + int(refresh_response["expires_in"])
return refreshed_bot
except SlackApiError as e:
raise SlackTokenRotationError(e)
def perform_user_token_rotation(
self,
*,
installation: Installation,
minutes_before_expiration: int = 120, # 2 hours by default
) -> Optional[Installation]:
"""Performs user token rotation if the underlying user token is expired / expiring.
Args:
installation: the current installation data
minutes_before_expiration: the minutes before the token expiration
Returns:
None if no rotation is necessary for now.
"""
if installation.user_token_expires_at is None:
return None
if installation.user_token_expires_at > time() + minutes_before_expiration * 60:
return None
try:
refresh_response = self.client.oauth_v2_access(
client_id=self.client_id,
client_secret=self.client_secret,
grant_type="refresh_token",
refresh_token=installation.user_refresh_token,
)
if refresh_response.get("token_type") != "user":
return None
refreshed_installation = Installation(**installation.to_dict_for_copying())
refreshed_installation.user_token = refresh_response.get("access_token")
refreshed_installation.user_refresh_token = refresh_response.get("refresh_token")
refreshed_installation.user_token_expires_at = int(time()) + int(refresh_response["expires_in"])
return refreshed_installation
except SlackApiError as e:
raise SlackTokenRotationError(e)