Skip to content

Checking in Admin Approval Workflow | AWS Agent Registry#1301

Merged
dhawalkp merged 1 commit intoawslabs:mainfrom
manchandakp:admin-approval-workflow
Apr 10, 2026
Merged

Checking in Admin Approval Workflow | AWS Agent Registry#1301
dhawalkp merged 1 commit intoawslabs:mainfrom
manchandakp:admin-approval-workflow

Conversation

@manchandakp
Copy link
Copy Markdown
Contributor

@manchandakp manchandakp commented Apr 10, 2026

Amazon Bedrock AgentCore Samples Pull Request

Important

  1. We strictly follow a issue-first approach, please first open an issue relating to this Pull Request.
  2. Once this Pull Request is ready for review please attach review ready label to it. Only PRs with review ready will be reviewed.

Issue number:

Code sample for Admin Approval Workflow | AWS Agent Registry

Changes to ..., because ...

User experience

Please share what the user experience looks like before and after this change

Checklist

If your change doesn't seem to apply, please leave them unchecked.

  • I have reviewed the contributing guidelines
  • Add your name to CONTRIBUTORS.md
  • Have you checked to ensure there aren't other open Pull Requests for the same update/change?
  • Are you uploading a dataset?
  • Have you documented Introduction, Architecture Diagram, Prerequisites, Usage, Sample Prompts, and Clean Up steps in your example README?
  • I agree to resolve any issues created for this example in the future.
  • I have performed a self-review of this change
  • Changes have been tested
  • Changes are documented

Acknowledgment

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of the project license.

@review-notebook-app
Copy link
Copy Markdown

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@github-actions github-actions bot added the 01-tutorials 01-tutorials label Apr 10, 2026
@github-actions
Copy link
Copy Markdown

Latest scan for commit: 289d6e3 | Updated: 2026-04-10 11:46:26 UTC

Security Scan Results

Scan Metadata

  • Project: ASH
  • Scan executed: 2026-04-10T11:46:14+00:00
  • ASH version: 3.0.0

Summary

Scanner Results

The table below shows findings by scanner, with status based on severity thresholds and dependencies:

Column Explanations:

Severity Levels (S/C/H/M/L/I):

  • Suppressed (S): Security findings that have been explicitly suppressed/ignored and don't affect the scanner's pass/fail status
  • Critical (C): The most severe security vulnerabilities requiring immediate remediation (e.g., SQL injection, remote code execution)
  • High (H): Serious security vulnerabilities that should be addressed promptly (e.g., authentication bypasses, privilege escalation)
  • Medium (M): Moderate security risks that should be addressed in normal development cycles (e.g., weak encryption, input validation issues)
  • Low (L): Minor security concerns with limited impact (e.g., information disclosure, weak recommendations)
  • Info (I): Informational findings for awareness with minimal security risk (e.g., code quality suggestions, best practice recommendations)

Other Columns:

  • Time: Duration taken by each scanner to complete its analysis
  • Action: Total number of actionable findings at or above the configured severity threshold that require attention

Scanner Results:

  • PASSED: Scanner found no security issues at or above the configured severity threshold - code is clean for this scanner
  • FAILED: Scanner found security vulnerabilities at or above the threshold that require attention and remediation
  • MISSING: Scanner could not run because required dependencies/tools are not installed or available
  • SKIPPED: Scanner was intentionally disabled or excluded from this scan
  • ERROR: Scanner encountered an execution error and could not complete successfully

Severity Thresholds (Thresh Column):

  • CRITICAL: Only Critical severity findings cause scanner to fail
  • HIGH: High and Critical severity findings cause scanner to fail
  • MEDIUM (MED): Medium, High, and Critical severity findings cause scanner to fail
  • LOW: Low, Medium, High, and Critical severity findings cause scanner to fail
  • ALL: Any finding of any severity level causes scanner to fail

Threshold Source: Values in parentheses indicate where the threshold is configured:

  • (g) = global: Set in the global_settings section of ASH configuration
  • (c) = config: Set in the individual scanner configuration section
  • (s) = scanner: Default threshold built into the scanner itself

Statistics calculation:

  • All statistics are calculated from the final aggregated SARIF report
  • Suppressed findings are counted separately and do not contribute to actionable findings
  • Scanner status is determined by comparing actionable findings to the threshold
Scanner S C H M L I Time Action Result Thresh
bandit 0 0 0 0 0 0 624ms 0 PASSED MED (g)
cdk-nag 0 5 0 1 0 3 29.1s 6 FAILED MED (g)
cfn-nag 0 0 0 4 0 0 622ms 4 FAILED MED (g)
checkov 0 4 0 0 0 0 5.5s 4 FAILED MED (g)
detect-secrets 0 0 0 0 0 0 821ms 0 PASSED MED (g)
grype 0 0 0 0 0 0 40.3s 0 PASSED MED (g)
npm-audit 0 0 0 0 0 0 216ms 0 PASSED MED (g)
opengrep 0 0 0 0 0 0 <1ms 0 SKIPPED MED (g)
semgrep 0 0 0 0 0 0 <1ms 0 MISSING MED (g)
syft 0 0 0 0 0 0 2.1s 0 PASSED MED (g)

Detailed Findings

Show 14 actionable findings

Finding 1: CFN_NAG_W78

  • Severity: MEDIUM
  • Scanner: cfn-nag
  • Rule ID: CFN_NAG_W78
  • Location: 01-tutorials/10-Agent-Registry/01-advanced/admin-approval-workflow/cfn_eventbridge.yaml:460

Description:
DynamoDB table should have backup enabled, should be set using PointInTimeRecoveryEnabled


Finding 2: CFN_NAG_W89

  • Severity: MEDIUM
  • Scanner: cfn-nag
  • Rule ID: CFN_NAG_W89
  • Location: 01-tutorials/10-Agent-Registry/01-advanced/admin-approval-workflow/cfn_eventbridge.yaml:66

Description:
Lambda functions should be deployed inside a VPC


Finding 3: CFN_NAG_W92

  • Severity: MEDIUM
  • Scanner: cfn-nag
  • Rule ID: CFN_NAG_W92
  • Location: 01-tutorials/10-Agent-Registry/01-advanced/admin-approval-workflow/cfn_eventbridge.yaml:66

