-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpasso5_access_request_api.py
More file actions
178 lines (137 loc) · 5.98 KB
/
Copy pathpasso5_access_request_api.py
File metadata and controls
178 lines (137 loc) · 5.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""
PASSO 5 - IGA: Access Request + Approval Workflow.
Aqui adicionamos a metade de GOVERNANCE do IGA: o acesso so e provisionado
DEPOIS de ser aprovado, e tudo fica registrado (trilha de auditoria).
Fluxo:
1. Alguem cria uma solicitacao de acesso -> status "pendente"
2. O gestor aprova ou rejeita
3. Se aprovado -> reaproveita a ponte do PASSO 4 (PowerShell -> AD) e provisiona
4. Se rejeitado -> nada e provisionado
Esta API roda na porta 8002 (RH=8000, Webhook=8001, IGA=8002).
Como rodar (com o .venv ativado):
uvicorn passo5_access_request_api:app --reload --port 8002
Como testar:
- Abra http://127.0.0.1:8002/docs e use os endpoints, OU
- Rode em outro terminal: python passo5_access_request_demo.py
"""
from datetime import datetime
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
# Reaproveitamos a ponte do PASSO 4: o provisionamento ja existe e funciona.
from passo4_iqservice_bridge import acionar_powershell
# PASSO 6: politica de Segregation of Duties (SoD).
from passo6_sod_policy import verificar_sod, registrar_acesso
# 1) O que o solicitante envia para PEDIR um acesso.
class NovaSolicitacao(BaseModel):
solicitante_nome: str
solicitante_email: str
access_profile: str # o acesso desejado (ex.: "AD-Grupo-Financeiro")
justificativa: str # POR QUE precisa do acesso (exigencia de governanca)
# 2) O que o gestor envia ao decidir.
class Decisao(BaseModel):
aprovador: str
motivo: str = ""
# 3) "Banco" em memoria das solicitacoes e um contador de ids.
SOLICITACOES: dict[int, dict] = {}
_proximo_id = 1
app = FastAPI(
title="IGA - Access Request & Approval (SailPoint ISC Lab)",
description="Solicitacao de acesso com aprovacao e trilha de auditoria.",
version="1.0.0",
)
def _agora() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@app.get("/")
def raiz():
return {"servico": "IGA - Access Request & Approval", "status": "online"}
# 4) CRIAR uma solicitacao de acesso (entra como "pendente").
@app.post("/solicitacoes")
def criar_solicitacao(pedido: NovaSolicitacao):
global _proximo_id
solic_id = _proximo_id
_proximo_id += 1
registro = {
"id": solic_id,
"solicitante_nome": pedido.solicitante_nome,
"solicitante_email": pedido.solicitante_email,
"access_profile": pedido.access_profile,
"justificativa": pedido.justificativa,
"status": "pendente",
"criado_em": _agora(),
"aprovador": None,
"motivo_decisao": None,
"decidido_em": None,
"provisionamento_ad": None,
}
SOLICITACOES[solic_id] = registro
print(f"[{registro['criado_em']}] NOVA SOLICITACAO #{solic_id}: "
f"{pedido.solicitante_nome} pede '{pedido.access_profile}' (pendente)")
return registro
# 5) LISTAR todas (visibilidade / auditoria).
@app.get("/solicitacoes")
def listar_solicitacoes():
return list(SOLICITACOES.values())
# 6) VER uma solicitacao especifica.
@app.get("/solicitacoes/{solic_id}")
def obter_solicitacao(solic_id: int):
registro = SOLICITACOES.get(solic_id)
if registro is None:
raise HTTPException(404, f"Solicitacao #{solic_id} nao encontrada.")
return registro
def _exige_pendente(solic_id: int) -> dict:
registro = SOLICITACOES.get(solic_id)
if registro is None:
raise HTTPException(404, f"Solicitacao #{solic_id} nao encontrada.")
if registro["status"] != "pendente":
raise HTTPException(
409,
f"Solicitacao #{solic_id} ja foi decidida (status: {registro['status']}).",
)
return registro
# 7) APROVAR -> aqui esta o portao de governanca: so apos aprovar provisionamos.
@app.post("/solicitacoes/{solic_id}/aprovar")
def aprovar(solic_id: int, decisao: Decisao):
registro = _exige_pendente(solic_id)
# PASSO 6: portao de SoD. A politica e verificada ANTES de provisionar e
# prevalece ate sobre a decisao do gestor (controle preventivo).
ok, conflito = verificar_sod(registro["solicitante_email"], registro["access_profile"])
if not ok:
registro["status"] = "bloqueada_sod"
registro["aprovador"] = decisao.aprovador
registro["motivo_decisao"] = f"Bloqueada por SoD: {conflito['motivo']}"
registro["decidido_em"] = _agora()
registro["violacao_sod"] = conflito
print(f"[{registro['decidido_em']}] BLOQUEADA #{solic_id} (SoD '{conflito['politica']}'): "
f"{conflito['novo_acesso']} x {conflito['conflita_com']} -> NADA provisionado")
raise HTTPException(
status_code=409,
detail={"mensagem": "Solicitacao viola politica de SoD.", "violacao": conflito},
)
# Provisiona de verdade reaproveitando a ponte do PASSO 4.
resultado_ad = acionar_powershell(
nome=registro["solicitante_nome"],
email=registro["solicitante_email"],
access_profile=registro["access_profile"],
acao="provisionar",
)
# Registra o acesso concedido (passa a contar para futuras checagens de SoD).
registrar_acesso(registro["solicitante_email"], registro["access_profile"])
registro["status"] = "aprovada"
registro["aprovador"] = decisao.aprovador
registro["motivo_decisao"] = decisao.motivo
registro["decidido_em"] = _agora()
registro["provisionamento_ad"] = resultado_ad
print(f"[{registro['decidido_em']}] APROVADA #{solic_id} por {decisao.aprovador} "
f"-> provisionado: {resultado_ad['saida']}")
return registro
# 8) REJEITAR -> nada e provisionado, mas fica registrado.
@app.post("/solicitacoes/{solic_id}/rejeitar")
def rejeitar(solic_id: int, decisao: Decisao):
registro = _exige_pendente(solic_id)
registro["status"] = "rejeitada"
registro["aprovador"] = decisao.aprovador
registro["motivo_decisao"] = decisao.motivo
registro["decidido_em"] = _agora()
print(f"[{registro['decidido_em']}] REJEITADA #{solic_id} por {decisao.aprovador} "
f"(motivo: {decisao.motivo or 'nao informado'}) -> nada provisionado")
return registro