diff --git a/data/workflow/databases.sh b/data/workflow/databases.sh index c093da1cc..cfd14ec17 100644 --- a/data/workflow/databases.sh +++ b/data/workflow/databases.sh @@ -299,6 +299,14 @@ case "${SELECTION}" in push_back "${TMP_PATH}/kalamari.fasta" INPUT_TYPE="FASTA_LIST" ;; + *) + if [ ! -f "${SELECTION}" ]; then + fail "Local file not found: ${SELECTION}" + fi + push_back "${SELECTION}" + date "+%s" > "${TMP_PATH}/version" + INPUT_TYPE="LOCAL_FASTA" + ;; esac if notExists "${OUTDB}.dbtype"; then @@ -312,6 +320,23 @@ case "${INPUT_TYPE}" in rm -f -- "$i" done ;; + "LOCAL_FASTA") + eval "set -- $ARR" + # Fused pipeline: makepaddedseqdb reads FASTA directly (createdb + + # splitsequence + padding in a single streaming pass), then + # createindex --index-subset 2 streams SequenceLookup with constant + # memory via MADV_DONTNEED. + # shellcheck disable=SC2086 + "${MMSEQS}" makepaddedseqdb "${@}" "${OUTDB}" \ + --headers-split-mode 1 ${THREADS_PAR} \ + || fail "makepaddedseqdb died" + mkdir -p "${TMP_PATH}/indexdb" + # shellcheck disable=SC2086 + "${MMSEQS}" createindex "${OUTDB}" "${TMP_PATH}/indexdb" \ + --split 1 --index-subset 2 ${THREADS_PAR} \ + || fail "createindex died" + rm -rf -- "${TMP_PATH}/indexdb" + ;; "FSA") # shellcheck disable=SC2086 "${MMSEQS}" createdb "${TMP_PATH}/"*.fsa "${OUTDB}" ${COMP_PAR} --gpu 1 \ diff --git a/src/MMseqsBase.cpp b/src/MMseqsBase.cpp index 1fa167298..6ce5a04ce 100644 --- a/src/MMseqsBase.cpp +++ b/src/MMseqsBase.cpp @@ -129,9 +129,13 @@ std::vector baseCommands = { {"tmpDir", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::directory }}}, {"databases", databases, &par.databases, COMMAND_DATABASE_CREATION, "List and download databases", - NULL, + "# Download a known database\n" + "mmseqs databases UniRef30_2302 db tmp\n\n" + "# Index a local FASTA file (use ./relative or /absolute paths)\n" + "mmseqs databases ./input.fasta.gz db tmp\n" + "mmseqs databases /data/rnacentral_active.fasta.gz db tmp\n\n", "Milot Mirdita ", - " ", + " ", CITATION_TAXONOMY|CITATION_MMSEQS2, {{"selection", 0, DbType::ZERO_OR_ALL, &DbValidator::empty }, {"sequenceDB", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::sequenceDb }, {"tmpDir", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::directory }}}, @@ -149,10 +153,13 @@ std::vector baseCommands = { {"sequenceDB", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::flatfile }}}, {"makepaddedseqdb", makepaddedseqdb, &par.makepaddedseqdb, COMMAND_HIDDEN, "Generate a padded sequence DB", - "Generate a padded sequence DB", + "# From existing sequence DB\n" + "mmseqs makepaddedseqdb sequenceDB paddedDB\n\n" + "# Directly from FASTA (fused createdb+splitsequence+makepaddedseqdb)\n" + "mmseqs makepaddedseqdb input.fasta paddedDB --max-seq-len 10000\n", "Milot Mirdita & Martin Steinegger ", - " ", - CITATION_GPU, {{"sequenceDB", DbType::ACCESS_MODE_INPUT, DbType::NEED_DATA|DbType::NEED_HEADER, &DbValidator::sequenceDb }, + " ", + CITATION_GPU, {{"sequenceDB", DbType::ACCESS_MODE_INPUT, DbType::NEED_DATA, &DbValidator::allDbAndFlat }, {"sequenceIndexDB", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::sequenceDb }}}, {"appenddbtoindex", appenddbtoindex, &par.appenddbtoindex, COMMAND_HIDDEN, NULL, diff --git a/src/commons/Parameters.cpp b/src/commons/Parameters.cpp index ddd41d2c4..8b11207a1 100644 --- a/src/commons/Parameters.cpp +++ b/src/commons/Parameters.cpp @@ -974,6 +974,11 @@ Parameters::Parameters(): makepaddedseqdb.push_back(&PARAM_MASK_PROBABILTY); makepaddedseqdb.push_back(&PARAM_MASK_LOWER_CASE); makepaddedseqdb.push_back(&PARAM_MASK_N_REPEAT); + makepaddedseqdb.push_back(&PARAM_MAX_SEQ_LEN); + makepaddedseqdb.push_back(&PARAM_SEQUENCE_OVERLAP); + makepaddedseqdb.push_back(&PARAM_HEADER_SPLIT_MODE); + makepaddedseqdb.push_back(&PARAM_DB_TYPE); + makepaddedseqdb.push_back(&PARAM_SHUFFLE); makepaddedseqdb.push_back(&PARAM_WRITE_LOOKUP); makepaddedseqdb.push_back(&PARAM_THREADS); makepaddedseqdb.push_back(&PARAM_V); diff --git a/src/commons/Parameters.h b/src/commons/Parameters.h index 1e0eb1e41..29a3d8fdc 100644 --- a/src/commons/Parameters.h +++ b/src/commons/Parameters.h @@ -194,6 +194,7 @@ class Parameters { static const int INDEX_SUBSET_NO_PREFILTER = 2; static const int INDEX_SUBSET_NO_ALIGNMENT = 4; static const int INDEX_SUBSET_NO_SEQUENCE_LOOKUP = 8; + static const int INDEX_SUBSET_NO_STREAM = 16; static std::vector getOutputFormat(int formatMode, const std::string &outformat, bool &needSequences, bool &needBacktrace, bool &needFullHeaders, diff --git a/src/prefiltering/PrefilteringIndexReader.cpp b/src/prefiltering/PrefilteringIndexReader.cpp index 951eeb3b5..9fe08af7f 100644 --- a/src/prefiltering/PrefilteringIndexReader.cpp +++ b/src/prefiltering/PrefilteringIndexReader.cpp @@ -4,8 +4,15 @@ #include "ExtendedSubstitutionMatrix.h" #include "FileUtil.h" #include "IndexBuilder.h" +#include "Masker.h" #include "Parameters.h" +#ifdef OPENMP +#include +#endif + +#include + extern const char* index_version_compatible; unsigned int PrefilteringIndexReader::VERSION = 0; unsigned int PrefilteringIndexReader::META = 1; @@ -49,6 +56,185 @@ std::string PrefilteringIndexReader::indexName(const std::string &outDB) { return result; } +// Stream SequenceLookup data directly to the index file without the +// O(totalSeqBytes) heap allocation that IndexBuilder::fillDatabase uses. +// Encodes + masks each sequence and writes via DBWriter streaming API. +// Produces byte-identical output to the bulk-allocation path. +// +// Memory is kept constant by calling posix_madvise(MADV_DONTNEED) on +// consumed mmap pages, so the kernel reclaims them instead of letting +// page cache grow to input size. +static void streamSequenceLookupToIndex(DBWriter &writer, unsigned int splitIdx, unsigned int keyOffset, + DBReader *dbr, size_t dbFrom, size_t dbSize, + BaseMatrix *subMat, int seqType, int maxSeqLen, int kmerSize, + bool hasSpacedKmer, const std::string &spacedKmerPattern, + int maskMode, int maskLowerCase, float maskProb, int maskNrepeats, + int targetSearchMode) { + Debug(Debug::INFO) << "Streaming SequenceLookup to index (no bulk allocation)\n"; + + const bool isProfile = Parameters::isEqualDbtype(seqType, Parameters::DBTYPE_HMM_PROFILE); + const bool isTargetSimiliarKmerSearch = isProfile || targetSearchMode; + + // Compute offsets and total data size from DB sequence lengths + size_t aaDbSize = 0; + size_t *offsets = new size_t[dbSize + 1]; + offsets[0] = 0; + for (size_t id = dbFrom; id < dbFrom + dbSize; id++) { + size_t idLocal = id - dbFrom; + aaDbSize += dbr->getSeqLen(id); + offsets[idLocal + 1] = offsets[idLocal] + dbr->getSeqLen(id); + } + + // SEQCOUNT + Debug(Debug::INFO) << "Write SEQCOUNT (" << (keyOffset + PrefilteringIndexReader::SEQCOUNT) << ")\n"; + size_t tablesize = dbSize; + writer.writeData((char *) &tablesize, 1 * sizeof(size_t), keyOffset + PrefilteringIndexReader::SEQCOUNT, splitIdx); + writer.alignToPageSize(splitIdx); + + // SEQINDEXDATASIZE + Debug(Debug::INFO) << "Write SEQINDEXDATASIZE (" << (keyOffset + PrefilteringIndexReader::SEQINDEXDATASIZE) << ")\n"; + int64_t seqindexDataSize = (int64_t) aaDbSize; + writer.writeData((char *) &seqindexDataSize, 1 * sizeof(int64_t), keyOffset + PrefilteringIndexReader::SEQINDEXDATASIZE, splitIdx); + writer.alignToPageSize(splitIdx); + + // SEQINDEXSEQOFFSET + Debug(Debug::INFO) << "Write SEQINDEXSEQOFFSET (" << (keyOffset + PrefilteringIndexReader::SEQINDEXSEQOFFSET) << ")\n"; + writer.writeData((char *) offsets, (dbSize + 1) * sizeof(size_t), keyOffset + PrefilteringIndexReader::SEQINDEXSEQOFFSET, splitIdx); + writer.alignToPageSize(splitIdx); + + // Stream SEQINDEXDATA: encode + mask each sequence and write directly + Debug(Debug::INFO) << "Write SEQINDEXDATA (" << (keyOffset + PrefilteringIndexReader::SEQINDEXDATA) << ") [streaming]\n"; + + bool needMasking = !isTargetSimiliarKmerSearch && (maskMode > 0 || maskNrepeats > 0 || maskLowerCase > 0); + + // Allocate chunk buffer for parallel encode+mask, serial write + const size_t CHUNK_SEQ_COUNT = 100000; + size_t maxChunkBytes = 0; + for (size_t start = 0; start < dbSize; start += CHUNK_SEQ_COUNT) { + size_t end = std::min(start + CHUNK_SEQ_COUNT, dbSize); + size_t bytes = offsets[end] - offsets[start]; + maxChunkBytes = std::max(maxChunkBytes, bytes); + } + char *chunkBuffer = new char[maxChunkBytes]; + + // Page cache management: release consumed mmap pages so memory stays + // constant regardless of input size. We track the global byte offset + // of consumed data and periodically call posix_madvise(MADV_DONTNEED) + // on the pages behind us. + const size_t PAGE_SIZE = sysconf(_SC_PAGESIZE); + const size_t ADVISE_CHUNK = 64 * 1024 * 1024; // release pages every 64 MB + + size_t dataFileCnt = dbr->getDataFileCnt(); + + // Precompute cumulative file-start offsets: fileCumulOff[f] is the global + // byte offset where file f begins. fileCumulOff[dataFileCnt] = totalDataSize. + size_t *fileCumulOff = new size_t[dataFileCnt + 1]; + fileCumulOff[0] = 0; + for (size_t f = 0; f < dataFileCnt; f++) { + fileCumulOff[f + 1] = fileCumulOff[f] + dbr->getDataSizeForFile(f); + } + +#ifdef HAVE_POSIX_MADVISE + for (size_t f = 0; f < dataFileCnt; f++) { + size_t fileSize = dbr->getDataSizeForFile(f); + if (fileSize > 0) { + posix_madvise(dbr->getDataForFile(f), fileSize, POSIX_MADV_SEQUENTIAL); + } + } +#endif + + // Global byte offset up to which we've advised DONTNEED (page-aligned). + size_t advisedUpTo = 0; + + Debug::Progress progress(dbSize); + writer.writeStart(splitIdx); + for (size_t chunkStart = 0; chunkStart < dbSize; chunkStart += CHUNK_SEQ_COUNT) { + size_t chunkEnd = std::min(chunkStart + CHUNK_SEQ_COUNT, dbSize); + size_t chunkBytes = offsets[chunkEnd] - offsets[chunkStart]; + + // Parallel encode+mask into chunkBuffer +#pragma omp parallel + { + unsigned int thread_idx = 0; +#ifdef OPENMP + thread_idx = static_cast(omp_get_thread_num()); +#endif + Masker *masker = NULL; + if (needMasking) { + masker = new Masker(*subMat); + } + Sequence seq(maxSeqLen, seqType, subMat, kmerSize, hasSpacedKmer, + false, true, spacedKmerPattern); + +#pragma omp for schedule(dynamic, 100) + for (size_t i = chunkStart; i < chunkEnd; i++) { + progress.updateProgress(); + size_t id = dbFrom + i; + seq.resetCurrPos(); + char *seqData = dbr->getData(id, thread_idx); + unsigned int qKey = dbr->getDbKey(id); + seq.mapSequence(i, qKey, seqData, dbr->getSeqLen(id)); + + if (isTargetSimiliarKmerSearch) { + unsigned char *src = isProfile ? seq.numConsensusSequence : seq.numSequence; + memcpy(chunkBuffer + (offsets[i] - offsets[chunkStart]), src, seq.L); + } else { + if (masker != NULL) { + masker->maskSequence(seq, maskMode > 0, maskProb, maskLowerCase > 0, maskNrepeats); + } + memcpy(chunkBuffer + (offsets[i] - offsets[chunkStart]), seq.numSequence, seq.L); + } + } + + if (masker != NULL) { + delete masker; + } + } // end omp parallel + + // Serial write of the entire chunk + writer.writeAdd(chunkBuffer, chunkBytes, splitIdx); + +#ifdef HAVE_POSIX_MADVISE + // Release consumed mmap pages to keep memory constant. + size_t lastId = dbFrom + chunkEnd - 1; + size_t seqEnd = dbr->getOffset(lastId) + dbr->getEntryLen(lastId); + size_t pageAlignedEnd = (seqEnd / PAGE_SIZE) * PAGE_SIZE; + if (pageAlignedEnd > advisedUpTo + ADVISE_CHUNK) { + // Advise DONTNEED on all pages in [advisedUpTo, pageAlignedEnd) + // across whichever data files they span. + for (size_t f = 0; f < dataFileCnt; f++) { + size_t fileStart = fileCumulOff[f]; + size_t fileEnd = fileCumulOff[f + 1]; + // Intersect [advisedUpTo, pageAlignedEnd) with [fileStart, fileEnd) + size_t rangeStart = std::max(advisedUpTo, fileStart); + size_t rangeEnd = std::min(pageAlignedEnd, fileEnd); + if (rangeStart >= rangeEnd) { + continue; + } + // Convert to file-local offsets, page-align start upward + size_t localStart = ((rangeStart - fileStart + PAGE_SIZE - 1) / PAGE_SIZE) * PAGE_SIZE; + size_t localEnd = (rangeEnd - fileStart) / PAGE_SIZE * PAGE_SIZE; + if (localStart < localEnd) { + posix_madvise(dbr->getDataForFile(f) + localStart, + localEnd - localStart, POSIX_MADV_DONTNEED); + } + } + advisedUpTo = pageAlignedEnd; + } +#endif + } + // Sentinel byte to match original SequenceLookup buffer layout (dataSize+1 allocation) + char sentinel = '\0'; + writer.writeAdd(&sentinel, 1, splitIdx); + writer.writeEnd(keyOffset + PrefilteringIndexReader::SEQINDEXDATA, splitIdx, true, true); + writer.alignToPageSize(splitIdx); + + delete[] chunkBuffer; + delete[] offsets; + delete[] fileCumulOff; + dbr->remapData(); +} + void PrefilteringIndexReader::createIndexFile(const std::string &outDB, DBReader *dbr1, DBReader *dbr2, DBReader *hdbr1, DBReader *hdbr2, @@ -225,6 +411,19 @@ void PrefilteringIndexReader::createIndexFile(const std::string &outDB, continue; } + // When only SequenceLookup is needed (no k-mer index), stream it directly + // to disk without the O(totalSeqBytes) heap allocation. + // Use --index-subset 18 (2|16) to force the old bulk-allocation path. + const bool noStream = (indexSubset & Parameters::INDEX_SUBSET_NO_STREAM) != 0; + if (needSequenceLookup && !needKmerIndex && !noStream) { + unsigned int keyOffset = 1000 * s; + streamSequenceLookupToIndex(writer, SPLIT_INDX + s, keyOffset, + dbr1, dbFrom, dbSize, subMat, seqType, maxSeqLen, kmerSize, + hasSpacedKmer, spacedKmerPattern, maskMode, maskLowerCase, + maskProb, maskNrepeats, targetSearchMode); + continue; + } + IndexTable * indexTable; if(needKmerIndex){ indexTable = new IndexTable(adjustAlphabetSize, kmerSize, false); diff --git a/src/util/makepaddedseqdb.cpp b/src/util/makepaddedseqdb.cpp index ab75a9be0..a0efddcd4 100644 --- a/src/util/makepaddedseqdb.cpp +++ b/src/util/makepaddedseqdb.cpp @@ -6,15 +6,513 @@ #include "SubstitutionMatrix.h" #include "tantan.h" #include "Masker.h" +#include "KSeqWrapper.h" +#include "FastSort.h" +#include "Orf.h" +#include "itoa.h" +#include "FileUtil.h" + +#include +#include +#include +#include +#include +#include #ifdef OPENMP #include #endif +struct EntryMeta { + std::string header; // "name comment\n" (with trailing newline) + size_t seqLen; // sequence length (no sequence bytes stored) + size_t sequenceOffset; // byte offset in FASTA where sequence starts + int newlineCount; // newlines within sequence (for multi-line FASTA) +}; + +struct SortEntry { + size_t entryIdx; // index into entries[] + size_t startPos; // chunk start (0 if not split) + size_t chunkLen; // chunk length + unsigned int shuffledKey; // key after createdb shuffle +}; + +struct ChunkWriteInfo { + size_t startPos; // chunk start within FASTA sequence + size_t chunkLen; // chunk length + size_t dataOffset; // pre-computed byte offset in output data file +}; + +static int makepaddedseqdbFromFasta(Parameters &par) { + // ============================== + // Pass 1: Metadata collection (no sequence storage) + // ============================== + std::vector entries; + size_t maxEntrySeqLen = 0; + int dbType = par.dbType; + bool canMmap = false; + { + size_t sampleCount = 0; + size_t isNuclCnt = 0; + const size_t testForNucSequence = 100; + KSeqWrapper *kseq = KSeqFactory(par.db1.c_str()); + while (kseq->ReadEntry()) { + const KSeqWrapper::KSeqEntry &e = kseq->entry; + if (e.name.l == 0) { + Debug(Debug::ERROR) << "Fasta entry " << entries.size() << " is invalid\n"; + EXIT(EXIT_FAILURE); + } + EntryMeta meta; + meta.header.append(e.name.s, e.name.l); + if (e.comment.l > 0) { + meta.header.append(" ", 1); + meta.header.append(e.comment.s, e.comment.l); + } + meta.header.push_back('\n'); + meta.seqLen = e.sequence.l; + meta.sequenceOffset = e.sequenceOffset; + meta.newlineCount = e.newlineCount; + maxEntrySeqLen = std::max(maxEntrySeqLen, e.sequence.l); + + // Auto-detect nucleotide/amino acid (same heuristic as createdb) + if (dbType == 0) { + if (sampleCount < 10 || (sampleCount % 100) == 0) { + if (sampleCount < testForNucSequence) { + size_t cnt = 0; + for (size_t j = 0; j < e.sequence.l; j++) { + switch (toupper(e.sequence.s[j])) { + case 'T': case 'A': case 'G': case 'C': case 'U': case 'N': + cnt++; + break; + } + } + const float nuclDNAFraction = static_cast(cnt) / static_cast(e.sequence.l); + if (nuclDNAFraction > 0.9f) { + isNuclCnt++; + } + } + sampleCount++; + } + } + + entries.push_back(std::move(meta)); + } + canMmap = (kseq->type == KSeqWrapper::KSEQ_FILE); + delete kseq; + + if (entries.empty()) { + Debug(Debug::ERROR) << "The input file has no entries\n"; + EXIT(EXIT_FAILURE); + } + + if (dbType == 0) { + if (isNuclCnt == sampleCount) { + dbType = Parameters::DBTYPE_NUCLEOTIDES; + } else { + dbType = Parameters::DBTYPE_AMINO_ACIDS; + } + } + } + Debug(Debug::INFO) << "Database type: " << Parameters::getDbTypeName(dbType) << "\n"; + + // createdb always writes DBTYPE_AMINO_ACIDS to the dbtype file, + // so makepaddedseqdb always sees DBTYPE_AMINO_ACIDS as input type + int seqType = Parameters::DBTYPE_AMINO_ACIDS; + + // ============================== + // Compute output layout + // ============================== + + // Shuffle (group by id % shuffleSplits, flatten in group order) + const unsigned int shuffleSplits = par.shuffleDatabase ? 32 : 1; + std::vector> splits(shuffleSplits); + for (size_t i = 0; i < entries.size(); i++) { + splits[i % shuffleSplits].push_back(i); + } + std::vector shuffledOrder; + shuffledOrder.reserve(entries.size()); + for (unsigned int s = 0; s < shuffleSplits; s++) { + for (size_t j = 0; j < splits[s].size(); j++) { + shuffledOrder.push_back(splits[s][j]); + } + } + + // Generate SortEntries in shuffled order + bool doSplit = (par.maxSeqLen < 65535); + std::vector sortEntries; + for (size_t k = 0; k < shuffledOrder.size(); k++) { + size_t origId = shuffledOrder[k]; + size_t seqLen = entries[origId].seqLen; + + if (doSplit && seqLen > static_cast(par.maxSeqLen)) { + size_t effectiveStep = static_cast(par.maxSeqLen) - par.sequenceOverlap; + size_t splitCnt = static_cast( + ceilf(static_cast(seqLen) / static_cast(effectiveStep))); + for (size_t split = 0; split < splitCnt; split++) { + size_t startPos = split * effectiveStep; + size_t chunkLen = std::min(static_cast(par.maxSeqLen), seqLen - startPos); + SortEntry se; + se.entryIdx = origId; + se.startPos = startPos; + se.chunkLen = chunkLen; + se.shuffledKey = static_cast(k); + sortEntries.push_back(se); + } + } else { + SortEntry se; + se.entryIdx = origId; + se.startPos = 0; + se.chunkLen = seqLen; + se.shuffledKey = static_cast(k); + sortEntries.push_back(se); + } + } + + // Sort by (chunkLen+2 ASC, index_position DESC) + size_t totalSize = sortEntries.size(); + std::vector> sortArray(totalSize); + for (size_t i = 0; i < totalSize; i++) { + sortArray[i] = std::make_pair(i, sortEntries[i].chunkLen + 2); + } + SORT_PARALLEL(sortArray.begin(), sortArray.end(), + [](const std::pair &a, const std::pair &b) { + if (a.second != b.second) return a.second < b.second; + return a.first > b.first; + }); + + // Compute data file offsets — cumulative sum of aligned sizes + const int ALIGN = 4; + std::vector dataOffsets(totalSize); + size_t cumOffset = 0; + for (size_t i = 0; i < totalSize; i++) { + dataOffsets[i] = cumOffset; + size_t chunkLen = sortEntries[sortArray[i].first].chunkLen; + size_t encodedSize = chunkLen + (ALIGN - chunkLen % ALIGN) % ALIGN; + cumOffset += encodedSize; + } + size_t totalDataSize = cumOffset; + + // Build entryChunkMap[origFastaId] = chunks to write for that entry + std::vector> entryChunkMap(entries.size()); + for (size_t i = 0; i < totalSize; i++) { + size_t origIdx = sortArray[i].first; + const SortEntry &se = sortEntries[origIdx]; + ChunkWriteInfo info; + info.startPos = se.startPos; + info.chunkLen = se.chunkLen; + info.dataOffset = dataOffsets[i]; + entryChunkMap[se.entryIdx].push_back(info); + } + + // ============================== + // Write headers, lookup, source (no sequence data needed) + // ============================== + + size_t maxChunkLen = doSplit ? static_cast(par.maxSeqLen) : maxEntrySeqLen; + + DBWriter dbhw(par.hdr2.c_str(), par.hdr2Index.c_str(), 1, false, Parameters::DBTYPE_GENERIC_DB); + dbhw.open(); + + struct LookupInfo { + unsigned int outputKey; + std::string parsedName; + unsigned int fileNumber; + }; + std::vector lookupEntries(totalSize); + + char orfBuffer[1024]; + + for (size_t i = 0; i < totalSize; i++) { + size_t origIdx = sortArray[i].first; + const SortEntry &se = sortEntries[origIdx]; + const EntryMeta &meta = entries[se.entryIdx]; + + unsigned int outputKey = static_cast(i); + unsigned int dbKey = se.shuffledKey; + + if (doSplit && par.headerSplitMode == 0) { + size_t fromPos = se.startPos; + size_t toPos = se.startPos + se.chunkLen - 1; + size_t bufferLen = Orf::writeOrfHeader(orfBuffer, dbKey, fromPos, toPos, 0, 0); + dbhw.writeData(orfBuffer, bufferLen, outputKey, 0); + lookupEntries[i].parsedName = Util::parseFastaHeader(orfBuffer); + lookupEntries[i].fileNumber = static_cast(origIdx); + } else { + dbhw.writeData(meta.header.c_str(), meta.header.length(), outputKey, 0); + lookupEntries[i].parsedName = Util::parseFastaHeader(meta.header.c_str()); + lookupEntries[i].fileNumber = doSplit ? static_cast(origIdx) : dbKey; + } + lookupEntries[i].outputKey = outputKey; + } + + dbhw.close(true, false); + + // Write lookup file + if (par.writeLookup == true) { + std::string lookupFile = par.db2 + ".lookup"; + FILE *file = FileUtil::openAndDelete(lookupFile.c_str(), "w"); + std::string buffer; + buffer.reserve(2048); + DBReader::LookupEntry entry; + for (size_t i = 0; i < totalSize; i++) { + entry.id = lookupEntries[i].outputKey; + entry.entryName = lookupEntries[i].parsedName; + entry.fileNumber = lookupEntries[i].fileNumber; + DBReader::lookupEntryToBuffer(buffer, entry); + size_t written = fwrite(buffer.c_str(), sizeof(char), buffer.size(), file); + if (written != buffer.size()) { + Debug(Debug::ERROR) << "Cannot write to lookup file " << lookupFile << "\n"; + EXIT(EXIT_FAILURE); + } + buffer.clear(); + } + if (fclose(file) != 0) { + Debug(Debug::ERROR) << "Cannot close file " << lookupFile << "\n"; + EXIT(EXIT_FAILURE); + } + } + + // Write source file + { + std::string sourceFile = par.db2 + ".source"; + FILE *source = FileUtil::openAndDelete(sourceFile.c_str(), "w"); + std::string sourceName = FileUtil::baseName(par.db1); + char buffer[4096]; + size_t len = snprintf(buffer, sizeof(buffer), "0\t%s\n", sourceName.c_str()); + size_t written = fwrite(buffer, sizeof(char), len, source); + if (written != len) { + Debug(Debug::ERROR) << "Cannot write to source file " << sourceFile << "\n"; + EXIT(EXIT_FAILURE); + } + if (fclose(source) != 0) { + Debug(Debug::ERROR) << "Cannot close file " << sourceFile << "\n"; + EXIT(EXIT_FAILURE); + } + } + + // ============================== + // Pass 2: Streaming encode with pwrite + // ============================== + + // Open output data file and pre-allocate + std::string dataFilePath = par.db2; + int dataFd = ::open(dataFilePath.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (dataFd < 0) { + Debug(Debug::ERROR) << "Cannot open data file " << dataFilePath << "\n"; + EXIT(EXIT_FAILURE); + } + if (totalDataSize > 0 && ftruncate(dataFd, totalDataSize) != 0) { + Debug(Debug::ERROR) << "Cannot pre-allocate data file " << dataFilePath << "\n"; + EXIT(EXIT_FAILURE); + } + + if (canMmap) { + // mmap the input FASTA for parallel random access + int mmapFd = ::open(par.db1.c_str(), O_RDONLY); + if (mmapFd < 0) { + Debug(Debug::ERROR) << "Cannot open FASTA file for mmap: " << par.db1 << "\n"; + EXIT(EXIT_FAILURE); + } + struct stat fileStat; + if (fstat(mmapFd, &fileStat) != 0) { + Debug(Debug::ERROR) << "Cannot stat FASTA file: " << par.db1 << "\n"; + EXIT(EXIT_FAILURE); + } + size_t fastaFileSize = fileStat.st_size; + char *fastaData = (char *)mmap(NULL, fastaFileSize, PROT_READ, MAP_PRIVATE, mmapFd, 0); + if (fastaData == MAP_FAILED) { + Debug(Debug::ERROR) << "Cannot mmap FASTA file: " << par.db1 << "\n"; + EXIT(EXIT_FAILURE); + } +#ifdef HAVE_POSIX_MADVISE + posix_madvise(fastaData, fastaFileSize, POSIX_MADV_RANDOM); +#endif + + SubstitutionMatrix subMat(par.scoringMatrixFile.values.aminoacid().c_str(), 2.0, par.scoreBias); + + Debug::Progress progress(entries.size()); + +#pragma omp parallel +{ + Masker masker(subMat); + Sequence seq(maxChunkLen, seqType, &subMat, 0, false, false); + std::string result; + result.reserve(maxChunkLen + ALIGN); + char *seqBuf = new char[maxEntrySeqLen + 1]; + size_t charSeqBufferSize = maxChunkLen + 1; + unsigned char *charSequence = NULL; + if (par.maskMode) { + charSequence = (unsigned char *)malloc(charSeqBufferSize * sizeof(char)); + } + +#pragma omp for schedule(dynamic, 100) + for (size_t entryIdx = 0; entryIdx < entries.size(); entryIdx++) { + progress.updateProgress(); + const EntryMeta &meta = entries[entryIdx]; + const std::vector &chunks = entryChunkMap[entryIdx]; + + // Strip newlines from mmap'd data into contiguous buffer + const char *raw = fastaData + meta.sequenceOffset; + size_t rawLen = meta.seqLen + meta.newlineCount; + size_t cleanLen = 0; + for (size_t r = 0; r < rawLen && cleanLen < meta.seqLen; r++) { + if (raw[r] != '\n' && raw[r] != '\r') { + seqBuf[cleanLen++] = raw[r]; + } + } + + for (size_t c = 0; c < chunks.size(); c++) { + const ChunkWriteInfo &chunk = chunks[c]; + const char *chunkData = seqBuf + chunk.startPos; + size_t chunkLen = chunk.chunkLen; + + seq.mapSequence(0, 0, chunkData, chunkLen); + + if (charSequence != NULL) { + if (static_cast(seq.L) >= charSeqBufferSize) { + charSeqBufferSize = static_cast(seq.L * 1.5); + charSequence = (unsigned char *)realloc(charSequence, charSeqBufferSize * sizeof(char)); + } + memcpy(charSequence, seq.numSequence, seq.L); + masker.maskSequence(seq, par.maskMode, par.maskProb, par.maskLowerCaseMode, par.maskNrepeats); + for (int j = 0; j < seq.L; j++) { + result.append(1, (seq.numSequence[j] == masker.maskLetterNum) ? charSequence[j] + 32 : charSequence[j]); + } + } else { + for (int j = 0; j < seq.L; j++) { + char aa = chunkData[j]; + result.append(1, (islower(aa)) ? seq.numSequence[j] + 32 : seq.numSequence[j]); + } + } + + const size_t seqPadding = (seq.L % ALIGN == 0) ? 0 : ALIGN - seq.L % ALIGN; + result.append(seqPadding, static_cast(24)); + + pwrite(dataFd, result.c_str(), result.size(), chunk.dataOffset); + result.clear(); + } + } + + delete[] seqBuf; + if (charSequence != NULL) { + free(charSequence); + } +} + + munmap(fastaData, fastaFileSize); + ::close(mmapFd); + } else { + // Fallback: sequential Pass 2 for compressed/stdin inputs + SubstitutionMatrix subMat(par.scoringMatrixFile.values.aminoacid().c_str(), 2.0, par.scoreBias); + Masker masker(subMat); + Sequence seq(maxChunkLen, seqType, &subMat, 0, false, false); + std::string result; + result.reserve(maxChunkLen + ALIGN); + + size_t charSeqBufferSize = maxChunkLen + 1; + unsigned char *charSequence = NULL; + if (par.maskMode) { + charSequence = (unsigned char *)malloc(charSeqBufferSize * sizeof(char)); + } + + Debug::Progress progress(entries.size()); + + KSeqWrapper *kseq = KSeqFactory(par.db1.c_str()); + size_t entryIdx = 0; + while (kseq->ReadEntry()) { + progress.updateProgress(); + + const KSeqWrapper::KSeqEntry &e = kseq->entry; + const std::vector &chunks = entryChunkMap[entryIdx]; + + for (size_t c = 0; c < chunks.size(); c++) { + const ChunkWriteInfo &chunk = chunks[c]; + const char *chunkData = e.sequence.s + chunk.startPos; + size_t chunkLen = chunk.chunkLen; + + seq.mapSequence(0, 0, chunkData, chunkLen); + + if (charSequence != NULL) { + if (static_cast(seq.L) >= charSeqBufferSize) { + charSeqBufferSize = static_cast(seq.L * 1.5); + charSequence = (unsigned char *)realloc(charSequence, charSeqBufferSize * sizeof(char)); + } + memcpy(charSequence, seq.numSequence, seq.L); + masker.maskSequence(seq, par.maskMode, par.maskProb, par.maskLowerCaseMode, par.maskNrepeats); + for (int j = 0; j < seq.L; j++) { + result.append(1, (seq.numSequence[j] == masker.maskLetterNum) ? charSequence[j] + 32 : charSequence[j]); + } + } else { + for (int j = 0; j < seq.L; j++) { + char aa = chunkData[j]; + result.append(1, (islower(aa)) ? seq.numSequence[j] + 32 : seq.numSequence[j]); + } + } + + const size_t seqPadding = (seq.L % ALIGN == 0) ? 0 : ALIGN - seq.L % ALIGN; + result.append(seqPadding, static_cast(24)); + + ssize_t written = pwrite(dataFd, result.c_str(), result.size(), chunk.dataOffset); + if (written != static_cast(result.size())) { + Debug(Debug::ERROR) << "Cannot write to data file\n"; + EXIT(EXIT_FAILURE); + } + + result.clear(); + } + + entryIdx++; + } + delete kseq; + + if (charSequence != NULL) { + free(charSequence); + } + } + + ::close(dataFd); + + // Write index file + { + FILE *idxFile = FileUtil::openAndDelete(par.db2Index.c_str(), "w"); + char buffer[1024]; + for (size_t i = 0; i < totalSize; i++) { + size_t chunkLen = sortEntries[sortArray[i].first].chunkLen; + size_t len = DBWriter::indexToBuffer(buffer, static_cast(i), dataOffsets[i], chunkLen + 2); + size_t written = fwrite(buffer, sizeof(char), len, idxFile); + if (written != len) { + Debug(Debug::ERROR) << "Cannot write to index file\n"; + EXIT(EXIT_FAILURE); + } + } + if (fclose(idxFile) != 0) { + Debug(Debug::ERROR) << "Cannot close index file\n"; + EXIT(EXIT_FAILURE); + } + } + + // Write dbtype file + { + int paddedDbType = DBReader::setExtendedDbtype(seqType, Parameters::DBTYPE_EXTENDED_GPU); + DBWriter::writeDbtypeFile(par.db2.c_str(), paddedDbType, false); + } + + return EXIT_SUCCESS; +} + int makepaddedseqdb(int argc, const char **argv, const Command &command) { Parameters &par = Parameters::getInstance(); + // Match splitsequence defaults so fused FASTA path behaves identically + par.maxSeqLen = 10000; par.parseParameters(argc, argv, command, true, 0, 0); + // Detect FASTA input: if no .dbtype file exists, run fused FASTA codepath + bool isFastaInput = !FileUtil::fileExists(par.db1dbtype.c_str()); + if (isFastaInput) { + return makepaddedseqdbFromFasta(par); + } + + // Existing DB-input codepath (unchanged) const int mode = DBReader::USE_INDEX | DBReader::USE_DATA; DBReader dbr(par.db1.c_str(), par.db1Index.c_str(), par.threads, mode); dbr.open(DBReader::SORT_BY_LENGTH); @@ -141,4 +639,4 @@ int makepaddedseqdb(int argc, const char **argv, const Command &command) { } dbr.close(); return EXIT_SUCCESS; -} \ No newline at end of file +} diff --git a/src/workflow/Databases.cpp b/src/workflow/Databases.cpp index 2bbe05397..b60d21e6d 100644 --- a/src/workflow/Databases.cpp +++ b/src/workflow/Databases.cpp @@ -276,9 +276,12 @@ int databases(int argc, const char **argv, const Command &command) { } } if (downloadIdx == -1) { - par.printUsageMessage(command, par.help ? MMseqsParameter::COMMAND_EXPERT : 0, description.c_str()); - Debug(Debug::ERROR) << "Selected database " << par.db1 << " was not found\n"; - EXIT(EXIT_FAILURE); + if (par.db1.find('/') == std::string::npos) { + par.printUsageMessage(command, par.help ? MMseqsParameter::COMMAND_EXPERT : 0, description.c_str()); + Debug(Debug::ERROR) << "Selected database " << par.db1 << " was not found\n"; + EXIT(EXIT_FAILURE); + } + // Path contains '/' — treat as local file, shell script validates existence } par.printParameters(command.cmd, argc, argv, par.databases); std::string tmpDir = par.db3; @@ -291,10 +294,14 @@ int databases(int argc, const char **argv, const Command &command) { par.filenames.push_back(tmpDir); CommandCaller cmd; - for (size_t i = 0; i < usedDownloads[downloadIdx].environment.size(); ++i) { - cmd.addVariable(usedDownloads[downloadIdx].environment[i].key, usedDownloads[downloadIdx].environment[i].value); + if (downloadIdx >= 0) { + for (size_t i = 0; i < usedDownloads[downloadIdx].environment.size(); ++i) { + cmd.addVariable(usedDownloads[downloadIdx].environment[i].key, usedDownloads[downloadIdx].environment[i].value); + } + cmd.addVariable("TAXONOMY", usedDownloads[downloadIdx].hasTaxonomy ? "TRUE" : NULL); + } else { + cmd.addVariable("TAXONOMY", NULL); } - cmd.addVariable("TAXONOMY", usedDownloads[downloadIdx].hasTaxonomy ? "TRUE" : NULL); cmd.addVariable("REMOVE_TMP", par.removeTmpFiles ? "TRUE" : NULL); cmd.addVariable("VERB_PAR", par.createParameterString(par.onlyverbosity).c_str()); cmd.addVariable("COMP_PAR", par.createParameterString(par.verbandcompression).c_str()); @@ -303,7 +310,11 @@ int databases(int argc, const char **argv, const Command &command) { cmd.addVariable("THREADS_PAR", par.createParameterString(par.onlythreads).c_str()); cmd.addVariable("THREADS_COMP_PAR", par.createParameterString(par.threadsandcompression).c_str()); std::string program = tmpDir + "/download.sh"; - FileUtil::writeFile(program, usedDownloads[downloadIdx].script, usedDownloads[downloadIdx].scriptLength); + if (downloadIdx >= 0) { + FileUtil::writeFile(program, usedDownloads[downloadIdx].script, usedDownloads[downloadIdx].scriptLength); + } else { + FileUtil::writeFile(program, databases_sh, databases_sh_len); + } cmd.execProgram(program.c_str(), par.filenames); // Should never get here diff --git a/util/benchmarks/createindex_streaming/gen_fasta.py b/util/benchmarks/createindex_streaming/gen_fasta.py new file mode 100755 index 000000000..4a82664e9 --- /dev/null +++ b/util/benchmarks/createindex_streaming/gen_fasta.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +"""Generate a synthetic nucleotide FASTA with long sequences. + +Sequences are long enough (default 50 KB) to trigger splitting with +the default --max-seq-len 10000. Used by the benchmark/correctness suite. +""" + +import argparse +import random +import sys + + +def parse_size(s): + """Parse a human-readable size string (e.g. '100M', '5G') to bytes.""" + s = s.strip().upper() + multipliers = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4} + if s[-1] in multipliers: + return int(float(s[:-1]) * multipliers[s[-1]]) + return int(s) + + +def main(): + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('--size', default='100M', + help='Target total FASTA size (e.g. 100M, 1G)') + parser.add_argument('--seq-len', type=int, default=50000, + help='Length of each sequence in bases (default: 50000)') + parser.add_argument('--seed', type=int, default=42, + help='Random seed for reproducibility') + parser.add_argument('-o', '--output', default='/dev/stdout', + help='Output file (default: stdout)') + args = parser.parse_args() + + target_bytes = parse_size(args.size) + seq_len = args.seq_len + rng = random.Random(args.seed) + bases = 'ACGT' + line_width = 80 + + written = 0 + seq_id = 0 + + out = open(args.output, 'w') if args.output != '/dev/stdout' else sys.stdout + try: + while written < target_bytes: + header = f'>synth_{seq_id} len={seq_len}\n' + out.write(header) + written += len(header) + + remaining = seq_len + while remaining > 0: + chunk = min(remaining, line_width) + line = ''.join(rng.choice(bases) for _ in range(chunk)) + out.write(line) + out.write('\n') + written += chunk + 1 + remaining -= chunk + + seq_id += 1 + finally: + if out is not sys.stdout: + out.close() + + print(f'Generated {seq_id} sequences, ~{written} bytes', file=sys.stderr) + + +if __name__ == '__main__': + main() diff --git a/util/benchmarks/createindex_streaming/run b/util/benchmarks/createindex_streaming/run new file mode 100755 index 000000000..fdbb141b4 --- /dev/null +++ b/util/benchmarks/createindex_streaming/run @@ -0,0 +1,531 @@ +#!/usr/bin/env python3 +""" +Benchmark & correctness suite for streaming SequenceLookup in createindex. + +When --index-subset 2 is used (no k-mer index, only SequenceLookup), the new +streaming path writes encoded+masked sequences directly to the .idx file without +the O(totalSeqBytes) heap allocation. + +This script verifies that the streaming path produces byte-identical SequenceLookup +data compared to the original bulk-allocation path, then measures wall-time and +peak system memory (MemAvailable drop) at various input sizes. + +Usage: + cd util/benchmarks/createindexdb_faster + ./run # default: correctness + bench 1M..50M + ./run --fast # quick smoke test: 1M only + ./run --slow # full suite: 1M..2000M + ./run /path/to/mmseqs # use a specific binary + ./run /path/to/mmseqs --fast + ./run /path/to/mmseqs --slow +""" + +import os +import shutil +import subprocess +import sys +import tempfile + +# ── Configuration ───────────────────────────────────────────────────────────── + +CORRECTNESS_SIZES_FAST = ["1M"] +CORRECTNESS_SIZES = ["1M", "10M", "50M"] + +BENCH_SIZES_FAST = ["1M"] +BENCH_SIZES = ["1M", "10M", "50M", "100M", "500M"] +BENCH_SIZES_SLOW = ["1M", "10M", "50M", "100M", "500M", "1000M", "2000M"] + +DATA_CACHE_DIR = "/tmp/bench_createindex_stream_data" + +# Keys in the .idx index file for the SequenceLookup data +SEQLOOKUP_KEYS = { + 13: "SEQCOUNT", + 14: "SEQINDEXDATA", + 15: "SEQINDEXDATASIZE", + 16: "SEQINDEXSEQOFFSET", +} + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def parse_args(argv): + """Return (mmseqs_path, mode) where mode is 'fast', 'slow', or 'default'.""" + flags = {"--fast", "--slow"} + mode = "default" + if "--fast" in argv[1:]: + mode = "fast" + elif "--slow" in argv[1:]: + mode = "slow" + rest = [a for a in argv[1:] if a not in flags] + + if rest: + p = os.path.realpath(rest[0]) + if os.path.isfile(p) and os.access(p, os.X_OK): + return p, mode + sys.exit(f"Error: {rest[0]} is not an executable") + + script_dir = os.path.dirname(os.path.abspath(__file__)) + repo_root = os.path.normpath(os.path.join(script_dir, "../../..")) + default = os.path.join(repo_root, "build", "src", "mmseqs") + if os.path.isfile(default) and os.access(default, os.X_OK): + return default, mode + sys.exit( + f"mmseqs not found at {default}\n" + "Build first (cd build && make -j) or pass the path: ./run /path/to/mmseqs" + ) + + +def run_cmd(args, **kwargs): + """Run a command silently, raising on failure.""" + subprocess.run(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, + check=True, **kwargs) + + +def read_meminfo(): + """Read MemAvailable and SwapFree from /proc/meminfo (in kB).""" + mem_avail = 0 + swap_free = 0 + with open("/proc/meminfo") as f: + for line in f: + if line.startswith("MemAvailable:"): + mem_avail = int(line.split()[1]) + elif line.startswith("SwapFree:"): + swap_free = int(line.split()[1]) + return mem_avail, swap_free + + +def measure(args): + """Run *args*; return (wall_seconds, peak_mem_kb, cpu_percent). + + Uses fork/exec/wait4 to get rusage (user+sys CPU time), and polls + /proc/meminfo for system-level peak memory (MemAvailable drop). + """ + import threading, time as _time + + mem_avail_before, _ = read_meminfo() + min_avail = mem_avail_before + stop = threading.Event() + + def poll(): + nonlocal min_avail + while not stop.is_set(): + try: + avail, _ = read_meminfo() + if avail < min_avail: + min_avail = avail + except (FileNotFoundError, ValueError): + pass + stop.wait(0.1) + + poller = threading.Thread(target=poll, daemon=True) + poller.start() + + t0 = _time.monotonic() + pid = os.fork() + if pid == 0: + devnull = os.open(os.devnull, os.O_WRONLY) + os.dup2(devnull, 1) + os.dup2(devnull, 2) + os.close(devnull) + try: + os.execvp(args[0], args) + except OSError: + os._exit(127) + else: + _, status, rusage = os.wait4(pid, 0) + elapsed = _time.monotonic() - t0 + exit_code = os.waitstatus_to_exitcode(status) + if exit_code != 0: + raise subprocess.CalledProcessError(exit_code, args) + + stop.set() + poller.join() + + cpu_s = rusage.ru_utime + rusage.ru_stime + cpu_pct = round(100 * cpu_s / elapsed, 1) if elapsed > 0 else 0 + + peak_mem_kb = mem_avail_before - min_avail + if peak_mem_kb < 0: + peak_mem_kb = 0 + return elapsed, peak_mem_kb, cpu_pct + + +def drop_page_cache_for_prefix(prefix): + """Advise kernel to drop page cache for all files matching prefix. + + Uses posix_fadvise(FADV_DONTNEED) which works without root, unlike + /proc/sys/vm/drop_caches. This ensures that subsequent mmap reads + will fault pages in from disk, making MemAvailable drops visible. + """ + d = os.path.dirname(prefix) or "." + base = os.path.basename(prefix) + if not os.path.isdir(d): + return + for name in os.listdir(d): + if name.startswith(base): + path = os.path.join(d, name) + if not os.path.isfile(path): + continue + try: + fd = os.open(path, os.O_RDONLY) + os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED) + os.close(fd) + except (OSError, AttributeError): + pass + + +def gen_fasta(gen_script, size, output): + """Generate a synthetic FASTA file, using a persistent cache.""" + if os.path.isfile(output): + return + os.makedirs(os.path.dirname(output), exist_ok=True) + tmp = output + f".tmp.{os.getpid()}" + subprocess.run( + [sys.executable, gen_script, "--size", size, "-o", tmp], + check=True, stderr=subprocess.DEVNULL, + ) + os.rename(tmp, output) + + +def read_idx_index(path): + """Parse .idx.index file -> {key: (offset, length)}.""" + entries = {} + with open(path) as f: + for line in f: + parts = line.strip().split('\t') + key, offset, length = int(parts[0]), int(parts[1]), int(parts[2]) + entries[key] = (offset, length) + return entries + + +def get_seqindexdatasize(idx_index, idx_data_path): + """Read the SEQINDEXDATASIZE value from the index.""" + if 15 not in idx_index: + return None + off, length = idx_index[15] + with open(idx_data_path, 'rb') as f: + f.seek(off) + import struct + data = f.read(8) # int64_t + return struct.unpack('= 1024: + return f"{mb / 1024:.2f} GB" + return f"{mb:.1f} MB" + + +# ── Main ────────────────────────────────────────────────────────────────────── + +def main(): + mmseqs, mode = parse_args(sys.argv) + script_dir = os.path.dirname(os.path.abspath(__file__)) + gen_script = os.path.join(script_dir, "gen_fasta.py") + + + if mode == "fast": + correctness_sizes = CORRECTNESS_SIZES_FAST + bench_sizes = BENCH_SIZES_FAST + elif mode == "slow": + correctness_sizes = CORRECTNESS_SIZES + bench_sizes = BENCH_SIZES_SLOW + else: + correctness_sizes = CORRECTNESS_SIZES + bench_sizes = BENCH_SIZES + + print(f"Using mmseqs: {mmseqs}") + print(f"Mode: {mode}") + print() + + # Ephemeral directory for correctness/perf intermediates (cleaned up) + tmpdir = tempfile.mkdtemp(prefix="bench_createindex_stream_") + print(f"Working directory: {tmpdir}") + print(f"Data cache: {DATA_CACHE_DIR}") + print() + + try: + _run(mmseqs, gen_script, tmpdir, correctness_sizes, bench_sizes) + finally: + shutil.rmtree(tmpdir, ignore_errors=True) + + +def _run(mmseqs, gen_script, tmpdir, correctness_sizes, bench_sizes): + all_sizes = sorted(set(correctness_sizes + bench_sizes), + key=lambda s: _parse_size(s)) + + # ── Generate FASTA files + padded DBs ────────────────────────────────── + # Cached in DATA_CACHE_DIR so they survive across runs. + # We use GPU-padded DBs (makepaddedseqdb) for all tests because that's + # the real use case. With --index-subset 2: + # Reference: original path (fillDatabase builds SequenceLookup in bulk) + # Test: streaming path (writes SequenceLookup directly to disk) + # + # The streaming path activates when needSequenceLookup=true && needKmerIndex=false, + # which is exactly --index-subset 2. + + os.makedirs(DATA_CACHE_DIR, exist_ok=True) + + print("========================================") + print(" Generating test data") + print("========================================") + + fasta_paths = {} + paddb_paths = {} + + for size in all_sizes: + fasta = os.path.join(DATA_CACHE_DIR, f"test_{size}.fasta") + fasta_cached = os.path.isfile(fasta) + gen_fasta(gen_script, size, fasta) + fasta_paths[size] = fasta + mb = os.path.getsize(fasta) / 1024**2 + + paddb = os.path.join(DATA_CACHE_DIR, f"paddb_{size}") + paddb_cached = os.path.isfile(paddb + ".dbtype") + if not paddb_cached: + run_cmd([mmseqs, "makepaddedseqdb", fasta, paddb, + "--max-seq-len", "10000", "--threads", "1"]) + paddb_paths[size] = paddb + + tag = "cached" if (fasta_cached and paddb_cached) else "generated" + print(f" {size:>6s} {mb:8.1f} MB ({tag})") + print() + + # ── Correctness ─────────────────────────────────────────────────────── + # Compare SequenceLookup entries between: + # Reference: createindex --index-subset 18 (2|16: no k-mer index, bulk SequenceLookup via fillDatabase) + # Test: createindex --index-subset 2 (no k-mer index, streaming SequenceLookup) + # + # Both should produce identical SEQCOUNT, SEQINDEXDATASIZE, SEQINDEXSEQOFFSET, + # and SEQINDEXDATA (for the meaningful bytes). + + print("========================================") + print(" Correctness: streaming vs bulk-allocation SequenceLookup") + print("========================================") + print() + + total_checks = 0 + total_pass = 0 + all_correct = True + + for size in correctness_sizes: + paddb = paddb_paths[size] + tmp1 = os.path.join(tmpdir, f"tmp_ref_{size}") + tmp2 = os.path.join(tmpdir, f"tmp_test_{size}") + + # Reference: --index-subset 18 (2|16: bulk SequenceLookup, no streaming) + run_cmd([mmseqs, "createindex", paddb, tmp1, + "--threads", "1", "--index-subset", "18", "--split", "1"]) + ref_idx = paddb + ".idx" + ref_idx_index = ref_idx + ".index" + ref_data_copy = os.path.join(tmpdir, f"ref_{size}.idx") + ref_index_copy = os.path.join(tmpdir, f"ref_{size}.idx.index") + shutil.copy2(ref_idx, ref_data_copy) + shutil.copy2(ref_idx_index, ref_index_copy) + + # Remove index so createindex will recreate + remove_idx_files(paddb) + + # Test: --index-subset 2 (no k-mer index, streaming SequenceLookup) + run_cmd([mmseqs, "createindex", paddb, tmp2, + "--threads", "1", "--index-subset", "2", "--split", "1"]) + test_idx = paddb + ".idx" + test_idx_index = test_idx + ".index" + + n_match, n_total, mismatch_list = compare_seqlookup_entries( + ref_index_copy, ref_data_copy, + test_idx_index, test_idx) + + total_checks += n_total + total_pass += n_match + size_ok = n_match == n_total + + if not size_ok: + all_correct = False + status = "PASS" if size_ok else "FAIL" + detail = (f"all {n_total} SequenceLookup entries match" + if size_ok + else f"mismatches: {', '.join(mismatch_list)}") + print(f" {size:>6s} {status} ({detail})") + + # Clean up + remove_idx_files(paddb) + for p in [ref_data_copy, ref_index_copy]: + if os.path.exists(p): + os.remove(p) + + print() + if all_correct: + print(f" Result: ALL PASSED ({total_pass}/{total_checks} comparisons)") + else: + print(f" Result: FAILURES detected ({total_pass}/{total_checks} passed)") + print() + print(" Aborting -- correctness must pass before benchmarking.") + sys.exit(1) + print() + + # ── Performance benchmarks ──────────────────────────────────────────── + # Measure createindex on GPU-padded DBs: + # Streaming: --index-subset 2 (new streaming path, no bulk allocation) + # Bulk alloc: --index-subset 18 (2|16: bulk SequenceLookup via fillDatabase) + # + # Both use --split 1 and all available threads. + + threads = str(os.cpu_count()) + + print("========================================") + print(f" Performance: streaming vs bulk-allocation (threads={threads})") + print("========================================") + print() + + rows = [] # (size, stream_time, stream_mem, stream_cpu, bulk_time, bulk_mem, bulk_cpu) + + for size in bench_sizes: + paddb = paddb_paths[size] + perf_tmp = os.path.join(tmpdir, f"perf_tmp_{size}") + os.makedirs(perf_tmp, exist_ok=True) + + # Drop page cache so mmap faults show up in MemAvailable delta + drop_page_cache_for_prefix(paddb) + + # Streaming path (--index-subset 2) + st, sm, sc = measure([mmseqs, "createindex", paddb, perf_tmp, + "--threads", threads, "--index-subset", "2", "--split", "1"]) + remove_idx_files(paddb) + + # Drop page cache again for fair comparison + drop_page_cache_for_prefix(paddb) + + # Bulk allocation path (--index-subset 18: 2|16, no streaming) + bt, bm, bc = measure([mmseqs, "createindex", paddb, perf_tmp, + "--threads", threads, "--index-subset", "18", "--split", "1"]) + remove_idx_files(paddb) + + rows.append((size, st, sm, sc, bt, bm, bc)) + + # ── Wall-time + CPU% table ──────────────────────────────────────────── + print("--- Wall-time / CPU% ---") + print() + print(f" {'SIZE':8s} {'STREAMING':>16s} {'BULK-ALLOC':>16s} {'TIME RATIO':>10s}") + print(f" {'--------':8s} {'----------------':>16s} {'----------------':>16s} {'----------':>10s}") + for size, st, sm, sc, bt, bm, bc in rows: + ratio = f"{st / bt:.2f}x" if bt > 0 else "n/a" + print(f" {size:8s} {st:7.3f}s {sc:6.1f}% {bt:7.3f}s {bc:6.1f}% {ratio:>10s}") + print() + + # ── Peak memory table ───────────────────────────────────────────────── + print("--- Peak memory (system-level MemAvailable drop) ---") + print() + print(f" {'SIZE':8s} {'STREAMING':>10s} {'BULK-ALLOC':>12s} {'RATIO':>10s}") + print(f" {'--------':8s} {'----------':>10s} {'------------':>12s} {'----------':>10s}") + for size, st, sm, sc, bt, bm, bc in rows: + ratio = f"{sm / bm:.2f}x" if bm > 0 else "n/a" + s_str = fmt_mem(sm) + b_str = fmt_mem(bm) + print(f" {size:8s} {s_str:>10s} {b_str:>12s} {ratio:>10s}") + print() + + # ── Combined summary ────────────────────────────────────────────────── + print("--- Combined summary ---") + print() + hdr = (f" {'SIZE':8s} | {'STREAMING (time / cpu / mem)':>30s}" + f" | {'BULK-ALLOC (time / cpu / mem)':>32s}" + f" | {'TIME RATIO':>10s} | {'MEM RATIO':>10s}") + sep = (f" {'--------':8s} | {'------------------------------':>30s}" + f" | {'--------------------------------':>32s}" + f" | {'----------':>10s} | {'----------':>10s}") + print(hdr) + print(sep) + for size, st, sm, sc, bt, bm, bc in rows: + t_ratio = f"{st / bt:.2f}x" if bt > 0 else "n/a" + m_ratio = f"{sm / bm:.2f}x" if bm > 0 else "n/a" + s_str = fmt_mem(sm) + b_str = fmt_mem(bm) + print(f" {size:8s} | {st:7.3f}s {sc:6.1f}% / {s_str:>10s}" + f" | {bt:7.3f}s {bc:6.1f}% / {b_str:>12s}" + f" | {t_ratio:>10s} | {m_ratio:>10s}") + print() + print("Done.") + + +# ── Utilities ───────────────────────────────────────────────────────────────── + +def _parse_size(s): + """'10M' -> 10_485_760 (for sorting only).""" + s = s.strip().upper() + m = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4} + if s[-1] in m: + return int(float(s[:-1]) * m[s[-1]]) + return int(s) + + +if __name__ == "__main__": + main() diff --git a/util/benchmarks/createindex_streaming/run.log b/util/benchmarks/createindex_streaming/run.log new file mode 100644 index 000000000..7e948bccb --- /dev/null +++ b/util/benchmarks/createindex_streaming/run.log @@ -0,0 +1,67 @@ +Mode: slow + +Working directory: /tmp/bench_createindex_stream_nkk87fmx +Data cache: /tmp/bench_createindex_stream_data + +======================================== + Generating test data +======================================== + 1M 1.0 MB (cached) + 10M 10.0 MB (cached) + 50M 50.0 MB (cached) + 100M 100.0 MB (cached) + 500M 500.0 MB (cached) + 1000M 1000.0 MB (cached) + 2000M 2000.0 MB (cached) + +======================================== + Correctness: streaming vs bulk-allocation SequenceLookup +======================================== + + 1M PASS (all 4 SequenceLookup entries match) + 10M PASS (all 4 SequenceLookup entries match) + 50M PASS (all 4 SequenceLookup entries match) + + Result: ALL PASSED (12/12 comparisons) + +======================================== + Performance: streaming vs bulk-allocation (threads=64) +======================================== + +--- Wall-time / CPU% --- + + SIZE STREAMING BULK-ALLOC TIME RATIO + -------- ---------------- ---------------- ---------- + 1M 0.073s 1516.5% 0.058s 1782.8% 1.25x + 10M 0.144s 1117.6% 0.145s 1109.5% 1.00x + 50M 0.278s 1524.0% 0.285s 1462.2% 0.98x + 100M 0.435s 1880.9% 0.427s 1770.3% 1.02x + 500M 1.908s 2084.3% 1.910s 2066.5% 1.00x + 1000M 3.707s 2147.0% 3.668s 2137.3% 1.01x + 2000M 7.302s 2179.2% 7.210s 2174.4% 1.01x + +--- Peak memory (system-level MemAvailable drop) --- + + SIZE STREAMING BULK-ALLOC RATIO + -------- ---------- ------------ ---------- + 1M 0.0 MB 0.0 MB n/a + 10M 17.2 MB 19.4 MB 0.88x + 50M 45.3 MB 47.2 MB 0.96x + 100M 132.2 MB 132.4 MB 1.00x + 500M 501.2 MB 547.6 MB 0.92x + 1000M 986.9 MB 1009.9 MB 0.98x + 2000M 1023.1 MB 1.83 GB 0.54x + +--- Combined summary --- + + SIZE | STREAMING (time / cpu / mem) | BULK-ALLOC (time / cpu / mem) | TIME RATIO | MEM RATIO + -------- | ------------------------------ | -------------------------------- | ---------- | ---------- + 1M | 0.073s 1516.5% / 0.0 MB | 0.058s 1782.8% / 0.0 MB | 1.25x | n/a + 10M | 0.144s 1117.6% / 17.2 MB | 0.145s 1109.5% / 19.4 MB | 1.00x | 0.88x + 50M | 0.278s 1524.0% / 45.3 MB | 0.285s 1462.2% / 47.2 MB | 0.98x | 0.96x + 100M | 0.435s 1880.9% / 132.2 MB | 0.427s 1770.3% / 132.4 MB | 1.02x | 1.00x + 500M | 1.908s 2084.3% / 501.2 MB | 1.910s 2066.5% / 547.6 MB | 1.00x | 0.92x + 1000M | 3.707s 2147.0% / 986.9 MB | 3.668s 2137.3% / 1009.9 MB | 1.01x | 0.98x + 2000M | 7.302s 2179.2% / 1023.1 MB | 7.210s 2174.4% / 1.83 GB | 1.01x | 0.54x + +Done. diff --git a/util/benchmarks/makepaddedseqdb_fused/gen_fasta.py b/util/benchmarks/makepaddedseqdb_fused/gen_fasta.py new file mode 100755 index 000000000..4a82664e9 --- /dev/null +++ b/util/benchmarks/makepaddedseqdb_fused/gen_fasta.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +"""Generate a synthetic nucleotide FASTA with long sequences. + +Sequences are long enough (default 50 KB) to trigger splitting with +the default --max-seq-len 10000. Used by the benchmark/correctness suite. +""" + +import argparse +import random +import sys + + +def parse_size(s): + """Parse a human-readable size string (e.g. '100M', '5G') to bytes.""" + s = s.strip().upper() + multipliers = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4} + if s[-1] in multipliers: + return int(float(s[:-1]) * multipliers[s[-1]]) + return int(s) + + +def main(): + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('--size', default='100M', + help='Target total FASTA size (e.g. 100M, 1G)') + parser.add_argument('--seq-len', type=int, default=50000, + help='Length of each sequence in bases (default: 50000)') + parser.add_argument('--seed', type=int, default=42, + help='Random seed for reproducibility') + parser.add_argument('-o', '--output', default='/dev/stdout', + help='Output file (default: stdout)') + args = parser.parse_args() + + target_bytes = parse_size(args.size) + seq_len = args.seq_len + rng = random.Random(args.seed) + bases = 'ACGT' + line_width = 80 + + written = 0 + seq_id = 0 + + out = open(args.output, 'w') if args.output != '/dev/stdout' else sys.stdout + try: + while written < target_bytes: + header = f'>synth_{seq_id} len={seq_len}\n' + out.write(header) + written += len(header) + + remaining = seq_len + while remaining > 0: + chunk = min(remaining, line_width) + line = ''.join(rng.choice(bases) for _ in range(chunk)) + out.write(line) + out.write('\n') + written += chunk + 1 + remaining -= chunk + + seq_id += 1 + finally: + if out is not sys.stdout: + out.close() + + print(f'Generated {seq_id} sequences, ~{written} bytes', file=sys.stderr) + + +if __name__ == '__main__': + main() diff --git a/util/benchmarks/makepaddedseqdb_fused/run b/util/benchmarks/makepaddedseqdb_fused/run new file mode 100755 index 000000000..b322f18ba --- /dev/null +++ b/util/benchmarks/makepaddedseqdb_fused/run @@ -0,0 +1,348 @@ +#!/usr/bin/env python3 +""" +Benchmark & correctness suite for the fused makepaddedseqdb FASTA codepath. + +Usage: + cd util/benchmarks/makepaddedseqdb_fused + ./run # auto-detect mmseqs from build/src/mmseqs + ./run /path/to/mmseqs # use a specific binary +""" + +import filecmp +import os +import shutil +import subprocess +import sys +import tempfile + +# ── Configuration ───────────────────────────────────────────────────────────── + +CORRECTNESS_SIZES = ["1M", "10M", "50M", "100M"] +BENCH_SIZES = ["1M", "10M", "50M", "100M", "500M"] +EXTENSIONS = ["", ".index", ".dbtype", "_h", "_h.index", ".lookup"] +FASTA_CACHE_DIR = "/tmp/bench_fused_fasta_cache" + +# Correctness test matrix: (sequence-overlap, headers-split-mode) +CORRECTNESS_CONFIGS = [ + (0, 0), # no overlap, ORF headers (original test) + (300, 0), # default splitsequence overlap, ORF headers + (100, 0), # custom overlap, ORF headers + (0, 1), # no overlap, verbatim headers + (300, 1), # default overlap, verbatim headers + (100, 1), # custom overlap, verbatim headers +] + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def find_mmseqs(argv): + """Resolve the mmseqs binary path.""" + if len(argv) >= 2: + p = os.path.realpath(argv[1]) + if os.path.isfile(p) and os.access(p, os.X_OK): + return p + sys.exit(f"Error: {argv[1]} is not an executable") + + script_dir = os.path.dirname(os.path.abspath(__file__)) + repo_root = os.path.normpath(os.path.join(script_dir, "../../..")) + default = os.path.join(repo_root, "build", "src", "mmseqs") + if os.path.isfile(default) and os.access(default, os.X_OK): + return default + sys.exit( + f"mmseqs not found at {default}\n" + "Build first (cd build && make -j) or pass the path: ./run /path/to/mmseqs" + ) + + +def run_cmd(args, **kwargs): + """Run a command silently, raising on failure.""" + subprocess.run(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, + check=True, **kwargs) + + +def measure(args): + """Run *args*; return (wall_seconds, peak_rss_kb, cpu_percent). + + Uses fork/exec/wait4 to get rusage (user+sys CPU time and peak RSS). + """ + import time as _time + + t0 = _time.monotonic() + pid = os.fork() + if pid == 0: + devnull = os.open(os.devnull, os.O_WRONLY) + os.dup2(devnull, 1) + os.dup2(devnull, 2) + os.close(devnull) + try: + os.execvp(args[0], args) + except OSError: + os._exit(127) + else: + _, status, rusage = os.wait4(pid, 0) + elapsed = _time.monotonic() - t0 + exit_code = os.waitstatus_to_exitcode(status) + if exit_code != 0: + raise subprocess.CalledProcessError(exit_code, args) + + cpu_s = rusage.ru_utime + rusage.ru_stime + cpu_pct = round(100 * cpu_s / elapsed, 1) if elapsed > 0 else 0 + peak_rss_kb = rusage.ru_maxrss + return elapsed, peak_rss_kb, cpu_pct + + +def gen_fasta(gen_script, size, output): + """Generate a synthetic FASTA file, using a persistent cache.""" + if os.path.isfile(output): + return + os.makedirs(os.path.dirname(output), exist_ok=True) + tmp = output + f".tmp.{os.getpid()}" + subprocess.run( + [sys.executable, gen_script, "--size", size, "-o", tmp], + check=True, stderr=subprocess.DEVNULL, + ) + os.rename(tmp, output) + + +def files_identical(a, b): + """True if two files are byte-for-byte identical.""" + return filecmp.cmp(a, b, shallow=False) + + +def fmt_mb(kb): + return f"{kb / 1024:.1f} MB" + + +def config_label(overlap, hdr_mode): + return f"overlap={overlap},hdr={hdr_mode}" + + +# ── Main ────────────────────────────────────────────────────────────────────── + +def main(): + mmseqs = find_mmseqs(sys.argv) + script_dir = os.path.dirname(os.path.abspath(__file__)) + gen_script = os.path.join(script_dir, "gen_fasta.py") + + print(f"Using mmseqs: {mmseqs}") + print() + + tmpdir = tempfile.mkdtemp(prefix="bench_fused_") + print(f"Working directory: {tmpdir}") + print() + + try: + _run(mmseqs, gen_script, tmpdir) + finally: + shutil.rmtree(tmpdir, ignore_errors=True) + + +def _run(mmseqs, gen_script, tmpdir): + # ── Generate FASTA files ────────────────────────────────────────────── + all_sizes = sorted(set(CORRECTNESS_SIZES + BENCH_SIZES), + key=lambda s: _parse_size(s)) + + print("========================================") + print(" Generating test FASTA files") + print("========================================") + print(f" Cache: {FASTA_CACHE_DIR}") + fasta_paths = {} + for size in all_sizes: + path = os.path.join(FASTA_CACHE_DIR, f"test_{size}.fasta") + cached = os.path.isfile(path) + gen_fasta(gen_script, size, path) + fasta_paths[size] = path + mb = os.path.getsize(path) / 1024**2 + tag = "cached" if cached else "generated" + print(f" {size:>6s} {mb:8.1f} MB ({tag})") + print() + + # ── Correctness ─────────────────────────────────────────────────────── + print("========================================") + print(" Correctness: fused vs 3-step reference") + print("========================================") + print() + print(f" Configs: {len(CORRECTNESS_CONFIGS)}") + for overlap, hdr_mode in CORRECTNESS_CONFIGS: + print(f" {config_label(overlap, hdr_mode)}") + print() + + total_checks = 0 + total_pass = 0 + all_correct = True + + # createdb output is the same for all configs, so cache it per size + db_prefixes = {} + for size in CORRECTNESS_SIZES: + fasta = fasta_paths[size] + db_prefix = os.path.join(tmpdir, f"db_{size}") + run_cmd([mmseqs, "createdb", fasta, f"{db_prefix}_db", + "--dbtype", "2", "--shuffle", "1", "--threads", "1"]) + db_prefixes[size] = db_prefix + + for overlap, hdr_mode in CORRECTNESS_CONFIGS: + label = config_label(overlap, hdr_mode) + print(f" --- {label} ---") + + for size in CORRECTNESS_SIZES: + fasta = fasta_paths[size] + db_prefix = db_prefixes[size] + tag = f"{size}_o{overlap}_h{hdr_mode}" + ref = os.path.join(tmpdir, f"ref_{tag}") + fused = os.path.join(tmpdir, f"fused_{tag}") + + # Reference: splitsequence + makepaddedseqdb (createdb already done) + run_cmd([mmseqs, "splitsequence", f"{db_prefix}_db", f"{ref}_split", + "--max-seq-len", "10000", + "--sequence-overlap", str(overlap), + "--headers-split-mode", str(hdr_mode), + "--threads", "1"]) + run_cmd([mmseqs, "makepaddedseqdb", f"{ref}_split", f"{ref}_padded", + "--threads", "1"]) + + # Fused codepath + run_cmd([mmseqs, "makepaddedseqdb", fasta, f"{fused}_padded", + "--max-seq-len", "10000", + "--sequence-overlap", str(overlap), + "--headers-split-mode", str(hdr_mode), + "--threads", "1"]) + + size_ok = True + mismatches = [] + for ext in EXTENSIONS: + total_checks += 1 + rp = f"{ref}_padded{ext}" + fp = f"{fused}_padded{ext}" + if os.path.isfile(rp) and os.path.isfile(fp) and files_identical(rp, fp): + total_pass += 1 + else: + size_ok = False + all_correct = False + mismatches.append(ext if ext else "(data)") + + status = "PASS" if size_ok else "FAIL" + detail = f"all {len(EXTENSIONS)} files match" if size_ok else f"mismatches: {', '.join(mismatches)}" + print(f" {size:>6s} {status} ({detail})") + + # Clean split/padded/fused intermediates (keep createdb) + for prefix in (ref, fused): + for f in _glob_prefix(prefix): + os.remove(f) + + print() + + # Clean createdb intermediates + for db_prefix in db_prefixes.values(): + for f in _glob_prefix(db_prefix): + os.remove(f) + + if all_correct: + print(f" Result: ALL PASSED ({total_pass}/{total_checks} file comparisons)") + else: + print(f" Result: FAILURES detected ({total_pass}/{total_checks} passed)") + print() + print(" Aborting -- correctness must pass before benchmarking.") + sys.exit(1) + print() + + # ── Performance benchmarks ──────────────────────────────────────────── + threads = str(os.cpu_count()) + + print("========================================") + print(f" Performance: fused vs reference (threads={threads})") + print("========================================") + print() + + rows = [] # list of (size, fused_time, fused_rss, fused_cpu, ref_time, ref_rss, ref_cpu) + + for size in BENCH_SIZES: + fasta = fasta_paths[size] + ref = os.path.join(tmpdir, f"perf_ref_{size}") + fused = os.path.join(tmpdir, f"perf_fused_{size}") + + # Fused + ft, fr, fc = measure([mmseqs, "makepaddedseqdb", fasta, f"{fused}_padded", + "--max-seq-len", "10000", "--threads", threads]) + + # Reference (only measure the makepaddedseqdb step) + run_cmd([mmseqs, "createdb", fasta, f"{ref}_db", + "--dbtype", "2", "--shuffle", "1", "--threads", "1"]) + run_cmd([mmseqs, "splitsequence", f"{ref}_db", f"{ref}_split", + "--max-seq-len", "10000", "--sequence-overlap", "0", + "--threads", threads]) + rt, rr, rc = measure([mmseqs, "makepaddedseqdb", f"{ref}_split", f"{ref}_padded", + "--threads", threads]) + + rows.append((size, ft, fr, fc, rt, rr, rc)) + + # Clean + for prefix in (ref, fused): + for f in _glob_prefix(prefix): + os.remove(f) + + # ── Wall-time + CPU% table ──────────────────────────────────────────── + print("--- Wall-time / CPU% ---") + print() + print(f" {'SIZE':8s} {'FUSED':>16s} {'REF(DB)':>16s} {'TIME RATIO':>10s}") + print(f" {'--------':8s} {'----------------':>16s} {'----------------':>16s} {'----------':>10s}") + for size, ft, fr, fc, rt, rr, rc in rows: + ratio = f"{ft / rt:.2f}x" if rt > 0 else "n/a" + print(f" {size:8s} {ft:7.3f}s {fc:6.1f}% {rt:7.3f}s {rc:6.1f}% {ratio:>10s}") + print() + + # ── Peak-RSS table ──────────────────────────────────────────────────── + print("--- Peak RSS ---") + print() + print(f" {'SIZE':8s} {'FUSED':>10s} {'REF(DB)':>10s} {'RATIO':>10s}") + print(f" {'--------':8s} {'----------':>10s} {'----------':>10s} {'----------':>10s}") + for size, ft, fr, fc, rt, rr, rc in rows: + ratio = f"{fr / rr:.2f}x" if rr > 0 else "n/a" + print(f" {size:8s} {fmt_mb(fr):>10s} {fmt_mb(rr):>10s} {ratio:>10s}") + print() + + # ── Combined summary ────────────────────────────────────────────────── + print("--- Combined summary ---") + print() + hdr = (f" {'SIZE':8s} | {'FUSED (time / cpu / RSS)':>28s}" + f" | {'REF (time / cpu / RSS)':>28s}" + f" | {'TIME RATIO':>10s} | {'RSS RATIO':>10s}") + sep = (f" {'--------':8s} | {'----------------------------':>28s}" + f" | {'----------------------------':>28s}" + f" | {'----------':>10s} | {'----------':>10s}") + print(hdr) + print(sep) + for size, ft, fr, fc, rt, rr, rc in rows: + t_ratio = f"{ft / rt:.2f}x" if rt > 0 else "n/a" + r_ratio = f"{fr / rr:.2f}x" if rr > 0 else "n/a" + fr_mb = fmt_mb(fr) + rr_mb = fmt_mb(rr) + print(f" {size:8s} | {ft:7.3f}s {fc:5.1f}% / {fr_mb:>8s}" + f" | {rt:7.3f}s {rc:5.1f}% / {rr_mb:>8s}" + f" | {t_ratio:>10s} | {r_ratio:>10s}") + print() + print("Done.") + + +# ── Utilities ───────────────────────────────────────────────────────────────── + +def _parse_size(s): + """'10M' -> 10_485_760 (for sorting only).""" + s = s.strip().upper() + m = {'B': 1, 'K': 1024, 'M': 1024**2, 'G': 1024**3, 'T': 1024**4} + if s[-1] in m: + return int(float(s[:-1]) * m[s[-1]]) + return int(s) + + +def _glob_prefix(prefix): + """Yield all files whose path starts with *prefix*.""" + d = os.path.dirname(prefix) + base = os.path.basename(prefix) + if not os.path.isdir(d): + return + for name in os.listdir(d): + if name.startswith(base): + yield os.path.join(d, name) + + +if __name__ == "__main__": + main() diff --git a/util/benchmarks/makepaddedseqdb_fused/run.log b/util/benchmarks/makepaddedseqdb_fused/run.log new file mode 100644 index 000000000..1af1a950a --- /dev/null +++ b/util/benchmarks/makepaddedseqdb_fused/run.log @@ -0,0 +1,97 @@ +Working directory: /tmp/bench_fused_6fj_g1fs + +======================================== + Generating test FASTA files +======================================== + Cache: /tmp/bench_fused_fasta_cache + 1M 1.0 MB (cached) + 10M 10.0 MB (cached) + 50M 50.0 MB (cached) + 100M 100.0 MB (cached) + 500M 500.0 MB (cached) + +======================================== + Correctness: fused vs 3-step reference +======================================== + + Configs: 6 + overlap=0,hdr=0 + overlap=300,hdr=0 + overlap=100,hdr=0 + overlap=0,hdr=1 + overlap=300,hdr=1 + overlap=100,hdr=1 + + --- overlap=0,hdr=0 --- + 1M PASS (all 6 files match) + 10M PASS (all 6 files match) + 50M PASS (all 6 files match) + 100M PASS (all 6 files match) + + --- overlap=300,hdr=0 --- + 1M PASS (all 6 files match) + 10M PASS (all 6 files match) + 50M PASS (all 6 files match) + 100M PASS (all 6 files match) + + --- overlap=100,hdr=0 --- + 1M PASS (all 6 files match) + 10M PASS (all 6 files match) + 50M PASS (all 6 files match) + 100M PASS (all 6 files match) + + --- overlap=0,hdr=1 --- + 1M PASS (all 6 files match) + 10M PASS (all 6 files match) + 50M PASS (all 6 files match) + 100M PASS (all 6 files match) + + --- overlap=300,hdr=1 --- + 1M PASS (all 6 files match) + 10M PASS (all 6 files match) + 50M PASS (all 6 files match) + 100M PASS (all 6 files match) + + --- overlap=100,hdr=1 --- + 1M PASS (all 6 files match) + 10M PASS (all 6 files match) + 50M PASS (all 6 files match) + 100M PASS (all 6 files match) + + Result: ALL PASSED (144/144 file comparisons) + +======================================== + Performance: fused vs reference (threads=64) +======================================== + +--- Wall-time / CPU% --- + + SIZE FUSED REF(DB) TIME RATIO + -------- ---------------- ---------------- ---------- + 1M 0.025s 2495.1% 0.119s 550.0% 0.21x + 10M 0.047s 1473.6% 0.160s 506.3% 0.29x + 50M 0.081s 1503.4% 0.188s 630.5% 0.43x + 100M 0.112s 1887.2% 0.248s 693.7% 0.45x + 500M 0.468s 2434.4% 0.649s 850.0% 0.72x + +--- Peak RSS --- + + SIZE FUSED REF(DB) RATIO + -------- ---------- ---------- ---------- + 1M 11.6 MB 11.6 MB 1.00x + 10M 18.0 MB 11.6 MB 1.55x + 50M 55.5 MB 74.0 MB 0.75x + 100M 103.0 MB 201.5 MB 0.51x + 500M 498.1 MB 973.5 MB 0.51x + +--- Combined summary --- + + SIZE | FUSED (time / cpu / RSS) | REF (time / cpu / RSS) | TIME RATIO | RSS RATIO + -------- | ---------------------------- | ---------------------------- | ---------- | ---------- + 1M | 0.025s 2495.1% / 11.6 MB | 0.119s 550.0% / 11.6 MB | 0.21x | 1.00x + 10M | 0.047s 1473.6% / 18.0 MB | 0.160s 506.3% / 11.6 MB | 0.29x | 1.55x + 50M | 0.081s 1503.4% / 55.5 MB | 0.188s 630.5% / 74.0 MB | 0.43x | 0.75x + 100M | 0.112s 1887.2% / 103.0 MB | 0.248s 693.7% / 201.5 MB | 0.45x | 0.51x + 500M | 0.468s 2434.4% / 498.1 MB | 0.649s 850.0% / 973.5 MB | 0.72x | 0.51x + +Done.