Description:
Lambda functions should define ReservedConcurrentExecutions to reserve simultaneous executions


Finding 4: CFN_NAG_W28

  • Severity: MEDIUM
  • Scanner: cfn-nag
  • Rule ID: CFN_NAG_W28
  • Location: 01-tutorials/10-Agent-Registry/01-advanced/admin-approval-workflow/cfn_eventbridge.yaml:460

Description:
Resource found with an explicit name, this disallows updates that require replacement of this resource


Finding 5: CKV_AWS_115

  • Severity: HIGH
  • Scanner: checkov
  • Rule ID: CKV_AWS_115
  • Location: 01-tutorials/10-Agent-Registry/01-advanced/admin-approval-workflow/cfn_eventbridge.yaml:65-344

Description:
Ensure that AWS Lambda function is configured for function-level concurrent execution limit

Code Snippet:

CICDFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub ${Prefix}-lambda-cicd
      Runtime: python3.14
      Handler: index.lambda_handler
      Role: !GetAtt CICDFunctionRole.Arn
      Timeout: 60
      KmsKeyArn: !GetAtt LambdaEnvEncryptionKey.Arn
      Layers:
        - !Ref CICDDependenciesLayer
      Environment:
        Variables:
          slackIncomingHookUrl: !Ref SlackIncomingHookUrl
          slackChannelName: !Ref SlackChannelName
          reportsBucket: !Ref LambdaLayerBucket
          scanResultsTable: !Ref RegistryRecordMetadataTable
      Code:
        ZipFile: |
          import os
          import json
          import urllib3
          import boto3
          from datetime import datetime, timezone

          REGION = os.environ.get("AWS_REGION", "us-west-2")
          print(f"Boto3 version: {boto3.__version__}")

          # boto3 clients for the two AgentCore Registry planes
          registry_cp_client = boto3.client("bedrock-agentcore-control", region_name=REGION)
          registry_dp_client = boto3.client("bedrock-agentcore", region_name=REGION)

          def get_registry_record(registry_id, record_id):
              resp = registry_cp_client.get_registry_record(registryId=registry_id, recordId=record_id)
              metadata = resp.get("ResponseMetadata", {})
              aws_request_id = metadata['HTTPHeaders']['x-amzn-requestid']
              timestamp = metadata.get("HTTPHeaders", {}).get("date", datetime.now(timezone.utc).isoformat())
              print(f"get_registry_record metadata: requestId={aws_request_id}, timestamp={timestamp}")
              # Return the record fields, excluding boto3 response metadata
              return {k: v for k, v in resp.items() if k != "ResponseMetadata"}

          def search_registry_records(registry_id, query, max_results=2, descriptor_type=None):
              kwargs = dict(
                  searchQuery=query,
                  registryIds=[registry_id],
                  maxResults=max_results,
              )
              if descriptor_type:
                  kwargs['filters'] = {"descriptorType": {"$eq": descriptor_type}}
  
              print(f"Search parameters: {kwargs}")
              
              resp = registry_dp_client.search_registry_records(**kwargs)
              metadata = resp.get("ResponseMetadata", {})
              aws_request_id = metadata['HTTPHeaders']['x-amzn-requestid']
              timestamp = metadata.get("HTTPHeaders", {}).get("date", datetime.now(timezone.utc).isoformat())
              resp_body = {k: v for k, v in resp.items() if k != "ResponseMetadata"}

              if 'registryRecords' not in resp_body:
                  print(f"search_registry_records query: {query}, registry id: {registry_id}, error: {resp_body}")
              else:
                  print(f"search_registry_records query: {query}, registry id: {registry_id}, # records found: {len(resp_body['registryRecords'])}")

              print(f"search_registry_records metadata: requestId={aws_request_id}, timestamp={timestamp}")
              return resp_body

          def run_ai_scan(record):
              """Run Cisco AI A2A scanner on the registry record and return (summary, report_url)."""
              import asyncio
              async def _scan():
                  try:
                      from a2ascanner import Scanner
                      # Extract the A2A agent card from the nested descriptor structure
                      inline = record.get("descriptors", {}).get("a2a", {}).get("agentCard", {}).get("inlineContent", "{}")
                      agent_card = json.loads(inline) if isinstance(inline, str) else inline
                      #print(f"Agent Card: {agent_card}")

                      scanner = Scanner()
                      result = await scanner.scan_agent_card(agent_card)
                      severity_counts = {}

                      for f in result.findings:
                          sev = getattr(f, "severity", "INFO")
                          severity_counts[sev] = severity_counts.get(sev, 0) + 1

                      parts = [f"{v} {k}" for k, v in severity_counts.items()] if severity_counts else ["No issues found"]
                      summary = f":shield: *AI Scan Findings:* {', '.join(parts)}"
                      findings_list = [f.to_dict() for f in result.findings]
                      
                      print(f"AI scan findings: {json.dumps(findings_list, indent=2)}")

                      # Build HTML report
                      record_name = record.get("name", "Unknown Agent")
                      generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
                      rows = ""
                      for f in findings_list:
                          sev = f.get("severity", "INFO")
                          sev_color = {"HIGH": "#c0392b", "MEDIUM": "#e67e22", "LOW": "#27ae60"}.get(sev, "#7f8c8d")
                          details_str = json.dumps(f.get("details", {}), indent=2)
                          rows += f"""<tr>
                              <td style="color:{sev_color};font-weight:bold">{sev}</td>
                              <td>{f.get("analyzer","")}</td>
                              <td>{f.get("threat_name","")}</td>
                              <td>{f.get("scanner_category") or "—"}</td>
                              <td>{f.get("summary","")}</td>
                              <td><pre style="margin:0;font-size:11px">{details_str}</pre></td>
                          </tr>"""
                      html = f"""<!DOCTYPE html><html><head><meta charset="utf-8">
                      <title>A2A Security Scan Report</title>
                      <style>
                        body{{font-family:Arial,sans-serif;margin:20px;color:#333}}
                        h1{{font-size:20px}} .meta{{font-size:13px;color:#666;margin-bottom:16px}}
                        .summary{{display:flex;gap:16px;margin-bottom:20px}}
                        .badge{{padding:10px 20px;border-radius:6px;text-align:center;color:#fff}}
                        .HIGH{{background:#c0392b}} .MEDIUM{{background:#e67e22}} .LOW{{background:#27ae60}} .TOTAL{{background:#2980b9}}
                        .badge .num{{font-size:28px;font-weight:bold}} .badge .lbl{{font-size:12px}}
                        table{{border-collapse:collapse;width:100%}} th,td{{border:1px solid #ddd;padding:8px;text-align:left;vertical-align:top}}
                        th{{background:#f2f2f2;font-size:13px}} td{{font-size:13px}}
                      </style></head><body>
                      <h1>A2A Security Scan Report</h1>
                      <div class="meta">Target: {record_name} | Generated: {generated_at}</div>
                      <h2>Summary</h2>
                      <div class="summary">
                        <div class="badge HIGH"><div class="num">{severity_counts.get("HIGH",0)}</div><div class="lbl">High</div></div>
                        <div class="badge MEDIUM"><div class="num">{severity_counts.get("MEDIUM",0)}</div><div class="lbl">Medium</div></div>
                        <div class="badge LOW"><div class="num">{severity_counts.get("LOW",0)}</div><div class="lbl">Low</div></div>
                        <div class="badge TOTAL"><div class="num">{len(findings_list)}</div><div class="lbl">Total</div></div>
                      </div>
                      <h2>Findings</h2>
                      <table><thead><tr><th>Severity</th><th>Analyzer</th><th>Threat</th><th>Category</th><th>Summary</th><th>Details</th></tr></thead>
                      <tbody>{rows}</tbody></table>
                      <p style="margin-top:20px;font-size:12px;color:#999">Generated by A2A Scanner</p>
                      </body></html>"""

                      # Upload to S3 and generate presigned URL
                      reports_bucket = os.environ.get("reportsBucket", "")
                      report_url = ""
                      if reports_bucket:
                          s3 = boto3.client("s3", region_name=REGION)
                          record_id = record.get("registryRecordId", record.get("id", "unknown"))
                          s3_key = f"ai-scan-reports/{record_id}-{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}.html"
                          s3.put_object(Bucket=reports_bucket, Key=s3_key, Body=html.encode("utf-8"), ContentType="text/html")
                          report_url = s3.generate_presigned_url(
                              "get_object",
                              Params={"Bucket": reports_bucket, "Key": s3_key},
                              ExpiresIn=604800  # 7 days
                          )
                          print(f"AI scan report uploaded: s3://{reports_bucket}/{s3_key}")

                      return summary, report_url, findings_list
                  except ImportError:
                      print("Warning: a2ascanner not available")
                      return ":shield: *AI Scan:* Scanner unavailable", "", []
                  except Exception as e:
                      print(f"Warning: AI scan failed: {e}")
                      return f":shield: *AI Scan:* Scan error - {str(e)}", "", []
              return asyncio.run(_scan())

          def lambda_handler(event, context):
              slackIncomingHookUrl = os.getenv("slackIncomingHookUrl")
              slackChannelName = os.getenv("slackChannelName")

              #print(f"Event recd: {event}")

              registryId = event["detail"]["registryId"]
              registryRecordId = event["detail"]["registryRecordId"]

              # Fetch registry record details via REST API
              record = {}
              try:
                  record = get_registry_record(registryId, registryRecordId)
                  #print(f"Registry record: {json.dumps(record)}")
              except Exception as e:
                  print(f"Warning: could not fetch registry record details: {e}")

              record_name = record.get("name", "N/A")
              record_description = record.get("description", "N/A")
              record_protocol = record.get("descriptorType", "N/A")
              record_status = record.get("status", "N/A")

              # Search for top 2 similar records by name and description
              similar_records_text = ""
              try:
                  search_query = f"{record_name} {record_description}".strip()
                  search_resp = search_registry_records(registryId, search_query, max_results=2, descriptor_type=record_protocol)
                  #print(search_resp)
                  matches = [
                      r for r in search_resp.get("registryRecords", [])
                      if r.get("recordId") != registryRecordId
                  ][:2]
                  if matches:
                      lines = ["\n*Similar Records Found in Registry:*"]
                      for i, m in enumerate(matches, 1):
                          lines.append(
                              f"  {i}. *{m.get('name', 'N/A')}* (ID: {m.get('recordId', 'N/A')})\n"
                              f"     Descriptor Type: {m.get('descriptorType', 'N/A')} | Status: {m.get('status', 'N/A')}\n"
                              f"     Description: {m.get('description', 'N/A')}"
                          )
                      similar_records_text = "\n".join(lines)
                  #print(f"Similar records search result: {json.dumps(search_resp)}")
              except Exception as e:
                  print(f"Warning: could not search for similar records: {e}")

              # Run AI scan inline (A2A records only)
              ai_scan_summary, ai_scan_report_url, ai_scan_findings = "", "", []
              if record.get("descriptorType") == "A2A":
                  ai_scan_summary, ai_scan_report_url, ai_scan_findings = run_ai_scan(record)
              ai_scan_text = f"\n{ai_scan_summary}" if ai_scan_summary else ""
              if ai_scan_report_url:
                  ai_scan_text += f"  |  <{ai_scan_report_url}|:page_facing_up: View Full AI Scan Report>"

              # Write AI scan findings to DynamoDB
              try:
                  scan_table = os.environ.get("scanResultsTable", "")
                  if scan_table and record.get("descriptorType") == "A2A":
                      ddb = boto3.resource("dynamodb", region_name=REGION)
                      table = ddb.Table(scan_table)
                      table.put_item(Item={
                          "registryId": registryId,
                          "registryRecordId": registryRecordId,
                          "scanFindings": json.dumps(ai_scan_findings),
                          "scanSummary": ai_scan_summary,
                          "reportUrl": ai_scan_report_url,
                          "scannedAt": datetime.now(timezone.utc).isoformat(),
                      })
                      print(f"AI scan findings written to DynamoDB table: {scan_table}")
              except Exception as e:
                  print(f"Warning: could not write scan findings to DynamoDB: {e}")

              approve_cmd = (
                  f"aws bedrock-agentcore-control update-registry-record-status"
                  f" --registry-id {registryId}"
                  f" --record-id {registryRecordId}"
                  f" --status APPROVED"
                  f" --status-reason 'Admin approved via CLI'"
              )
              reject_cmd = (
                  f"aws bedrock-agentcore-control update-registry-record-status"
                  f" --registry-id {registryId}"
                  f" --record-id {registryRecordId}"
                  f" --status REJECTED"
                  f" --status-reason 'Admin rejected via CLI'"
              )

              getdetails_cmd = (
                  f"aws bedrock-agentcore-control get-registry-record"
                  f" --registry-id {registryId}"
                  f" --record-id {registryRecordId}"
              )

              payload = json.dumps({
                  "channel": slackChannelName,
                  "username": "AWS Agent Registry Bot",
                  "icon_emoji": ":aws:",
                  "text": (
                      f"A new registry record has been submitted for approval\n"
                      f"*Registry Id:* {registryId}\n"
                      f"*Registry Record Id:* {registryRecordId}\n"
                      f"*Name:* {record_name}\n"
                      f"*Description:* {record_description}\n"
                      f"*Descriptor Type:* {record_protocol}\n"
                      f"*Status:* {record_status}\n"
                      f"{similar_records_text}"
                      f"{ai_scan_text}\n\n"
                      f":white_check_mark: *To Approve, run:*\n```{approve_cmd}```\n"
                      f":x: *To Reject, run:*\n```{reject_cmd}```\n"
                      f":mag: *To Get more details, run:*\n```{getdetails_cmd}```"
                  )
              })

              http = urllib3.PoolManager()
              response = http.request(
                  "POST",
                  slackIncomingHookUrl,
                  body=payload,
                  headers={"Content-Type": "application/json"}
              )
              print(f"Slack response — status: {response.status}, body: {response.data.decode('utf-8')}")

              return {"statusCode": 200}

