Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions app/atomic_svc.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ async def clone_atomic_red_team_repo(self, repo_url=None):

async def _update_atomic_red_team_repo(self):
"""
Fast-forward an existing shallow ART checkout to the latest upstream HEAD.
Fast-forward an existing shallow ART checkout to the latest upstream HEAD
and heal the bundled enterprise-attack.json if it went missing.

A `git reset --hard` also restores the bundled enterprise-attack.json if
it went missing. Any failure (no network, git absent, corrupt checkout)
is logged and swallowed so the existing on-disk copy keeps working.
Updating to the latest commit needs network and is best-effort: failures
(offline, git absent, corrupt checkout) are logged and swallowed so the
on-disk copy keeps working. Restoring a deleted tracked file (eg. the
STIX) is done from the local git object store and works offline.
"""
try:
run(['git', 'fetch', '--depth', '1', 'origin', 'HEAD'],
Expand All @@ -92,15 +94,28 @@ async def _update_atomic_red_team_repo(self):
stdout=PIPE, stderr=DEVNULL, text=True).stdout.strip()
if local == remote:
self.log.debug('Atomic Red Team repo already up to date')
return
self.log.debug('updating Atomic Red Team repo to latest')
run(['git', 'reset', '--hard', 'FETCH_HEAD'],
cwd=self.repo_dir, check=True, stdout=DEVNULL, stderr=DEVNULL)
self.log.debug('Atomic Red Team repo updated')
else:
self.log.debug('updating Atomic Red Team repo to latest')
run(['git', 'reset', '--hard', 'FETCH_HEAD'],
cwd=self.repo_dir, check=True, stdout=DEVNULL, stderr=DEVNULL)
self.log.debug('Atomic Red Team repo updated')
except (CalledProcessError, OSError) as e:
self.log.warning('Could not update Atomic Red Team repo (offline?): %s. '
'Using the existing on-disk copy.' % e)

# Heal a missing bundled STIX even when the repo is otherwise up to date
# (eg. the file was deleted but no new upstream commit exists to reset
# to). Restoring a tracked file from the local checkout needs no network.
stix_rel = os.path.join('atomic_red_team', 'enterprise-attack.json')
if not os.path.isfile(os.path.join(self.repo_dir, stix_rel)):
self.log.debug('enterprise-attack.json missing; restoring from local checkout')
try:
run(['git', 'checkout', '--', stix_rel],
cwd=self.repo_dir, check=True, stdout=DEVNULL, stderr=DEVNULL)
except (CalledProcessError, OSError) as e:
self.log.warning('Could not restore enterprise-attack.json from local '
'checkout: %s' % e)

async def populate_data_directory(self, path_yaml=None):
"""
Populate the 'data' directory with the Atomic Red Team abilities.
Expand Down
Loading