Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
0e4b31d
renaming camel case looks strange
sebastian9991 Oct 28, 2025
9bab346
update: adding kuzu to dependencies
sebastian9991 Oct 28, 2025
54bfabd
update: preliminary kuzu database script
sebastian9991 Oct 28, 2025
be0eb05
construct id w/ tqdm, args with new paramater
sebastian9991 Oct 28, 2025
49b80c0
initalizing graphdb from strings as unique id
sebastian9991 Oct 28, 2025
633fca3
fix for kuzu syntax
sebastian9991 Oct 28, 2025
8ce252e
fix for kuzu syntax
sebastian9991 Oct 28, 2025
544a3a1
removing ts in edge kuzu init
sebastian9991 Oct 28, 2025
ec30daf
header option
sebastian9991 Oct 28, 2025
b33bb70
cardinality change
sebastian9991 Oct 28, 2025
307e138
checking database existence before init
sebastian9991 Oct 28, 2025
5b5061f
printing node heads
sebastian9991 Oct 28, 2025
d9cbde9
database is not considered a folder
sebastian9991 Oct 28, 2025
605c1bc
featurestore, graphstore logging size
sebastian9991 Oct 28, 2025
d18b321
adding logging for feature and graph stores attributes
sebastian9991 Oct 28, 2025
2cef0f1
adding logging for feature and graph stores attributes
sebastian9991 Oct 28, 2025
be687c8
typo in read me, full path for gnn experiments
sebastian9991 Oct 30, 2025
4dc9a40
writing features, labels and domains to numpy as per kuzu tutorial
sebastian9991 Oct 30, 2025
efd5762
fixing dqr path
sebastian9991 Oct 30, 2025
e8c6370
correcting index, score -> pc1
sebastian9991 Oct 30, 2025
e76c0b6
correcting index, score -> pc1
sebastian9991 Oct 30, 2025
faa4848
correcting kuzu format
sebastian9991 Oct 30, 2025
5412ce9
removing graphdb in absolute path
sebastian9991 Oct 30, 2025
dbc00a8
updating dataframe usage
sebastian9991 Oct 30, 2025
c98950e
printing node heads
sebastian9991 Oct 30, 2025
16f0960
logging for tables
sebastian9991 Oct 30, 2025
c44db04
printing head
sebastian9991 Oct 30, 2025
c2048c4
removing graphdb in path
sebastian9991 Oct 30, 2025
75ff435
fixing graphdb paths
sebastian9991 Oct 30, 2025
c768f4a
fixing graphdb path, checke incorrect path
sebastian9991 Oct 30, 2025
5acc09e
numpy object to U256
sebastian9991 Oct 30, 2025
c4c9343
fixing string object type in numpy
sebastian9991 Oct 30, 2025
45e68c1
removing .npy of domains, using csv
sebastian9991 Oct 30, 2025
21c9b19
execute for feature and label
sebastian9991 Oct 30, 2025
3d936ac
domain, ts loading from csv only
sebastian9991 Oct 30, 2025
254eda7
fixing dataframe execute
sebastian9991 Oct 30, 2025
9925520
fixing space
sebastian9991 Oct 30, 2025
2c1c38d
mapping domains to ids, overhaul changes to previous idea
sebastian9991 Nov 3, 2025
153a5bc
mapping and kuzu construct check for already existing files
sebastian9991 Nov 4, 2025
4a6ec8f
fix: missing parantheses on conn.execute string
sebastian9991 Nov 4, 2025
108b42f
fix: ids.npy -> nid.npy
sebastian9991 Nov 4, 2025
b66ed4e
build domain_id is more robust to missing files
sebastian9991 Nov 4, 2025
fcba921
fix: ts.npy added in COPY
sebastian9991 Nov 4, 2025
aab9fda
view of x features
sebastian9991 Nov 4, 2025
ffdd6bd
printing feature and graph store features
sebastian9991 Nov 4, 2025
9e4d4a0
fix: printing nid
sebastian9991 Nov 4, 2025
2442369
viewing graph and feature store
sebastian9991 Nov 4, 2025
40e47b5
viewing graph and feature store, using get tensor
sebastian9991 Nov 4, 2025
86c304e
viewing graph and feature store, using get tensor, with index
sebastian9991 Nov 4, 2025
34290f8
using tensor attribute
sebastian9991 Nov 4, 2025
ae79f7e
checking type for lazy tensor
sebastian9991 Nov 4, 2025
6ff2aa9
switching print to log
sebastian9991 Nov 4, 2025
b051e52
try, catch for accessing feature store
sebastian9991 Nov 4, 2025
e9dde67
adding final confirmation log
sebastian9991 Nov 4, 2025
2cf30e7
remove: TensorAttribute
sebastian9991 Nov 4, 2025
892f46f
adding faulthandler
sebastian9991 Nov 4, 2025
a017a5b
adding assertions for npy datatypes
sebastian9991 Nov 4, 2025
9fe361c
logging datatypes and dimensions
sebastian9991 Nov 4, 2025
6ff37ab
fix: logging format
sebastian9991 Nov 4, 2025
3cbff6c
printing nid
sebastian9991 Nov 4, 2025
acf4fbe
trying get_as_torch_geomtric function
sebastian9991 Nov 4, 2025
6854ac2
printing feature x
sebastian9991 Nov 4, 2025
ca81a00
setting kuzu=0.0.7
sebastian9991 Nov 4, 2025
4261bff
path is now str as per 0.0.7
sebastian9991 Nov 4, 2025
77bd30a
torch_geometric==2.3.1
sebastian9991 Nov 5, 2025
f36ad43
upgraded torch-geometric
sebastian9991 Nov 5, 2025
31b745a
upgraded torch-geometric
sebastian9991 Nov 5, 2025
38a0310
upgrade Kuzu
sebastian9991 Nov 5, 2025
a659c8b
update dependencies to match tutorial
sebastian9991 Nov 5, 2025
5ee69ce
adding numpy
sebastian9991 Nov 5, 2025
7e875b6
update: attempting neighborLoader as per Kuzu tutorial
sebastian9991 Nov 5, 2025
eee6a3d
fix: test and train count sum to count
sebastian9991 Nov 6, 2025
c566d6b
logging
sebastian9991 Nov 6, 2025
cc30d71
logging count return
sebastian9991 Nov 6, 2025
2817f59
logging sample of train, test mask
sebastian9991 Nov 6, 2025
bedb5e3
using random perm
sebastian9991 Nov 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ For information on installations of these additional libraries see [pyg-lib](htt
To run our baseline static experimentation:

```sh
uv run tgrag/experiments/main.py
uv run tgrag/experiments/gnn_experiments/main.py
```

Alternatively, you can design your own configuration, updating the model parameters:

```sh
uv run tgrag/experiments/main.py --config configs/your_config.yaml
uv run tgrag/experiments/gnn_experiments/main.py --config configs/your_config.yaml
```

To learn more about making a contribution to CrediGraph see our [contribution guide](./.github/CONTRIBUTION.md)
Expand Down
13 changes: 10 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "tgrag"
version = "0.1.0"
description = "Temporal Graph Analysis on Large-Scale Web Data"
readme = "README.md"
requires-python = ">=3.9"
requires-python = ">=3.10"
authors = [
]
dependencies = [
Expand All @@ -19,14 +19,18 @@ dependencies = [
"scipy>=1.13.1",
"sentence-transformers>=5.0.0",
"tldextract>=5.3.0",
"torch>=2.7.1",
"torch-geometric>=2.6.1",
"torch==2.0.1",
"torcheval>=0.0.7",
"ujson>=5.10.0",
"warcio>=1.7.5",
"cloudscraper>=1.2.71",
"BeautifulSoup4>=4.13.0",
"jsonpath-ng>=1.7.0",
"kuzu==0.0.7",
"torch-geometric>=2.6.1",
"torchvision==0.15.2",
"torchaudio==2.0.2",
"numpy>=2.0.2",
]

[tool.flit.module]
Expand Down Expand Up @@ -112,3 +116,6 @@ exclude_lines = [
"register_parameter",
"torch.cuda.is_available",
]

[tool.uv.sources]
torch-geometric = { git = "https://github.com/pyg-team/pytorch_geometric.git" }
332 changes: 332 additions & 0 deletions tgrag/experiments/tgl_experiments/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
import argparse
import faulthandler
import logging
import pickle
from multiprocessing import cpu_count
from pathlib import Path
from typing import Tuple, cast

import kuzu
import numpy as np
import pandas as pd
import torch
from torch_geometric.data import FeatureStore, GraphStore
from torch_geometric.loader import NeighborLoader
from tqdm import tqdm

from tgrag.utils.args import DataArguments, ModelArguments, parse_args
from tgrag.utils.logger import setup_logging
from tgrag.utils.path import get_root_dir, get_scratch
from tgrag.utils.seed import seed_everything

parser = argparse.ArgumentParser(
description='TGL Experiments.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
'--config-file',
type=str,
default='configs/tgl/base.yaml',
help='Path to yaml configuration file to use',
)


def construct_kuzu_format(
db_path: Path,
node_csv: Path,
dqr_csv: Path,
seed: int = 42,
D: int = 128,
chunk_size: int = 1_000_000,
) -> None:
dqr = pd.read_csv(dqr_csv)
rng = np.random.default_rng(seed=seed)

x_list = []
y_list = []
ts_list = []

x_path = db_path / 'x.npy'
y_path = db_path / 'y.npy'
ts_path = db_path / 'ts.npy'

if x_path.exists() and y_path.exists() and ts_path.exists():
y = np.load(y_path, mmap_mode='r')
x = np.load(x_path, mmap_mode='r')
ts = np.load(ts_path, mmap_mode='r')
logging.info(
f'x: {x.shape}, {x.dtype}; y: {y.shape}, {y.dtype}; ts: {ts.shape}, {ts.dtype}'
)

assert (
np.load(x_path).dtype == np.float32
), f'x.npy has wrong dtype: {np.load(x_path).dtype}'
assert (
np.load(y_path).dtype == np.float32
), f'y.npy has wrong dtype: {np.load(y_path).dtype}'
assert (
np.load(ts_path).dtype == np.int64
), f'ts.npy has wrong dtype: {np.load(ts_path).dtype}'
logging.info(f'x.npy, y.npy and ts.npy at {db_path} already exists, returning.')
return

logging.info(f'Processing {node_csv} in chunks of {chunk_size:,} rows...')
for chunk in tqdm(
pd.read_csv(node_csv, chunksize=chunk_size),
desc='Reading vertices',
unit='chunk',
):
chunk = chunk.merge(dqr, on='domain', how='left')
chunk['pc1'].fillna(-1.0, inplace=True)

x_chunk = rng.normal(size=(len(chunk), D)).astype(np.float32)

x_list.append(x_chunk)
y_list.append(chunk['pc1'].astype(np.float32).values)
ts_list.append(chunk['ts'].astype(np.int64).values)

x = np.vstack(x_list)
y = np.concatenate(y_list)
ts = np.concatenate(ts_list)

logging.info(f'Saving arrays to {db_path}...')
np.save(x_path, x)
np.save(y_path, y)
np.save(ts_path, ts)

logging.info(f'Saved x{list(x.shape)}, y[{y.shape[0]}]')


def build_domain_id_mapping(
node_csv: Path, edge_csv: Path, out_dir: Path, chunk_size: int = 1_000_000
) -> None:
out_dir.mkdir(parents=True, exist_ok=True)
nid_map_path = out_dir / 'nid_map.pkl'
nid_array_path = out_dir / 'nid.npy'
edges_out_path = out_dir / 'edges_with_id.csv'

if nid_array_path.exists() and nid_map_path.exists() and edges_out_path.exists():
logging.info(
f'nid.npy, nid_map.pkl and edges with IDs already exists at {out_dir}, returning.'
)
return

logging.info(f'Building domain to id mapping from {node_csv}...')
domain_to_id = {}
next_id = 0
domain_list = []

if not (nid_array_path.exists() and nid_map_path.exists()):
for chunk in tqdm(
pd.read_csv(node_csv, chunksize=chunk_size),
desc='Reading vertices',
unit='chunk',
):
for domain in chunk['domain'].astype(str):
if domain not in domain_to_id:
domain_to_id[domain] = next_id
domain_list.append(domain)
next_id += 1

logging.info(f'Total unique domains: {len(domain_to_id):,}')
np.save(nid_array_path, np.arange(len(domain_list), dtype=np.int64))
with open(nid_map_path, 'wb') as f:
pickle.dump(domain_to_id, f)

if not edges_out_path.exists():
logging.info(f'Rewriting {edge_csv} to {edges_out_path} with ID mapping...')
with open(edges_out_path, 'w') as fout:
fout.write('src_id,dst_id,ts\n')

for chunk in tqdm(
pd.read_csv(edge_csv, chunksize=chunk_size),
desc='Rewriting edges',
unit='chunk',
):
chunk['src_id'] = chunk['src'].map(domain_to_id)
chunk['dst_id'] = chunk['dst'].map(domain_to_id)

chunk[['src_id', 'dst_id', 'ts']].astype(
{'src_id': 'int64', 'dst_id': 'int64'}
).to_csv(fout, header=False, index=False)

logging.info(
f'Finished. Saved nid_map.pkl, nid.npy, and edges_with_id.csv to {out_dir}'
)


def initialize_graph_db(
db_path: Path, buffer: int = 40
) -> Tuple[kuzu.Database, kuzu.Connection]:
logging.info('Connecting graph storage backend')
graph_db_path = db_path / 'graphdb'
edges_csv = db_path / 'edges_with_id.csv'

if graph_db_path.exists():
logging.info(f'Existing database found at {db_path}, skipping initalization.')
db = kuzu.Database(str(graph_db_path), buffer_pool_size=buffer * 1024**3)
conn = kuzu.Connection(db, num_threads=cpu_count())
return db, conn

db = kuzu.Database(str(graph_db_path), buffer_pool_size=buffer * 1024**3)
conn = kuzu.Connection(db, num_threads=cpu_count())

conn.execute(
'CREATE NODE TABLE domain(nid INT64, x FLOAT[128], ts INT64, y FLOAT, PRIMARY KEY(nid));'
)
conn.execute('CREATE REL TABLE link(FROM domain TO domain, ts INT64, MANY_MANY);')
conn.execute(
f'COPY domain FROM ("{db_path / "nid.npy"}", "{db_path / "x.npy"}", "{db_path / "ts.npy"}", "{db_path / "y.npy"}") BY COLUMN;'
)
conn.execute(f'COPY link FROM "{edges_csv}" (HEADER=true);')
logging.info('Graph database initialized')
return db, conn


def construct_dataset(conn: kuzu.connection) -> Tuple[torch.Tensor, torch.Tensor]:
count_result = conn.execute(
"""
MATCH (d:domain) RETURN count(*);
"""
)
count = count_result.get_next()[0]
logging.info(f'Match (d:domain) RETURN count(*) -> {count}')

train_count = int(0.6 * count)
count - train_count

indices = torch.randperm(count)
train_ids = indices[:train_count]
test_ids = indices[train_count:]

train_mask = torch.zeros(count, dtype=torch.bool)
test_mask = torch.zeros(count, dtype=torch.bool)

train_mask[train_ids] = True
test_mask[test_ids] = True
return train_mask, test_mask


def run_scalable_gnn(
data_arguments: DataArguments,
model_arguments: ModelArguments,
train_mask: torch.Tensor,
test_mask: torch.Tensor,
feature_store: FeatureStore,
graph_store: GraphStore,
) -> None:
loader = NeighborLoader(
data=(feature_store, graph_store),
input_nodes=('domain', train_mask),
num_neighbors={('domain', 'link', 'domain'): model_arguments.num_neighbors},
batch_size=model_arguments.batch_size,
shuffle=True,
num_workers=4,
pin_memory=True,
persistent_workers=True,
)
logging.info('Train loader created')

next_data = next(iter(loader))
logging.info(f'{next_data}')

# logging.info('*** Training ***')
# for run in tqdm(range(model_arguments.runs), desc='Runs'):
# model = Model(
# model_name=model_arguments.model,
# normalization=model_arguments.normalization,
# in_channels=data.x.shape[1],
# hidden_channels=model_arguments.hidden_channels,
# out_channels=model_arguments.embedding_dimension,
# num_layers=model_arguments.num_layers,
# dropout=model_arguments.dropout,
# ).to(device)
# optimizer = torch.optim.AdamW(
# model.parameters(), lr=model_arguments.lr, weight_decay=5e-4
# )
# for _ in tqdm(range(1, 1 + model_arguments.epochs), desc='Epochs'):
# pass
#


def main() -> None:
faulthandler.enable()
root = get_root_dir()
scratch = get_scratch()
args = parser.parse_args()
config_file_path = root / args.config_file
meta_args, experiment_args = parse_args(config_file_path)
setup_logging(meta_args.log_file_path)
seed_everything(meta_args.global_seed)

db_path = scratch / cast(str, meta_args.database_folder)
node_path = scratch / cast(str, meta_args.node_file)
edge_path = scratch / cast(str, meta_args.edge_file)
dqr_path = root / 'data' / 'dqr' / 'domain_pc1.csv'

build_domain_id_mapping(node_csv=node_path, edge_csv=edge_path, out_dir=db_path)
construct_kuzu_format(db_path=db_path, node_csv=node_path, dqr_csv=dqr_path)

db, conn = initialize_graph_db(db_path=db_path)

# try:
# df = conn.execute('MATCH (n:domain) RETURN n LIMIT 5').get_as_df()
# print(df.head())
# except RuntimeError as e:
# print('No domain table found:', e)
#
# node_df = conn.execute(
# """
# MATCH (n:domain)
# RETURN n.nid AS domain, n.ts AS ts, n.x as RNI
# LIMIT 5
# """
# ).get_as_df()
#
# print('=== Example node records ===')
# print(node_df.head(), '\n')
#
# edge_df = conn.execute(
# """
# MATCH (a:domain)-[r:link]->(b:domain)
# RETURN a.nid AS src, b.nid AS dst, r.ts AS ts
# LIMIT 5
# """
# ).get_as_df()

# print('=== Example edge records ===')
# print(edge_df.head())

feature_store, graph_store = db.get_torch_geometric_remote_backend()

logging.info(f'Feature store keys: {feature_store.get_all_tensor_attrs()}')
logging.info(f'Graph store keys: {graph_store.get_all_edge_attrs()}')

logging.info('View of feature and graph store:')
try:
y_subset = feature_store['domain', 'x', 0]
logging.info(type(y_subset))
except Exception as e:
logging.exception(f'Error accessing feature store {e}')

train_mask, test_mask = construct_dataset(conn=conn)
logging.info(f'Train_mask size: {train_mask.size()}')
logging.info(f'Sample of train mask: {train_mask[0:5]}')
logging.info(f'Test_mask size: {test_mask.size()}')
logging.info(f'Sample of test mask: {test_mask[0:5]}')
for experiment, experiment_arg in experiment_args.exp_args.items():
logging.info(f'\n**Running**: {experiment}')
run_scalable_gnn(
data_arguments=experiment_arg.data_args,
model_arguments=experiment_arg.model_args,
train_mask=train_mask,
test_mask=test_mask,
feature_store=feature_store,
graph_store=graph_store,
)

logging.info('Completed.')


if __name__ == '__main__':
main()
Loading