Finding 6: CKV_AWS_116

  • Severity: HIGH
  • Scanner: checkov
  • Rule ID: CKV_AWS_116
  • Location: 01-tutorials/10-Agent-Registry/01-advanced/admin-approval-workflow/cfn_eventbridge.yaml:65-344

Description:
Ensure that AWS Lambda function is configured for a Dead Letter Queue(DLQ)

Code Snippet:

CICDFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub ${Prefix}-lambda-cicd
      Runtime: python3.14
      Handler: index.lambda_handler
      Role: !GetAtt CICDFunctionRole.Arn
      Timeout: 60
      KmsKeyArn: !GetAtt LambdaEnvEncryptionKey.Arn
      Layers:
        - !Ref CICDDependenciesLayer
      Environment:
        Variables:
          slackIncomingHookUrl: !Ref SlackIncomingHookUrl
          slackChannelName: !Ref SlackChannelName
          reportsBucket: !Ref LambdaLayerBucket
          scanResultsTable: !Ref RegistryRecordMetadataTable
      Code:
        ZipFile: |
          import os
          import json
          import urllib3
          import boto3
          from datetime import datetime, timezone

          REGION = os.environ.get("AWS_REGION", "us-west-2")
          print(f"Boto3 version: {boto3.__version__}")

          # boto3 clients for the two AgentCore Registry planes
          registry_cp_client = boto3.client("bedrock-agentcore-control", region_name=REGION)
          registry_dp_client = boto3.client("bedrock-agentcore", region_name=REGION)

          def get_registry_record(registry_id, record_id):
              resp = registry_cp_client.get_registry_record(registryId=registry_id, recordId=record_id)
              metadata = resp.get("ResponseMetadata", {})
              aws_request_id = metadata['HTTPHeaders']['x-amzn-requestid']
              timestamp = metadata.get("HTTPHeaders", {}).get("date", datetime.now(timezone.utc).isoformat())
              print(f"get_registry_record metadata: requestId={aws_request_id}, timestamp={timestamp}")
              # Return the record fields, excluding boto3 response metadata
              return {k: v for k, v in resp.items() if k != "ResponseMetadata"}

          def search_registry_records(registry_id, query, max_results=2, descriptor_type=None):
              kwargs = dict(
                  searchQuery=query,
                  registryIds=[registry_id],
                  maxResults=max_results,
              )
              if descriptor_type:
                  kwargs['filters'] = {"descriptorType": {"$eq": descriptor_type}}
  
              print(f"Search parameters: {kwargs}")
              
              resp = registry_dp_client.search_registry_records(**kwargs)
              metadata = resp.get("ResponseMetadata", {})
              aws_request_id = metadata['HTTPHeaders']['x-amzn-requestid']
              timestamp = metadata.get("HTTPHeaders", {}).get("date", datetime.now(timezone.utc).isoformat())
              resp_body = {k: v for k, v in resp.items() if k != "ResponseMetadata"}

              if 'registryRecords' not in resp_body:
                  print(f"search_registry_records query: {query}, registry id: {registry_id}, error: {resp_body}")
              else:
                  print(f"search_registry_records query: {query}, registry id: {registry_id}, # records found: {len(resp_body['registryRecords'])}")

              print(f"search_registry_records metadata: requestId={aws_request_id}, timestamp={timestamp}")
              return resp_body

          def run_ai_scan(record):
              """Run Cisco AI A2A scanner on the registry record and return (summary, report_url)."""
              import asyncio
              async def _scan():
                  try:
                      from a2ascanner import Scanner
                      # Extract the A2A agent card from the nested descriptor structure
                      inline = record.get("descriptors", {}).get("a2a", {}).get("agentCard", {}).get("inlineContent", "{}")
                      agent_card = json.loads(inline) if isinstance(inline, str) else inline
                      #print(f"Agent Card: {agent_card}")

                      scanner = Scanner()
                      result = await scanner.scan_agent_card(agent_card)
                      severity_counts = {}

                      for f in result.findings:
                          sev = getattr(f, "severity", "INFO")
                          severity_counts[sev] = severity_counts.get(sev, 0) + 1

                      parts = [f"{v} {k}" for k, v in severity_counts.items()] if severity_counts else ["No issues found"]
                      summary = f":shield: *AI Scan Findings:* {', '.join(parts)}"
                      findings_list = [f.to_dict() for f in result.findings]
                      
                      print(f"AI scan findings: {json.dumps(findings_list, indent=2)}")

                      # Build HTML report
                      record_name = record.get("name", "Unknown Agent")
                      generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
                      rows = ""
                      for f in findings_list:
                          sev = f.get("severity", "INFO")
                          sev_color = {"HIGH": "#c0392b", "MEDIUM": "#e67e22", "LOW": "#27ae60"}.get(sev, "#7f8c8d")
                          details_str = json.dumps(f.get("details", {}), indent=2)
                          rows += f"""<tr>
                              <td style="color:{sev_color};font-weight:bold">{sev}</td>
                              <td>{f.get("analyzer","")}</td>
                              <td>{f.get("threat_name","")}</td>
                              <td>{f.get("scanner_category") or "—"}</td>
                              <td>{f.get("summary","")}</td>
                              <td><pre style="margin:0;font-size:11px">{details_str}</pre></td>
                          </tr>"""
                      html = f"""<!DOCTYPE html><html><head><meta charset="utf-8">
                      <title>A2A Security Scan Report</title>
                      <style>
                        body{{font-family:Arial,sans-serif;margin:20px;color:#333}}
                        h1{{font-size:20px}} .meta{{font-size:13px;color:#666;margin-bottom:16px}}
                        .summary{{display:flex;gap:16px;margin-bottom:20px}}
                        .badge{{padding:10px 20px;border-radius:6px;text-align:center;color:#fff}}
                        .HIGH{{background:#c0392b}} .MEDIUM{{background:#e67e22}} .LOW{{background:#27ae60}} .TOTAL{{background:#2980b9}}
                        .badge .num{{font-size:28px;font-weight:bold}} .badge .lbl{{font-size:12px}}
                        table{{border-collapse:collapse;width:100%}} th,td{{border:1px solid #ddd;padding:8px;text-align:left;vertical-align:top}}
                        th{{background:#f2f2f2;font-size:13px}} td{{font-size:13px}}
                      </style></head><body>
                      <h1>A2A Security Scan Report</h1>
                      <div class="meta">Target: {record_name} | Generated: {generated_at}</div>
                      <h2>Summary</h2>
                      <div class="summary">
                        <div class="badge HIGH"><div class="num">{severity_counts.get("HIGH",0)}</div><div class="lbl">High</div></div>
                        <div class="badge MEDIUM"><div class="num">{severity_counts.get("MEDIUM",0)}</div><div class="lbl">Medium</div></div>
                        <div class="badge LOW"><div class="num">{severity_counts.get("LOW",0)}</div><div class="lbl">Low</div></div>
                        <div class="badge TOTAL"><div class="num">{len(findings_list)}</div><div class="lbl">Total</div></div>
                      </div>
                      <h2>Findings</h2>
                      <table><thead><tr><th>Severity</th><th>Analyzer</th><th>Threat</th><th>Category</th><th>Summary</th><th>Details</th></tr></thead>
                      <tbody>{rows}</tbody></table>
                      <p style="margin-top:20px;font-size:12px;color:#999">Generated by A2A Scanner</p>
                      </body></html>"""

                      # Upload to S3 and generate presigned URL
                      reports_bucket = os.environ.get("reportsBucket", "")
                      report_url = ""
                      if reports_bucket:
                          s3 = boto3.client("s3", region_name=REGION)
                          record_id = record.get("registryRecordId", record.get("id", "unknown"))
                          s3_key = f"ai-scan-reports/{record_id}-{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}.html"
                          s3.put_object(Bucket=reports_bucket, Key=s3_key, Body=html.encode("utf-8"), ContentType="text/html")
                          report_url = s3.generate_presigned_url(
                              "get_object",
                              Params={"Bucket": reports_bucket, "Key": s3_key},
                              ExpiresIn=604800  # 7 days
                          )
                          print(f"AI scan report uploaded: s3://{reports_bucket}/{s3_key}")

                      return summary, report_url, findings_list
                  except ImportError:
                      print("Warning: a2ascanner not available")
                      return ":shield: *AI Scan:* Scanner unavailable", "", []
                  except Exception as e:
                      print(f"Warning: AI scan failed: {e}")
                      return f":shield: *AI Scan:* Scan error - {str(e)}", "", []
              return asyncio.run(_scan())

          def lambda_handler(event, context):
              slackIncomingHookUrl = os.getenv("slackIncomingHookUrl")
              slackChannelName = os.getenv("slackChannelName")

              #print(f"Event recd: {event}")

              registryId = event["detail"]["registryId"]
              registryRecordId = event["detail"]["registryRecordId"]

              # Fetch registry record details via REST API
              record = {}
              try:
                  record = get_registry_record(registryId, registryRecordId)
                  #print(f"Registry record: {json.dumps(record)}")
              except Exception as e:
                  print(f"Warning: could not fetch registry record details: {e}")

              record_name = record.get("name", "N/A")
              record_description = record.get("description", "N/A")
              record_protocol = record.get("descriptorType", "N/A")
              record_status = record.get("status", "N/A")

              # Search for top 2 similar records by name and description
              similar_records_text = ""
              try:
                  search_query = f"{record_name} {record_description}".strip()
                  search_resp = search_registry_records(registryId, search_query, max_results=2, descriptor_type=record_protocol)
                  #print(search_resp)
                  matches = [
                      r for r in search_resp.get("registryRecords", [])
                      if r.get("recordId") != registryRecordId
                  ][:2]
                  if matches:
                      lines = ["\n*Similar Records Found in Registry:*"]
                      for i, m in enumerate(matches, 1):
                          lines.append(
                              f"  {i}. *{m.get('name', 'N/A')}* (ID: {m.get('recordId', 'N/A')})\n"
                              f"     Descriptor Type: {m.get('descriptorType', 'N/A')} | Status: {m.get('status', 'N/A')}\n"
                              f"     Description: {m.get('description', 'N/A')}"
                          )
                      similar_records_text = "\n".join(lines)
                  #print(f"Similar records search result: {json.dumps(search_resp)}")
              except Exception as e:
                  print(f"Warning: could not search for similar records: {e}")

              # Run AI scan inline (A2A records only)
              ai_scan_summary, ai_scan_report_url, ai_scan_findings = "", "", []
              if record.get("descriptorType") == "A2A":
                  ai_scan_summary, ai_scan_report_url, ai_scan_findings = run_ai_scan(record)
              ai_scan_text = f"\n{ai_scan_summary}" if ai_scan_summary else ""
              if ai_scan_report_url:
                  ai_scan_text += f"  |  <{ai_scan_report_url}|:page_facing_up: View Full AI Scan Report>"

              # Write AI scan findings to DynamoDB
              try:
                  scan_table = os.environ.get("scanResultsTable", "")
                  if scan_table and record.get("descriptorType") == "A2A":
                      ddb = boto3.resource("dynamodb", region_name=REGION)
                      table = ddb.Table(scan_table)
                      table.put_item(Item={
                          "registryId": registryId,
                          "registryRecordId": registryRecordId,
                          "scanFindings": json.dumps(ai_scan_findings),
                          "scanSummary": ai_scan_summary,
                          "reportUrl": ai_scan_report_url,
                          "scannedAt": datetime.now(timezone.utc).isoformat(),
                      })
                      print(f"AI scan findings written to DynamoDB table: {scan_table}")
              except Exception as e:
                  print(f"Warning: could not write scan findings to DynamoDB: {e}")

              approve_cmd = (
                  f"aws bedrock-agentcore-control update-registry-record-status"
                  f" --registry-id {registryId}"
                  f" --record-id {registryRecordId}"
                  f" --status APPROVED"
                  f" --status-reason 'Admin approved via CLI'"
              )
              reject_cmd = (
                  f"aws bedrock-agentcore-control update-registry-record-status"
                  f" --registry-id {registryId}"
                  f" --record-id {registryRecordId}"
                  f" --status REJECTED"
                  f" --status-reason 'Admin rejected via CLI'"
              )

              getdetails_cmd = (
                  f"aws bedrock-agentcore-control get-registry-record"
                  f" --registry-id {registryId}"
                  f" --record-id {registryRecordId}"
              )

              payload = json.dumps({
                  "channel": slackChannelName,
                  "username": "AWS Agent Registry Bot",
                  "icon_emoji": ":aws:",
                  "text": (
                      f"A new registry record has been submitted for approval\n"
                      f"*Registry Id:* {registryId}\n"
                      f"*Registry Record Id:* {registryRecordId}\n"
                      f"*Name:* {record_name}\n"
                      f"*Description:* {record_description}\n"
                      f"*Descriptor Type:* {record_protocol}\n"
                      f"*Status:* {record_status}\n"
                      f"{similar_records_text}"
                      f"{ai_scan_text}\n\n"
                      f":white_check_mark: *To Approve, run:*\n```{approve_cmd}```\n"
                      f":x: *To Reject, run:*\n```{reject_cmd}```\n"
                      f":mag: *To Get more details, run:*\n```{getdetails_cmd}```"
                  )
              })

              http = urllib3.PoolManager()
              response = http.request(
                  "POST",
                  slackIncomingHookUrl,
                  body=payload,
                  headers={"Content-Type": "application/json"}
              )
              print(f"Slack response — status: {response.status}, body: {response.data.decode('utf-8')}")

              return {"statusCode": 200}

