Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions arc/job/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,32 @@ def determine_run_time(self):
else:
self.run_time = None

@staticmethod
def _rotate_csv_if_needed(csv_path: str, max_lines: int = 10000, line_count: Optional[int] = None) -> None:
"""
Rotate a CSV file if it reaches or exceeds ``max_lines`` lines (including the header).
The archived file is renamed with a timestamp suffix in the same directory.
A fresh file (with headers) will be created by the caller as needed.

Args:
csv_path (str): The path to the CSV file.
max_lines (int): The maximum number of lines before rotation is triggered.
line_count (int, optional): Pre-computed line count to avoid re-reading the file.
"""
if not os.path.isfile(csv_path):
return
if line_count is None:
with open(csv_path, 'r') as f:
line_count = sum(1 for _ in f)
if line_count >= max_lines:
local_time = datetime.datetime.now().strftime("%d_%m_%y")
base, ext = os.path.splitext(csv_path)
archive_path = f"{base}.old.{local_time}{ext}"
try:
os.rename(csv_path, archive_path)
except FileNotFoundError:
pass # Another process already rotated the file.
Comment thread
calvinp0 marked this conversation as resolved.

def _set_job_number(self):
"""
Used as the entry number in the database, as well as the job name on the server.
Expand All @@ -686,12 +712,16 @@ def _set_job_number(self):
writer.writerow(row)
with open(csv_path, 'r') as f:
reader = csv.reader(f, dialect='excel')
line_count = 0
job_num = 0
for _ in reader:
line_count += 1
job_num += 1
if job_num == 100000:
job_num = 0
self.job_num = job_num
# Rotate after counting to avoid a second full read of the file.
self._rotate_csv_if_needed(csv_path, max_lines=10000, line_count=line_count)
# 2. Set other related attributes job_name and job_server_name.
self.job_server_name = self.job_server_name or 'a' + str(self.job_num)
if self.conformer is not None and self.job_name is None:
Expand Down Expand Up @@ -728,6 +758,7 @@ def write_completed_job_to_csv_file(self):
self.determine_job_status()
local_arc_path_ = local_arc_path if os.path.isdir(local_arc_path) else ARC_PATH
csv_path = os.path.join(local_arc_path_, 'completed_jobs.csv')
self._rotate_csv_if_needed(csv_path)
if os.path.isfile(csv_path):
# check that this is the updated version
with open(csv_path, 'r') as f:
Expand Down
79 changes: 72 additions & 7 deletions arc/job/adapter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
"""

import datetime
import glob
import math
import os
import tempfile
import time
import shutil
import unittest
Expand All @@ -17,7 +19,7 @@

