From b140af90dd59b0fb3e2beed6bfc5a961a5204db9 Mon Sep 17 00:00:00 2001 From: deacon-mp Date: Wed, 20 May 2026 10:29:31 -0400 Subject: [PATCH] atomic_svc: refresh ART/STIX at startup, degrade gracefully offline clone_atomic_red_team_repo() now ensures the Atomic Red Team checkout is both present AND current on every startup: * missing/empty -> shallow clone * already present -> git fetch --depth 1 + fast-forward to latest HEAD This refreshes the atomic tests and the bundled ATT&CK STIX (enterprise-attack.json) together, so the 45MB STIX is pulled at startup rather than committed into the plugin, and stays current instead of being frozen at first-boot state. All network operations degrade gracefully: if the host is offline (or git is unavailable / the checkout is corrupt), a warning is logged and the existing on-disk copy is used; if nothing is on disk the import is skipped rather than crashing plugin enable. _populate_dict_techniques_tactics() likewise tolerates a missing enterprise-attack.json, falling back to the 'redcanary-unknown' tactic instead of raising FileNotFoundError. Co-Authored-By: Claude Opus 4.7 --- app/atomic_svc.py | 68 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 5 deletions(-) diff --git a/app/atomic_svc.py b/app/atomic_svc.py index 2f1beb5..94192a6 100644 --- a/app/atomic_svc.py +++ b/app/atomic_svc.py @@ -7,7 +7,7 @@ import yaml from collections import defaultdict -from subprocess import DEVNULL, STDOUT, check_call +from subprocess import DEVNULL, PIPE, CalledProcessError, run from app.utility.base_world import BaseWorld from app.utility.base_service import BaseService @@ -41,16 +41,65 @@ def __init__(self): async def clone_atomic_red_team_repo(self, repo_url=None): """ - Clone the Atomic Red Team repository. You can use a specific url via - the `repo_url` parameter (eg. if you want to use a fork). + Ensure the Atomic Red Team repository is present and up to date. + + On every startup this: + * clones the repo (shallow) if it is missing or empty; + * otherwise fetches the latest HEAD and fast-forwards to it. + + Both the atomic tests (atomics/) and the bundled ATT&CK STIX + (atomic_red_team/enterprise-attack.json) are refreshed together, so the + STIX is downloaded at startup rather than committed into this plugin. + + Every network operation degrades gracefully: if the host is offline a + warning is logged and whatever copy is already on disk is used. If + nothing is on disk and the clone fails, the import is skipped instead of + crashing the plugin. You can use a specific url via the `repo_url` + parameter (eg. if you want to use a fork). """ if not repo_url: repo_url = 'https://github.com/redcanaryco/atomic-red-team.git' if not os.path.exists(self.repo_dir) or not os.listdir(self.repo_dir): self.log.debug('cloning repo %s' % repo_url) - check_call(['git', 'clone', '--depth', '1', repo_url, self.repo_dir], stdout=DEVNULL, stderr=STDOUT) - self.log.debug('clone complete') + try: + run(['git', 'clone', '--depth', '1', repo_url, self.repo_dir], + check=True, stdout=DEVNULL, stderr=DEVNULL) + self.log.debug('clone complete') + except (CalledProcessError, OSError) as e: + self.log.warning('Could not clone Atomic Red Team repo (offline?): %s. ' + 'Atomic abilities will be unavailable until a startup ' + 'with network access succeeds.' % e) + return + + # Repo already on disk -> refresh it, but never fail if offline. + await self._update_atomic_red_team_repo() + + async def _update_atomic_red_team_repo(self): + """ + Fast-forward an existing shallow ART checkout to the latest upstream HEAD. + + A `git reset --hard` also restores the bundled enterprise-attack.json if + it went missing. Any failure (no network, git absent, corrupt checkout) + is logged and swallowed so the existing on-disk copy keeps working. + """ + try: + run(['git', 'fetch', '--depth', '1', 'origin', 'HEAD'], + cwd=self.repo_dir, check=True, stdout=DEVNULL, stderr=DEVNULL) + local = run(['git', 'rev-parse', 'HEAD'], cwd=self.repo_dir, check=True, + stdout=PIPE, stderr=DEVNULL, text=True).stdout.strip() + remote = run(['git', 'rev-parse', 'FETCH_HEAD'], cwd=self.repo_dir, check=True, + stdout=PIPE, stderr=DEVNULL, text=True).stdout.strip() + if local == remote: + self.log.debug('Atomic Red Team repo already up to date') + return + self.log.debug('updating Atomic Red Team repo to latest') + run(['git', 'reset', '--hard', 'FETCH_HEAD'], + cwd=self.repo_dir, check=True, stdout=DEVNULL, stderr=DEVNULL) + self.log.debug('Atomic Red Team repo updated') + except (CalledProcessError, OSError) as e: + self.log.warning('Could not update Atomic Red Team repo (offline?): %s. ' + 'Using the existing on-disk copy.' % e) async def populate_data_directory(self, path_yaml=None): """ @@ -107,9 +156,18 @@ async def _populate_dict_techniques_tactics(self): """ Populate internal dictionary used to match techniques to corresponding tactics. Use the file 'enterprise-attack.json' located in the Atomic Red Team repository. + + If the STIX file is missing (eg. the repo could not be cloned because the + host is offline), the mapping is left empty and abilities fall back to the + 'redcanary-unknown' tactic rather than failing the import. """ enterprise_attack_path = os.path.join(self.repo_dir, 'atomic_red_team', 'enterprise-attack.json') + if not os.path.isfile(enterprise_attack_path): + self.log.warning('ATT&CK STIX (%s) not found; importing abilities without tactic ' + 'mapping (offline?).' % enterprise_attack_path) + return + with open(enterprise_attack_path, 'r') as f: mitre_json = json.load(f)