Finding 7: CKV_AWS_117

  • Severity: HIGH
  • Scanner: checkov
  • Rule ID: CKV_AWS_117
  • Location: 01-tutorials/10-Agent-Registry/01-advanced/admin-approval-workflow/cfn_eventbridge.yaml:65-344

Description:
Ensure that AWS Lambda function is configured inside a VPC

Code Snippet:

CICDFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub ${Prefix}-lambda-cicd
      Runtime: python3.14
      Handler: index.lambda_handler
      Role: !GetAtt CICDFunctionRole.Arn
      Timeout: 60
      KmsKeyArn: !GetAtt LambdaEnvEncryptionKey.Arn
      Layers:
        - !Ref CICDDependenciesLayer
      Environment:
        Variables:
          slackIncomingHookUrl: !Ref SlackIncomingHookUrl
          slackChannelName: !Ref SlackChannelName
          reportsBucket: !Ref LambdaLayerBucket
          scanResultsTable: !Ref RegistryRecordMetadataTable
      Code:
        ZipFile: |
          import os
          import json
          import urllib3
          import boto3
          from datetime import datetime, timezone

          REGION = os.environ.get("AWS_REGION", "us-west-2")
          print(f"Boto3 version: {boto3.__version__}")

          # boto3 clients for the two AgentCore Registry planes
          registry_cp_client = boto3.client("bedrock-agentcore-control", region_name=REGION)
          registry_dp_client = boto3.client("bedrock-agentcore", region_name=REGION)

          def get_registry_record(registry_id, record_id):
              resp = registry_cp_client.get_registry_record(registryId=registry_id, recordId=record_id)
              metadata = resp.get("ResponseMetadata", {})
              aws_request_id = metadata['HTTPHeaders']['x-amzn-requestid']
              timestamp = metadata.get("HTTPHeaders", {}).get("date", datetime.now(timezone.utc).isoformat())
              print(f"get_registry_record metadata: requestId={aws_request_id}, timestamp={timestamp}")
              # Return the record fields, excluding boto3 response metadata
              return {k: v for k, v in resp.items() if k != "ResponseMetadata"}

          def search_registry_records(registry_id, query, max_results=2, descriptor_type=None):
              kwargs = dict(
                  searchQuery=query,
                  registryIds=[registry_id],
                  maxResults=max_results,
              )
              if descriptor_type:
                  kwargs['filters'] = {"descriptorType": {"$eq": descriptor_type}}
  
              print(f"Search parameters: {kwargs}")
              
              resp = registry_dp_client.search_registry_records(**kwargs)
              metadata = resp.get("ResponseMetadata", {})
              aws_request_id = metadata['HTTPHeaders']['x-amzn-requestid']
              timestamp = metadata.get("HTTPHeaders", {}).get("date", datetime.now(timezone.utc).isoformat())
              resp_body = {k: v for k, v in resp.items() if k != "ResponseMetadata"}

              if 'registryRecords' not in resp_body:
                  print(f"search_registry_records query: {query}, registry id: {registry_id}, error: {resp_body}")
              else:
                  print(f"search_registry_records query: {query}, registry id: {registry_id}, # records found: {len(resp_body['registryRecords'])}")

              print(f"search_registry_records metadata: requestId={aws_request_id}, timestamp={timestamp}")
              return resp_body

          def run_ai_scan(record):
              """Run Cisco AI A2A scanner on the registry record and return (summary, report_url)."""
              import asyncio
              async def _scan():
                  try:
                      from a2ascanner import Scanner
                      # Extract the A2A agent card from the nested descriptor structure
                      inline = record.get("descriptors", {}).get("a2a", {}).get("agentCard", {}).get("inlineContent", "{}")
                      agent_card = json.loads(inline) if isinstance(inline, str) else inline
                      #print(f"Agent Card: {agent_card}")

                      scanner = Scanner()
                      result = await scanner.scan_agent_card(agent_card)
                      severity_counts = {}

                      for f in result.findings:
                          sev = getattr(f, "severity", "INFO")
                          severity_counts[sev] = severity_counts.get(sev, 0) + 1

                      parts = [f"{v} {k}" for k, v in severity_counts.items()] if severity_counts else ["No issues found"]
                      summary = f":shield: *AI Scan Findings:* {', '.join(parts)}"
                      findings_list = [f.to_dict() for f in result.findings]
                      
                      print(f"AI scan findings: {json.dumps(findings_list, indent=2)}")

                      # Build HTML report
                      record_name = record.get("name", "Unknown Agent")
                      generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
                      rows = ""
                      for f in findings_list:
                          sev = f.get("severity", "INFO")
                          sev_color = {"HIGH": "#c0392b", "MEDIUM": "#e67e22", "LOW": "#27ae60"}.get(sev, "#7f8c8d")
                          details_str = json.dumps(f.get("details", {}), indent=2)
                          rows += f"""<tr>
                              <td style="color:{sev_color};font-weight:bold">{sev}</td>
                              <td>{f.get("analyzer","")}</td>
                              <td>{f.get("threat_name","")}</td>
                              <td>{f.get("scanner_category") or "—"}</td>
                              <td>{f.get("summary","")}</td>
                              <td><pre style="margin:0;font-size:11px">{details_str}</pre></td>
                          </tr>"""
                      html = f"""<!DOCTYPE html><html><head><meta charset="utf-8">
                      <title>A2A Security Scan Report</title>
                      <style>
                        body{{font-family:Arial,sans-serif;margin:20px;color:#333}}
                        h1{{font-size:20px}} .meta{{font-size:13px;color:#666;margin-bottom:16px}}
                        .summary{{display:flex;gap:16px;margin-bottom:20px}}
                        .badge{{padding:10px 20px;border-radius:6px;text-align:center;color:#fff}}
                        .HIGH{{background:#c0392b}} .MEDIUM{{background:#e67e22}} .LOW{{background:#27ae60}} .TOTAL{{background:#2980b9}}
                        .badge .num{{font-size:28px;font-weight:bold}} .badge .lbl{{font-size:12px}}
                        table{{border-collapse:collapse;width:100%}} th,td{{border:1px solid #ddd;padding:8px;text-align:left;vertical-align:top}}
                        th{{background:#f2f2f2;font-size:13px}} td{{font-size:13px}}
                      </style></head><body>
                      <h1>A2A Security Scan Report</h1>
                      <div class="meta">Target: {record_name} | Generated: {generated_at}</div>
                      <h2>Summary</h2>
                      <div class="summary">
                        <div class="badge HIGH"><div class="num">{severity_counts.get("HIGH",0)}</div><div class="lbl">High</div></div>
                        <div class="badge MEDIUM"><div class="num">{severity_counts.get("MEDIUM",0)}</div><div class="lbl">Medium</div></div>
                        <div class="badge LOW"><div class="num">{severity_counts.get("LOW",0)}</div><div class="lbl">Low</div></div>
                        <div class="badge TOTAL"><div class="num">{len(findings_list)}</div><div class="lbl">Total</div></div>
                      </div>
                      <h2>Findings</h2>
                      <table><thead><tr><th>Severity</th><th>Analyzer</th><th>Threat</th><th>Category</th><th>Summary</th><th>Details</th></tr></thead>
                      <tbody>{rows}</tbody></table>
                      <p style="margin-top:20px;font-size:12px;color:#999">Generated by A2A Scanner</p>
                      </body></html>"""

                      # Upload to S3 and generate presigned URL
                      reports_bucket = os.environ.get("reportsBucket", "")
                      report_url = ""
                      if reports_bucket:
                          s3 = boto3.client("s3", region_name=REGION)
                          record_id = record.get("registryRecordId", record.get("id", "unknown"))
                          s3_key = f"ai-scan-reports/{record_id}-{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}.html"
                          s3.put_object(Bucket=reports_bucket, Key=s3_key, Body=html.encode("utf-8"), ContentType="text/html")
                          report_url = s3.generate_presigned_url(
                              "get_object",
                              Params={"Bucket": reports_bucket, "Key": s3_key},
                              ExpiresIn=604800  # 7 days
                          )
                          print(f"AI scan report uploaded: s3://{reports_bucket}/{s3_key}")

                      return summary, report_url, findings_list
                  except ImportError:
                      print("Warning: a2ascanner not available")
                      return ":shield: *AI Scan:* Scanner unavailable", "", []
                  except Exception as e:
                      print(f"Warning: AI scan failed: {e}")
                      return f":shield: *AI Scan:* Scan error - {str(e)}", "", []
              return asyncio.run(_scan())

          def lambda_handler(event, context):
              slackIncomingHookUrl = os.getenv("slackIncomingHookUrl")
              slackChannelName = os.getenv("slackChannelName")

              #print(f"Event recd: {event}")

              registryId = event["detail"]["registryId"]
              registryRecordId = event["detail"]["registryRecordId"]

              # Fetch registry record details via REST API
              record = {}
              try:
                  record = get_registry_record(registryId, registryRecordId)
                  #print(f"Registry record: {json.dumps(record)}")
              except Exception as e:
                  print(f"Warning: could not fetch registry record details: {e}")

              record_name = record.get("name", "N/A")
              record_description = record.get("description", "N/A")
              record_protocol = record.get("descriptorType", "N/A")
              record_status = record.get("status", "N/A")

              # Search for top 2 similar records by name and description
              similar_records_text = ""
              try:
                  search_query = f"{record_name} {record_description}".strip()
                  search_resp = search_registry_records(registryId, search_query, max_results=2, descriptor_type=record_protocol)
                  #print(search_resp)
                  matches = [
                      r for r in search_resp.get("registryRecords", [])
                      if r.get("recordId") != registryRecordId
                  ][:2]
                  if matches:
                      lines = ["\n*Similar Records Found in Registry:*"]
                      for i, m in enumerate(matches, 1):
                          lines.append(
                              f"  {i}. *{m.get('name', 'N/A')}* (ID: {m.get('recordId', 'N/A')})\n"
                              f"     Descriptor Type: {m.get('descriptorType', 'N/A')} | Status: {m.get('status', 'N/A')}\n"
                              f"     Description: {m.get('description', 'N/A')}"
                          )
                      similar_records_text = "\n".join(lines)
                  #print(f"Similar records search result: {json.dumps(search_resp)}")
              except Exception as e:
                  print(f"Warning: could not search for similar records: {e}")

              # Run AI scan inline (A2A records only)
              ai_scan_summary, ai_scan_report_url, ai_scan_findings = "", "", []
              if record.get("descriptorType") == "A2A":
                  ai_scan_summary, ai_scan_report_url, ai_scan_findings = run_ai_scan(record)
              ai_scan_text = f"\n{ai_scan_summary}" if ai_scan_summary else ""
              if ai_scan_report_url:
                  ai_scan_text += f"  |  <{ai_scan_report_url}|:page_facing_up: View Full AI Scan Report>"

              # Write AI scan findings to DynamoDB
              try:
                  scan_table = os.environ.get("scanResultsTable", "")
                  if scan_table and record.get("descriptorType") == "A2A":
                      ddb = boto3.resource("dynamodb", region_name=REGION)
                      table = ddb.Table(scan_table)
                      table.put_item(Item={
                          "registryId": registryId,
                          "registryRecordId": registryRecordId,
                          "scanFindings": json.dumps(ai_scan_findings),
                          "scanSummary": ai_scan_summary,
                          "reportUrl": ai_scan_report_url,
                          "scannedAt": datetime.now(timezone.utc).isoformat(),
                      })
                      print(f"AI scan findings written to DynamoDB table: {scan_table}")
              except Exception as e:
                  print(f"Warning: could not write scan findings to DynamoDB: {e}")

              approve_cmd = (
                  f"aws bedrock-agentcore-control update-registry-record-status"
                  f" --registry-id {registryId}"
                  f" --record-id {registryRecordId}"
                  f" --status APPROVED"
                  f" --status-reason 'Admin approved via CLI'"
              )
              reject_cmd = (
                  f"aws bedrock-agentcore-control update-registry-record-status"
                  f" --registry-id {registryId}"
                  f" --record-id {registryRecordId}"
                  f" --status REJECTED"
                  f" --status-reason 'Admin rejected via CLI'"
              )

              getdetails_cmd = (
                  f"aws bedrock-agentcore-control get-registry-record"
                  f" --registry-id {registryId}"
                  f" --record-id {registryRecordId}"
              )

              payload = json.dumps({
                  "channel": slackChannelName,
                  "username": "AWS Agent Registry Bot",
                  "icon_emoji": ":aws:",
                  "text": (
                      f"A new registry record has been submitted for approval\n"
                      f"*Registry Id:* {registryId}\n"
                      f"*Registry Record Id:* {registryRecordId}\n"
                      f"*Name:* {record_name}\n"
                      f"*Description:* {record_description}\n"
                      f"*Descriptor Type:* {record_protocol}\n"
                      f"*Status:* {record_status}\n"
                      f"{similar_records_text}"
                      f"{ai_scan_text}\n\n"
                      f":white_check_mark: *To Approve, run:*\n```{approve_cmd}```\n"
                      f":x: *To Reject, run:*\n```{reject_cmd}```\n"
                      f":mag: *To Get more details, run:*\n```{getdetails_cmd}```"
                  )
              })

              http = urllib3.PoolManager()
              response = http.request(
                  "POST",
                  slackIncomingHookUrl,
                  body=payload,
                  headers={"Content-Type": "application/json"}
              )
              print(f"Slack response — status: {response.status}, body: {response.data.decode('utf-8')}")

              return {"statusCode": 200}

