Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 63 additions & 5 deletions app/atomic_svc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import yaml

from collections import defaultdict
from subprocess import DEVNULL, STDOUT, check_call
from subprocess import DEVNULL, PIPE, CalledProcessError, run

from app.utility.base_world import BaseWorld
from app.utility.base_service import BaseService
Expand Down Expand Up @@ -41,16 +41,65 @@ def __init__(self):

async def clone_atomic_red_team_repo(self, repo_url=None):
"""
Clone the Atomic Red Team repository. You can use a specific url via
the `repo_url` parameter (eg. if you want to use a fork).
Ensure the Atomic Red Team repository is present and up to date.

On every startup this:
* clones the repo (shallow) if it is missing or empty;
* otherwise fetches the latest HEAD and fast-forwards to it.

Both the atomic tests (atomics/) and the bundled ATT&CK STIX
(atomic_red_team/enterprise-attack.json) are refreshed together, so the
STIX is downloaded at startup rather than committed into this plugin.

Every network operation degrades gracefully: if the host is offline a
warning is logged and whatever copy is already on disk is used. If
nothing is on disk and the clone fails, the import is skipped instead of
crashing the plugin. You can use a specific url via the `repo_url`
parameter (eg. if you want to use a fork).
"""
if not repo_url:
repo_url = 'https://github.com/redcanaryco/atomic-red-team.git'

if not os.path.exists(self.repo_dir) or not os.listdir(self.repo_dir):
self.log.debug('cloning repo %s' % repo_url)
check_call(['git', 'clone', '--depth', '1', repo_url, self.repo_dir], stdout=DEVNULL, stderr=STDOUT)
self.log.debug('clone complete')
try:
run(['git', 'clone', '--depth', '1', repo_url, self.repo_dir],
check=True, stdout=DEVNULL, stderr=DEVNULL)
self.log.debug('clone complete')
except (CalledProcessError, OSError) as e:
self.log.warning('Could not clone Atomic Red Team repo (offline?): %s. '
'Atomic abilities will be unavailable until a startup '
'with network access succeeds.' % e)
return

# Repo already on disk -> refresh it, but never fail if offline.
await self._update_atomic_red_team_repo()

async def _update_atomic_red_team_repo(self):
"""
Fast-forward an existing shallow ART checkout to the latest upstream HEAD.

A `git reset --hard` also restores the bundled enterprise-attack.json if
it went missing. Any failure (no network, git absent, corrupt checkout)
is logged and swallowed so the existing on-disk copy keeps working.
"""
try:
run(['git', 'fetch', '--depth', '1', 'origin', 'HEAD'],
cwd=self.repo_dir, check=True, stdout=DEVNULL, stderr=DEVNULL)
local = run(['git', 'rev-parse', 'HEAD'], cwd=self.repo_dir, check=True,
stdout=PIPE, stderr=DEVNULL, text=True).stdout.strip()
remote = run(['git', 'rev-parse', 'FETCH_HEAD'], cwd=self.repo_dir, check=True,
stdout=PIPE, stderr=DEVNULL, text=True).stdout.strip()
if local == remote:
self.log.debug('Atomic Red Team repo already up to date')
return
self.log.debug('updating Atomic Red Team repo to latest')
run(['git', 'reset', '--hard', 'FETCH_HEAD'],
cwd=self.repo_dir, check=True, stdout=DEVNULL, stderr=DEVNULL)
self.log.debug('Atomic Red Team repo updated')
except (CalledProcessError, OSError) as e:
self.log.warning('Could not update Atomic Red Team repo (offline?): %s. '
'Using the existing on-disk copy.' % e)

async def populate_data_directory(self, path_yaml=None):
"""
Expand Down Expand Up @@ -107,9 +156,18 @@ async def _populate_dict_techniques_tactics(self):
"""
Populate internal dictionary used to match techniques to corresponding tactics.
Use the file 'enterprise-attack.json' located in the Atomic Red Team repository.

If the STIX file is missing (eg. the repo could not be cloned because the
host is offline), the mapping is left empty and abilities fall back to the
'redcanary-unknown' tactic rather than failing the import.
"""
enterprise_attack_path = os.path.join(self.repo_dir, 'atomic_red_team', 'enterprise-attack.json')

if not os.path.isfile(enterprise_attack_path):
self.log.warning('ATT&CK STIX (%s) not found; importing abilities without tactic '
'mapping (offline?).' % enterprise_attack_path)
return

with open(enterprise_attack_path, 'r') as f:
mitre_json = json.load(f)

Expand Down
Loading