-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathinteractive_event_bridge_flow.py
193 lines (159 loc) · 8.06 KB
/
interactive_event_bridge_flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
"""
Isolates interactive init prompt flow for event bridge.
"""
import click
from samcli.lib.schemas.cli_paginator import do_paginate_cli
from samcli.lib.schemas.schemas_api_caller import SchemasApiCaller
from samcli.lib.schemas.schemas_aws_config import get_aws_configuration_choice, get_schemas_client
from samcli.lib.schemas.schemas_cli_message_generator import (
construct_cli_display_message_for_registries,
construct_cli_display_message_for_schemas,
)
from samcli.lib.schemas.schemas_constants import (
EVENT_BRIDGE_SOURCE,
EVENT_BRIDGE_SOURCE_DETAIL_TYPE,
PAGE_LIMIT,
SCHEMA_NAME,
SCHEMA_ROOT,
SCHEMAS_REGISTRY,
)
def get_schema_template_details(schemas_api_caller):
"""
Calls schemas APIs to fetch available selection and returns schema details based on user selection.
:param schemas_api_caller:
:return:
"""
registry_name = _get_registry_cli_choice(schemas_api_caller)
schema_full_name = _get_schema_cli_choice(schemas_api_caller, registry_name)
schema_latest_version = schemas_api_caller.get_latest_schema_version(registry_name, schema_full_name)
get_schema_metadata_response = schemas_api_caller.get_schema_metadata(registry_name, schema_full_name)
return {
"registry_name": registry_name,
"schema_full_name": schema_full_name,
"schema_version": schema_latest_version,
"event_source": get_schema_metadata_response["event_source"],
"event_source_detail_type": get_schema_metadata_response["event_source_detail_type"],
"schema_root_name": get_schema_metadata_response["schema_root_name"],
"schemas_package_hierarchy": get_schema_metadata_response["schemas_package_hierarchy"],
}
def _get_registry_cli_choice(schemas_api_caller):
"""Returns registry choice if one registry is present otherwise prompt for selection"""
registries = _fetch_available_registries(schemas_api_caller, dict(), None)
registry_pages = registries["registry_pages"]
# If only one registry don't prompt for choice
if len(registry_pages) == 1 and len(registry_pages.get(0)) == 1:
return registry_pages.get(0)[0]
# more than one registries
click.echo("Which Schema Registry would you like to use?")
next_token = registries.get("next_token")
is_last_page = next_token is None
return _prompt_for_registry_choice(
schemas_api_caller, registry_pages, 0, next_token, is_last_page, last_page_number=None
)
def _prompt_for_registry_choice(
schemas_api_caller, registry_pages, page_to_render, next_token, is_last_page, last_page_number
):
# construct CLI message
cli_display_message = construct_cli_display_message_for_registries(page_to_render + 1, last_page_number)
# get customer decision
cli_response = do_paginate_cli(registry_pages, page_to_render, PAGE_LIMIT, is_last_page, cli_display_message)
# user selected item
if cli_response.get("choice") is not None:
return cli_response.get("choice")
# user decided to paginate
page_to_render = cli_response.get("page_to_render")
if registry_pages.get(page_to_render) is None:
registries = _fetch_available_registries(schemas_api_caller, registry_pages, next_token)
registry_pages = registries["registry_pages"]
next_token = registries.get("next_token")
is_last_page = next_token is None
if is_last_page and last_page_number is None:
last_page_number = page_to_render + 1
return _prompt_for_registry_choice(
schemas_api_caller, registry_pages, page_to_render, next_token, is_last_page, last_page_number
)
def _get_schema_cli_choice(schemas_api_caller, registry_name):
"""Returns registry registry choice if one registry is present otherwise prompt for selection"""
schemas = _fetch_available_schemas(schemas_api_caller, registry_name, dict(), None)
schema_pages = schemas["schema_pages"]
# If only one schema don't prompt for choice
if len(schema_pages) == 1 and len(schema_pages.get(0)) == 1:
return schema_pages.get(0)[0]
# more than one schema
click.echo("\nWhich Schema would you like to use?")
next_token = schemas.get("next_token")
is_last_page = next_token is None
return _prompt_for_schemas_choice(
schemas_api_caller, registry_name, schema_pages, 0, next_token, is_last_page, last_page_number=None
)
def _prompt_for_schemas_choice(
schemas_api_caller, registry_name, schema_pages, page_to_render, next_token, is_last_page, last_page_number
):
# construct CLI message
cli_display_message = construct_cli_display_message_for_schemas(page_to_render + 1, last_page_number)
# get customer decision
cli_response = do_paginate_cli(schema_pages, page_to_render, PAGE_LIMIT, is_last_page, cli_display_message)
# user selected item
if cli_response.get("choice") is not None:
return cli_response["choice"]
# user decided to paginate
page_to_render = cli_response.get("page_to_render")
if schema_pages.get(page_to_render) is None:
schemas = _fetch_available_schemas(schemas_api_caller, registry_name, schema_pages, next_token)
schema_pages = schemas["schema_pages"]
next_token = schemas.get("next_token")
is_last_page = next_token is None
if is_last_page and last_page_number is None:
last_page_number = page_to_render + 1
return _prompt_for_schemas_choice(
schemas_api_caller, registry_name, schema_pages, page_to_render, next_token, is_last_page, last_page_number
)
def _fetch_available_schemas(schemas_api_caller, registry_name, schema_pages, next_token):
"""calls schemas api fetch schemas for given registry. Two CLI pages are fetched at a time."""
list_schemas_response = schemas_api_caller.list_schemas(registry_name, next_token, PAGE_LIMIT)
schemas = list_schemas_response["schemas"]
# divided response into pages
pages = _construct_cli_page(schemas, PAGE_LIMIT)
for page in range(0, len(pages)):
schema_pages.update({len(schema_pages): pages.get(page)})
next_token = list_schemas_response.get("next_token")
return {"schema_pages": schema_pages, "next_token": next_token}
def _fetch_available_registries(schemas_api_caller, registry_pages, next_token):
"""calls schemas api to fetch registries. Two CLI pages are fetched at a time."""
list_registries_response = schemas_api_caller.list_registries(next_token, PAGE_LIMIT)
registries = list_registries_response["registries"]
# sort registries alphabetically by name
registries.sort()
# divided response into pages
pages = _construct_cli_page(registries, PAGE_LIMIT)
for page in range(0, len(pages)):
registry_pages.update({len(registry_pages): pages.get(page)})
next_token = list_registries_response.get("next_token")
return {"registry_pages": registry_pages, "next_token": next_token}
def _construct_cli_page(items, item_per_page):
"""Responsible for splitting items into CLI pages.
Currently CLI pages are list of dictionary [0:{0:s1, 1:s2: 3:s3}, 1: {4:s4, 5:s5: 6:s6}]
We maintain the page detail and item index details."""
pages = [
items[i * item_per_page : (i + 1) * item_per_page]
for i in range((len(items) + item_per_page - 1) // item_per_page)
]
index = 0
schema_dict = dict()
for page in pages:
schema_dict.update({index: page})
index = index + 1
return schema_dict
def get_schemas_template_parameter(schema_template_details):
"""Schemas cookiecutter template parameter mapping"""
return {
SCHEMAS_REGISTRY: schema_template_details["registry_name"],
SCHEMA_NAME: schema_template_details["schema_root_name"],
EVENT_BRIDGE_SOURCE: schema_template_details["event_source"],
EVENT_BRIDGE_SOURCE_DETAIL_TYPE: schema_template_details["event_source_detail_type"],
SCHEMA_ROOT: schema_template_details["schemas_package_hierarchy"],
}
def get_schemas_api_caller():
aws_configuration = get_aws_configuration_choice()
schemas_client = get_schemas_client(aws_configuration["profile"], aws_configuration["region"])
return SchemasApiCaller(schemas_client)