Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ea267d8
add function to write output files to adf
anu1217 Jan 16, 2026
d863350
fix write_to_adf() and make function to write to sqlite
anu1217 Jan 19, 2026
daa624f
extend new function
anu1217 Jan 19, 2026
5cae818
add flux spectrum shape and t_irr to db
anu1217 Jan 20, 2026
242c707
Apply suggestions from code review
anu1217 Apr 17, 2026
48fd7e4
switch to .read() instead of .readlines()
anu1217 Apr 17, 2026
77c53af
Merge branch 'make_adf' of github.com:anu1217/activationDB into make_adf
anu1217 Apr 17, 2026
cf8dd3a
change variable names
anu1217 Apr 17, 2026
972d346
remove openmc dependency
anu1217 Apr 17, 2026
1099775
store vit j data externally
anu1217 Apr 17, 2026
dc11956
move empty flux file check, pass flux entries instead of flux_str
anu1217 Apr 17, 2026
56d610d
remove write_to_adf()
anu1217 Apr 17, 2026
2acf99e
Merge branch 'svalinn:main' into make_adf
anu1217 Apr 23, 2026
abbaf7c
change script setup
anu1217 Apr 28, 2026
0596773
delete file with bins
anu1217 Apr 28, 2026
c19b444
add docstrings
anu1217 Apr 28, 2026
6dd6dce
remove unused packages
anu1217 Apr 28, 2026
2a99216
remove unused packages
anu1217 Apr 28, 2026
2665308
unindent line
anu1217 Apr 28, 2026
0c61846
modify function names, move integer interval check
anu1217 May 5, 2026
df144c7
move functions around, create test df, add docstrings
anu1217 May 7, 2026
7889b05
change function name
anu1217 May 7, 2026
41595a8
normalize flux
anu1217 May 8, 2026
7b48ffe
separate returned values from test for separation of concerns
anu1217 May 8, 2026
f23a894
remove redundant test
anu1217 May 8, 2026
45323ad
remove returned value from test
anu1217 May 8, 2026
2fd5e55
Delete unused file
anu1217 May 8, 2026
65ee23a
make actual pytests
anu1217 May 19, 2026
191a922
add tests for parse_flux_str and normalize_flux
anu1217 May 19, 2026
53a81ca
add test for flux mapping
anu1217 May 19, 2026
af6dd8f
rename scripts
anu1217 May 19, 2026
6a30a8d
separate opening and closing of connection, don't record index in dat…
anu1217 May 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions tools/adf_to_sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import numpy as np
import sqlite3


def open_flux_file(flux_file):
with open(flux_file, 'r') as flux_data:
flux_str = flux_data.read()
all_flux_entries = np.array(flux_str.split(), dtype=float)
if len(all_flux_entries) == 0:
raise Exception("The chosen flux file is empty.")
return all_flux_entries


def parse_flux_str(all_flux_entries, num_groups):
'''
Uses provided list of flux lines and group structure applied to the run to create an array of flux entries, with:
# rows = # of intervals = total # flux entries / # group structure bins
# columns = # group structure bins
:param: all_flux_entries: (data (numpy array) from ALARA flux file)
:param: num_groups : total number (int) of energy groups from group structure
'''
if len(all_flux_entries) % num_groups != 0:
raise Exception("The number of intervals must be an integer.")
num_intervals = len(all_flux_entries) // num_groups
flux_array = all_flux_entries.reshape(num_intervals, num_groups)
return flux_array

def normalize_flux(flux_array):
'''
Obtain the total flux by summing over the bin widths of the flux array,
then normalize the spectrum by the total flux in each interval.
:param: flux_array: (numpy array of shape # intervals x # energy groups)
'''
total_flux = np.sum(flux_array, axis=1)
#norm_flux_arr = 2D array of shape num_intervals x num_groups
norm_flux_arr = flux_array / total_flux.reshape(len(total_flux), 1)
return norm_flux_arr


def modify_adf_for_db(adf):
'''
Filters the adf for the pre-shutdown state and the number density.
Removes columns that do not add information required for the database.
:param: adf: ALARA DFrame object
'''
adf = adf.filter_rows(filter_dict={
"time": -1,
"variable": adf.VARIABLE_ENUM["Number Density"]
})
#Remove some columns:
adf.drop(columns=[
'time', 'time_unit', 'variable', 'var_unit', 'block', 'block_num'
],
inplace=True)
#Rename some columns:
adf.rename(columns={'value': 'num_dens_(atoms/cm3)'}, inplace=True)

return adf


def map_adf_flux_tirr(adf, norm_flux_arr, t_irr_arr_mod):
'''
Finds the unique block names in the adf and maps the correct flux spectrum
to the block. Assigns a column to store irradiation time.
:param: norm_flux_arr: numpy array of flux spectrum shape (# intervals x # energy groups)
:param: t_irr_arr_mod: numpy array of irradiation times where the total number of entries
is the number of rows in the adf
'''

block_names = adf['block_name'].unique()
flux_map = dict(zip(block_names, norm_flux_arr))

# Normalized flux spectrum shape:
adf['flux_spec_shape'] = adf['block_name'].map(flux_map)
adf['t_irr'] = t_irr_arr_mod
return adf

