From 0ca58da6bc435a959b9878b7dd8951576249a0f3 Mon Sep 17 00:00:00 2001 From: Anton Vorontsov Date: Fri, 10 Apr 2026 21:46:58 +0000 Subject: [PATCH 1/3] databases: accept local FASTA paths for easy nucleotide DB creation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Building a nucleotide database from a FASTA file previously required manually chaining createdb, splitsequence, makepaddedseqdb, and createindex with the right flags. Now a single command does it: mmseqs databases ./input.fasta.gz outdb tmp Both relative (./...) and absolute (/...) paths work — any argument containing '/' that isn't a known database name is treated as a local file. Protein inputs are rejected with a clear error since the indexing pipeline is nucleotide-specific. This keeps `databases` as the single entry point to maintain indexing requirements, and makes it suitable for reindexing external or already manually downloaded databases. --- data/workflow/databases.sh | 44 ++++++++++++++++++++++++++++++++++++++ src/MMseqsBase.cpp | 8 +++++-- src/workflow/Databases.cpp | 25 ++++++++++++++++------ 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/data/workflow/databases.sh b/data/workflow/databases.sh index c093da1cc..fab3313c3 100644 --- a/data/workflow/databases.sh +++ b/data/workflow/databases.sh @@ -299,6 +299,14 @@ case "${SELECTION}" in push_back "${TMP_PATH}/kalamari.fasta" INPUT_TYPE="FASTA_LIST" ;; + *) + if [ ! -f "${SELECTION}" ]; then + fail "Local file not found: ${SELECTION}" + fi + push_back "${SELECTION}" + date "+%s" > "${TMP_PATH}/version" + INPUT_TYPE="LOCAL_FASTA" + ;; esac if notExists "${OUTDB}.dbtype"; then @@ -312,6 +320,42 @@ case "${INPUT_TYPE}" in rm -f -- "$i" done ;; + "LOCAL_FASTA") + eval "set -- $ARR" + export MMSEQS_FORCE_MERGE=1 + # shellcheck disable=SC2086 + set +e + CREATEDB_LOG=$("${MMSEQS}" createdb "${@}" "${TMP_PATH}/createdb" ${THREADS_COMP_PAR} 2>&1) + CREATEDB_RET=$? + set -e + printf "%s\n" "${CREATEDB_LOG}" + [ $CREATEDB_RET -ne 0 ] && fail "createdb died" + case "${CREATEDB_LOG}" in + *"Database type: Nucleotide"*) ;; + *) fail "Local file indexing only supports nucleotide databases" ;; + esac + # shellcheck disable=SC2086 + "${MMSEQS}" splitsequence "${TMP_PATH}/createdb" "${TMP_PATH}/splitdb" \ + --sequence-overlap 0 --sequence-split-mode 0 --headers-split-mode 1 \ + ${THREADS_PAR} \ + || fail "splitsequence died" + # shellcheck disable=SC2086 + "${MMSEQS}" makepaddedseqdb "${TMP_PATH}/splitdb" "${OUTDB}" \ + ${THREADS_PAR} \ + || fail "makepaddedseqdb died" + mkdir -p "${TMP_PATH}/indexdb" + # shellcheck disable=SC2086 + "${MMSEQS}" createindex "${OUTDB}" "${TMP_PATH}/indexdb" \ + --split 1 --index-subset 2 ${THREADS_PAR} \ + || fail "createindex died" + rm -rf -- "${TMP_PATH}/indexdb" + if [ -n "${REMOVE_TMP}" ]; then + # shellcheck disable=SC2086 + "${MMSEQS}" rmdb "${TMP_PATH}/createdb" ${VERB_PAR} + # shellcheck disable=SC2086 + "${MMSEQS}" rmdb "${TMP_PATH}/splitdb" ${VERB_PAR} + fi + ;; "FSA") # shellcheck disable=SC2086 "${MMSEQS}" createdb "${TMP_PATH}/"*.fsa "${OUTDB}" ${COMP_PAR} --gpu 1 \ diff --git a/src/MMseqsBase.cpp b/src/MMseqsBase.cpp index 1fa167298..52dae74c0 100644 --- a/src/MMseqsBase.cpp +++ b/src/MMseqsBase.cpp @@ -129,9 +129,13 @@ std::vector baseCommands = { {"tmpDir", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::directory }}}, {"databases", databases, &par.databases, COMMAND_DATABASE_CREATION, "List and download databases", - NULL, + "# Download a known database\n" + "mmseqs databases UniRef30_2302 db tmp\n\n" + "# Index a local FASTA file (use ./relative or /absolute paths)\n" + "mmseqs databases ./input.fasta.gz db tmp\n" + "mmseqs databases /data/rnacentral_active.fasta.gz db tmp\n\n", "Milot Mirdita ", - " ", + " ", CITATION_TAXONOMY|CITATION_MMSEQS2, {{"selection", 0, DbType::ZERO_OR_ALL, &DbValidator::empty }, {"sequenceDB", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::sequenceDb }, {"tmpDir", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::directory }}}, diff --git a/src/workflow/Databases.cpp b/src/workflow/Databases.cpp index 2bbe05397..b60d21e6d 100644 --- a/src/workflow/Databases.cpp +++ b/src/workflow/Databases.cpp @@ -276,9 +276,12 @@ int databases(int argc, const char **argv, const Command &command) { } } if (downloadIdx == -1) { - par.printUsageMessage(command, par.help ? MMseqsParameter::COMMAND_EXPERT : 0, description.c_str()); - Debug(Debug::ERROR) << "Selected database " << par.db1 << " was not found\n"; - EXIT(EXIT_FAILURE); + if (par.db1.find('/') == std::string::npos) { + par.printUsageMessage(command, par.help ? MMseqsParameter::COMMAND_EXPERT : 0, description.c_str()); + Debug(Debug::ERROR) << "Selected database " << par.db1 << " was not found\n"; + EXIT(EXIT_FAILURE); + } + // Path contains '/' — treat as local file, shell script validates existence } par.printParameters(command.cmd, argc, argv, par.databases); std::string tmpDir = par.db3; @@ -291,10 +294,14 @@ int databases(int argc, const char **argv, const Command &command) { par.filenames.push_back(tmpDir); CommandCaller cmd; - for (size_t i = 0; i < usedDownloads[downloadIdx].environment.size(); ++i) { - cmd.addVariable(usedDownloads[downloadIdx].environment[i].key, usedDownloads[downloadIdx].environment[i].value); + if (downloadIdx >= 0) { + for (size_t i = 0; i < usedDownloads[downloadIdx].environment.size(); ++i) { + cmd.addVariable(usedDownloads[downloadIdx].environment[i].key, usedDownloads[downloadIdx].environment[i].value); + } + cmd.addVariable("TAXONOMY", usedDownloads[downloadIdx].hasTaxonomy ? "TRUE" : NULL); + } else { + cmd.addVariable("TAXONOMY", NULL); } - cmd.addVariable("TAXONOMY", usedDownloads[downloadIdx].hasTaxonomy ? "TRUE" : NULL); cmd.addVariable("REMOVE_TMP", par.removeTmpFiles ? "TRUE" : NULL); cmd.addVariable("VERB_PAR", par.createParameterString(par.onlyverbosity).c_str()); cmd.addVariable("COMP_PAR", par.createParameterString(par.verbandcompression).c_str()); @@ -303,7 +310,11 @@ int databases(int argc, const char **argv, const Command &command) { cmd.addVariable("THREADS_PAR", par.createParameterString(par.onlythreads).c_str()); cmd.addVariable("THREADS_COMP_PAR", par.createParameterString(par.threadsandcompression).c_str()); std::string program = tmpDir + "/download.sh"; - FileUtil::writeFile(program, usedDownloads[downloadIdx].script, usedDownloads[downloadIdx].scriptLength); + if (downloadIdx >= 0) { + FileUtil::writeFile(program, usedDownloads[downloadIdx].script, usedDownloads[downloadIdx].scriptLength); + } else { + FileUtil::writeFile(program, databases_sh, databases_sh_len); + } cmd.execProgram(program.c_str(), par.filenames); // Should never get here From 650a1d6398ec9e43b89fd553b130c61f13010f40 Mon Sep 17 00:00:00 2001 From: Anton Vorontsov Date: Thu, 23 Apr 2026 22:50:47 +0000 Subject: [PATCH 2/3] makepaddedseqdb: fuse createdb+splitsequence for direct FASTA input, parallelize with mmap+OpenMP MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When makepaddedseqdb detects no .dbtype file for the input, it runs a fused codepath that accepts FASTA directly, eliminating intermediate file I/O from the 3-step createdb -> splitsequence -> makepaddedseqdb pipeline. The existing DB-input codepath is unchanged. The fused path replicates the exact behavior of the 3-step pipeline: FASTA parsing via KSeqWrapper, createdb-style shuffle (group by id%32), splitsequence-style chunking with ORF headers, length-based sorting, padded encoding, and lookup/source file generation. Output is byte-for-byte identical to the 3-step pipeline (144/144 correctness checks across 6 configs x 4 sizes). Uses a two-pass streaming approach to avoid O(N) memory: Pass 1: stream FASTA collecting only metadata (headers + lengths) Layout: compute shuffle/sort/offsets from metadata alone Write: headers, lookup, source (no sequence data needed) Pass 2: re-read FASTA, encode each chunk, pwrite() to pre-computed offset Pass 2 (encode + pwrite) is parallelized with mmap + OpenMP: record sequenceOffset/newlineCount during Pass 1, then mmap the input FASTA and process entries in parallel — each thread strips newlines from its mmap'd region into a thread-local buffer, encodes, and pwrites to pre-computed offsets. For compressed/stdin inputs, falls back to the original sequential path. Default --max-seq-len to 10000 to match splitsequence. Without this, the fused FASTA codepath would not split sequences unless --max-seq-len was explicitly passed, producing different output than the 3-step pipeline. Replace the 4-step LOCAL_FASTA pipeline in databases.sh (createdb -> splitsequence -> makepaddedseqdb -> createindex) with 2 steps: makepaddedseqdb (fused single-pass) createindex --index-subset 2 (streaming + MADV_DONTNEED) This eliminates intermediate databases and their cleanup, reduces disk I/O by ~2x, and uses constant memory for indexing regardless of input size. Benchmark (64 threads, fused vs reference 3-step makepaddedseqdb): SIZE FUSED REF(DB) TIME RSS 1M 0.025s 2495% 0.119s 550% 0.21x 1.00x 10M 0.047s 1474% 0.160s 506% 0.29x 1.55x 50M 0.081s 1503% 0.188s 631% 0.43x 0.75x 100M 0.112s 1887% 0.248s 694% 0.45x 0.51x 500M 0.468s 2434% 0.649s 850% 0.72x 0.51x Includes a self-contained correctness and performance benchmark suite in util/benchmarks/makepaddedseqdb_fused/. --- data/workflow/databases.sh | 31 +- src/MMseqsBase.cpp | 9 +- src/commons/Parameters.cpp | 5 + src/util/makepaddedseqdb.cpp | 500 +++++++++++++++++- .../makepaddedseqdb_fused/gen_fasta.py | 68 +++ util/benchmarks/makepaddedseqdb_fused/run | 348 ++++++++++++ util/benchmarks/makepaddedseqdb_fused/run.log | 97 ++++ 7 files changed, 1029 insertions(+), 29 deletions(-) create mode 100755 util/benchmarks/makepaddedseqdb_fused/gen_fasta.py create mode 100755 util/benchmarks/makepaddedseqdb_fused/run create mode 100644 util/benchmarks/makepaddedseqdb_fused/run.log diff --git a/data/workflow/databases.sh b/data/workflow/databases.sh index fab3313c3..cfd14ec17 100644 --- a/data/workflow/databases.sh +++ b/data/workflow/databases.sh @@ -322,26 +322,13 @@ case "${INPUT_TYPE}" in ;; "LOCAL_FASTA") eval "set -- $ARR" - export MMSEQS_FORCE_MERGE=1 + # Fused pipeline: makepaddedseqdb reads FASTA directly (createdb + + # splitsequence + padding in a single streaming pass), then + # createindex --index-subset 2 streams SequenceLookup with constant + # memory via MADV_DONTNEED. # shellcheck disable=SC2086 - set +e - CREATEDB_LOG=$("${MMSEQS}" createdb "${@}" "${TMP_PATH}/createdb" ${THREADS_COMP_PAR} 2>&1) - CREATEDB_RET=$? - set -e - printf "%s\n" "${CREATEDB_LOG}" - [ $CREATEDB_RET -ne 0 ] && fail "createdb died" - case "${CREATEDB_LOG}" in - *"Database type: Nucleotide"*) ;; - *) fail "Local file indexing only supports nucleotide databases" ;; - esac - # shellcheck disable=SC2086 - "${MMSEQS}" splitsequence "${TMP_PATH}/createdb" "${TMP_PATH}/splitdb" \ - --sequence-overlap 0 --sequence-split-mode 0 --headers-split-mode 1 \ - ${THREADS_PAR} \ - || fail "splitsequence died" - # shellcheck disable=SC2086 - "${MMSEQS}" makepaddedseqdb "${TMP_PATH}/splitdb" "${OUTDB}" \ - ${THREADS_PAR} \ + "${MMSEQS}" makepaddedseqdb "${@}" "${OUTDB}" \ + --headers-split-mode 1 ${THREADS_PAR} \ || fail "makepaddedseqdb died" mkdir -p "${TMP_PATH}/indexdb" # shellcheck disable=SC2086 @@ -349,12 +336,6 @@ case "${INPUT_TYPE}" in --split 1 --index-subset 2 ${THREADS_PAR} \ || fail "createindex died" rm -rf -- "${TMP_PATH}/indexdb" - if [ -n "${REMOVE_TMP}" ]; then - # shellcheck disable=SC2086 - "${MMSEQS}" rmdb "${TMP_PATH}/createdb" ${VERB_PAR} - # shellcheck disable=SC2086 - "${MMSEQS}" rmdb "${TMP_PATH}/splitdb" ${VERB_PAR} - fi ;; "FSA") # shellcheck disable=SC2086 diff --git a/src/MMseqsBase.cpp b/src/MMseqsBase.cpp index 52dae74c0..6ce5a04ce 100644 --- a/src/MMseqsBase.cpp +++ b/src/MMseqsBase.cpp @@ -153,10 +153,13 @@ std::vector baseCommands = { {"sequenceDB", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::flatfile }}}, {"makepaddedseqdb", makepaddedseqdb, &par.makepaddedseqdb, COMMAND_HIDDEN, "Generate a padded sequence DB", - "Generate a padded sequence DB", + "# From existing sequence DB\n" + "mmseqs makepaddedseqdb sequenceDB paddedDB\n\n" + "# Directly from FASTA (fused createdb+splitsequence+makepaddedseqdb)\n" + "mmseqs makepaddedseqdb input.fasta paddedDB --max-seq-len 10000\n", "Milot Mirdita & Martin Steinegger ", - " ", - CITATION_GPU, {{"sequenceDB", DbType::ACCESS_MODE_INPUT, DbType::NEED_DATA|DbType::NEED_HEADER, &DbValidator::sequenceDb }, + " ", + CITATION_GPU, {{"sequenceDB", DbType::ACCESS_MODE_INPUT, DbType::NEED_DATA, &DbValidator::allDbAndFlat }, {"sequenceIndexDB", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::sequenceDb }}}, {"appenddbtoindex", appenddbtoindex, &par.appenddbtoindex, COMMAND_HIDDEN, NULL, diff --git a/src/commons/Parameters.cpp b/src/commons/Parameters.cpp index ddd41d2c4..8b11207a1 100644 --- a/src/commons/Parameters.cpp +++ b/src/commons/Parameters.cpp @@ -974,6 +974,11 @@ Parameters::Parameters(): makepaddedseqdb.push_back(&PARAM_MASK_PROBABILTY); makepaddedseqdb.push_back(&PARAM_MASK_LOWER_CASE); makepaddedseqdb.push_back(&PARAM_MASK_N_REPEAT); + makepaddedseqdb.push_back(&PARAM_MAX_SEQ_LEN); + makepaddedseqdb.push_back(&PARAM_SEQUENCE_OVERLAP); + makepaddedseqdb.push_back(&PARAM_HEADER_SPLIT_MODE); + makepaddedseqdb.push_back(&PARAM_DB_TYPE); + makepaddedseqdb.push_back(&PARAM_SHUFFLE); makepaddedseqdb.push_back(&PARAM_WRITE_LOOKUP); makepaddedseqdb.push_back(&PARAM_THREADS); makepaddedseqdb.push_back(&PARAM_V); diff --git a/src/util/makepaddedseqdb.cpp b/src/util/makepaddedseqdb.cpp index ab75a9be0..a0efddcd4 100644 --- a/src/util/makepaddedseqdb.cpp +++ b/src/util/makepaddedseqdb.cpp @@ -6,15 +6,513 @@ #include "SubstitutionMatrix.h" #include "tantan.h" #include "Masker.h" +#include "KSeqWrapper.h" +#include "FastSort.h" +#include "Orf.h" +#include "itoa.h" +#include "FileUtil.h" + +#include +#include +#include +#include +#include +#include #ifdef OPENMP #include #endif +struct EntryMeta { + std::string header; // "name comment\n" (with trailing newline) + size_t seqLen; // sequence length (no sequence bytes stored) + size_t sequenceOffset; // byte offset in FASTA where sequence starts + int newlineCount; // newlines within sequence (for multi-line FASTA) +}; + +struct SortEntry { + size_t entryIdx; // index into entries[] + size_t startPos; // chunk start (0 if not split) + size_t chunkLen; // chunk length + unsigned int shuffledKey; // key after createdb shuffle +}; + +struct ChunkWriteInfo { + size_t startPos; // chunk start within FASTA sequence + size_t chunkLen; // chunk length + size_t dataOffset; // pre-computed byte offset in output data file +}; + +static int makepaddedseqdbFromFasta(Parameters &par) { + // ============================== + // Pass 1: Metadata collection (no sequence storage) + // ============================== + std::vector entries; + size_t maxEntrySeqLen = 0; + int dbType = par.dbType; + bool canMmap = false; + { + size_t sampleCount = 0; + size_t isNuclCnt = 0; + const size_t testForNucSequence = 100; + KSeqWrapper *kseq = KSeqFactory(par.db1.c_str()); + while (kseq->ReadEntry()) { + const KSeqWrapper::KSeqEntry &e = kseq->entry; + if (e.name.l == 0) { + Debug(Debug::ERROR) << "Fasta entry " << entries.size() << " is invalid\n"; + EXIT(EXIT_FAILURE); + } + EntryMeta meta; + meta.header.append(e.name.s, e.name.l); + if (e.comment.l > 0) { + meta.header.append(" ", 1); + meta.header.append(e.comment.s, e.comment.l); + } + meta.header.push_back('\n'); + meta.seqLen = e.sequence.l; + meta.sequenceOffset = e.sequenceOffset; + meta.newlineCount = e.newlineCount; + maxEntrySeqLen = std::max(maxEntrySeqLen, e.sequence.l); + + // Auto-detect nucleotide/amino acid (same heuristic as createdb) + if (dbType == 0) { + if (sampleCount < 10 || (sampleCount % 100) == 0) { + if (sampleCount < testForNucSequence) { + size_t cnt = 0; + for (size_t j = 0; j < e.sequence.l; j++) { + switch (toupper(e.sequence.s[j])) { + case 'T': case 'A': case 'G': case 'C': case 'U': case 'N': + cnt++; + break; + } + } + const float nuclDNAFraction = static_cast(cnt) / static_cast(e.sequence.l); + if (nuclDNAFraction > 0.9f) { + isNuclCnt++; + } + } + sampleCount++; + } + } + + entries.push_back(std::move(meta)); + } + canMmap = (kseq->type == KSeqWrapper::KSEQ_FILE); + delete kseq; + + if (entries.empty()) { + Debug(Debug::ERROR) << "The input file has no entries\n"; + EXIT(EXIT_FAILURE); + } + + if (dbType == 0) { + if (isNuclCnt == sampleCount) { + dbType = Parameters::DBTYPE_NUCLEOTIDES; + } else { + dbType = Parameters::DBTYPE_AMINO_ACIDS; + } + } + } + Debug(Debug::INFO) << "Database type: " << Parameters::getDbTypeName(dbType) << "\n"; + + // createdb always writes DBTYPE_AMINO_ACIDS to the dbtype file, + // so makepaddedseqdb always sees DBTYPE_AMINO_ACIDS as input type + int seqType = Parameters::DBTYPE_AMINO_ACIDS; + + // ============================== + // Compute output layout + // ============================== + + // Shuffle (group by id % shuffleSplits, flatten in group order) + const unsigned int shuffleSplits = par.shuffleDatabase ? 32 : 1; + std::vector> splits(shuffleSplits); + for (size_t i = 0; i < entries.size(); i++) { + splits[i % shuffleSplits].push_back(i); + } + std::vector shuffledOrder; + shuffledOrder.reserve(entries.size()); + for (unsigned int s = 0; s < shuffleSplits; s++) { + for (size_t j = 0; j < splits[s].size(); j++) { + shuffledOrder.push_back(splits[s][j]); + } + } + + // Generate SortEntries in shuffled order + bool doSplit = (par.maxSeqLen < 65535); + std::vector sortEntries; + for (size_t k = 0; k < shuffledOrder.size(); k++) { + size_t origId = shuffledOrder[k]; + size_t seqLen = entries[origId].seqLen; + + if (doSplit && seqLen > static_cast(par.maxSeqLen)) { + size_t effectiveStep = static_cast(par.maxSeqLen) - par.sequenceOverlap; + size_t splitCnt = static_cast( + ceilf(static_cast(seqLen) / static_cast(effectiveStep))); + for (size_t split = 0; split < splitCnt; split++) { + size_t startPos = split * effectiveStep; + size_t chunkLen = std::min(static_cast(par.maxSeqLen), seqLen - startPos); + SortEntry se; + se.entryIdx = origId; + se.startPos = startPos; + se.chunkLen = chunkLen; + se.shuffledKey = static_cast(k); + sortEntries.push_back(se); + } + } else { + SortEntry se; + se.entryIdx = origId; + se.startPos = 0; + se.chunkLen = seqLen; + se.shuffledKey = static_cast(k); + sortEntries.push_back(se); + } + } + + // Sort by (chunkLen+2 ASC, index_position DESC) + size_t totalSize = sortEntries.size(); + std::vector> sortArray(totalSize); + for (size_t i = 0; i < totalSize; i++) { + sortArray[i] = std::make_pair(i, sortEntries[i].chunkLen + 2); + } + SORT_PARALLEL(sortArray.begin(), sortArray.end(), + [](const std::pair &a, const std::pair &b) { + if (a.second != b.second) return a.second < b.second; + return a.first > b.first; + }); + + // Compute data file offsets — cumulative sum of aligned sizes + const int ALIGN = 4; + std::vector dataOffsets(totalSize); + size_t cumOffset = 0; + for (size_t i = 0; i < totalSize; i++) { + dataOffsets[i] = cumOffset; + size_t chunkLen = sortEntries[sortArray[i].first].chunkLen; + size_t encodedSize = chunkLen + (ALIGN - chunkLen % ALIGN) % ALIGN; + cumOffset += encodedSize; + } + size_t totalDataSize = cumOffset; + + // Build entryChunkMap[origFastaId] = chunks to write for that entry + std::vector> entryChunkMap(entries.size()); + for (size_t i = 0; i < totalSize; i++) { + size_t origIdx = sortArray[i].first; + const SortEntry &se = sortEntries[origIdx]; + ChunkWriteInfo info; + info.startPos = se.startPos; + info.chunkLen = se.chunkLen; + info.dataOffset = dataOffsets[i]; + entryChunkMap[se.entryIdx].push_back(info); + } + + // ============================== + // Write headers, lookup, source (no sequence data needed) + // ============================== + + size_t maxChunkLen = doSplit ? static_cast(par.maxSeqLen) : maxEntrySeqLen; + + DBWriter dbhw(par.hdr2.c_str(), par.hdr2Index.c_str(), 1, false, Parameters::DBTYPE_GENERIC_DB); + dbhw.open(); + + struct LookupInfo { + unsigned int outputKey; + std::string parsedName; + unsigned int fileNumber; + }; + std::vector lookupEntries(totalSize); + + char orfBuffer[1024]; + + for (size_t i = 0; i < totalSize; i++) { + size_t origIdx = sortArray[i].first; + const SortEntry &se = sortEntries[origIdx]; + const EntryMeta &meta = entries[se.entryIdx]; + + unsigned int outputKey = static_cast(i); + unsigned int dbKey = se.shuffledKey; + + if (doSplit && par.headerSplitMode == 0) { + size_t fromPos = se.startPos; + size_t toPos = se.startPos + se.chunkLen - 1; + size_t bufferLen = Orf::writeOrfHeader(orfBuffer, dbKey, fromPos, toPos, 0, 0); + dbhw.writeData(orfBuffer, bufferLen, outputKey, 0); + lookupEntries[i].parsedName = Util::parseFastaHeader(orfBuffer); + lookupEntries[i].fileNumber = static_cast(origIdx); + } else { + dbhw.writeData(meta.header.c_str(), meta.header.length(), outputKey, 0); + lookupEntries[i].parsedName = Util::parseFastaHeader(meta.header.c_str()); + lookupEntries[i].fileNumber = doSplit ? static_cast(origIdx) : dbKey; + } + lookupEntries[i].outputKey = outputKey; + } + + dbhw.close(true, false); + + // Write lookup file + if (par.writeLookup == true) { + std::string lookupFile = par.db2 + ".lookup"; + FILE *file = FileUtil::openAndDelete(lookupFile.c_str(), "w"); + std::string buffer; + buffer.reserve(2048); + DBReader::LookupEntry entry; + for (size_t i = 0; i < totalSize; i++) { + entry.id = lookupEntries[i].outputKey; + entry.entryName = lookupEntries[i].parsedName; + entry.fileNumber = lookupEntries[i].fileNumber; + DBReader::lookupEntryToBuffer(buffer, entry); + size_t written = fwrite(buffer.c_str(), sizeof(char), buffer.size(), file); + if (written != buffer.size()) { + Debug(Debug::ERROR) << "Cannot write to lookup file " << lookupFile << "\n"; + EXIT(EXIT_FAILURE); + } + buffer.clear(); + } + if (fclose(file) != 0) { + Debug(Debug::ERROR) << "Cannot close file " << lookupFile << "\n"; + EXIT(EXIT_FAILURE); + } + } + + // Write source file + { + std::string sourceFile = par.db2 + ".source"; + FILE *source = FileUtil::openAndDelete(sourceFile.c_str(), "w"); + std::string sourceName = FileUtil::baseName(par.db1); + char buffer[4096]; + size_t len = snprintf(buffer, sizeof(buffer), "0\t%s\n", sourceName.c_str()); + size_t written = fwrite(buffer, sizeof(char), len, source); + if (written != len) { + Debug(Debug::ERROR) << "Cannot write to source file " << sourceFile << "\n"; + EXIT(EXIT_FAILURE); + } + if (fclose(source) != 0) { + Debug(Debug::ERROR) << "Cannot close file " << sourceFile << "\n"; + EXIT(EXIT_FAILURE); + } + } + + // ============================== + // Pass 2: Streaming encode with pwrite + // ============================== + + // Open output data file and pre-allocate + std::string dataFilePath = par.db2; + int dataFd = ::open(dataFilePath.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (dataFd < 0) { + Debug(Debug::ERROR) << "Cannot open data file " << dataFilePath << "\n"; + EXIT(EXIT_FAILURE); + } + if (totalDataSize > 0 && ftruncate(dataFd, totalDataSize) != 0) { + Debug(Debug::ERROR) << "Cannot pre-allocate data file " << dataFilePath << "\n"; + EXIT(EXIT_FAILURE); + } + + if (canMmap) { + // mmap the input FASTA for parallel random access + int mmapFd = ::open(par.db1.c_str(), O_RDONLY); + if (mmapFd < 0) { + Debug(Debug::ERROR) << "Cannot open FASTA file for mmap: " << par.db1 << "\n"; + EXIT(EXIT_FAILURE); + } + struct stat fileStat; + if (fstat(mmapFd, &fileStat) != 0) { + Debug(Debug::ERROR) << "Cannot stat FASTA file: " << par.db1 << "\n"; + EXIT(EXIT_FAILURE); + } + size_t fastaFileSize = fileStat.st_size; + char *fastaData = (char *)mmap(NULL, fastaFileSize, PROT_READ, MAP_PRIVATE, mmapFd, 0); + if (fastaData == MAP_FAILED) { + Debug(Debug::ERROR) << "Cannot mmap FASTA file: " << par.db1 << "\n"; + EXIT(EXIT_FAILURE); + } +#ifdef HAVE_POSIX_MADVISE + posix_madvise(fastaData, fastaFileSize, POSIX_MADV_RANDOM); +#endif + + SubstitutionMatrix subMat(par.scoringMatrixFile.values.aminoacid().c_str(), 2.0, par.scoreBias); + + Debug::Progress progress(entries.size()); + +#pragma omp parallel +{ + Masker masker(subMat); + Sequence seq(maxChunkLen, seqType, &subMat, 0, false, false); + std::string result; + result.reserve(maxChunkLen + ALIGN); + char *seqBuf = new char[maxEntrySeqLen + 1]; + size_t charSeqBufferSize = maxChunkLen + 1; + unsigned char *charSequence = NULL; + if (par.maskMode) { + charSequence = (unsigned char *)malloc(charSeqBufferSize * sizeof(char)); + } + +#pragma omp for schedule(dynamic, 100) + for (size_t entryIdx = 0; entryIdx < entries.size(); entryIdx++) { + progress.updateProgress(); + const EntryMeta &meta = entries[entryIdx]; + const std::vector &chunks = entryChunkMap[entryIdx]; + + // Strip newlines from mmap'd data into contiguous buffer + const char *raw = fastaData + meta.sequenceOffset; + size_t rawLen = meta.seqLen + meta.newlineCount; + size_t cleanLen = 0; + for (size_t r = 0; r < rawLen && cleanLen < meta.seqLen; r++) { + if (raw[r] != '\n' && raw[r] != '\r') { + seqBuf[cleanLen++] = raw[r]; + } + } + + for (size_t c = 0; c < chunks.size(); c++) { + const ChunkWriteInfo &chunk = chunks[c]; + const char *chunkData = seqBuf + chunk.startPos; + size_t chunkLen = chunk.chunkLen; + + seq.mapSequence(0, 0, chunkData, chunkLen); + + if (charSequence != NULL) { + if (static_cast(seq.L) >= charSeqBufferSize) { + charSeqBufferSize = static_cast(seq.L * 1.5); + charSequence = (unsigned char *)realloc(charSequence, charSeqBufferSize * sizeof(char)); + } + memcpy(charSequence, seq.numSequence, seq.L); + masker.maskSequence(seq, par.maskMode, par.maskProb, par.maskLowerCaseMode, par.maskNrepeats); + for (int j = 0; j < seq.L; j++) { + result.append(1, (seq.numSequence[j] == masker.maskLetterNum) ? charSequence[j] + 32 : charSequence[j]); + } + } else { + for (int j = 0; j < seq.L; j++) { + char aa = chunkData[j]; + result.append(1, (islower(aa)) ? seq.numSequence[j] + 32 : seq.numSequence[j]); + } + } + + const size_t seqPadding = (seq.L % ALIGN == 0) ? 0 : ALIGN - seq.L % ALIGN; + result.append(seqPadding, static_cast(24)); + + pwrite(dataFd, result.c_str(), result.size(), chunk.dataOffset); + result.clear(); + } + } + + delete[] seqBuf; + if (charSequence != NULL) { + free(charSequence); + } +} + + munmap(fastaData, fastaFileSize); + ::close(mmapFd); + } else { + // Fallback: sequential Pass 2 for compressed/stdin inputs + SubstitutionMatrix subMat(par.scoringMatrixFile.values.aminoacid().c_str(), 2.0, par.scoreBias); + Masker masker(subMat); + Sequence seq(maxChunkLen, seqType, &subMat, 0, false, false); + std::string result; + result.reserve(maxChunkLen + ALIGN); + + size_t charSeqBufferSize = maxChunkLen + 1; + unsigned char *charSequence = NULL; + if (par.maskMode) { + charSequence = (unsigned char *)malloc(charSeqBufferSize * sizeof(char)); + } + + Debug::Progress progress(entries.size()); + + KSeqWrapper *kseq = KSeqFactory(par.db1.c_str()); + size_t entryIdx = 0; + while (kseq->ReadEntry()) { + progress.updateProgress(); + + const KSeqWrapper::KSeqEntry &e = kseq->entry; + const std::vector &chunks = entryChunkMap[entryIdx]; + + for (size_t c = 0; c < chunks.size(); c++) { + const ChunkWriteInfo &chunk = chunks[c]; + const char *chunkData = e.sequence.s + chunk.startPos; + size_t chunkLen = chunk.chunkLen; + + seq.mapSequence(0, 0, chunkData, chunkLen); + + if (charSequence != NULL) { + if (static_cast(seq.L) >= charSeqBufferSize) { + charSeqBufferSize = static_cast(seq.L * 1.5); + charSequence = (unsigned char *)realloc(charSequence, charSeqBufferSize * sizeof(char)); + } + memcpy(charSequence, seq.numSequence, seq.L); + masker.maskSequence(seq, par.maskMode, par.maskProb, par.maskLowerCaseMode, par.maskNrepeats); + for (int j = 0; j < seq.L; j++) { + result.append(1, (seq.numSequence[j] == masker.maskLetterNum) ? charSequence[j] + 32 : charSequence[j]); + } + } else { + for (int j = 0; j < seq.L; j++) { + char aa = chunkData[j]; + result.append(1, (islower(aa)) ? seq.numSequence[j] + 32 : seq.numSequence[j]); + } + } + + const size_t seqPadding = (seq.L % ALIGN == 0) ? 0 : ALIGN - seq.L % ALIGN; + result.append(seqPadding, static_cast(24)); + + ssize_t written = pwrite(dataFd, result.c_str(), result.size(), chunk.dataOffset); + if (written != static_cast(result.size())) { + Debug(Debug::ERROR) << "Cannot write to data file\n"; + EXIT(EXIT_FAILURE); + } + + result.clear(); + } + + entryIdx++; + } + delete kseq; + + if (charSequence != NULL) { + free(charSequence); + } + } + + ::close(dataFd); + + // Write index file + { + FILE *idxFile = FileUtil::openAndDelete(par.db2Index.c_str(), "w"); + char buffer[1024]; + for (size_t i = 0; i < totalSize; i++) { + size_t chunkLen = sortEntries[sortArray[i].first].chunkLen; + size_t len = DBWriter::indexToBuffer(buffer, static_cast(i), dataOffsets[i], chunkLen + 2); + size_t written = fwrite(buffer, sizeof(char), len, idxFile); + if (written != len) { + Debug(Debug::ERROR) << "Cannot write to index file\n"; + EXIT(EXIT_FAILURE); + } + } + if (fclose(idxFile) != 0) { + Debug(Debug::ERROR) << "Cannot close index file\n"; + EXIT(EXIT_FAILURE); + } + } + + // Write dbtype file + { + int paddedDbType = DBReader::setExtendedDbtype(seqType, Parameters::DBTYPE_EXTENDED_GPU); + DBWriter::writeDbtypeFile(par.db2.c_str(), paddedDbType, false); + } + + return EXIT_SUCCESS; +} + int makepaddedseqdb(int argc, const char **argv, const Command &command) { Parameters &par = Parameters::getInstance(); + // Match splitsequence defaults so fused FASTA path behaves identically + par.maxSeqLen = 10000; par.parseParameters(argc, argv, command, true, 0, 0); + // Detect FASTA input: if no .dbtype file exists, run fused FASTA codepath + bool isFastaInput = !FileUtil::fileExists(par.db1dbtype.c_str()); + if (isFastaInput) { + return makepaddedseqdbFromFasta(par); + } + + // Existing DB-input codepath (unchanged) const int mode = DBReader::USE_INDEX | DBReader::USE_DATA; DBReader dbr(par.db1.c_str(), par.db1Index.c_str(), par.threads, mode); dbr.open(DBReader::SORT_BY_LENGTH); @@ -141,4 +639,4 @@ int makepaddedseqdb(int argc, const char **argv, const Command &command) { } dbr.close(); return EXIT_SUCCESS; -} \ No newline at end of file +} diff --git a/util/benchmarks/makepaddedseqdb_fused/gen_fasta.py b/util/benchmarks/makepaddedseqdb_fused/gen_fasta.py new file mode 100755 index 000000000..4a82664e9 --- /dev/null +++ b/util/benchmarks/makepaddedseqdb_fused/gen_fasta.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +"""Generate a synthetic nucleotide FASTA with long sequences. + +Sequences are long enough (default 50 KB) to trigger splitting with +the default --max-seq-len 10000. Used by the benchmark/correctness suite. +""" + +import argparse +import random +import sys + + +def parse_size(s): + """Parse a human-readable size string (e.g. '100M', '5G') to bytes.""" + s = s.strip().upper() + multipliers = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4} + if s[-1] in multipliers: + return int(float(s[:-1]) * multipliers[s[-1]]) + return int(s) + + +def main(): + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('--size', default='100M', + help='Target total FASTA size (e.g. 100M, 1G)') + parser.add_argument('--seq-len', type=int, default=50000, + help='Length of each sequence in bases (default: 50000)') + parser.add_argument('--seed', type=int, default=42, + help='Random seed for reproducibility') + parser.add_argument('-o', '--output', default='/dev/stdout', + help='Output file (default: stdout)') + args = parser.parse_args() + + target_bytes = parse_size(args.size) + seq_len = args.seq_len + rng = random.Random(args.seed) + bases = 'ACGT' + line_width = 80 + + written = 0 + seq_id = 0 + + out = open(args.output, 'w') if args.output != '/dev/stdout' else sys.stdout + try: + while written < target_bytes: + header = f'>synth_{seq_id} len={seq_len}\n' + out.write(header) + written += len(header) + + remaining = seq_len + while remaining > 0: + chunk = min(remaining, line_width) + line = ''.join(rng.choice(bases) for _ in range(chunk)) + out.write(line) + out.write('\n') + written += chunk + 1 + remaining -= chunk + + seq_id += 1 + finally: + if out is not sys.stdout: + out.close() + + print(f'Generated {seq_id} sequences, ~{written} bytes', file=sys.stderr) + + +if __name__ == '__main__': + main() diff --git a/util/benchmarks/makepaddedseqdb_fused/run b/util/benchmarks/makepaddedseqdb_fused/run new file mode 100755 index 000000000..b322f18ba --- /dev/null +++ b/util/benchmarks/makepaddedseqdb_fused/run @@ -0,0 +1,348 @@ +#!/usr/bin/env python3 +""" +Benchmark & correctness suite for the fused makepaddedseqdb FASTA codepath. + +Usage: + cd util/benchmarks/makepaddedseqdb_fused + ./run # auto-detect mmseqs from build/src/mmseqs + ./run /path/to/mmseqs # use a specific binary +""" + +import filecmp +import os +import shutil +import subprocess +import sys +import tempfile + +# ── Configuration ───────────────────────────────────────────────────────────── + +CORRECTNESS_SIZES = ["1M", "10M", "50M", "100M"] +BENCH_SIZES = ["1M", "10M", "50M", "100M", "500M"] +EXTENSIONS = ["", ".index", ".dbtype", "_h", "_h.index", ".lookup"] +FASTA_CACHE_DIR = "/tmp/bench_fused_fasta_cache" + +# Correctness test matrix: (sequence-overlap, headers-split-mode) +CORRECTNESS_CONFIGS = [ + (0, 0), # no overlap, ORF headers (original test) + (300, 0), # default splitsequence overlap, ORF headers + (100, 0), # custom overlap, ORF headers + (0, 1), # no overlap, verbatim headers + (300, 1), # default overlap, verbatim headers + (100, 1), # custom overlap, verbatim headers +] + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def find_mmseqs(argv): + """Resolve the mmseqs binary path.""" + if len(argv) >= 2: + p = os.path.realpath(argv[1]) + if os.path.isfile(p) and os.access(p, os.X_OK): + return p + sys.exit(f"Error: {argv[1]} is not an executable") + + script_dir = os.path.dirname(os.path.abspath(__file__)) + repo_root = os.path.normpath(os.path.join(script_dir, "../../..")) + default = os.path.join(repo_root, "build", "src", "mmseqs") + if os.path.isfile(default) and os.access(default, os.X_OK): + return default + sys.exit( + f"mmseqs not found at {default}\n" + "Build first (cd build && make -j) or pass the path: ./run /path/to/mmseqs" + ) + + +def run_cmd(args, **kwargs): + """Run a command silently, raising on failure.""" + subprocess.run(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, + check=True, **kwargs) + + +def measure(args): + """Run *args*; return (wall_seconds, peak_rss_kb, cpu_percent). + + Uses fork/exec/wait4 to get rusage (user+sys CPU time and peak RSS). + """ + import time as _time + + t0 = _time.monotonic() + pid = os.fork() + if pid == 0: + devnull = os.open(os.devnull, os.O_WRONLY) + os.dup2(devnull, 1) + os.dup2(devnull, 2) + os.close(devnull) + try: + os.execvp(args[0], args) + except OSError: + os._exit(127) + else: + _, status, rusage = os.wait4(pid, 0) + elapsed = _time.monotonic() - t0 + exit_code = os.waitstatus_to_exitcode(status) + if exit_code != 0: + raise subprocess.CalledProcessError(exit_code, args) + + cpu_s = rusage.ru_utime + rusage.ru_stime + cpu_pct = round(100 * cpu_s / elapsed, 1) if elapsed > 0 else 0 + peak_rss_kb = rusage.ru_maxrss + return elapsed, peak_rss_kb, cpu_pct + + +def gen_fasta(gen_script, size, output): + """Generate a synthetic FASTA file, using a persistent cache.""" + if os.path.isfile(output): + return + os.makedirs(os.path.dirname(output), exist_ok=True) + tmp = output + f".tmp.{os.getpid()}" + subprocess.run( + [sys.executable, gen_script, "--size", size, "-o", tmp], + check=True, stderr=subprocess.DEVNULL, + ) + os.rename(tmp, output) + + +def files_identical(a, b): + """True if two files are byte-for-byte identical.""" + return filecmp.cmp(a, b, shallow=False) + + +def fmt_mb(kb): + return f"{kb / 1024:.1f} MB" + + +def config_label(overlap, hdr_mode): + return f"overlap={overlap},hdr={hdr_mode}" + + +# ── Main ────────────────────────────────────────────────────────────────────── + +def main(): + mmseqs = find_mmseqs(sys.argv) + script_dir = os.path.dirname(os.path.abspath(__file__)) + gen_script = os.path.join(script_dir, "gen_fasta.py") + + print(f"Using mmseqs: {mmseqs}") + print() + + tmpdir = tempfile.mkdtemp(prefix="bench_fused_") + print(f"Working directory: {tmpdir}") + print() + + try: + _run(mmseqs, gen_script, tmpdir) + finally: + shutil.rmtree(tmpdir, ignore_errors=True) + + +def _run(mmseqs, gen_script, tmpdir): + # ── Generate FASTA files ────────────────────────────────────────────── + all_sizes = sorted(set(CORRECTNESS_SIZES + BENCH_SIZES), + key=lambda s: _parse_size(s)) + + print("========================================") + print(" Generating test FASTA files") + print("========================================") + print(f" Cache: {FASTA_CACHE_DIR}") + fasta_paths = {} + for size in all_sizes: + path = os.path.join(FASTA_CACHE_DIR, f"test_{size}.fasta") + cached = os.path.isfile(path) + gen_fasta(gen_script, size, path) + fasta_paths[size] = path + mb = os.path.getsize(path) / 1024**2 + tag = "cached" if cached else "generated" + print(f" {size:>6s} {mb:8.1f} MB ({tag})") + print() + + # ── Correctness ─────────────────────────────────────────────────────── + print("========================================") + print(" Correctness: fused vs 3-step reference") + print("========================================") + print() + print(f" Configs: {len(CORRECTNESS_CONFIGS)}") + for overlap, hdr_mode in CORRECTNESS_CONFIGS: + print(f" {config_label(overlap, hdr_mode)}") + print() + + total_checks = 0 + total_pass = 0 + all_correct = True + + # createdb output is the same for all configs, so cache it per size + db_prefixes = {} + for size in CORRECTNESS_SIZES: + fasta = fasta_paths[size] + db_prefix = os.path.join(tmpdir, f"db_{size}") + run_cmd([mmseqs, "createdb", fasta, f"{db_prefix}_db", + "--dbtype", "2", "--shuffle", "1", "--threads", "1"]) + db_prefixes[size] = db_prefix + + for overlap, hdr_mode in CORRECTNESS_CONFIGS: + label = config_label(overlap, hdr_mode) + print(f" --- {label} ---") + + for size in CORRECTNESS_SIZES: + fasta = fasta_paths[size] + db_prefix = db_prefixes[size] + tag = f"{size}_o{overlap}_h{hdr_mode}" + ref = os.path.join(tmpdir, f"ref_{tag}") + fused = os.path.join(tmpdir, f"fused_{tag}") + + # Reference: splitsequence + makepaddedseqdb (createdb already done) + run_cmd([mmseqs, "splitsequence", f"{db_prefix}_db", f"{ref}_split", + "--max-seq-len", "10000", + "--sequence-overlap", str(overlap), + "--headers-split-mode", str(hdr_mode), + "--threads", "1"]) + run_cmd([mmseqs, "makepaddedseqdb", f"{ref}_split", f"{ref}_padded", + "--threads", "1"]) + + # Fused codepath + run_cmd([mmseqs, "makepaddedseqdb", fasta, f"{fused}_padded", + "--max-seq-len", "10000", + "--sequence-overlap", str(overlap), + "--headers-split-mode", str(hdr_mode), + "--threads", "1"]) + + size_ok = True + mismatches = [] + for ext in EXTENSIONS: + total_checks += 1 + rp = f"{ref}_padded{ext}" + fp = f"{fused}_padded{ext}" + if os.path.isfile(rp) and os.path.isfile(fp) and files_identical(rp, fp): + total_pass += 1 + else: + size_ok = False + all_correct = False + mismatches.append(ext if ext else "(data)") + + status = "PASS" if size_ok else "FAIL" + detail = f"all {len(EXTENSIONS)} files match" if size_ok else f"mismatches: {', '.join(mismatches)}" + print(f" {size:>6s} {status} ({detail})") + + # Clean split/padded/fused intermediates (keep createdb) + for prefix in (ref, fused): + for f in _glob_prefix(prefix): + os.remove(f) + + print() + + # Clean createdb intermediates + for db_prefix in db_prefixes.values(): + for f in _glob_prefix(db_prefix): + os.remove(f) + + if all_correct: + print(f" Result: ALL PASSED ({total_pass}/{total_checks} file comparisons)") + else: + print(f" Result: FAILURES detected ({total_pass}/{total_checks} passed)") + print() + print(" Aborting -- correctness must pass before benchmarking.") + sys.exit(1) + print() + + # ── Performance benchmarks ──────────────────────────────────────────── + threads = str(os.cpu_count()) + + print("========================================") + print(f" Performance: fused vs reference (threads={threads})") + print("========================================") + print() + + rows = [] # list of (size, fused_time, fused_rss, fused_cpu, ref_time, ref_rss, ref_cpu) + + for size in BENCH_SIZES: + fasta = fasta_paths[size] + ref = os.path.join(tmpdir, f"perf_ref_{size}") + fused = os.path.join(tmpdir, f"perf_fused_{size}") + + # Fused + ft, fr, fc = measure([mmseqs, "makepaddedseqdb", fasta, f"{fused}_padded", + "--max-seq-len", "10000", "--threads", threads]) + + # Reference (only measure the makepaddedseqdb step) + run_cmd([mmseqs, "createdb", fasta, f"{ref}_db", + "--dbtype", "2", "--shuffle", "1", "--threads", "1"]) + run_cmd([mmseqs, "splitsequence", f"{ref}_db", f"{ref}_split", + "--max-seq-len", "10000", "--sequence-overlap", "0", + "--threads", threads]) + rt, rr, rc = measure([mmseqs, "makepaddedseqdb", f"{ref}_split", f"{ref}_padded", + "--threads", threads]) + + rows.append((size, ft, fr, fc, rt, rr, rc)) + + # Clean + for prefix in (ref, fused): + for f in _glob_prefix(prefix): + os.remove(f) + + # ── Wall-time + CPU% table ──────────────────────────────────────────── + print("--- Wall-time / CPU% ---") + print() + print(f" {'SIZE':8s} {'FUSED':>16s} {'REF(DB)':>16s} {'TIME RATIO':>10s}") + print(f" {'--------':8s} {'----------------':>16s} {'----------------':>16s} {'----------':>10s}") + for size, ft, fr, fc, rt, rr, rc in rows: + ratio = f"{ft / rt:.2f}x" if rt > 0 else "n/a" + print(f" {size:8s} {ft:7.3f}s {fc:6.1f}% {rt:7.3f}s {rc:6.1f}% {ratio:>10s}") + print() + + # ── Peak-RSS table ──────────────────────────────────────────────────── + print("--- Peak RSS ---") + print() + print(f" {'SIZE':8s} {'FUSED':>10s} {'REF(DB)':>10s} {'RATIO':>10s}") + print(f" {'--------':8s} {'----------':>10s} {'----------':>10s} {'----------':>10s}") + for size, ft, fr, fc, rt, rr, rc in rows: + ratio = f"{fr / rr:.2f}x" if rr > 0 else "n/a" + print(f" {size:8s} {fmt_mb(fr):>10s} {fmt_mb(rr):>10s} {ratio:>10s}") + print() + + # ── Combined summary ────────────────────────────────────────────────── + print("--- Combined summary ---") + print() + hdr = (f" {'SIZE':8s} | {'FUSED (time / cpu / RSS)':>28s}" + f" | {'REF (time / cpu / RSS)':>28s}" + f" | {'TIME RATIO':>10s} | {'RSS RATIO':>10s}") + sep = (f" {'--------':8s} | {'----------------------------':>28s}" + f" | {'----------------------------':>28s}" + f" | {'----------':>10s} | {'----------':>10s}") + print(hdr) + print(sep) + for size, ft, fr, fc, rt, rr, rc in rows: + t_ratio = f"{ft / rt:.2f}x" if rt > 0 else "n/a" + r_ratio = f"{fr / rr:.2f}x" if rr > 0 else "n/a" + fr_mb = fmt_mb(fr) + rr_mb = fmt_mb(rr) + print(f" {size:8s} | {ft:7.3f}s {fc:5.1f}% / {fr_mb:>8s}" + f" | {rt:7.3f}s {rc:5.1f}% / {rr_mb:>8s}" + f" | {t_ratio:>10s} | {r_ratio:>10s}") + print() + print("Done.") + + +# ── Utilities ───────────────────────────────────────────────────────────────── + +def _parse_size(s): + """'10M' -> 10_485_760 (for sorting only).""" + s = s.strip().upper() + m = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4} + if s[-1] in m: + return int(float(s[:-1]) * m[s[-1]]) + return int(s) + + +def _glob_prefix(prefix): + """Yield all files whose path starts with *prefix*.""" + d = os.path.dirname(prefix) + base = os.path.basename(prefix) + if not os.path.isdir(d): + return + for name in os.listdir(d): + if name.startswith(base): + yield os.path.join(d, name) + + +if __name__ == "__main__": + main() diff --git a/util/benchmarks/makepaddedseqdb_fused/run.log b/util/benchmarks/makepaddedseqdb_fused/run.log new file mode 100644 index 000000000..1af1a950a --- /dev/null +++ b/util/benchmarks/makepaddedseqdb_fused/run.log @@ -0,0 +1,97 @@ +Working directory: /tmp/bench_fused_6fj_g1fs + +======================================== + Generating test FASTA files +======================================== + Cache: /tmp/bench_fused_fasta_cache + 1M 1.0 MB (cached) + 10M 10.0 MB (cached) + 50M 50.0 MB (cached) + 100M 100.0 MB (cached) + 500M 500.0 MB (cached) + +======================================== + Correctness: fused vs 3-step reference +======================================== + + Configs: 6 + overlap=0,hdr=0 + overlap=300,hdr=0 + overlap=100,hdr=0 + overlap=0,hdr=1 + overlap=300,hdr=1 + overlap=100,hdr=1 + + --- overlap=0,hdr=0 --- + 1M PASS (all 6 files match) + 10M PASS (all 6 files match) + 50M PASS (all 6 files match) + 100M PASS (all 6 files match) + + --- overlap=300,hdr=0 --- + 1M PASS (all 6 files match) + 10M PASS (all 6 files match) + 50M PASS (all 6 files match) + 100M PASS (all 6 files match) + + --- overlap=100,hdr=0 --- + 1M PASS (all 6 files match) + 10M PASS (all 6 files match) + 50M PASS (all 6 files match) + 100M PASS (all 6 files match) + + --- overlap=0,hdr=1 --- + 1M PASS (all 6 files match) + 10M PASS (all 6 files match) + 50M PASS (all 6 files match) + 100M PASS (all 6 files match) + + --- overlap=300,hdr=1 --- + 1M PASS (all 6 files match) + 10M PASS (all 6 files match) + 50M PASS (all 6 files match) + 100M PASS (all 6 files match) + + --- overlap=100,hdr=1 --- + 1M PASS (all 6 files match) + 10M PASS (all 6 files match) + 50M PASS (all 6 files match) + 100M PASS (all 6 files match) + + Result: ALL PASSED (144/144 file comparisons) + +======================================== + Performance: fused vs reference (threads=64) +======================================== + +--- Wall-time / CPU% --- + + SIZE FUSED REF(DB) TIME RATIO + -------- ---------------- ---------------- ---------- + 1M 0.025s 2495.1% 0.119s 550.0% 0.21x + 10M 0.047s 1473.6% 0.160s 506.3% 0.29x + 50M 0.081s 1503.4% 0.188s 630.5% 0.43x + 100M 0.112s 1887.2% 0.248s 693.7% 0.45x + 500M 0.468s 2434.4% 0.649s 850.0% 0.72x + +--- Peak RSS --- + + SIZE FUSED REF(DB) RATIO + -------- ---------- ---------- ---------- + 1M 11.6 MB 11.6 MB 1.00x + 10M 18.0 MB 11.6 MB 1.55x + 50M 55.5 MB 74.0 MB 0.75x + 100M 103.0 MB 201.5 MB 0.51x + 500M 498.1 MB 973.5 MB 0.51x + +--- Combined summary --- + + SIZE | FUSED (time / cpu / RSS) | REF (time / cpu / RSS) | TIME RATIO | RSS RATIO + -------- | ---------------------------- | ---------------------------- | ---------- | ---------- + 1M | 0.025s 2495.1% / 11.6 MB | 0.119s 550.0% / 11.6 MB | 0.21x | 1.00x + 10M | 0.047s 1473.6% / 18.0 MB | 0.160s 506.3% / 11.6 MB | 0.29x | 1.55x + 50M | 0.081s 1503.4% / 55.5 MB | 0.188s 630.5% / 74.0 MB | 0.43x | 0.75x + 100M | 0.112s 1887.2% / 103.0 MB | 0.248s 693.7% / 201.5 MB | 0.45x | 0.51x + 500M | 0.468s 2434.4% / 498.1 MB | 0.649s 850.0% / 973.5 MB | 0.72x | 0.51x + +Done. From faa760b4bd85f2b1afb2749f9c0eeb435be3e524 Mon Sep 17 00:00:00 2001 From: Anton Vorontsov Date: Fri, 24 Apr 2026 22:51:38 +0000 Subject: [PATCH 3/3] createindex: streaming SequenceLookup with constant memory and parallel encode+mask When building only the SequenceLookup (no k-mer index), encode+mask each sequence and write directly to the .idx file via DBWriter streaming API, instead of allocating the O(totalSeqBytes) heap buffer. This eliminates the dominant memory allocation that causes OOM at ~250 GB input. Memory usage drops from O(totalSeqBytes) to O(numSequences * 8 + maxSeqLen). Produces byte-identical SequenceLookup data in the index file (12/12 correctness checks across 3 sizes). The streaming path now calls posix_madvise(MADV_DONTNEED) every 64 MB to release consumed mmap pages, keeping memory flat at ~100 MB regardless of input size. Previously the mmap'd padded DB filled page cache proportionally to input size (~1x). With a 1.5 TB DB and 128 GB RAM this is problematic. The streaming streamSequenceLookupToIndex() path is parallelized with OpenMP using chunked processing: sequences are processed in chunks of 100K, each chunk parallel-encoded into a shared buffer at pre-computed non-overlapping offsets (same thread-safety model as fillDatabase), then written serially via writeAdd. Benchmark (64 threads, streaming vs bulk-allocation, --slow): SIZE STREAMING BULK-ALLOC TIME MEM 1M 0.073s 1517% 0.058s 1783% 1.25x n/a 10M 0.144s 1118% 0.145s 1110% 1.00x 0.88x 50M 0.278s 1524% 0.285s 1462% 0.98x 0.96x 100M 0.435s 1881% 0.427s 1770% 1.02x 1.00x 500M 1.908s 2084% 1.910s 2067% 1.00x 0.92x 1000M 3.707s 2147% 3.668s 2137% 1.01x 0.98x 2000M 7.302s 2179% 7.210s 2174% 1.01x 0.54x Wall time is unchanged (~1.0x ratio). Memory savings become significant at 2 GB input (1.0 GB streaming vs 1.8 GB bulk, 0.54x). --- src/commons/Parameters.h | 1 + src/prefiltering/PrefilteringIndexReader.cpp | 199 +++++++ .../createindex_streaming/gen_fasta.py | 68 +++ util/benchmarks/createindex_streaming/run | 531 ++++++++++++++++++ util/benchmarks/createindex_streaming/run.log | 67 +++ 5 files changed, 866 insertions(+) create mode 100755 util/benchmarks/createindex_streaming/gen_fasta.py create mode 100755 util/benchmarks/createindex_streaming/run create mode 100644 util/benchmarks/createindex_streaming/run.log diff --git a/src/commons/Parameters.h b/src/commons/Parameters.h index 1e0eb1e41..29a3d8fdc 100644 --- a/src/commons/Parameters.h +++ b/src/commons/Parameters.h @@ -194,6 +194,7 @@ class Parameters { static const int INDEX_SUBSET_NO_PREFILTER = 2; static const int INDEX_SUBSET_NO_ALIGNMENT = 4; static const int INDEX_SUBSET_NO_SEQUENCE_LOOKUP = 8; + static const int INDEX_SUBSET_NO_STREAM = 16; static std::vector getOutputFormat(int formatMode, const std::string &outformat, bool &needSequences, bool &needBacktrace, bool &needFullHeaders, diff --git a/src/prefiltering/PrefilteringIndexReader.cpp b/src/prefiltering/PrefilteringIndexReader.cpp index 951eeb3b5..9fe08af7f 100644 --- a/src/prefiltering/PrefilteringIndexReader.cpp +++ b/src/prefiltering/PrefilteringIndexReader.cpp @@ -4,8 +4,15 @@ #include "ExtendedSubstitutionMatrix.h" #include "FileUtil.h" #include "IndexBuilder.h" +#include "Masker.h" #include "Parameters.h" +#ifdef OPENMP +#include +#endif + +#include + extern const char* index_version_compatible; unsigned int PrefilteringIndexReader::VERSION = 0; unsigned int PrefilteringIndexReader::META = 1; @@ -49,6 +56,185 @@ std::string PrefilteringIndexReader::indexName(const std::string &outDB) { return result; } +// Stream SequenceLookup data directly to the index file without the +// O(totalSeqBytes) heap allocation that IndexBuilder::fillDatabase uses. +// Encodes + masks each sequence and writes via DBWriter streaming API. +// Produces byte-identical output to the bulk-allocation path. +// +// Memory is kept constant by calling posix_madvise(MADV_DONTNEED) on +// consumed mmap pages, so the kernel reclaims them instead of letting +// page cache grow to input size. +static void streamSequenceLookupToIndex(DBWriter &writer, unsigned int splitIdx, unsigned int keyOffset, + DBReader *dbr, size_t dbFrom, size_t dbSize, + BaseMatrix *subMat, int seqType, int maxSeqLen, int kmerSize, + bool hasSpacedKmer, const std::string &spacedKmerPattern, + int maskMode, int maskLowerCase, float maskProb, int maskNrepeats, + int targetSearchMode) { + Debug(Debug::INFO) << "Streaming SequenceLookup to index (no bulk allocation)\n"; + + const bool isProfile = Parameters::isEqualDbtype(seqType, Parameters::DBTYPE_HMM_PROFILE); + const bool isTargetSimiliarKmerSearch = isProfile || targetSearchMode; + + // Compute offsets and total data size from DB sequence lengths + size_t aaDbSize = 0; + size_t *offsets = new size_t[dbSize + 1]; + offsets[0] = 0; + for (size_t id = dbFrom; id < dbFrom + dbSize; id++) { + size_t idLocal = id - dbFrom; + aaDbSize += dbr->getSeqLen(id); + offsets[idLocal + 1] = offsets[idLocal] + dbr->getSeqLen(id); + } + + // SEQCOUNT + Debug(Debug::INFO) << "Write SEQCOUNT (" << (keyOffset + PrefilteringIndexReader::SEQCOUNT) << ")\n"; + size_t tablesize = dbSize; + writer.writeData((char *) &tablesize, 1 * sizeof(size_t), keyOffset + PrefilteringIndexReader::SEQCOUNT, splitIdx); + writer.alignToPageSize(splitIdx); + + // SEQINDEXDATASIZE + Debug(Debug::INFO) << "Write SEQINDEXDATASIZE (" << (keyOffset + PrefilteringIndexReader::SEQINDEXDATASIZE) << ")\n"; + int64_t seqindexDataSize = (int64_t) aaDbSize; + writer.writeData((char *) &seqindexDataSize, 1 * sizeof(int64_t), keyOffset + PrefilteringIndexReader::SEQINDEXDATASIZE, splitIdx); + writer.alignToPageSize(splitIdx); + + // SEQINDEXSEQOFFSET + Debug(Debug::INFO) << "Write SEQINDEXSEQOFFSET (" << (keyOffset + PrefilteringIndexReader::SEQINDEXSEQOFFSET) << ")\n"; + writer.writeData((char *) offsets, (dbSize + 1) * sizeof(size_t), keyOffset + PrefilteringIndexReader::SEQINDEXSEQOFFSET, splitIdx); + writer.alignToPageSize(splitIdx); + + // Stream SEQINDEXDATA: encode + mask each sequence and write directly + Debug(Debug::INFO) << "Write SEQINDEXDATA (" << (keyOffset + PrefilteringIndexReader::SEQINDEXDATA) << ") [streaming]\n"; + + bool needMasking = !isTargetSimiliarKmerSearch && (maskMode > 0 || maskNrepeats > 0 || maskLowerCase > 0); + + // Allocate chunk buffer for parallel encode+mask, serial write + const size_t CHUNK_SEQ_COUNT = 100000; + size_t maxChunkBytes = 0; + for (size_t start = 0; start < dbSize; start += CHUNK_SEQ_COUNT) { + size_t end = std::min(start + CHUNK_SEQ_COUNT, dbSize); + size_t bytes = offsets[end] - offsets[start]; + maxChunkBytes = std::max(maxChunkBytes, bytes); + } + char *chunkBuffer = new char[maxChunkBytes]; + + // Page cache management: release consumed mmap pages so memory stays + // constant regardless of input size. We track the global byte offset + // of consumed data and periodically call posix_madvise(MADV_DONTNEED) + // on the pages behind us. + const size_t PAGE_SIZE = sysconf(_SC_PAGESIZE); + const size_t ADVISE_CHUNK = 64 * 1024 * 1024; // release pages every 64 MB + + size_t dataFileCnt = dbr->getDataFileCnt(); + + // Precompute cumulative file-start offsets: fileCumulOff[f] is the global + // byte offset where file f begins. fileCumulOff[dataFileCnt] = totalDataSize. + size_t *fileCumulOff = new size_t[dataFileCnt + 1]; + fileCumulOff[0] = 0; + for (size_t f = 0; f < dataFileCnt; f++) { + fileCumulOff[f + 1] = fileCumulOff[f] + dbr->getDataSizeForFile(f); + } + +#ifdef HAVE_POSIX_MADVISE + for (size_t f = 0; f < dataFileCnt; f++) { + size_t fileSize = dbr->getDataSizeForFile(f); + if (fileSize > 0) { + posix_madvise(dbr->getDataForFile(f), fileSize, POSIX_MADV_SEQUENTIAL); + } + } +#endif + + // Global byte offset up to which we've advised DONTNEED (page-aligned). + size_t advisedUpTo = 0; + + Debug::Progress progress(dbSize); + writer.writeStart(splitIdx); + for (size_t chunkStart = 0; chunkStart < dbSize; chunkStart += CHUNK_SEQ_COUNT) { + size_t chunkEnd = std::min(chunkStart + CHUNK_SEQ_COUNT, dbSize); + size_t chunkBytes = offsets[chunkEnd] - offsets[chunkStart]; + + // Parallel encode+mask into chunkBuffer +#pragma omp parallel + { + unsigned int thread_idx = 0; +#ifdef OPENMP + thread_idx = static_cast(omp_get_thread_num()); +#endif + Masker *masker = NULL; + if (needMasking) { + masker = new Masker(*subMat); + } + Sequence seq(maxSeqLen, seqType, subMat, kmerSize, hasSpacedKmer, + false, true, spacedKmerPattern); + +#pragma omp for schedule(dynamic, 100) + for (size_t i = chunkStart; i < chunkEnd; i++) { + progress.updateProgress(); + size_t id = dbFrom + i; + seq.resetCurrPos(); + char *seqData = dbr->getData(id, thread_idx); + unsigned int qKey = dbr->getDbKey(id); + seq.mapSequence(i, qKey, seqData, dbr->getSeqLen(id)); + + if (isTargetSimiliarKmerSearch) { + unsigned char *src = isProfile ? seq.numConsensusSequence : seq.numSequence; + memcpy(chunkBuffer + (offsets[i] - offsets[chunkStart]), src, seq.L); + } else { + if (masker != NULL) { + masker->maskSequence(seq, maskMode > 0, maskProb, maskLowerCase > 0, maskNrepeats); + } + memcpy(chunkBuffer + (offsets[i] - offsets[chunkStart]), seq.numSequence, seq.L); + } + } + + if (masker != NULL) { + delete masker; + } + } // end omp parallel + + // Serial write of the entire chunk + writer.writeAdd(chunkBuffer, chunkBytes, splitIdx); + +#ifdef HAVE_POSIX_MADVISE + // Release consumed mmap pages to keep memory constant. + size_t lastId = dbFrom + chunkEnd - 1; + size_t seqEnd = dbr->getOffset(lastId) + dbr->getEntryLen(lastId); + size_t pageAlignedEnd = (seqEnd / PAGE_SIZE) * PAGE_SIZE; + if (pageAlignedEnd > advisedUpTo + ADVISE_CHUNK) { + // Advise DONTNEED on all pages in [advisedUpTo, pageAlignedEnd) + // across whichever data files they span. + for (size_t f = 0; f < dataFileCnt; f++) { + size_t fileStart = fileCumulOff[f]; + size_t fileEnd = fileCumulOff[f + 1]; + // Intersect [advisedUpTo, pageAlignedEnd) with [fileStart, fileEnd) + size_t rangeStart = std::max(advisedUpTo, fileStart); + size_t rangeEnd = std::min(pageAlignedEnd, fileEnd); + if (rangeStart >= rangeEnd) { + continue; + } + // Convert to file-local offsets, page-align start upward + size_t localStart = ((rangeStart - fileStart + PAGE_SIZE - 1) / PAGE_SIZE) * PAGE_SIZE; + size_t localEnd = (rangeEnd - fileStart) / PAGE_SIZE * PAGE_SIZE; + if (localStart < localEnd) { + posix_madvise(dbr->getDataForFile(f) + localStart, + localEnd - localStart, POSIX_MADV_DONTNEED); + } + } + advisedUpTo = pageAlignedEnd; + } +#endif + } + // Sentinel byte to match original SequenceLookup buffer layout (dataSize+1 allocation) + char sentinel = '\0'; + writer.writeAdd(&sentinel, 1, splitIdx); + writer.writeEnd(keyOffset + PrefilteringIndexReader::SEQINDEXDATA, splitIdx, true, true); + writer.alignToPageSize(splitIdx); + + delete[] chunkBuffer; + delete[] offsets; + delete[] fileCumulOff; + dbr->remapData(); +} + void PrefilteringIndexReader::createIndexFile(const std::string &outDB, DBReader *dbr1, DBReader *dbr2, DBReader *hdbr1, DBReader *hdbr2, @@ -225,6 +411,19 @@ void PrefilteringIndexReader::createIndexFile(const std::string &outDB, continue; } + // When only SequenceLookup is needed (no k-mer index), stream it directly + // to disk without the O(totalSeqBytes) heap allocation. + // Use --index-subset 18 (2|16) to force the old bulk-allocation path. + const bool noStream = (indexSubset & Parameters::INDEX_SUBSET_NO_STREAM) != 0; + if (needSequenceLookup && !needKmerIndex && !noStream) { + unsigned int keyOffset = 1000 * s; + streamSequenceLookupToIndex(writer, SPLIT_INDX + s, keyOffset, + dbr1, dbFrom, dbSize, subMat, seqType, maxSeqLen, kmerSize, + hasSpacedKmer, spacedKmerPattern, maskMode, maskLowerCase, + maskProb, maskNrepeats, targetSearchMode); + continue; + } + IndexTable * indexTable; if(needKmerIndex){ indexTable = new IndexTable(adjustAlphabetSize, kmerSize, false); diff --git a/util/benchmarks/createindex_streaming/gen_fasta.py b/util/benchmarks/createindex_streaming/gen_fasta.py new file mode 100755 index 000000000..4a82664e9 --- /dev/null +++ b/util/benchmarks/createindex_streaming/gen_fasta.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +"""Generate a synthetic nucleotide FASTA with long sequences. + +Sequences are long enough (default 50 KB) to trigger splitting with +the default --max-seq-len 10000. Used by the benchmark/correctness suite. +""" + +import argparse +import random +import sys + + +def parse_size(s): + """Parse a human-readable size string (e.g. '100M', '5G') to bytes.""" + s = s.strip().upper() + multipliers = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4} + if s[-1] in multipliers: + return int(float(s[:-1]) * multipliers[s[-1]]) + return int(s) + + +def main(): + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('--size', default='100M', + help='Target total FASTA size (e.g. 100M, 1G)') + parser.add_argument('--seq-len', type=int, default=50000, + help='Length of each sequence in bases (default: 50000)') + parser.add_argument('--seed', type=int, default=42, + help='Random seed for reproducibility') + parser.add_argument('-o', '--output', default='/dev/stdout', + help='Output file (default: stdout)') + args = parser.parse_args() + + target_bytes = parse_size(args.size) + seq_len = args.seq_len + rng = random.Random(args.seed) + bases = 'ACGT' + line_width = 80 + + written = 0 + seq_id = 0 + + out = open(args.output, 'w') if args.output != '/dev/stdout' else sys.stdout + try: + while written < target_bytes: + header = f'>synth_{seq_id} len={seq_len}\n' + out.write(header) + written += len(header) + + remaining = seq_len + while remaining > 0: + chunk = min(remaining, line_width) + line = ''.join(rng.choice(bases) for _ in range(chunk)) + out.write(line) + out.write('\n') + written += chunk + 1 + remaining -= chunk + + seq_id += 1 + finally: + if out is not sys.stdout: + out.close() + + print(f'Generated {seq_id} sequences, ~{written} bytes', file=sys.stderr) + + +if __name__ == '__main__': + main() diff --git a/util/benchmarks/createindex_streaming/run b/util/benchmarks/createindex_streaming/run new file mode 100755 index 000000000..fdbb141b4 --- /dev/null +++ b/util/benchmarks/createindex_streaming/run @@ -0,0 +1,531 @@ +#!/usr/bin/env python3 +""" +Benchmark & correctness suite for streaming SequenceLookup in createindex. + +When --index-subset 2 is used (no k-mer index, only SequenceLookup), the new +streaming path writes encoded+masked sequences directly to the .idx file without +the O(totalSeqBytes) heap allocation. + +This script verifies that the streaming path produces byte-identical SequenceLookup +data compared to the original bulk-allocation path, then measures wall-time and +peak system memory (MemAvailable drop) at various input sizes. + +Usage: + cd util/benchmarks/createindexdb_faster + ./run # default: correctness + bench 1M..50M + ./run --fast # quick smoke test: 1M only + ./run --slow # full suite: 1M..2000M + ./run /path/to/mmseqs # use a specific binary + ./run /path/to/mmseqs --fast + ./run /path/to/mmseqs --slow +""" + +import os +import shutil +import subprocess +import sys +import tempfile + +# ── Configuration ───────────────────────────────────────────────────────────── + +CORRECTNESS_SIZES_FAST = ["1M"] +CORRECTNESS_SIZES = ["1M", "10M", "50M"] + +BENCH_SIZES_FAST = ["1M"] +BENCH_SIZES = ["1M", "10M", "50M", "100M", "500M"] +BENCH_SIZES_SLOW = ["1M", "10M", "50M", "100M", "500M", "1000M", "2000M"] + +DATA_CACHE_DIR = "/tmp/bench_createindex_stream_data" + +# Keys in the .idx index file for the SequenceLookup data +SEQLOOKUP_KEYS = { + 13: "SEQCOUNT", + 14: "SEQINDEXDATA", + 15: "SEQINDEXDATASIZE", + 16: "SEQINDEXSEQOFFSET", +} + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def parse_args(argv): + """Return (mmseqs_path, mode) where mode is 'fast', 'slow', or 'default'.""" + flags = {"--fast", "--slow"} + mode = "default" + if "--fast" in argv[1:]: + mode = "fast" + elif "--slow" in argv[1:]: + mode = "slow" + rest = [a for a in argv[1:] if a not in flags] + + if rest: + p = os.path.realpath(rest[0]) + if os.path.isfile(p) and os.access(p, os.X_OK): + return p, mode + sys.exit(f"Error: {rest[0]} is not an executable") + + script_dir = os.path.dirname(os.path.abspath(__file__)) + repo_root = os.path.normpath(os.path.join(script_dir, "../../..")) + default = os.path.join(repo_root, "build", "src", "mmseqs") + if os.path.isfile(default) and os.access(default, os.X_OK): + return default, mode + sys.exit( + f"mmseqs not found at {default}\n" + "Build first (cd build && make -j) or pass the path: ./run /path/to/mmseqs" + ) + + +def run_cmd(args, **kwargs): + """Run a command silently, raising on failure.""" + subprocess.run(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, + check=True, **kwargs) + + +def read_meminfo(): + """Read MemAvailable and SwapFree from /proc/meminfo (in kB).""" + mem_avail = 0 + swap_free = 0 + with open("/proc/meminfo") as f: + for line in f: + if line.startswith("MemAvailable:"): + mem_avail = int(line.split()[1]) + elif line.startswith("SwapFree:"): + swap_free = int(line.split()[1]) + return mem_avail, swap_free + + +def measure(args): + """Run *args*; return (wall_seconds, peak_mem_kb, cpu_percent). + + Uses fork/exec/wait4 to get rusage (user+sys CPU time), and polls + /proc/meminfo for system-level peak memory (MemAvailable drop). + """ + import threading, time as _time + + mem_avail_before, _ = read_meminfo() + min_avail = mem_avail_before + stop = threading.Event() + + def poll(): + nonlocal min_avail + while not stop.is_set(): + try: + avail, _ = read_meminfo() + if avail < min_avail: + min_avail = avail + except (FileNotFoundError, ValueError): + pass + stop.wait(0.1) + + poller = threading.Thread(target=poll, daemon=True) + poller.start() + + t0 = _time.monotonic() + pid = os.fork() + if pid == 0: + devnull = os.open(os.devnull, os.O_WRONLY) + os.dup2(devnull, 1) + os.dup2(devnull, 2) + os.close(devnull) + try: + os.execvp(args[0], args) + except OSError: + os._exit(127) + else: + _, status, rusage = os.wait4(pid, 0) + elapsed = _time.monotonic() - t0 + exit_code = os.waitstatus_to_exitcode(status) + if exit_code != 0: + raise subprocess.CalledProcessError(exit_code, args) + + stop.set() + poller.join() + + cpu_s = rusage.ru_utime + rusage.ru_stime + cpu_pct = round(100 * cpu_s / elapsed, 1) if elapsed > 0 else 0 + + peak_mem_kb = mem_avail_before - min_avail + if peak_mem_kb < 0: + peak_mem_kb = 0 + return elapsed, peak_mem_kb, cpu_pct + + +def drop_page_cache_for_prefix(prefix): + """Advise kernel to drop page cache for all files matching prefix. + + Uses posix_fadvise(FADV_DONTNEED) which works without root, unlike + /proc/sys/vm/drop_caches. This ensures that subsequent mmap reads + will fault pages in from disk, making MemAvailable drops visible. + """ + d = os.path.dirname(prefix) or "." + base = os.path.basename(prefix) + if not os.path.isdir(d): + return + for name in os.listdir(d): + if name.startswith(base): + path = os.path.join(d, name) + if not os.path.isfile(path): + continue + try: + fd = os.open(path, os.O_RDONLY) + os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED) + os.close(fd) + except (OSError, AttributeError): + pass + + +def gen_fasta(gen_script, size, output): + """Generate a synthetic FASTA file, using a persistent cache.""" + if os.path.isfile(output): + return + os.makedirs(os.path.dirname(output), exist_ok=True) + tmp = output + f".tmp.{os.getpid()}" + subprocess.run( + [sys.executable, gen_script, "--size", size, "-o", tmp], + check=True, stderr=subprocess.DEVNULL, + ) + os.rename(tmp, output) + + +def read_idx_index(path): + """Parse .idx.index file -> {key: (offset, length)}.""" + entries = {} + with open(path) as f: + for line in f: + parts = line.strip().split('\t') + key, offset, length = int(parts[0]), int(parts[1]), int(parts[2]) + entries[key] = (offset, length) + return entries + + +def get_seqindexdatasize(idx_index, idx_data_path): + """Read the SEQINDEXDATASIZE value from the index.""" + if 15 not in idx_index: + return None + off, length = idx_index[15] + with open(idx_data_path, 'rb') as f: + f.seek(off) + import struct + data = f.read(8) # int64_t + return struct.unpack('= 1024: + return f"{mb / 1024:.2f} GB" + return f"{mb:.1f} MB" + + +# ── Main ────────────────────────────────────────────────────────────────────── + +def main(): + mmseqs, mode = parse_args(sys.argv) + script_dir = os.path.dirname(os.path.abspath(__file__)) + gen_script = os.path.join(script_dir, "gen_fasta.py") + + + if mode == "fast": + correctness_sizes = CORRECTNESS_SIZES_FAST + bench_sizes = BENCH_SIZES_FAST + elif mode == "slow": + correctness_sizes = CORRECTNESS_SIZES + bench_sizes = BENCH_SIZES_SLOW + else: + correctness_sizes = CORRECTNESS_SIZES + bench_sizes = BENCH_SIZES + + print(f"Using mmseqs: {mmseqs}") + print(f"Mode: {mode}") + print() + + # Ephemeral directory for correctness/perf intermediates (cleaned up) + tmpdir = tempfile.mkdtemp(prefix="bench_createindex_stream_") + print(f"Working directory: {tmpdir}") + print(f"Data cache: {DATA_CACHE_DIR}") + print() + + try: + _run(mmseqs, gen_script, tmpdir, correctness_sizes, bench_sizes) + finally: + shutil.rmtree(tmpdir, ignore_errors=True) + + +def _run(mmseqs, gen_script, tmpdir, correctness_sizes, bench_sizes): + all_sizes = sorted(set(correctness_sizes + bench_sizes), + key=lambda s: _parse_size(s)) + + # ── Generate FASTA files + padded DBs ────────────────────────────────── + # Cached in DATA_CACHE_DIR so they survive across runs. + # We use GPU-padded DBs (makepaddedseqdb) for all tests because that's + # the real use case. With --index-subset 2: + # Reference: original path (fillDatabase builds SequenceLookup in bulk) + # Test: streaming path (writes SequenceLookup directly to disk) + # + # The streaming path activates when needSequenceLookup=true && needKmerIndex=false, + # which is exactly --index-subset 2. + + os.makedirs(DATA_CACHE_DIR, exist_ok=True) + + print("========================================") + print(" Generating test data") + print("========================================") + + fasta_paths = {} + paddb_paths = {} + + for size in all_sizes: + fasta = os.path.join(DATA_CACHE_DIR, f"test_{size}.fasta") + fasta_cached = os.path.isfile(fasta) + gen_fasta(gen_script, size, fasta) + fasta_paths[size] = fasta + mb = os.path.getsize(fasta) / 1024**2 + + paddb = os.path.join(DATA_CACHE_DIR, f"paddb_{size}") + paddb_cached = os.path.isfile(paddb + ".dbtype") + if not paddb_cached: + run_cmd([mmseqs, "makepaddedseqdb", fasta, paddb, + "--max-seq-len", "10000", "--threads", "1"]) + paddb_paths[size] = paddb + + tag = "cached" if (fasta_cached and paddb_cached) else "generated" + print(f" {size:>6s} {mb:8.1f} MB ({tag})") + print() + + # ── Correctness ─────────────────────────────────────────────────────── + # Compare SequenceLookup entries between: + # Reference: createindex --index-subset 18 (2|16: no k-mer index, bulk SequenceLookup via fillDatabase) + # Test: createindex --index-subset 2 (no k-mer index, streaming SequenceLookup) + # + # Both should produce identical SEQCOUNT, SEQINDEXDATASIZE, SEQINDEXSEQOFFSET, + # and SEQINDEXDATA (for the meaningful bytes). + + print("========================================") + print(" Correctness: streaming vs bulk-allocation SequenceLookup") + print("========================================") + print() + + total_checks = 0 + total_pass = 0 + all_correct = True + + for size in correctness_sizes: + paddb = paddb_paths[size] + tmp1 = os.path.join(tmpdir, f"tmp_ref_{size}") + tmp2 = os.path.join(tmpdir, f"tmp_test_{size}") + + # Reference: --index-subset 18 (2|16: bulk SequenceLookup, no streaming) + run_cmd([mmseqs, "createindex", paddb, tmp1, + "--threads", "1", "--index-subset", "18", "--split", "1"]) + ref_idx = paddb + ".idx" + ref_idx_index = ref_idx + ".index" + ref_data_copy = os.path.join(tmpdir, f"ref_{size}.idx") + ref_index_copy = os.path.join(tmpdir, f"ref_{size}.idx.index") + shutil.copy2(ref_idx, ref_data_copy) + shutil.copy2(ref_idx_index, ref_index_copy) + + # Remove index so createindex will recreate + remove_idx_files(paddb) + + # Test: --index-subset 2 (no k-mer index, streaming SequenceLookup) + run_cmd([mmseqs, "createindex", paddb, tmp2, + "--threads", "1", "--index-subset", "2", "--split", "1"]) + test_idx = paddb + ".idx" + test_idx_index = test_idx + ".index" + + n_match, n_total, mismatch_list = compare_seqlookup_entries( + ref_index_copy, ref_data_copy, + test_idx_index, test_idx) + + total_checks += n_total + total_pass += n_match + size_ok = n_match == n_total + + if not size_ok: + all_correct = False + status = "PASS" if size_ok else "FAIL" + detail = (f"all {n_total} SequenceLookup entries match" + if size_ok + else f"mismatches: {', '.join(mismatch_list)}") + print(f" {size:>6s} {status} ({detail})") + + # Clean up + remove_idx_files(paddb) + for p in [ref_data_copy, ref_index_copy]: + if os.path.exists(p): + os.remove(p) + + print() + if all_correct: + print(f" Result: ALL PASSED ({total_pass}/{total_checks} comparisons)") + else: + print(f" Result: FAILURES detected ({total_pass}/{total_checks} passed)") + print() + print(" Aborting -- correctness must pass before benchmarking.") + sys.exit(1) + print() + + # ── Performance benchmarks ──────────────────────────────────────────── + # Measure createindex on GPU-padded DBs: + # Streaming: --index-subset 2 (new streaming path, no bulk allocation) + # Bulk alloc: --index-subset 18 (2|16: bulk SequenceLookup via fillDatabase) + # + # Both use --split 1 and all available threads. + + threads = str(os.cpu_count()) + + print("========================================") + print(f" Performance: streaming vs bulk-allocation (threads={threads})") + print("========================================") + print() + + rows = [] # (size, stream_time, stream_mem, stream_cpu, bulk_time, bulk_mem, bulk_cpu) + + for size in bench_sizes: + paddb = paddb_paths[size] + perf_tmp = os.path.join(tmpdir, f"perf_tmp_{size}") + os.makedirs(perf_tmp, exist_ok=True) + + # Drop page cache so mmap faults show up in MemAvailable delta + drop_page_cache_for_prefix(paddb) + + # Streaming path (--index-subset 2) + st, sm, sc = measure([mmseqs, "createindex", paddb, perf_tmp, + "--threads", threads, "--index-subset", "2", "--split", "1"]) + remove_idx_files(paddb) + + # Drop page cache again for fair comparison + drop_page_cache_for_prefix(paddb) + + # Bulk allocation path (--index-subset 18: 2|16, no streaming) + bt, bm, bc = measure([mmseqs, "createindex", paddb, perf_tmp, + "--threads", threads, "--index-subset", "18", "--split", "1"]) + remove_idx_files(paddb) + + rows.append((size, st, sm, sc, bt, bm, bc)) + + # ── Wall-time + CPU% table ──────────────────────────────────────────── + print("--- Wall-time / CPU% ---") + print() + print(f" {'SIZE':8s} {'STREAMING':>16s} {'BULK-ALLOC':>16s} {'TIME RATIO':>10s}") + print(f" {'--------':8s} {'----------------':>16s} {'----------------':>16s} {'----------':>10s}") + for size, st, sm, sc, bt, bm, bc in rows: + ratio = f"{st / bt:.2f}x" if bt > 0 else "n/a" + print(f" {size:8s} {st:7.3f}s {sc:6.1f}% {bt:7.3f}s {bc:6.1f}% {ratio:>10s}") + print() + + # ── Peak memory table ───────────────────────────────────────────────── + print("--- Peak memory (system-level MemAvailable drop) ---") + print() + print(f" {'SIZE':8s} {'STREAMING':>10s} {'BULK-ALLOC':>12s} {'RATIO':>10s}") + print(f" {'--------':8s} {'----------':>10s} {'------------':>12s} {'----------':>10s}") + for size, st, sm, sc, bt, bm, bc in rows: + ratio = f"{sm / bm:.2f}x" if bm > 0 else "n/a" + s_str = fmt_mem(sm) + b_str = fmt_mem(bm) + print(f" {size:8s} {s_str:>10s} {b_str:>12s} {ratio:>10s}") + print() + + # ── Combined summary ────────────────────────────────────────────────── + print("--- Combined summary ---") + print() + hdr = (f" {'SIZE':8s} | {'STREAMING (time / cpu / mem)':>30s}" + f" | {'BULK-ALLOC (time / cpu / mem)':>32s}" + f" | {'TIME RATIO':>10s} | {'MEM RATIO':>10s}") + sep = (f" {'--------':8s} | {'------------------------------':>30s}" + f" | {'--------------------------------':>32s}" + f" | {'----------':>10s} | {'----------':>10s}") + print(hdr) + print(sep) + for size, st, sm, sc, bt, bm, bc in rows: + t_ratio = f"{st / bt:.2f}x" if bt > 0 else "n/a" + m_ratio = f"{sm / bm:.2f}x" if bm > 0 else "n/a" + s_str = fmt_mem(sm) + b_str = fmt_mem(bm) + print(f" {size:8s} | {st:7.3f}s {sc:6.1f}% / {s_str:>10s}" + f" | {bt:7.3f}s {bc:6.1f}% / {b_str:>12s}" + f" | {t_ratio:>10s} | {m_ratio:>10s}") + print() + print("Done.") + + +# ── Utilities ───────────────────────────────────────────────────────────────── + +def _parse_size(s): + """'10M' -> 10_485_760 (for sorting only).""" + s = s.strip().upper() + m = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4} + if s[-1] in m: + return int(float(s[:-1]) * m[s[-1]]) + return int(s) + + +if __name__ == "__main__": + main() diff --git a/util/benchmarks/createindex_streaming/run.log b/util/benchmarks/createindex_streaming/run.log new file mode 100644 index 000000000..7e948bccb --- /dev/null +++ b/util/benchmarks/createindex_streaming/run.log @@ -0,0 +1,67 @@ +Mode: slow + +Working directory: /tmp/bench_createindex_stream_nkk87fmx +Data cache: /tmp/bench_createindex_stream_data + +======================================== + Generating test data +======================================== + 1M 1.0 MB (cached) + 10M 10.0 MB (cached) + 50M 50.0 MB (cached) + 100M 100.0 MB (cached) + 500M 500.0 MB (cached) + 1000M 1000.0 MB (cached) + 2000M 2000.0 MB (cached) + +======================================== + Correctness: streaming vs bulk-allocation SequenceLookup +======================================== + + 1M PASS (all 4 SequenceLookup entries match) + 10M PASS (all 4 SequenceLookup entries match) + 50M PASS (all 4 SequenceLookup entries match) + + Result: ALL PASSED (12/12 comparisons) + +======================================== + Performance: streaming vs bulk-allocation (threads=64) +======================================== + +--- Wall-time / CPU% --- + + SIZE STREAMING BULK-ALLOC TIME RATIO + -------- ---------------- ---------------- ---------- + 1M 0.073s 1516.5% 0.058s 1782.8% 1.25x + 10M 0.144s 1117.6% 0.145s 1109.5% 1.00x + 50M 0.278s 1524.0% 0.285s 1462.2% 0.98x + 100M 0.435s 1880.9% 0.427s 1770.3% 1.02x + 500M 1.908s 2084.3% 1.910s 2066.5% 1.00x + 1000M 3.707s 2147.0% 3.668s 2137.3% 1.01x + 2000M 7.302s 2179.2% 7.210s 2174.4% 1.01x + +--- Peak memory (system-level MemAvailable drop) --- + + SIZE STREAMING BULK-ALLOC RATIO + -------- ---------- ------------ ---------- + 1M 0.0 MB 0.0 MB n/a + 10M 17.2 MB 19.4 MB 0.88x + 50M 45.3 MB 47.2 MB 0.96x + 100M 132.2 MB 132.4 MB 1.00x + 500M 501.2 MB 547.6 MB 0.92x + 1000M 986.9 MB 1009.9 MB 0.98x + 2000M 1023.1 MB 1.83 GB 0.54x + +--- Combined summary --- + + SIZE | STREAMING (time / cpu / mem) | BULK-ALLOC (time / cpu / mem) | TIME RATIO | MEM RATIO + -------- | ------------------------------ | -------------------------------- | ---------- | ---------- + 1M | 0.073s 1516.5% / 0.0 MB | 0.058s 1782.8% / 0.0 MB | 1.25x | n/a + 10M | 0.144s 1117.6% / 17.2 MB | 0.145s 1109.5% / 19.4 MB | 1.00x | 0.88x + 50M | 0.278s 1524.0% / 45.3 MB | 0.285s 1462.2% / 47.2 MB | 0.98x | 0.96x + 100M | 0.435s 1880.9% / 132.2 MB | 0.427s 1770.3% / 132.4 MB | 1.02x | 1.00x + 500M | 1.908s 2084.3% / 501.2 MB | 1.910s 2066.5% / 547.6 MB | 1.00x | 0.92x + 1000M | 3.707s 2147.0% / 986.9 MB | 3.668s 2137.3% / 1009.9 MB | 1.01x | 0.98x + 2000M | 7.302s 2179.2% / 1023.1 MB | 7.210s 2174.4% / 1.83 GB | 1.01x | 0.54x + +Done.