Finding 8: CKV_AWS_28

  • Severity: HIGH
  • Scanner: checkov
  • Rule ID: CKV_AWS_28
  • Location: 01-tutorials/10-Agent-Registry/01-advanced/admin-approval-workflow/cfn_eventbridge.yaml:459-477

Description:
Ensure DynamoDB point in time recovery (backup) is enabled

Code Snippet:

RegistryRecordMetadataTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: !Sub ${Prefix}-registry-record-metadata
      BillingMode: PAY_PER_REQUEST
      SSESpecification:
        SSEEnabled: true
        SSEType: KMS
        KMSMasterKeyId: !GetAtt DynamoDBEncryptionKey.Arn
      AttributeDefinitions:
        - AttributeName: registryId
          AttributeType: S
        - AttributeName: registryRecordId
          AttributeType: S
      KeySchema:
        - AttributeName: registryId
          KeyType: HASH
        - AttributeName: registryRecordId
          KeyType: RANGE

Finding 9: AwsSolutions-DDB3

  • Severity: MEDIUM
  • Scanner: cdk-nag
  • Rule ID: AwsSolutions-DDB3
  • Location: 01-tutorials/10-Agent-Registry/01-advanced/admin-approval-workflow/cfn_eventbridge.yaml:80

Description:
The DynamoDB table does not have Point-in-time Recovery enabled.

Exception Reason: N/A

Code Snippet:

Resources:
  RegistryRecordMetadataTable:
    Properties:
      AttributeDefinitions:
        - AttributeName: registryId
          AttributeType: S
        - AttributeName: registryRecordId
          AttributeType: S
      BillingMode: PAY_PER_REQUEST
      KeySchema:
        - AttributeName: registryId


<!-- ASH-SECURITY-SCAN-COMMENT -->

@dhawalkp dhawalkp merged commit fddfadb into awslabs:main Apr 10, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

01-tutorials 01-tutorials

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants