Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 61 additions & 50 deletions SZtoCPDPSK/SZtoCPDPSKv10r.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,72 @@
#!/usr/bin/env python3

import csv
import requests
import json
import sys
import uuid
from datetime import datetime
from typing import Dict, Any

CPFQDN = "cloudpath.my.domain" # Cloudpath FQDN or ip address
CPUSER = "your Cloudpath admin username/email" # Cloudpath user name/email
CPPASSWORD = "password" # Cloudpath password
SZKEYFILE = "nameofyourdpskexportfile.csv" # .csv file with DPSKs exported from SZ (Clients/Dynamic PSK/Export All)
CPDPSKGUID = "yourDPSKpoolGuid" # Cloudpath DPSK Pool Guid (Configuration/DPSK Pools)
# Configuration
CONFIG = {
"CPFQDN": "cloudpath.my.domain", # Cloudpath FQDN or IP address
"CPUSER": "your Cloudpath admin username/email",
"CPPASSWORD": "password",
"SZKEYFILE": "nameofyourdpskexportfile.csv", # CSV file with DPSKs exported from SZ
"CPDPSKGUID": "yourDPSKpoolGuid" # Cloudpath DPSK Pool Guid
}

# run using python3 SZtoCPDPSKv9r.py
class CloudpathDPSKMigration:
def __init__(self, config: Dict[str, str]):
self.config = config
self.timestamp = datetime.now().strftime("%m-%d-%Y")

def readfile():
dateTimeObj = datetime.now()
timestampStr = dateTimeObj.strftime("%m-%d-%Y")
print ("Reading CSV file", end = "")
with open(SZKEYFILE, mode="r", encoding="utf-8-sig") as csv_file:
reader = csv.DictReader(csv_file)
def read_csv_file(self) -> Dict[str, Dict[str, str]]:
"""Read the CSV file and process its contents."""
print("Reading CSV file", end="")
result = {}
for row in reader:
key = row.pop("Passphrase")
result[key] = row
row["uuid"] = "SZ2CP-"+timestampStr+"-"+str(uuid.uuid4())
print (".", end = "")
print (" ")
return result

def getcptoken(username, password):
#print ("Getting API token")
url = "https://"+CPFQDN+"/admin/publicApi/token"
body = {"userName":username, "password":password}
response = requests.post(url, json=body)
token = response.json()['token']
return token

def createdpsks(olddpsk):
url = "https://"+CPFQDN+"/admin/publicApi/dpskPools/"+CPDPSKGUID+"/dpsks"
for key in olddpsk:
token = getcptoken(CPUSER, CPPASSWORD)
print ('Creating EDPSK '+key+" ", end = "")
uuid = olddpsk[key]["uuid"]
VLAN = olddpsk[key]["VLAN ID"]
name = olddpsk[key]["User Name"]
cpheaders = {"Content-Type":"application/json", "Authorization":token}
body = {"name":uuid, "passphrase":key, "vlanid":VLAN, "thirdPartyId":name}
response = requests.post(url, headers=cpheaders, json=body)
print (response)
print (" ")
return

def main(argv):
szkeys = readfile()
createdpsks (szkeys)
return
with open(self.config["SZKEYFILE"], mode="r", encoding="utf-8-sig") as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
key = row.pop("Passphrase")
row["uuid"] = f"SZ2CP-{self.timestamp}-{uuid.uuid4()}"
result[key] = row
print(".", end="")
print()
return result

def get_cp_token(self) -> str:
"""Get API token from Cloudpath."""
url = f"https://{self.config['CPFQDN']}/admin/publicApi/token"
body = {"userName": self.config["CPUSER"], "password": self.config["CPPASSWORD"]}
response = requests.post(url, json=body)
return response.json()['token']

def create_dpsks(self, old_dpsks: Dict[str, Dict[str, str]]):
"""Create DPSKs in Cloudpath."""
url = f"https://{self.config['CPFQDN']}/admin/publicApi/dpskPools/{self.config['CPDPSKGUID']}/dpsks"
for key, value in old_dpsks.items():
token = self.get_cp_token()
print(f'Creating EDPSK {key} ', end="")

headers = {"Content-Type": "application/json", "Authorization": token}
body = {
"name": value["uuid"],
"passphrase": key,
"vlanid": value["VLAN ID"],
"thirdPartyId": value["User Name"]
}
response = requests.post(url, headers=headers, json=body)
print(response)
print()

def run(self):
"""Main execution method."""
sz_keys = self.read_csv_file()
self.create_dpsks(sz_keys)

def main():
migration = CloudpathDPSKMigration(CONFIG)
migration.run()

if __name__ == "__main__":
main(sys.argv[1:])
main()