-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathmigrate_connections.py
More file actions
138 lines (125 loc) · 5.46 KB
/
migrate_connections.py
File metadata and controls
138 lines (125 loc) · 5.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import ast
import os
import requests
import json
CONNECTION_IDS = os.getenv("CONNECTION_IDS")
CONNECTIONS_CONFIG = os.getenv("CONNECTIONS_CONFIG")
MIGRATE_ALL_CONNECTIONS = os.getenv("MIGRATE_ALL_CONNECTIONS")
SOURCE_VAULT_ID = os.getenv("SOURCE_VAULT_ID")
TARGET_VAULT_ID = os.getenv("TARGET_VAULT_ID")
SOURCE_ACCOUNT_ID = os.getenv("SOURCE_ACCOUNT_ID")
TARGET_ACCOUNT_ID = os.getenv("TARGET_ACCOUNT_ID")
SOURCE_ACCOUNT_AUTH = os.getenv("SOURCE_ACCOUNT_AUTH")
TARGET_ACCOUNT_AUTH = os.getenv("TARGET_ACCOUNT_AUTH")
SOURCE_ENV_URL = os.getenv("SOURCE_ENV_URL")
TARGET_ENV_URL = os.getenv("TARGET_ENV_URL")
SOURCE_ACCOUNT_HEADERS = {
"X-SKYFLOW-ACCOUNT-ID": SOURCE_ACCOUNT_ID,
"Authorization": f"Bearer {SOURCE_ACCOUNT_AUTH}",
"Content-Type": "application/json",
}
TARGET_ACCOUNT_HEADERS = {
"X-SKYFLOW-ACCOUNT-ID": TARGET_ACCOUNT_ID,
"Authorization": f"Bearer {TARGET_ACCOUNT_AUTH}",
"Content-Type": "application/json",
}
def list_connections(vault_id):
"""Lists inbound + outbound connections for a vault."""
connections = []
response = requests.get(
f"{SOURCE_ENV_URL}/v1/gateway/outboundRoutes?vaultID={vault_id}",
headers=SOURCE_ACCOUNT_HEADERS,
)
response.raise_for_status()
connections.extend(response.json()["ConnectionMappings"])
response = requests.get(
f"{SOURCE_ENV_URL}/v1/gateway/inboundRoutes?vaultID={vault_id}",
headers=SOURCE_ACCOUNT_HEADERS,
)
response.raise_for_status()
connections.extend(response.json()["ConnectionMappings"])
return connections
def get_connection(connection_id):
"""Fetches a single connection"""
# /inboundRoutes can also fetch outbound connection details
response = requests.get(
f"{SOURCE_ENV_URL}/v1/gateway/inboundRoutes/{connection_id}",
headers=SOURCE_ACCOUNT_HEADERS,
)
response.raise_for_status()
return response.json()
def create_connection(connection):
"""Creates connection"""
route = "outboundRoutes" if connection["mode"] == "EGRESS" else "inboundRoutes"
response = requests.post(
f"{TARGET_ENV_URL}/v1/gateway/{route}",
json=connection,
headers=TARGET_ACCOUNT_HEADERS,
)
return response
def transform_connection_payload(source_resource):
"""Transforms source connection payload to target payload."""
transformed_resource = source_resource
transformed_resource["vaultID"] = TARGET_VAULT_ID
# drop basic audit and invocation URL
if "BasicAudit" in transformed_resource.keys():
del transformed_resource["BasicAudit"]
for route in transformed_resource["routes"]:
del route["invocationURL"]
return transformed_resource
def main(connection_ids=None):
"""Migrates connections"""
try:
print("-- Initiating Connections migration --")
connections = []
if CONNECTIONS_CONFIG is not None and CONNECTIONS_CONFIG == "config_file":
print(f"-- Fetching connections from the config file --")
with open("configs/connections/connections.json", "r") as file:
content = file.read()
connections = json.loads(content)
elif MIGRATE_ALL_CONNECTIONS is not None and MIGRATE_ALL_CONNECTIONS.lower() == "true":
if SOURCE_VAULT_ID:
print(f"-- Fetching all connections from the source vault --")
connections = list_connections(SOURCE_VAULT_ID)
else:
print(
"-- Please provide valid input. Source vault ID is required to migrate all connections --"
)
return
else:
connection_ids = (
connection_ids
if connection_ids
else ast.literal_eval(CONNECTION_IDS)
)
print(f"-- Fetching connection details for the given connection IDs --")
for connection_id in connection_ids:
connection = get_connection(connection_id)
connections.append(connection)
created_connections = []
for index, connection in enumerate(connections):
print(f"-- Working on connection: {index + 1}. {connection['name']} --")
connection_payload = transform_connection_payload(connection)
create_connection_response = create_connection(connection_payload)
if create_connection_response.status_code == 200:
created_connection = create_connection_response.json()
created_connections.append(created_connection)
# fetch connection roles
# create service account and assign connection invoker role
print(
f"-- Connection migrated successfully: {connection['name']}. Source CONNECTION_ID: {connection['ID']}, Target CONNECTION_ID: {created_connection['ID']} --"
)
else:
print(f"-- Connection migration failed: {create_connection_response.status_code}. {create_connection_response.content}")
print(f"-- {len(created_connections)} out of {len(connections)} connections were created successfully. --")
print("-- Connections migration script executed successfully. --")
except requests.exceptions.HTTPError as http_err:
print(
f"-- migrate_connections HTTP error: {http_err.response.content.decode()} --"
)
exit(1)
except Exception as err:
print(f"-- migrate_connections other error: {err} --")
exit(1)
if __name__ == "__main__":
main()