def write_to_sqlite(adf, sqlite_conn):
'''
Initialize a connection to a SQLite database, and write the adf
to it. Catches any errors produced during this process.
'''
try:
adf.to_sql('number_densities',
sqlite_conn,
if_exists='append',
method="multi",
index=False)
sqlite_conn.commit()
except sqlite3.OperationalError as error:
print(error)
return sqlite_conn.cursor()


def close_sqlite_conn(cursor):
'''
Closes inidividual SQLite cursor objects, and the connection
to the database. To be executed after running write_to_sqlite()
or anytime a SQLite connection has been established.
:param: cursor: SQLite cursor object
'''
try:
cursor.close()
if cursor.connection:
cursor.connection.close()
except sqlite3.OperationalError as error:
print(error)
15 changes: 0 additions & 15 deletions tools/iter_dt_out.yaml

This file was deleted.

80 changes: 0 additions & 80 deletions tools/script_template.py

This file was deleted.

111 changes: 111 additions & 0 deletions tools/test_adf_to_sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import numpy as np
import alara_output_processing as aop
import adf_to_sqlite as ats
import sqlite3
import pytest

@pytest.mark.parametrize( "all_flux_entries, num_groups, exp_flux_arr",
[(np.array([2,4,6,8,10,2,4,6,8,10]), 5, np.array([[2,4,6,8,10], [2,4,6,8,10]]))
])

def test_parse_flux_str(all_flux_entries, num_groups, exp_flux_arr):
obs_flux_arr = ats.parse_flux_str(all_flux_entries, num_groups)
assert obs_flux_arr.all() == exp_flux_arr.all()

@pytest.mark.parametrize( "flux_arr, exp_norm_flux_arr",
[
(np.array([[2,4,6,8,10], [2,4,6,8,10]]), np.array([[num / 30 for num in [2,4,6,8,10]],
[num / 30 for num in [2,4,6,8,10]]
])
)
])

def test_normalize_flux(flux_arr, exp_norm_flux_arr):
obs_norm_flux_arr = ats.normalize_flux(flux_arr)
assert obs_norm_flux_arr.all() == exp_norm_flux_arr.all()

@pytest.mark.parametrize( "adf",
[
(aop.ALARADFrame(data=
{"value": [5.678e-11]*2,
"time": [-1]*2,
"time_unit" : ["s"]*2,
"variable": [0]*2,
"var_unit": ["atoms/cm3"]*2,
"block" : [1,2],
"block_name": ["Be", "W"],
"block_num": [2]*2,
"nuclide": ["h-1"]*2,
"half_life": ["-1"]*2,
"run_lbl": ["test_case", "new_test_case"]}))
])

def test_modify_adf_for_db(adf):
'''
Ensure that the expected columns exist in the adf.
'''
adf = ats.modify_adf_for_db(adf)
assert not any(col in adf for col in [
"value",
"time",
"time_unit",
"variable",
"var_unit",
"block",
"block_num",
])
assert (all(col in adf
for col in ["num_dens_(atoms/cm3)", "half_life", "block_name"]))

@pytest.mark.parametrize( "test_adf, norm_flux_arr, t_irr_arr_mod, exp_mod_adf",
[
(aop.ALARADFrame(data=
{"num_dens_(atoms/cm3)": [5.678e-11]*2,
"block_name": ["Be", "W"],
"nuclide": ["h-1"]*2,
"half_life": ["-1"]*2,
"run_lbl": ["test_case", "new_test_case"]}),
np.array([[num / 25 for num in [1,3,5,7,9]], [num / 30 for num in [2,4,6,8,10]]
]),
np.array([7.5]*2),
aop.ALARADFrame(data=
{"num_dens_(atoms/cm3)": [5.678e-11]*2,
"block_name": ["Be", "W"],
"nuclide": ["h-1"]*2,
"half_life": ["-1"]*2,
"run_lbl": ["test_case", "new_test_case"],
"flux_spec_shape" : [np.array([num / 25 for num in [1,3,5,7,9]]),
np.array([num / 30 for num in [2,4,6,8,10]])],
"t_irr" : [7.5]*2
})
)
])

def test_map_adf_flux_tirr(test_adf, norm_flux_arr, t_irr_arr_mod, exp_mod_adf):
obs_mod_adf = ats.map_adf_flux_tirr(test_adf, norm_flux_arr, t_irr_arr_mod)
assert obs_mod_adf["flux_spec_shape"].equals(exp_mod_adf["flux_spec_shape"]) == True
assert all(obs_mod_adf["t_irr"]) == all(exp_mod_adf["t_irr"])

@pytest.mark.parametrize( "mod_adf",
[
(aop.ALARADFrame(data=
{"num_dens_(atoms/cm3)": [5.678e-11]*2,
"block_name": ["Be", "W"],
"nuclide": ["h-1"]*2,
"half_life": ["-1"]*2,
"run_lbl": ["test_case", "new_test_case"]}))
])

def test_write_to_sqlite(mod_adf):
cursor = ats.write_to_sqlite(mod_adf, sqlite3.connect(":memory:"))
cursor.execute("SELECT * FROM number_densities")
result = cursor.fetchall()
assert len(result) == len(mod_adf["half_life"])
assert len(result[0]) == 5

@pytest.mark.parametrize( "test_cursor",
[sqlite3.connect(":memory:").cursor()])

def test_close_sqlite_conn(test_cursor):
# This function has a built-in test in the form of catching operational errors
ats.close_sqlite_conn(test_cursor)