Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions data/workflow/databases.sh
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,14 @@ case "${SELECTION}" in
push_back "${TMP_PATH}/kalamari.fasta"
INPUT_TYPE="FASTA_LIST"
;;
*)
if [ ! -f "${SELECTION}" ]; then
fail "Local file not found: ${SELECTION}"
fi
push_back "${SELECTION}"
date "+%s" > "${TMP_PATH}/version"
INPUT_TYPE="LOCAL_FASTA"
;;
esac

if notExists "${OUTDB}.dbtype"; then
Expand All @@ -312,6 +320,23 @@ case "${INPUT_TYPE}" in
rm -f -- "$i"
done
;;
"LOCAL_FASTA")
eval "set -- $ARR"
# Fused pipeline: makepaddedseqdb reads FASTA directly (createdb +
# splitsequence + padding in a single streaming pass), then
# createindex --index-subset 2 streams SequenceLookup with constant
# memory via MADV_DONTNEED.
# shellcheck disable=SC2086
"${MMSEQS}" makepaddedseqdb "${@}" "${OUTDB}" \
--headers-split-mode 1 ${THREADS_PAR} \
|| fail "makepaddedseqdb died"
mkdir -p "${TMP_PATH}/indexdb"
# shellcheck disable=SC2086
"${MMSEQS}" createindex "${OUTDB}" "${TMP_PATH}/indexdb" \
--split 1 --index-subset 2 ${THREADS_PAR} \
|| fail "createindex died"
rm -rf -- "${TMP_PATH}/indexdb"
;;
"FSA")
# shellcheck disable=SC2086
"${MMSEQS}" createdb "${TMP_PATH}/"*.fsa "${OUTDB}" ${COMP_PAR} --gpu 1 \
Expand Down
17 changes: 12 additions & 5 deletions src/MMseqsBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,13 @@ std::vector<Command> baseCommands = {
{"tmpDir", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::directory }}},
{"databases", databases, &par.databases, COMMAND_DATABASE_CREATION,
"List and download databases",
NULL,
"# Download a known database\n"
"mmseqs databases UniRef30_2302 db tmp\n\n"
"# Index a local FASTA file (use ./relative or /absolute paths)\n"
"mmseqs databases ./input.fasta.gz db tmp\n"
"mmseqs databases /data/rnacentral_active.fasta.gz db tmp\n\n",
"Milot Mirdita <milot@mirdita.de>",
"<name> <o:sequenceDB> <tmpDir>",
"<name|path> <o:sequenceDB> <tmpDir>",
CITATION_TAXONOMY|CITATION_MMSEQS2, {{"selection", 0, DbType::ZERO_OR_ALL, &DbValidator::empty },
{"sequenceDB", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::sequenceDb },
{"tmpDir", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::directory }}},
Expand All @@ -149,10 +153,13 @@ std::vector<Command> baseCommands = {
{"sequenceDB", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::flatfile }}},
{"makepaddedseqdb", makepaddedseqdb, &par.makepaddedseqdb, COMMAND_HIDDEN,
"Generate a padded sequence DB",
"Generate a padded sequence DB",
"# From existing sequence DB\n"
"mmseqs makepaddedseqdb sequenceDB paddedDB\n\n"
"# Directly from FASTA (fused createdb+splitsequence+makepaddedseqdb)\n"
"mmseqs makepaddedseqdb input.fasta paddedDB --max-seq-len 10000\n",
"Milot Mirdita <milot@mirdita.de> & Martin Steinegger <martin.steinegger@snu.ac.kr>",
"<i:sequenceDB> <o:sequenceDB>",
CITATION_GPU, {{"sequenceDB", DbType::ACCESS_MODE_INPUT, DbType::NEED_DATA|DbType::NEED_HEADER, &DbValidator::sequenceDb },
"<i:fastaFile|sequenceDB> <o:sequenceDB>",
CITATION_GPU, {{"sequenceDB", DbType::ACCESS_MODE_INPUT, DbType::NEED_DATA, &DbValidator::allDbAndFlat },
{"sequenceIndexDB", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::sequenceDb }}},
{"appenddbtoindex", appenddbtoindex, &par.appenddbtoindex, COMMAND_HIDDEN,
NULL,
Expand Down
5 changes: 5 additions & 0 deletions src/commons/Parameters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,11 @@ Parameters::Parameters():
makepaddedseqdb.push_back(&PARAM_MASK_PROBABILTY);
makepaddedseqdb.push_back(&PARAM_MASK_LOWER_CASE);
makepaddedseqdb.push_back(&PARAM_MASK_N_REPEAT);
makepaddedseqdb.push_back(&PARAM_MAX_SEQ_LEN);
makepaddedseqdb.push_back(&PARAM_SEQUENCE_OVERLAP);
makepaddedseqdb.push_back(&PARAM_HEADER_SPLIT_MODE);
makepaddedseqdb.push_back(&PARAM_DB_TYPE);
makepaddedseqdb.push_back(&PARAM_SHUFFLE);
makepaddedseqdb.push_back(&PARAM_WRITE_LOOKUP);
makepaddedseqdb.push_back(&PARAM_THREADS);
makepaddedseqdb.push_back(&PARAM_V);
Expand Down
1 change: 1 addition & 0 deletions src/commons/Parameters.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class Parameters {
static const int INDEX_SUBSET_NO_PREFILTER = 2;
static const int INDEX_SUBSET_NO_ALIGNMENT = 4;
static const int INDEX_SUBSET_NO_SEQUENCE_LOOKUP = 8;
static const int INDEX_SUBSET_NO_STREAM = 16;


static std::vector<int> getOutputFormat(int formatMode, const std::string &outformat, bool &needSequences, bool &needBacktrace, bool &needFullHeaders,
Expand Down
199 changes: 199 additions & 0 deletions src/prefiltering/PrefilteringIndexReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@
#include "ExtendedSubstitutionMatrix.h"
#include "FileUtil.h"
#include "IndexBuilder.h"
#include "Masker.h"
#include "Parameters.h"

#ifdef OPENMP
#include <omp.h>
#endif

#include <sys/mman.h>

extern const char* index_version_compatible;
unsigned int PrefilteringIndexReader::VERSION = 0;
unsigned int PrefilteringIndexReader::META = 1;
Expand Down Expand Up @@ -49,6 +56,185 @@ std::string PrefilteringIndexReader::indexName(const std::string &outDB) {
return result;
}

// Stream SequenceLookup data directly to the index file without the
// O(totalSeqBytes) heap allocation that IndexBuilder::fillDatabase uses.
// Encodes + masks each sequence and writes via DBWriter streaming API.
// Produces byte-identical output to the bulk-allocation path.
//
// Memory is kept constant by calling posix_madvise(MADV_DONTNEED) on
// consumed mmap pages, so the kernel reclaims them instead of letting
// page cache grow to input size.
static void streamSequenceLookupToIndex(DBWriter &writer, unsigned int splitIdx, unsigned int keyOffset,
DBReader<unsigned int> *dbr, size_t dbFrom, size_t dbSize,
BaseMatrix *subMat, int seqType, int maxSeqLen, int kmerSize,
bool hasSpacedKmer, const std::string &spacedKmerPattern,
int maskMode, int maskLowerCase, float maskProb, int maskNrepeats,
int targetSearchMode) {
Debug(Debug::INFO) << "Streaming SequenceLookup to index (no bulk allocation)\n";

const bool isProfile = Parameters::isEqualDbtype(seqType, Parameters::DBTYPE_HMM_PROFILE);
const bool isTargetSimiliarKmerSearch = isProfile || targetSearchMode;

// Compute offsets and total data size from DB sequence lengths
size_t aaDbSize = 0;
size_t *offsets = new size_t[dbSize + 1];
offsets[0] = 0;
for (size_t id = dbFrom; id < dbFrom + dbSize; id++) {
size_t idLocal = id - dbFrom;
aaDbSize += dbr->getSeqLen(id);
offsets[idLocal + 1] = offsets[idLocal] + dbr->getSeqLen(id);
}

// SEQCOUNT
Debug(Debug::INFO) << "Write SEQCOUNT (" << (keyOffset + PrefilteringIndexReader::SEQCOUNT) << ")\n";
size_t tablesize = dbSize;
writer.writeData((char *) &tablesize, 1 * sizeof(size_t), keyOffset + PrefilteringIndexReader::SEQCOUNT, splitIdx);
writer.alignToPageSize(splitIdx);

// SEQINDEXDATASIZE
Debug(Debug::INFO) << "Write SEQINDEXDATASIZE (" << (keyOffset + PrefilteringIndexReader::SEQINDEXDATASIZE) << ")\n";
int64_t seqindexDataSize = (int64_t) aaDbSize;
writer.writeData((char *) &seqindexDataSize, 1 * sizeof(int64_t), keyOffset + PrefilteringIndexReader::SEQINDEXDATASIZE, splitIdx);
writer.alignToPageSize(splitIdx);

// SEQINDEXSEQOFFSET
Debug(Debug::INFO) << "Write SEQINDEXSEQOFFSET (" << (keyOffset + PrefilteringIndexReader::SEQINDEXSEQOFFSET) << ")\n";
writer.writeData((char *) offsets, (dbSize + 1) * sizeof(size_t), keyOffset + PrefilteringIndexReader::SEQINDEXSEQOFFSET, splitIdx);
writer.alignToPageSize(splitIdx);

// Stream SEQINDEXDATA: encode + mask each sequence and write directly
Debug(Debug::INFO) << "Write SEQINDEXDATA (" << (keyOffset + PrefilteringIndexReader::SEQINDEXDATA) << ") [streaming]\n";

bool needMasking = !isTargetSimiliarKmerSearch && (maskMode > 0 || maskNrepeats > 0 || maskLowerCase > 0);

// Allocate chunk buffer for parallel encode+mask, serial write
const size_t CHUNK_SEQ_COUNT = 100000;
size_t maxChunkBytes = 0;
for (size_t start = 0; start < dbSize; start += CHUNK_SEQ_COUNT) {
size_t end = std::min(start + CHUNK_SEQ_COUNT, dbSize);
size_t bytes = offsets[end] - offsets[start];
maxChunkBytes = std::max(maxChunkBytes, bytes);
}
char *chunkBuffer = new char[maxChunkBytes];

// Page cache management: release consumed mmap pages so memory stays
// constant regardless of input size. We track the global byte offset
// of consumed data and periodically call posix_madvise(MADV_DONTNEED)
// on the pages behind us.
const size_t PAGE_SIZE = sysconf(_SC_PAGESIZE);
const size_t ADVISE_CHUNK = 64 * 1024 * 1024; // release pages every 64 MB

size_t dataFileCnt = dbr->getDataFileCnt();

// Precompute cumulative file-start offsets: fileCumulOff[f] is the global
// byte offset where file f begins. fileCumulOff[dataFileCnt] = totalDataSize.
size_t *fileCumulOff = new size_t[dataFileCnt + 1];
fileCumulOff[0] = 0;
for (size_t f = 0; f < dataFileCnt; f++) {
fileCumulOff[f + 1] = fileCumulOff[f] + dbr->getDataSizeForFile(f);
}

#ifdef HAVE_POSIX_MADVISE
for (size_t f = 0; f < dataFileCnt; f++) {
size_t fileSize = dbr->getDataSizeForFile(f);
if (fileSize > 0) {
posix_madvise(dbr->getDataForFile(f), fileSize, POSIX_MADV_SEQUENTIAL);
}
}
#endif

// Global byte offset up to which we've advised DONTNEED (page-aligned).
size_t advisedUpTo = 0;

Debug::Progress progress(dbSize);
writer.writeStart(splitIdx);
for (size_t chunkStart = 0; chunkStart < dbSize; chunkStart += CHUNK_SEQ_COUNT) {
size_t chunkEnd = std::min(chunkStart + CHUNK_SEQ_COUNT, dbSize);
size_t chunkBytes = offsets[chunkEnd] - offsets[chunkStart];

// Parallel encode+mask into chunkBuffer
#pragma omp parallel
{
unsigned int thread_idx = 0;
#ifdef OPENMP
thread_idx = static_cast<unsigned int>(omp_get_thread_num());
#endif
Masker *masker = NULL;
if (needMasking) {
masker = new Masker(*subMat);
}
Sequence seq(maxSeqLen, seqType, subMat, kmerSize, hasSpacedKmer,
false, true, spacedKmerPattern);

#pragma omp for schedule(dynamic, 100)
for (size_t i = chunkStart; i < chunkEnd; i++) {
progress.updateProgress();
size_t id = dbFrom + i;
seq.resetCurrPos();
char *seqData = dbr->getData(id, thread_idx);
unsigned int qKey = dbr->getDbKey(id);
seq.mapSequence(i, qKey, seqData, dbr->getSeqLen(id));

if (isTargetSimiliarKmerSearch) {
unsigned char *src = isProfile ? seq.numConsensusSequence : seq.numSequence;
memcpy(chunkBuffer + (offsets[i] - offsets[chunkStart]), src, seq.L);
} else {
if (masker != NULL) {
masker->maskSequence(seq, maskMode > 0, maskProb, maskLowerCase > 0, maskNrepeats);
}
memcpy(chunkBuffer + (offsets[i] - offsets[chunkStart]), seq.numSequence, seq.L);
}
}

if (masker != NULL) {
delete masker;
}
} // end omp parallel

// Serial write of the entire chunk
writer.writeAdd(chunkBuffer, chunkBytes, splitIdx);

#ifdef HAVE_POSIX_MADVISE
// Release consumed mmap pages to keep memory constant.
size_t lastId = dbFrom + chunkEnd - 1;
size_t seqEnd = dbr->getOffset(lastId) + dbr->getEntryLen(lastId);
size_t pageAlignedEnd = (seqEnd / PAGE_SIZE) * PAGE_SIZE;
if (pageAlignedEnd > advisedUpTo + ADVISE_CHUNK) {
// Advise DONTNEED on all pages in [advisedUpTo, pageAlignedEnd)
// across whichever data files they span.
for (size_t f = 0; f < dataFileCnt; f++) {
size_t fileStart = fileCumulOff[f];
size_t fileEnd = fileCumulOff[f + 1];
// Intersect [advisedUpTo, pageAlignedEnd) with [fileStart, fileEnd)
size_t rangeStart = std::max(advisedUpTo, fileStart);
size_t rangeEnd = std::min(pageAlignedEnd, fileEnd);
if (rangeStart >= rangeEnd) {
continue;
}
// Convert to file-local offsets, page-align start upward
size_t localStart = ((rangeStart - fileStart + PAGE_SIZE - 1) / PAGE_SIZE) * PAGE_SIZE;
size_t localEnd = (rangeEnd - fileStart) / PAGE_SIZE * PAGE_SIZE;
if (localStart < localEnd) {
posix_madvise(dbr->getDataForFile(f) + localStart,
localEnd - localStart, POSIX_MADV_DONTNEED);
}
}
advisedUpTo = pageAlignedEnd;
}
#endif
}
// Sentinel byte to match original SequenceLookup buffer layout (dataSize+1 allocation)
char sentinel = '\0';
writer.writeAdd(&sentinel, 1, splitIdx);
writer.writeEnd(keyOffset + PrefilteringIndexReader::SEQINDEXDATA, splitIdx, true, true);
writer.alignToPageSize(splitIdx);

delete[] chunkBuffer;
delete[] offsets;
delete[] fileCumulOff;
dbr->remapData();
}

void PrefilteringIndexReader::createIndexFile(const std::string &outDB,
DBReader<unsigned int> *dbr1, DBReader<unsigned int> *dbr2,
DBReader<unsigned int> *hdbr1, DBReader<unsigned int> *hdbr2,
Expand Down Expand Up @@ -225,6 +411,19 @@ void PrefilteringIndexReader::createIndexFile(const std::string &outDB,
continue;
}

// When only SequenceLookup is needed (no k-mer index), stream it directly
// to disk without the O(totalSeqBytes) heap allocation.
// Use --index-subset 18 (2|16) to force the old bulk-allocation path.
const bool noStream = (indexSubset & Parameters::INDEX_SUBSET_NO_STREAM) != 0;
if (needSequenceLookup && !needKmerIndex && !noStream) {
unsigned int keyOffset = 1000 * s;
streamSequenceLookupToIndex(writer, SPLIT_INDX + s, keyOffset,
dbr1, dbFrom, dbSize, subMat, seqType, maxSeqLen, kmerSize,
hasSpacedKmer, spacedKmerPattern, maskMode, maskLowerCase,
maskProb, maskNrepeats, targetSearchMode);
continue;
}

IndexTable * indexTable;
if(needKmerIndex){
indexTable = new IndexTable(adjustAlphabetSize, kmerSize, false);
Expand Down
Loading