from arc.common import ARC_TESTING_PATH
from arc.imports import settings
from arc.job.adapter import DataPoint, JobEnum, JobTypeEnum, JobExecutionTypeEnum
from arc.job.adapter import DataPoint, JobAdapter, JobEnum, JobTypeEnum, JobExecutionTypeEnum
from arc.job.adapters.gaussian import GaussianAdapter
from arc.level import Level
from arc.species import ARCSpecies
Expand Down Expand Up @@ -194,28 +196,34 @@ def setUpClass(cls):
species=[ARCSpecies(label='spc1', xyz=['O 0 0 1'])],
testing=True,
)
# Copy the PBS time limit fixture into the directory structure the adapter expects.
stl_dir = os.path.join(ARC_TESTING_PATH, 'test_JobAdapter_ServerTimeLimit')
err_dest = os.path.join(stl_dir, 'calcs', 'Species', 'spc1', 'opt_101')
os.makedirs(err_dest, exist_ok=True)
shutil.copy(os.path.join(ARC_TESTING_PATH, 'server', 'pbs', 'timelimit', 'err.txt'),
os.path.join(err_dest, 'err.txt'))
cls.job_5 = GaussianAdapter(execution_type='queue',
job_name='spc1',
job_name='opt_101',
job_type='opt',
job_id='123456',
job_num=101,
job_server_name = 'server3',
job_server_name='server3',
level=Level(method='cbs-qb3'),
project='test',
project_directory=os.path.join(ARC_TESTING_PATH, 'test_JobAdapter_ServerTimeLimit'),
project_directory=stl_dir,
species=[ARCSpecies(label='spc1', xyz=['O 0 0 1'])],
server='server3',
testing=True,
)
cls.job_6 = GaussianAdapter(execution_type='queue',
job_name='spc1',
job_name='opt_101',
job_type='opt',
job_id='123456',
job_num=101,
job_server_name = 'server1',
job_server_name='server1',
level=Level(method='cbs-qb3'),
project='test',
project_directory=os.path.join(ARC_TESTING_PATH, 'test_JobAdapter_ServerTimeLimit'),
project_directory=stl_dir,
species=[ARCSpecies(label='spc1', xyz=['O 0 0 1'])],
testing=True,
queue='short_queue',
Expand Down Expand Up @@ -471,5 +479,62 @@ def tearDownClass(cls):
shutil.rmtree(os.path.join(ARC_TESTING_PATH, 'test_JobAdapter_ServerTimeLimit'), ignore_errors=True)


class TestRotateCSV(unittest.TestCase):
"""
Contains unit tests for the CSV rotation logic.
"""

def _make_csv(self, path, num_lines):
"""Helper to create a CSV file with a header and ``num_lines - 1`` data rows."""
with open(path, 'w') as f:
f.write('col1,col2\n')
for i in range(num_lines - 1):
f.write(f'{i},data\n')

def test_no_rotation_below_threshold(self):
"""Test that no rotation occurs when the file is below the threshold."""
with tempfile.TemporaryDirectory() as tmp:
csv_path = os.path.join(tmp, 'jobs.csv')
self._make_csv(csv_path, 10)
JobAdapter._rotate_csv_if_needed(csv_path, max_lines=50)
self.assertTrue(os.path.isfile(csv_path))
self.assertEqual(glob.glob(os.path.join(tmp, 'jobs.old.*.csv')), [])

def test_rotation_at_threshold(self):
"""Test that the file is rotated when it reaches the threshold."""
with tempfile.TemporaryDirectory() as tmp:
csv_path = os.path.join(tmp, 'jobs.csv')
self._make_csv(csv_path, 50)
JobAdapter._rotate_csv_if_needed(csv_path, max_lines=50)
self.assertFalse(os.path.isfile(csv_path))
archives = glob.glob(os.path.join(tmp, 'jobs.old.*.csv'))
self.assertEqual(len(archives), 1)

Comment on lines +503 to +512
Copy link

Copilot AI Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_rotation_at_threshold writes/reads a 10,000-line file, which can slow the unit test suite. Since max_lines is a parameter, consider using a much smaller threshold (e.g., 20–100) to validate the boundary condition without heavy I/O, and keep a separate assertion that the default value is wired correctly if needed.

Copilot uses AI. Check for mistakes.
def test_no_error_for_missing_file(self):
"""Test that rotation is a no-op when the file does not exist."""
JobAdapter._rotate_csv_if_needed('/tmp/nonexistent_arc_test.csv')

def test_multiple_rotations(self):
"""Test that multiple rotations produce distinct archive files."""
with tempfile.TemporaryDirectory() as tmp:
csv_path = os.path.join(tmp, 'jobs.csv')
# First rotation on "day 1"
self._make_csv(csv_path, 50)
with patch('arc.job.adapter.datetime') as mock_dt:
mock_dt.datetime.now.return_value = datetime.datetime(2026, 1, 15)
mock_dt.timedelta = datetime.timedelta
JobAdapter._rotate_csv_if_needed(csv_path, max_lines=50)
self.assertFalse(os.path.isfile(csv_path))
# Second rotation on "day 2"
self._make_csv(csv_path, 50)
with patch('arc.job.adapter.datetime') as mock_dt:
mock_dt.datetime.now.return_value = datetime.datetime(2026, 2, 20)
mock_dt.timedelta = datetime.timedelta
JobAdapter._rotate_csv_if_needed(csv_path, max_lines=50)
self.assertFalse(os.path.isfile(csv_path))
archives = glob.glob(os.path.join(tmp, 'jobs.old.*.csv'))
self.assertEqual(len(archives), 2)


Comment on lines +537 to +538
Copy link

Copilot AI Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rotation tests don’t cover the same-day collision case implied by the current date-only archive suffix. Adding a test that performs two rotations with the same mocked date/time (and asserting two distinct archives) would prevent regressions once the archive naming is made unique.

Suggested change
def test_same_day_multiple_rotations(self):
"""Test that two rotations on the same day produce distinct archive files."""
with tempfile.TemporaryDirectory() as tmp:
csv_path = os.path.join(tmp, 'jobs.csv')
# Both rotations use the same mocked datetime
fixed_dt = datetime.datetime(2026, 3, 1, 12, 0, 0)
# First rotation on the fixed "day/time"
self._make_csv(csv_path, 50)
with patch('arc.job.adapter.datetime') as mock_dt:
mock_dt.datetime.now.return_value = fixed_dt
mock_dt.timedelta = datetime.timedelta
JobAdapter._rotate_csv_if_needed(csv_path, max_lines=50)
self.assertFalse(os.path.isfile(csv_path))
# Second rotation on the same fixed "day/time"
self._make_csv(csv_path, 50)
with patch('arc.job.adapter.datetime') as mock_dt:
mock_dt.datetime.now.return_value = fixed_dt
mock_dt.timedelta = datetime.timedelta
JobAdapter._rotate_csv_if_needed(csv_path, max_lines=50)
self.assertFalse(os.path.isfile(csv_path))
archives = glob.glob(os.path.join(tmp, 'jobs.old.*.csv'))
# Expect two distinct archives even though the datetime is the same
self.assertEqual(len(archives), 2)
self.assertEqual(len({os.path.basename(p) for p in archives}), 2)

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, highly improbably that this scenario will occur

if __name__ == '__main__':
unittest.main(testRunner=unittest.TextTestRunner(verbosity=2))

This file was deleted.

This file was deleted.

Loading