-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpasso2_aggregation.py
More file actions
64 lines (49 loc) · 2.27 KB
/
Copy pathpasso2_aggregation.py
File metadata and controls
64 lines (49 loc) · 2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""
PASSO 2 - Aggregation (o "motor do SailPoint ISC").
Aqui simulamos o que o ISC faz por baixo dos panos: ele vai ate a
Authoritative Source (a RH API do PASSO 1), puxa os dados das pessoas
e monta/atualiza as "Identities".
IMPORTANTE: a API do PASSO 1 precisa estar NO AR para este script funcionar.
Em um terminal, deixe rodando:
uvicorn passo1_authoritative_source_api:app --reload
E em OUTRO terminal, rode este script:
python passo2_aggregation.py
"""
import requests
# Endereco da nossa Authoritative Source (a RH API do PASSO 1).
# E o mesmo link que voce abriu no navegador.
RH_API_URL = "http://127.0.0.1:8000/funcionarios"
def fazer_aggregation():
"""Puxa os funcionarios da RH API e simula a criacao das Identities."""
print("Iniciando Aggregation: conectando na Authoritative Source...")
# 1) O ISC faz o GET na fonte da verdade. O try/except protege contra
# o caso da API estar fora do ar (o erro mais comum no inicio).
try:
resposta = requests.get(RH_API_URL, timeout=5)
except requests.exceptions.ConnectionError:
print("ERRO: nao consegui conectar na RH API.")
print(" Verifique se o PASSO 1 esta rodando (uvicorn ...).")
return
# 2) Conferimos o status HTTP. 200 = deu certo. Qualquer outra coisa,
# abortamos avisando o que veio.
if resposta.status_code != 200:
print(f"ERRO: a RH API respondeu com status {resposta.status_code}.")
return
# 3) Convertemos o corpo JSON em uma lista de dicionarios Python.
funcionarios = resposta.json()
print(f"Aggregation recebeu {len(funcionarios)} registros da fonte.\n")
# 4) Para cada pessoa, o ISC decide o que fazer com a Identity.
# A regra aqui e simples e didatica: o campo 'status' manda.
for f in funcionarios:
if f["status"] == "ativo":
acao = "CRIAR/ATUALIZAR Identity"
else:
acao = "DESATIVAR Identity"
print(f"[{acao}] {f['nome']} (id {f['id']})")
print(f" email......: {f['email']}")
print(f" departamento: {f['departamento']}")
print(f" cargo.......: {f['cargo']}")
print(f" status......: {f['status']}\n")
print("Aggregation concluida.")
if __name__ == "__main__":
fazer_aggregation()