diff --git a/cmd/index_analyzer/main.go b/cmd/index_analyzer/main.go index b1b22323..4ea8dd44 100644 --- a/cmd/index_analyzer/main.go +++ b/cmd/index_analyzer/main.go @@ -5,12 +5,14 @@ import ( "fmt" "hash/fnv" "os" + "strings" "sync" "time" "github.com/alecthomas/units" "go.uber.org/zap" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/token" @@ -21,7 +23,7 @@ import ( // Launch as: // -// > go run ./cmd/index_analyzer/... ./data/*.index | tee ~/report.txt +// > go run ./cmd/index_analyzer/... ./data/*.info | tee ~/report.txt func main() { if len(os.Args) < 2 { fmt.Println("No args") @@ -73,45 +75,80 @@ func getCacheMaintainer() (*fracmanager.CacheMaintainer, func()) { } } +// basePath strips any known index suffix to return the fraction base path. +func basePath(path string) string { + for _, suffix := range []string{ + consts.InfoFileSuffix, + consts.TokenFileSuffix, + consts.OffsetsFileSuffix, + consts.IDFileSuffix, + consts.LIDFileSuffix, + } { + if strings.HasSuffix(path, suffix) { + return path[:len(path)-len(suffix)] + } + } + return path +} + +func openFile(path string) *os.File { + f, err := os.Open(path) + if err != nil { + panic(err) + } + return f +} + func analyzeIndex( path string, cm *fracmanager.CacheMaintainer, - reader *storage.ReadLimiter, + rl *storage.ReadLimiter, mergedTokensUniq map[string]map[string]int, allTokensValuesUniq map[string]int, ) Stats { + base := basePath(path) + indexCache := cm.CreateIndexCache() + + // Open per-section files. + infoFile := openFile(base + consts.InfoFileSuffix) + tokenFile := openFile(base + consts.TokenFileSuffix) + lidFile := openFile(base + consts.LIDFileSuffix) + defer infoFile.Close() + defer tokenFile.Close() + defer lidFile.Close() + + infoReader := storage.NewIndexReader(rl, infoFile.Name(), infoFile, indexCache.InfoRegistry) + tokenReader := storage.NewIndexReader(rl, tokenFile.Name(), tokenFile, indexCache.TokenRegistry) + lidReader := storage.NewIndexReader(rl, lidFile.Name(), lidFile, indexCache.LIDRegistry) + + // --- Info --- var blockIndex uint32 - cache := cm.CreateIndexCache() - - f, err := os.Open(path) + infoData, _, err := infoReader.ReadIndexBlock(0, nil) if err != nil { - panic(err) + logger.Fatal("error reading info block", zap.String("file", infoFile.Name()), zap.Error(err)) } + var b sealed.BlockInfo + if err := b.Unpack(infoData); err != nil { + logger.Fatal("error unpacking block info", zap.Error(err)) + } + docsCount := int(b.Info.DocsTotal) - indexReader := storage.NewIndexReader(reader, f.Name(), f, cache.Registry) - - readBlock := func() []byte { - data, _, err := indexReader.ReadIndexBlock(blockIndex, nil) + // --- Tokens (.token file) --- + // Token blocks start at index 0, followed by an empty separator, then token table blocks. + blockIndex = 0 + readTokenBlock := func() []byte { + data, _, err := tokenReader.ReadIndexBlock(blockIndex, nil) blockIndex++ if err != nil { - logger.Fatal("error reading block", zap.String("file", f.Name()), zap.Error(err)) + logger.Fatal("error reading token block", zap.String("file", tokenFile.Name()), zap.Error(err)) } return data } - // load info - var b sealed.BlockInfo - if err := b.Unpack(readBlock()); err != nil { - logger.Fatal("error unpacking block info", zap.Error(err)) - } - - docsCount := int(b.Info.DocsTotal) - - // load tokens tokens := [][]byte{} for { - data := readBlock() - if len(data) == 0 { // empty block - is section separator + data := readTokenBlock() + if len(data) == 0 { // empty block - section separator break } block := token.Block{} @@ -123,11 +160,10 @@ func analyzeIndex( } } - // load tokens table tokenTableBlocks := []token.TableBlock{} for { - data := readBlock() - if len(data) == 0 { // empty block - is section separator + data := readTokenBlock() + if len(data) == 0 { // empty block - section separator break } block := token.TableBlock{} @@ -136,28 +172,25 @@ func analyzeIndex( } tokenTable := token.TableFromBlocks(tokenTableBlocks) - // skip position - blockIndex++ - - // skip IDS - for { - data := readBlock() - if len(data) == 0 { // empty block - is section separator - break + // --- LIDs (.lid file) --- + blockIndex = 0 + readLIDBlock := func() []byte { + data, _, err := lidReader.ReadIndexBlock(blockIndex, nil) + blockIndex++ + if err != nil { + logger.Fatal("error reading lid block", zap.String("file", lidFile.Name()), zap.Error(err)) } - blockIndex++ // skip RID - blockIndex++ // skip Param + return data } - // load LIDs tid := 0 lidsTotal := 0 lidsUniq := map[[16]byte]int{} lidsLens := make([]int, len(tokens)) tokenLIDs := []uint32{} for { - data := readBlock() - if len(data) == 0 { // empty block - is section separator + data := readLIDBlock() + if len(data) == 0 { // empty block - section separator break } diff --git a/consts/consts.go b/consts/consts.go index d3ef72ff..1ee0e5ea 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -59,8 +59,26 @@ const ( SdocsTmpFileSuffix = "._sdocs" SdocsDelFileSuffix = ".sdocs.del" + InfoFileSuffix = ".info" + InfoTmpFileSuffix = "._info" + + TokenFileSuffix = ".tokens" + TokenTmpFileSuffix = "._tokens" + + OffsetsFileSuffix = ".offsets" + OffsetsTmpFileSuffix = "._offsets" + + IDFileSuffix = ".ids" + IDTmpFileSuffix = "._ids" + + LIDFileSuffix = ".lids" + LIDTmpFileSuffix = "._lids" + + // IndexFileSuffix is the legacy single-file index format (pre-split). IndexFileSuffix = ".index" IndexTmpFileSuffix = "._index" + // TODO(dkharms): [IndexDelFileSuffix] is actually not necessary. + // We can remove it in the future releases. IndexDelFileSuffix = ".index.del" RemoteFractionSuffix = ".remote" diff --git a/frac/active.go b/frac/active.go index b586db51..6c4598df 100644 --- a/frac/active.go +++ b/frac/active.go @@ -3,7 +3,6 @@ package frac import ( "context" "io" - "math" "os" "path/filepath" "sync" @@ -26,9 +25,7 @@ import ( "github.com/ozontech/seq-db/util" ) -var ( - _ Fraction = (*Active)(nil) -) +var _ Fraction = (*Active)(nil) type Active struct { Config *Config @@ -61,16 +58,6 @@ type Active struct { indexer *ActiveIndexer } -const ( - systemMID = math.MaxUint64 - systemRID = math.MaxUint64 -) - -var systemSeqID = seq.ID{ - MID: systemMID, - RID: systemRID, -} - func NewActive( baseFileName string, activeIndexer *ActiveIndexer, @@ -109,8 +96,8 @@ func NewActive( } // use of 0 as keys in maps is prohibited – it's system key, so add first element - f.MIDs.Append(systemMID) - f.RIDs.Append(systemRID) + f.MIDs.Append(uint64(seq.SystemMID)) + f.RIDs.Append(uint64(seq.SystemRID)) logger.Info("active fraction created", zap.String("fraction", baseFileName)) @@ -121,7 +108,8 @@ func mustOpenMetaWriter( baseFileName string, readLimiter *storage.ReadLimiter, docsFile *os.File, - docsStats os.FileInfo) (*os.File, *ActiveWriter, *storage.DocBlocksReader, *storage.WalReader, uint64) { + docsStats os.FileInfo, +) (*os.File, *ActiveWriter, *storage.DocBlocksReader, *storage.WalReader, uint64) { legacyMetaFileName := baseFileName + consts.MetaFileSuffix if _, err := os.Stat(legacyMetaFileName); err == nil { diff --git a/frac/active_sealing_source.go b/frac/active_sealing_source.go index 43ca0239..8c960b41 100644 --- a/frac/active_sealing_source.go +++ b/frac/active_sealing_source.go @@ -3,14 +3,12 @@ package frac import ( "bytes" "encoding/binary" - "errors" "io" "iter" "os" "path/filepath" "slices" "time" - "unsafe" "github.com/alecthomas/units" "go.uber.org/zap" @@ -24,72 +22,62 @@ import ( "github.com/ozontech/seq-db/util" ) -// ActiveSealingSource transforms data from in-memory (frac.Active) storage -// into a format suitable for disk writing during index creation. -// -// The main purpose of this type is to provide access to sorted data -// through a set of iterators that allow sequential processing of -// data in sized blocks for disk writing: -// -// - TokenBlocks() - iterator for token blocks, sorted by fields and values -// - Fields() - iterator for sorted fields with maximum TIDs -// - IDsBlocks() - iterator for document ID blocks and their positions -// - TokenLIDs() - iterator for LID lists for each token -// - Docs() - iterator for documents themselves with duplicate handling -// -// All iterators work with pre-sorted data and return information -// in an order optimal for creating disk index structures. type ActiveSealingSource struct { - params common.SealParams // Sealing parameters - info *common.Info // fraction Info - created time.Time // Creation time of the source - sortedLIDs []uint32 // Sorted LIDs (Local ID) - oldToNewLIDs []uint32 // Mapping from old LIDs to new ones (after sorting) - mids *UInt64s // MIDs - rids *UInt64s // RIDs - fields []string // Sorted field names - fieldsMaxTIDs []uint32 // Maximum TIDs for each field - tids []uint32 // Sorted TIDs (Token ID) - tokens [][]byte // Tokens (values) by TID - lids []*TokenLIDs // LID lists for each token - docPosMap map[seq.ID]seq.DocPos // Original document positions - docPosSorted []seq.DocPos // Document positions after sorting - blocksOffsets []uint64 // Document block offsets - docsReader *storage.DocsReader // Document storage reader - lastErr error // Last error + params common.SealParams // Sealing parameters + + info *common.Info // fraction Info + created time.Time // Creation time of the source + + blocksOffsets []uint64 // Document block offsets + + sortedLIDs []uint32 // Sorted LIDs (Local ID) + oldToNewLIDs []uint32 // Mapping from old LIDs to new ones (after sorting) + + mids *UInt64s // MIDs + rids *UInt64s // RIDs + + fields []string // Sorted field names + fieldTid map[string][]uint32 // Each field contains sorted TIDs based on token value + tokens [][]byte // Tokens (values) by TID + lids []*TokenLIDs // LID lists for each token + + docPosMap map[seq.ID]seq.DocPos // Original document positions + docPosSorted []seq.DocPos // Document positions after sorting + docsReader *storage.DocsReader // Document storage reader + + lastErr error // Last error } -// NewActiveSealingSource creates a new data source for sealing -// based on an active in-memory index. func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSealingSource, error) { info := *active.info // copy + sortedLIDs := active.GetAllDocuments() + fields, fieldTid := sortFields(active.TokenList) - // Sort fields and get maximum TIDs for each field - sortedFields, fieldsMaxTIDs := sortFields(active.TokenList) + src := ActiveSealingSource{ + params: params, - // Sort tokens within each field - sortedTIDs := sortTokens(sortedFields, active.TokenList) + info: &info, + created: time.Now(), + + sortedLIDs: sortedLIDs, + oldToNewLIDs: makeInverser(sortedLIDs), // Create LID mapping + + mids: active.MIDs, + rids: active.RIDs, + + fields: fields, + fieldTid: fieldTid, + tokens: active.TokenList.tidToVal, + lids: active.TokenList.tidToLIDs, - src := ActiveSealingSource{ - params: params, - info: &info, - created: time.Now(), - sortedLIDs: sortedLIDs, - oldToNewLIDs: makeInverser(sortedLIDs), // Create LID mapping - mids: active.MIDs, - rids: active.RIDs, - fields: sortedFields, - tids: sortedTIDs, - fieldsMaxTIDs: fieldsMaxTIDs, - tokens: active.TokenList.tidToVal, - lids: active.TokenList.tidToLIDs, docPosMap: active.DocsPositions.idToPos, blocksOffsets: active.DocBlocks.vals, docsReader: &active.sortReader, } src.prepareInfo() + src.prepareLids() // Sort documents if not skipped in configuration if !active.Config.SkipSortDocs { @@ -101,49 +89,70 @@ func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSe return &src, nil } -// sortFields sorts field names and calculates maximum TIDs for each field. -// Returns sorted field list and array of maximum TIDs. -func sortFields(tl *TokenList) ([]string, []uint32) { +func sortFields(tl *TokenList) ([]string, map[string][]uint32) { fields := make([]string, 0, len(tl.FieldTIDs)) - for field := range tl.FieldTIDs { + fieldTid := make(map[string][]uint32, len(tl.FieldTIDs)) + + for field, tids := range tl.FieldTIDs { fields = append(fields, field) - } - slices.Sort(fields) - pos := 0 - maxTIDs := make([]uint32, 0, len(fields)) - for _, field := range fields { - pos += len(tl.FieldTIDs[field]) - maxTIDs = append(maxTIDs, uint32(pos)) + // Make a copy because this memory is shared + // with concurrent readers (user search queries). + cp := slices.Clone(tids) + + slices.SortFunc(cp, func(i, j uint32) int { + return bytes.Compare(tl.tidToVal[i], tl.tidToVal[j]) + }) + + fieldTid[field] = cp } - return fields, maxTIDs + slices.Sort(fields) + return fields, fieldTid } -// sortTokens sorts tokens lexicographically within each field. -// Returns sorted list of TIDs. -func sortTokens(sortedFields []string, tl *TokenList) []uint32 { - pos := 0 - tids := make([]uint32, 0, len(tl.tidToVal)) - for _, field := range sortedFields { - tids = append(tids, tl.FieldTIDs[field]...) - chunk := tids[pos:] - slices.SortFunc(chunk, func(i, j uint32) int { - a := tl.tidToVal[i] - b := tl.tidToVal[j] - return bytes.Compare(a, b) // Sort by token value - }) - pos = len(tids) +func (src *ActiveSealingSource) ID() iter.Seq2[seq.ID, seq.DocPos] { + return func(yield func(seq.ID, seq.DocPos) bool) { + mids := src.mids.vals + rids := src.rids.vals + + // System ID and DocPos are not stored in `src.sortedLIDs`. + // However we do have to yield them to preserve 1-baseed indexing for ids. + if !yield(seq.SystemID, seq.SystemDocPos) { + return + } + + for i, lid := range src.sortedLIDs { + id := seq.ID{ + MID: seq.MID(mids[lid]), + RID: seq.RID(rids[lid]), + } + + // Documents were not sorted previously. + if len(src.docPosSorted) == 0 { + if !yield(id, src.docPosMap[id]) { + return + } + continue + } + + // `i` in range [0; len(src.sortedLIDs)) + // but lids indexes are 1-based. + if !yield(id, src.docPosSorted[i+1]) { + return + } + } } - return tids } -// LastError returns the last error that occurred during processing. +func (src *ActiveSealingSource) BlockOffsets() []uint64 { + return src.blocksOffsets +} + func (src *ActiveSealingSource) LastError() error { return src.lastErr } -// prepareInfo prepares metadata for disk writing. func (src *ActiveSealingSource) prepareInfo() { src.info.MetaOnDisk = 0 src.info.SealingTime = uint64(src.created.UnixMilli()) @@ -155,130 +164,46 @@ func (src *ActiveSealingSource) prepareInfo() { src.info.BuildDistribution(mids) } -// Info returns index metadata information. -func (src *ActiveSealingSource) Info() *common.Info { - return src.info +func (src *ActiveSealingSource) prepareLids() { + for _, tl := range src.lids[1:] { + tl.GetLIDs(src.mids, src.rids) + } } -// TokenBlocks returns an iterator for token blocks for disk writing. -// Tokens are pre-sorted: first by fields, then lexicographically within each field. -// Each block contains up to blockSize bytes of data for efficient writing. -func (src *ActiveSealingSource) TokenBlocks(blockSize int) iter.Seq[[][]byte] { - const tokenLengthSize = int(unsafe.Sizeof(uint32(0))) - return func(yield func([][]byte) bool) { - if len(src.tids) == 0 { - return - } - if blockSize <= 0 { - src.lastErr = errors.New("sealing: token block size must be > 0") - return - } - - actualSize := 0 - block := make([][]byte, 0, blockSize) - - // Iterate through all sorted TIDs - for _, tid := range src.tids { - if actualSize >= blockSize { - if !yield(block) { - return - } - actualSize = 0 - block = block[:0] - } - token := src.tokens[tid] - actualSize += tokenLengthSize // Add the size of the token length field - actualSize += len(token) // Add the size of the token itself - block = append(block, token) - } - yield(block) - } +func (src *ActiveSealingSource) Info() *common.Info { + return src.info } -// Fields returns an iterator for sorted fields and their maximum TIDs. -// Fields are sorted lexicographically, ensuring predictable order -// when building disk index structures. -func (src *ActiveSealingSource) Fields() iter.Seq2[string, uint32] { - return func(yield func(string, uint32) bool) { - for i, field := range src.fields { - if !yield(field, src.fieldsMaxTIDs[i]) { +func (src *ActiveSealingSource) TokenTriplet() iter.Seq2[string, iter.Seq2[[]byte, []uint32]] { + return func(yield func(string, iter.Seq2[[]byte, []uint32]) bool) { + for _, field := range src.fields { + if !yield(field, src.tokensForField(field)) { return } } } } -// IDsBlocks returns an iterator for document ID blocks and corresponding positions. -// IDs are sorted. Block size is controlled by blockSize parameter for balance between -// performance and memory usage. -func (src *ActiveSealingSource) IDsBlocks(blockSize int) iter.Seq2[[]seq.ID, []seq.DocPos] { - return func(yield func([]seq.ID, []seq.DocPos) bool) { - mids := src.mids.vals - rids := src.rids.vals - - ids := make([]seq.ID, 0, blockSize) - pos := make([]seq.DocPos, 0, blockSize) - - // First reserved ID (system). This position is not used because Local IDs (LIDs) use 1-based indexing. - ids = append(ids, seq.ID{MID: seq.MID(mids[0]), RID: seq.RID(rids[0])}) - pos = append(pos, 0) - - // Iterate through sorted LIDs - for i, lid := range src.sortedLIDs { - if len(ids) == blockSize { - if !yield(ids, pos) { - return - } - ids = ids[:0] - pos = pos[:0] - } - id := seq.ID{MID: seq.MID(mids[lid]), RID: seq.RID(rids[lid])} - ids = append(ids, id) - - // Use sorted or original positions - if len(src.docPosSorted) == 0 { - pos = append(pos, src.docPosMap[id]) - } else { - pos = append(pos, src.docPosSorted[i+1]) // +1 for system document - } - } - yield(ids, pos) - } -} +func (src *ActiveSealingSource) tokensForField(field string) iter.Seq2[[]byte, []uint32] { + var lidsbuf []uint32 + return func(yield func([]byte, []uint32) bool) { + for _, tid := range src.fieldTid[field] { + token := src.tokens[tid] -// BlocksOffsets returns document block offsets. -func (src *ActiveSealingSource) BlocksOffsets() []uint64 { - return src.blocksOffsets -} + lids := src.lids[tid].GetLIDs(src.mids, src.rids) + lidsbuf = slices.Grow(lidsbuf[:0], len(lids)) -// TokenLIDs returns an iterator for LID lists for each token. -// LIDs are converted to new numbering after document sorting. -// Each iterator call returns a list of documents containing a specific token, -// in sorted order. -func (src *ActiveSealingSource) TokenLIDs() iter.Seq[[]uint32] { - return func(yield func([]uint32) bool) { - newLIDs := []uint32{} - - // For each sorted TID - for _, tid := range src.tids { - // Get original LIDs for this token - oldLIDs := src.lids[tid].GetLIDs(src.mids, src.rids) - newLIDs = slices.Grow(newLIDs[:0], len(oldLIDs)) - - // Convert old LIDs to new through mapping - for _, lid := range oldLIDs { - newLIDs = append(newLIDs, src.oldToNewLIDs[lid]) + for _, lid := range lids { + lidsbuf = append(lidsbuf, src.oldToNewLIDs[lid]) } - if !yield(newLIDs) { + if !yield(token, lidsbuf) { return } } } } -// makeInverser creates an array for converting old LIDs to new ones. -// sortedLIDs[i] = oldLID -> inverser[oldLID] = i+1 func makeInverser(sortedLIDs []uint32) []uint32 { inverser := make([]uint32, len(sortedLIDs)+1) for i, lid := range sortedLIDs { @@ -297,22 +222,18 @@ func (src *ActiveSealingSource) Docs() iter.Seq2[seq.ID, []byte] { curDoc []byte ) - // Iterate through ID and position blocks - for ids, pos := range src.IDsBlocks(consts.IDsPerBlock) { - for i, id := range ids { - if id == systemSeqID { - curDoc = nil // reserved system document (no payload) - } else if id != prev { - // If ID changed, read new document - if curDoc, src.lastErr = src.doc(pos[i]); src.lastErr != nil { - return - } - } - prev = id - if !yield(id, curDoc) { + for id, pos := range src.ID() { + if id == seq.SystemID { + curDoc = nil // reserved system document (no payload) + } else if id != prev { + if curDoc, src.lastErr = src.doc(pos); src.lastErr != nil { return } } + prev = id + if !yield(id, curDoc) { + return + } } } } diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index 70af8f46..050e55db 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -24,7 +24,7 @@ import ( "github.com/ozontech/seq-db/parser" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" - test_common "github.com/ozontech/seq-db/tests/common" + testcommon "github.com/ozontech/seq-db/tests/common" "github.com/ozontech/seq-db/tokenizer" ) @@ -38,9 +38,9 @@ func TestConcurrentAppendAndQuery(t *testing.T) { docs, bulks, fromTime, toTime := generatesMessages(numWriters*numMessagesPerWriter, bulkSize) - tmpDir := test_common.CreateTempDir() + tmpDir := testcommon.CreateTempDir() fracPath := filepath.Join(tmpDir, "test_fraction") - defer test_common.RemoveDir(fracPath) + defer testcommon.RemoveDir(fracPath) activeIndexer, stop := NewActiveIndexer(numIndexWorkers, 1000) defer stop() @@ -353,13 +353,17 @@ func seal(active *Active) (*Sealed, error) { return nil, err } indexCache := &IndexCache{ - MIDs: cache.NewCache[[]byte](nil, nil), - RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), - Params: cache.NewCache[seqids.BlockParams](nil, nil), - LIDs: cache.NewCache[*lids.Block](nil, nil), - Tokens: cache.NewCache[*token.Block](nil, nil), - TokenTable: cache.NewCache[token.Table](nil, nil), - Registry: cache.NewCache[[]byte](nil, nil), + MIDs: cache.NewCache[[]byte](nil, nil), + RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), + Params: cache.NewCache[seqids.BlockParams](nil, nil), + LIDs: cache.NewCache[*lids.Block](nil, nil), + Tokens: cache.NewCache[*token.Block](nil, nil), + TokenTable: cache.NewCache[token.Table](nil, nil), + InfoRegistry: cache.NewCache[[]byte](nil, nil), + TokenRegistry: cache.NewCache[[]byte](nil, nil), + OffsetsRegistry: cache.NewCache[[]byte](nil, nil), + IDRegistry: cache.NewCache[[]byte](nil, nil), + LIDRegistry: cache.NewCache[[]byte](nil, nil), } sealed := NewSealedPreloaded( active.BaseFileName, diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 8e44f566..8ac45512 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1833,8 +1833,8 @@ func (s *FractionTestSuite) TestFractionInfo() { "index on disk doesn't match. actual value: %d", info.IndexOnDisk) case *Remote: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") - s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1500), - "index on disk doesn't match. actual value: %d", info.MetaOnDisk) + s.Require().True(info.IndexOnDisk > uint64(1400) && info.IndexOnDisk < uint64(1600), + "index on disk doesn't match. actual value: %d", info.IndexOnDisk) default: s.Require().Fail("unsupported fraction type") } @@ -2085,13 +2085,17 @@ func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed { s.Require().NoError(err, "Sealing failed") indexCache := &IndexCache{ - MIDs: cache.NewCache[[]byte](nil, nil), - RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), - Params: cache.NewCache[seqids.BlockParams](nil, nil), - LIDs: cache.NewCache[*lids.Block](nil, nil), - Tokens: cache.NewCache[*token.Block](nil, nil), - TokenTable: cache.NewCache[token.Table](nil, nil), - Registry: cache.NewCache[[]byte](nil, nil), + MIDs: cache.NewCache[[]byte](nil, nil), + RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), + Params: cache.NewCache[seqids.BlockParams](nil, nil), + LIDs: cache.NewCache[*lids.Block](nil, nil), + Tokens: cache.NewCache[*token.Block](nil, nil), + TokenTable: cache.NewCache[token.Table](nil, nil), + InfoRegistry: cache.NewCache[[]byte](nil, nil), + TokenRegistry: cache.NewCache[[]byte](nil, nil), + OffsetsRegistry: cache.NewCache[[]byte](nil, nil), + IDRegistry: cache.NewCache[[]byte](nil, nil), + LIDRegistry: cache.NewCache[[]byte](nil, nil), } sealed := NewSealedPreloaded( @@ -2102,6 +2106,7 @@ func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed { cache.NewCache[[]byte](nil, nil), s.config, ) + active.Release() return sealed } @@ -2273,13 +2278,17 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal sealed.Release() indexCache := &IndexCache{ - MIDs: cache.NewCache[[]byte](nil, nil), - RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), - Params: cache.NewCache[seqids.BlockParams](nil, nil), - LIDs: cache.NewCache[*lids.Block](nil, nil), - Tokens: cache.NewCache[*token.Block](nil, nil), - TokenTable: cache.NewCache[token.Table](nil, nil), - Registry: cache.NewCache[[]byte](nil, nil), + MIDs: cache.NewCache[[]byte](nil, nil), + RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), + Params: cache.NewCache[seqids.BlockParams](nil, nil), + LIDs: cache.NewCache[*lids.Block](nil, nil), + Tokens: cache.NewCache[*token.Block](nil, nil), + TokenTable: cache.NewCache[token.Table](nil, nil), + InfoRegistry: cache.NewCache[[]byte](nil, nil), + TokenRegistry: cache.NewCache[[]byte](nil, nil), + OffsetsRegistry: cache.NewCache[[]byte](nil, nil), + IDRegistry: cache.NewCache[[]byte](nil, nil), + LIDRegistry: cache.NewCache[[]byte](nil, nil), } sealed = NewSealed( @@ -2288,7 +2297,10 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal indexCache, cache.NewCache[[]byte](nil, nil), nil, - s.config) + s.config, + false, + ) + s.fraction = sealed return sealed } @@ -2339,13 +2351,17 @@ func (s *RemoteFractionTestSuite) SetupTest() { s.Require().True(offloaded, "didn't offload frac") indexCache := &IndexCache{ - MIDs: cache.NewCache[[]byte](nil, nil), - RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), - Params: cache.NewCache[seqids.BlockParams](nil, nil), - LIDs: cache.NewCache[*lids.Block](nil, nil), - Tokens: cache.NewCache[*token.Block](nil, nil), - TokenTable: cache.NewCache[token.Table](nil, nil), - Registry: cache.NewCache[[]byte](nil, nil), + MIDs: cache.NewCache[[]byte](nil, nil), + RIDs: cache.NewCache[seqids.BlockRIDs](nil, nil), + Params: cache.NewCache[seqids.BlockParams](nil, nil), + LIDs: cache.NewCache[*lids.Block](nil, nil), + Tokens: cache.NewCache[*token.Block](nil, nil), + TokenTable: cache.NewCache[token.Table](nil, nil), + InfoRegistry: cache.NewCache[[]byte](nil, nil), + TokenRegistry: cache.NewCache[[]byte](nil, nil), + OffsetsRegistry: cache.NewCache[[]byte](nil, nil), + IDRegistry: cache.NewCache[[]byte](nil, nil), + LIDRegistry: cache.NewCache[[]byte](nil, nil), } remoteFrac := NewRemote( @@ -2356,7 +2372,10 @@ func (s *RemoteFractionTestSuite) SetupTest() { cache.NewCache[[]byte](nil, nil), sealed.info, s.config, - s3cli) + s3cli, + false, + ) + s.fraction = remoteFrac } } diff --git a/frac/index_cache.go b/frac/index_cache.go index 4536fa22..852fe51f 100644 --- a/frac/index_cache.go +++ b/frac/index_cache.go @@ -8,7 +8,14 @@ import ( ) type IndexCache struct { - Registry *cache.Cache[[]byte] + // Per-file registry caches (each IndexReader needs its own). + InfoRegistry *cache.Cache[[]byte] + TokenRegistry *cache.Cache[[]byte] + OffsetsRegistry *cache.Cache[[]byte] + IDRegistry *cache.Cache[[]byte] + LIDRegistry *cache.Cache[[]byte] + + // Block-level data caches shared across all readers. MIDs *cache.Cache[[]byte] RIDs *cache.Cache[seqids.BlockRIDs] Params *cache.Cache[seqids.BlockParams] @@ -18,11 +25,15 @@ type IndexCache struct { } func (s *IndexCache) Release() { + s.InfoRegistry.Release() + s.TokenRegistry.Release() + s.OffsetsRegistry.Release() + s.IDRegistry.Release() + s.LIDRegistry.Release() s.LIDs.Release() s.MIDs.Release() s.RIDs.Release() s.Params.Release() - s.Registry.Release() s.Tokens.Release() s.TokenTable.Release() } diff --git a/frac/remote.go b/frac/remote.go index c2088caa..33cf938f 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -23,9 +23,7 @@ import ( "github.com/ozontech/seq-db/util" ) -var ( - _ Fraction = (*Remote)(nil) -) +var _ Fraction = (*Remote)(nil) // Remote fraction is a fraction that is backed by remote storage. // @@ -45,9 +43,25 @@ type Remote struct { docsCache *cache.Cache[[]byte] docsReader storage.DocsReader - indexFile storage.ImmutableFile - indexCache *IndexCache - indexReader storage.IndexReader + // IsLegacy is true for fractions that use the old single .index file format. + IsLegacy bool + legacyFile storage.ImmutableFile + legacyReader storage.IndexReader + + // Per-section index files and their readers (new split format only). + infoFile storage.ImmutableFile + tokenFile storage.ImmutableFile + offsetsFile storage.ImmutableFile + idFile storage.ImmutableFile + lidFile storage.ImmutableFile + + infoReader storage.IndexReader + tokenReader storage.IndexReader + offsetsReader storage.IndexReader + idReader storage.IndexReader + lidReader storage.IndexReader + + indexCache *IndexCache loadMu *sync.RWMutex isLoaded bool @@ -66,6 +80,7 @@ func NewRemote( info *common.Info, config *Config, s3cli *s3.Client, + isLegacy bool, ) *Remote { f := &Remote{ ctx: ctx, @@ -80,7 +95,8 @@ func NewRemote( BaseFileName: baseFile, Config: config, - s3cli: s3cli, + s3cli: s3cli, + IsLegacy: isLegacy, } // Fast path if fraction-info cache exists AND it has valid index size. @@ -95,15 +111,15 @@ func NewRemote( // I wrote a small proposal on how we can reduce impact of such events. // https://github.com/ozontech/seq-db/issues/92 - if err := f.openIndex(); err != nil { + if err := f.openInfo(); err != nil { logger.Error( - "cannot open index file: any subsequent operation will fail", + "cannot open info file: any subsequent operation will fail", zap.String("fraction", filepath.Base(f.BaseFileName)), zap.Error(err), ) } - f.info = loadHeader(f.indexFile, f.indexReader) + f.info = loadInfo(f.infoReader) return f } @@ -140,20 +156,33 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e ) return nil, err } + + tokenReader := &f.tokenReader + lidReader := &f.lidReader + idReader := &f.idReader + + if f.IsLegacy { + tokenReader = &f.legacyReader + lidReader = &f.legacyReader + idReader = &f.legacyReader + } + return &sealedDataProvider{ - ctx: ctx, + ctx: ctx, + fractionTypeLabel: "remote", + info: f.info, config: f.Config, docsReader: &f.docsReader, blocksOffsets: f.blocksData.BlocksOffsets, lidsTable: f.blocksData.LIDsTable, - lidsLoader: lids.NewLoader(&f.indexReader, f.indexCache.LIDs), - tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.indexReader, f.indexCache.Tokens), - tokenTableLoader: token.NewTableLoader(f.BaseFileName, &f.indexReader, f.indexCache.TokenTable), + lidsLoader: lids.NewLoader(lidReader, f.indexCache.LIDs), + tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens), + tokenTableLoader: token.NewTableLoader(f.BaseFileName, tokenReader, f.indexCache.TokenTable), idsTable: &f.blocksData.IDsTable, idsProvider: seqids.NewProvider( - &f.indexReader, + idReader, f.indexCache.MIDs, f.indexCache.RIDs, f.indexCache.Params, @@ -180,7 +209,14 @@ func (f *Remote) Suicide() { files := []string{ filepath.Base(f.BaseFileName) + consts.DocsFileSuffix, filepath.Base(f.BaseFileName) + consts.SdocsFileSuffix, + // Legacy single-file format. filepath.Base(f.BaseFileName) + consts.IndexFileSuffix, + // New split format. + filepath.Base(f.BaseFileName) + consts.InfoFileSuffix, + filepath.Base(f.BaseFileName) + consts.TokenFileSuffix, + filepath.Base(f.BaseFileName) + consts.OffsetsFileSuffix, + filepath.Base(f.BaseFileName) + consts.IDFileSuffix, + filepath.Base(f.BaseFileName) + consts.LIDFileSuffix, } err := f.s3cli.Remove(f.ctx, files...) @@ -213,34 +249,133 @@ func (f *Remote) load() error { return err } - (&Loader{}).Load(&f.blocksData, f.info, &f.indexReader) - f.isLoaded = true + if f.IsLegacy { + (&LegacyLoader{}).Load(&f.blocksData, f.info, f.legacyReader) + f.isLoaded = true + return nil + } + + (&Loader{}).Load(&f.blocksData, f.info, IndexReaders{ + Info: f.infoReader, + Token: f.tokenReader, + Offsets: f.offsetsReader, + ID: f.idReader, + LID: f.lidReader, + }) + f.isLoaded = true return nil } +func (f *Remote) openInfo() error { + if f.IsLegacy { + if f.legacyFile != nil { + return nil + } + + indexName := filepath.Base(f.BaseFileName) + consts.IndexFileSuffix + f.legacyFile = s3.NewReader(f.ctx, f.s3cli, indexName) + + f.legacyReader = storage.NewIndexReader( + f.readLimiter, indexName, + f.legacyFile, f.indexCache.InfoRegistry, + ) + + // infoReader is used by [loadInfo] + f.infoReader = f.legacyReader + return nil + } + + if f.infoFile != nil { + return nil + } + + return f.openRemoteFile(consts.InfoFileSuffix, func(file storage.ImmutableFile) { + f.infoFile = file + f.infoReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.InfoRegistry, + ) + }) +} + func (f *Remote) openIndex() error { - if f.indexFile != nil { + if err := f.openInfo(); err != nil { + return err + } + + if f.IsLegacy { return nil } - name := filepath.Base(f.BaseFileName) + consts.IndexFileSuffix + if f.tokenFile == nil { + if err := f.openRemoteFile(consts.TokenFileSuffix, func(file storage.ImmutableFile) { + f.tokenFile = file + f.tokenReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.TokenRegistry, + ) + }); err != nil { + return err + } + } + + if f.offsetsFile == nil { + if err := f.openRemoteFile(consts.OffsetsFileSuffix, func(file storage.ImmutableFile) { + f.offsetsFile = file + f.offsetsReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.OffsetsRegistry, + ) + }); err != nil { + return err + } + } + + if f.idFile == nil { + if err := f.openRemoteFile(consts.IDFileSuffix, func(file storage.ImmutableFile) { + f.idFile = file + f.idReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.IDRegistry, + ) + }); err != nil { + return err + } + } + + if f.lidFile == nil { + if err := f.openRemoteFile(consts.LIDFileSuffix, func(file storage.ImmutableFile) { + f.lidFile = file + f.lidReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.LIDRegistry, + ) + }); err != nil { + return err + } + } + + return nil +} + +func (f *Remote) openRemoteFile(suffix string, assign func(storage.ImmutableFile)) error { + name := filepath.Base(f.BaseFileName) + suffix ok, err := f.s3cli.Exists(f.ctx, name) if err != nil { return fmt.Errorf( "cannot check existence of %q file: %w", - consts.IndexFileSuffix, err, + suffix, err, ) } - if ok { - f.indexFile = s3.NewReader(f.ctx, f.s3cli, name) - f.indexReader = storage.NewIndexReader(f.readLimiter, f.indexFile.Name(), f.indexFile, f.indexCache.Registry) - return nil + if !ok { + return fmt.Errorf("missing %q file", suffix) } - return fmt.Errorf("missing %q file", consts.IndexFileSuffix) + assign(s3.NewReader(f.ctx, f.s3cli, name)) + return nil } func (f *Remote) openDocs() error { diff --git a/frac/sealed.go b/frac/sealed.go index c4c033d8..8d53cc93 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -24,9 +24,7 @@ import ( "github.com/ozontech/seq-db/util" ) -var ( - _ Fraction = (*Sealed)(nil) -) +var _ Fraction = (*Sealed)(nil) type Sealed struct { Config *Config @@ -39,9 +37,25 @@ type Sealed struct { docsCache *cache.Cache[[]byte] docsReader storage.DocsReader - indexFile *os.File - indexCache *IndexCache - indexReader storage.IndexReader + // IsLegacy is true for fractions that use the old single .index file format. + IsLegacy bool + legacyFile *os.File + legacyReader storage.IndexReader + + // Per-section index files and their readers (new split format only). + infoFile *os.File + tokenFile *os.File + offsetsFile *os.File + idFile *os.File + lidFile *os.File + + infoReader storage.IndexReader + tokenReader storage.IndexReader + offsetsReader storage.IndexReader + idReader storage.IndexReader + lidReader storage.IndexReader + + indexCache *IndexCache loadMu *sync.RWMutex isLoaded bool @@ -68,6 +82,7 @@ func NewSealed( docsCache *cache.Cache[[]byte], info *common.Info, config *Config, + isLegacy bool, ) *Sealed { f := &Sealed{ loadMu: &sync.RWMutex{}, @@ -76,6 +91,7 @@ func NewSealed( docsCache: docsCache, indexCache: indexCache, + IsLegacy: isLegacy, info: info, BaseFileName: baseFile, Config: config, @@ -83,44 +99,140 @@ func NewSealed( PartialSuicideMode: Off, } - // fast path if fraction-info cache exists AND it has valid index size + // Fast path: if info cache has valid index size, skip opening the info file now. if info != nil && info.IndexOnDisk > 0 { return f } - f.openIndex() - f.info = loadHeader(f.indexFile, f.indexReader) + f.openInfo() + f.info = loadInfo(f.infoReader) + f.info.IndexOnDisk = computeIndexOnDisk(f.BaseFileName, f.IsLegacy) return f } -func (f *Sealed) openIndex() { - if f.indexFile == nil { - var err error +func (f *Sealed) openInfo() { + if f.IsLegacy { + if f.legacyFile != nil { + return + } + name := f.BaseFileName + consts.IndexFileSuffix - f.indexFile, err = os.Open(name) + file, err := os.Open(name) if err != nil { - logger.Fatal("can't open index file", zap.String("file", name), zap.Error(err)) + logger.Fatal( + "can't open legacy index file", + zap.String("file", name), + zap.Error(err), + ) } - f.indexReader = storage.NewIndexReader(f.readLimiter, f.indexFile.Name(), f.indexFile, f.indexCache.Registry) + + f.legacyFile = file + f.legacyReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.InfoRegistry, + ) + + // infoReader is used by [loadInfo] + f.infoReader = f.legacyReader + return + } + + if f.infoFile != nil { + return + } + + name := f.BaseFileName + consts.InfoFileSuffix + file, err := os.Open(name) + if err != nil { + logger.Fatal( + "can't open info file", + zap.String("file", name), + zap.Error(err), + ) + } + + f.infoFile = file + f.infoReader = storage.NewIndexReader( + f.readLimiter, file.Name(), + file, f.indexCache.InfoRegistry, + ) +} + +func (f *Sealed) openIndex() { + f.openInfo() + if f.IsLegacy { + return + } + + if f.tokenFile == nil { + name := f.BaseFileName + consts.TokenFileSuffix + file, err := os.Open(name) + if err != nil { + logger.Fatal("can't open token file", zap.String("file", name), zap.Error(err)) + } + f.tokenFile = file + f.tokenReader = storage.NewIndexReader(f.readLimiter, file.Name(), file, f.indexCache.TokenRegistry) + } + + if f.offsetsFile == nil { + name := f.BaseFileName + consts.OffsetsFileSuffix + file, err := os.Open(name) + if err != nil { + logger.Fatal("can't open offsets file", zap.String("file", name), zap.Error(err)) + } + f.offsetsFile = file + f.offsetsReader = storage.NewIndexReader(f.readLimiter, file.Name(), file, f.indexCache.OffsetsRegistry) + } + + if f.idFile == nil { + name := f.BaseFileName + consts.IDFileSuffix + file, err := os.Open(name) + if err != nil { + logger.Fatal("can't open id file", zap.String("file", name), zap.Error(err)) + } + f.idFile = file + f.idReader = storage.NewIndexReader(f.readLimiter, file.Name(), file, f.indexCache.IDRegistry) + } + + if f.lidFile == nil { + name := f.BaseFileName + consts.LIDFileSuffix + file, err := os.Open(name) + if err != nil { + logger.Fatal("can't open lid file", zap.String("file", name), zap.Error(err)) + } + f.lidFile = file + f.lidReader = storage.NewIndexReader(f.readLimiter, file.Name(), file, f.indexCache.LIDRegistry) } } func (f *Sealed) openDocs() { - if f.docsFile == nil { - var err error - f.docsFile, err = os.Open(f.BaseFileName + consts.SdocsFileSuffix) // try first open *.sdocs file + if f.docsFile != nil { + return + } + + var err error + f.docsFile, err = os.Open(f.BaseFileName + consts.SdocsFileSuffix) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + logger.Fatal( + "can't open sdocs file", + zap.String("frac", f.BaseFileName), + zap.Error(err), + ) + } + + f.docsFile, err = os.Open(f.BaseFileName + consts.DocsFileSuffix) if err != nil { - if !errors.Is(err, os.ErrNotExist) { - logger.Fatal("can't open sdocs file", zap.String("frac", f.BaseFileName), zap.Error(err)) - } - f.docsFile, err = os.Open(f.BaseFileName + consts.DocsFileSuffix) // fallback to *.docs file - if err != nil { - logger.Fatal("can't open docs file", zap.String("frac", f.BaseFileName), zap.Error(err)) - } + logger.Fatal( + "can't open docs file", + zap.String("frac", f.BaseFileName), + zap.Error(err), + ) } - f.docsReader = storage.NewDocsReader(f.readLimiter, f.docsFile, f.docsCache) } + + f.docsReader = storage.NewDocsReader(f.readLimiter, f.docsFile, f.docsCache) } func NewSealedPreloaded( @@ -146,7 +258,7 @@ func NewSealedPreloaded( Config: config, } - // put the token table built during sealing into the cache of the sealed fraction + // Put token table built during sealing into the cache. indexCache.TokenTable.Get(token.CacheKeyTable, func() (token.Table, int) { return preloaded.TokenTable, preloaded.TokenTable.Size() }) @@ -172,18 +284,31 @@ func (f *Sealed) load() { f.loadMu.Lock() defer f.loadMu.Unlock() - if !f.isLoaded { + if f.isLoaded { + return + } - f.openDocs() - f.openIndex() + f.openDocs() + f.openIndex() - (&Loader{}).Load(&f.blocksData, f.info, &f.indexReader) + if f.IsLegacy { + (&LegacyLoader{}).Load(&f.blocksData, f.info, f.legacyReader) f.isLoaded = true + return } + + (&Loader{}).Load(&f.blocksData, f.info, IndexReaders{ + Info: f.infoReader, + Token: f.tokenReader, + Offsets: f.offsetsReader, + ID: f.idReader, + LID: f.lidReader, + }) + + f.isLoaded = true } -// Offload saves `.docs` (or `.sdocs`) and `.index` files into remote storage. -// It does not free any of the occupied memory (nor on disk nor in memory). +// Offload saves all index files and docs to remote storage. func (f *Sealed) Offload(ctx context.Context, u storage.Uploader) (bool, error) { f.loadMu.Lock() f.openDocs() @@ -192,14 +317,22 @@ func (f *Sealed) Offload(ctx context.Context, u storage.Uploader) (bool, error) g, gctx := errgroup.WithContext(ctx) g.Go(func() error { return u.Upload(gctx, f.docsFile) }) - g.Go(func() error { return u.Upload(gctx, f.indexFile) }) + + if f.IsLegacy { + g.Go(func() error { return u.Upload(gctx, f.legacyFile) }) + } else { + g.Go(func() error { return u.Upload(gctx, f.infoFile) }) + g.Go(func() error { return u.Upload(gctx, f.tokenFile) }) + g.Go(func() error { return u.Upload(gctx, f.offsetsFile) }) + g.Go(func() error { return u.Upload(gctx, f.idFile) }) + g.Go(func() error { return u.Upload(gctx, f.lidFile) }) + } if err := g.Wait(); err != nil { return true, err } remoteFracName := f.BaseFileName + consts.RemoteFractionSuffix - file, err := os.Create(remoteFracName) if err != nil { return true, err @@ -211,15 +344,31 @@ func (f *Sealed) Offload(ctx context.Context, u storage.Uploader) (bool, error) } func (f *Sealed) Release() { - if f.docsFile != nil { - if err := f.docsFile.Close(); err != nil { - logger.Error("can't close docs file", zap.String("frac", f.BaseFileName), zap.Error(err)) + indexFiles := []*os.File{ + f.docsFile, + f.infoFile, + f.tokenFile, + f.offsetsFile, + f.idFile, + f.lidFile, + } + + if f.IsLegacy { + indexFiles = []*os.File{ + f.docsFile, + f.legacyFile, } } - if f.indexFile != nil { - if err := f.indexFile.Close(); err != nil { - logger.Error("can't close index file", zap.String("frac", f.BaseFileName), zap.Error(err)) + for _, file := range indexFiles { + if file != nil { + if err := file.Close(); err != nil { + logger.Error( + "can't close file", + zap.String("file", file.Name()), + zap.Error(err), + ) + } } } @@ -230,13 +379,14 @@ func (f *Sealed) Release() { func (f *Sealed) Suicide() { f.Release() - // make some atomic magic, to be more stable on removing fractions + // Rename docs atomically first — this commits the intent to delete. oldPath := f.BaseFileName + consts.DocsFileSuffix newPath := f.BaseFileName + consts.DocsDelFileSuffix if err := os.Rename(oldPath, newPath); err != nil && !errors.Is(err, os.ErrNotExist) { - logger.Error("can't rename docs file", - zap.String("old_path", oldPath), - zap.String("new_path", newPath), + logger.Error( + "can't rename docs file", + zap.String("old", oldPath), + zap.String("new", newPath), zap.Error(err), ) } @@ -244,9 +394,10 @@ func (f *Sealed) Suicide() { oldPath = f.BaseFileName + consts.SdocsFileSuffix newPath = f.BaseFileName + consts.SdocsDelFileSuffix if err := os.Rename(oldPath, newPath); err != nil && !errors.Is(err, os.ErrNotExist) { - logger.Error("can't rename sdocs file", - zap.String("old_path", oldPath), - zap.String("new_path", newPath), + logger.Error( + "can't rename sdocs file", + zap.String("old", oldPath), + zap.String("new", newPath), zap.Error(err), ) } @@ -255,28 +406,35 @@ func (f *Sealed) Suicide() { return } - oldPath = f.BaseFileName + consts.IndexFileSuffix - newPath = f.BaseFileName + consts.IndexDelFileSuffix - if err := os.Rename(oldPath, newPath); err != nil { - logger.Error("can't rename index file", - zap.String("old_path", oldPath), - zap.String("new_path", newPath), - zap.Error(err), - ) + // Delete all index files directly (they are regenerable; no atomic rename needed). + indexSuffixes := []string{ + consts.InfoFileSuffix, + consts.TokenFileSuffix, + consts.OffsetsFileSuffix, + consts.IDFileSuffix, + consts.LIDFileSuffix, } - rmPath := f.BaseFileName + consts.DocsDelFileSuffix - if err := os.Remove(rmPath); err != nil && !errors.Is(err, os.ErrNotExist) { - logger.Error("can't remove docs file", - zap.String("file", rmPath), - zap.Error(err), - ) + if f.IsLegacy { + indexSuffixes = []string{ + consts.IndexFileSuffix, + } + } + + for _, suffix := range indexSuffixes { + if err := os.Remove(f.BaseFileName + suffix); err != nil && !errors.Is(err, os.ErrNotExist) { + logger.Error( + "can't remove index file", + zap.String("file", f.BaseFileName+suffix), + zap.Error(err), + ) + } } - rmPath = f.BaseFileName + consts.SdocsDelFileSuffix - if err := os.Remove(rmPath); err != nil && !errors.Is(err, os.ErrNotExist) { - logger.Error("can't remove sdocs file", - zap.String("file", rmPath), + if err := os.Remove(f.BaseFileName + consts.DocsDelFileSuffix); err != nil && !errors.Is(err, os.ErrNotExist) { + logger.Error( + "can't remove docs del file", + zap.String("frac", f.BaseFileName), zap.Error(err), ) } @@ -285,10 +443,10 @@ func (f *Sealed) Suicide() { return } - rmPath = f.BaseFileName + consts.IndexDelFileSuffix - if err := os.Remove(rmPath); err != nil { - logger.Error("can't remove index file", - zap.String("file", rmPath), + if err := os.Remove(f.BaseFileName + consts.SdocsDelFileSuffix); err != nil && !errors.Is(err, os.ErrNotExist) { + logger.Error( + "can't remove sdocs del file", + zap.String("frac", f.BaseFileName), zap.Error(err), ) } @@ -301,19 +459,28 @@ func (f *Sealed) String() string { func (f *Sealed) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) { dp := f.createDataProvider(ctx) defer dp.release() - return dp.Fetch(ids) } func (f *Sealed) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) { dp := f.createDataProvider(ctx) defer dp.release() - return dp.Search(params) } func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { f.load() + + tokenReader := &f.tokenReader + lidReader := &f.lidReader + idReader := &f.idReader + + if f.IsLegacy { + tokenReader = &f.legacyReader + lidReader = &f.legacyReader + idReader = &f.legacyReader + } + return &sealedDataProvider{ ctx: ctx, fractionTypeLabel: "sealed", @@ -323,13 +490,13 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { docsReader: &f.docsReader, blocksOffsets: f.blocksData.BlocksOffsets, lidsTable: f.blocksData.LIDsTable, - lidsLoader: lids.NewLoader(&f.indexReader, f.indexCache.LIDs), - tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.indexReader, f.indexCache.Tokens), - tokenTableLoader: token.NewTableLoader(f.BaseFileName, &f.indexReader, f.indexCache.TokenTable), + lidsLoader: lids.NewLoader(lidReader, f.indexCache.LIDs), + tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens), + tokenTableLoader: token.NewTableLoader(f.BaseFileName, tokenReader, f.indexCache.TokenTable), idsTable: &f.blocksData.IDsTable, idsProvider: seqids.NewProvider( - &f.indexReader, + idReader, f.indexCache.MIDs, f.indexCache.RIDs, f.indexCache.Params, @@ -351,39 +518,48 @@ func (f *Sealed) IsIntersecting(from, to seq.MID) bool { return f.info.IsIntersecting(from, to) } -func loadHeader( - indexFile storage.ImmutableFile, - indexReader storage.IndexReader, -) *common.Info { - block, _, err := indexReader.ReadIndexBlock(0, nil) +func loadInfo(infoReader storage.IndexReader) *common.Info { + block, _, err := infoReader.ReadIndexBlock(0, nil) if err != nil { - logger.Fatal( - "error reading info block from index", - zap.String("file", indexFile.Name()), - zap.Error(err), - ) + logger.Fatal("error reading info block", zap.Error(err)) } var bi sealed.BlockInfo if err := bi.Unpack(block); err != nil { - logger.Fatal( - "error unpacking info block", - zap.String("file", indexFile.Name()), - zap.Error(err), - ) + logger.Fatal("error unpacking info block", zap.Error(err)) } - info := bi.Info - // set index size - stat, err := indexFile.Stat() - if err != nil { - logger.Fatal( - "can't stat index file", - zap.String("file", indexFile.Name()), - zap.Error(err), - ) + return bi.Info +} + +// computeIndexOnDisk returns the total on-disk size of index files for a local fraction. +func computeIndexOnDisk(basePath string, isLegacy bool) uint64 { + suffixes := []string{ + consts.InfoFileSuffix, + consts.TokenFileSuffix, + consts.OffsetsFileSuffix, + consts.IDFileSuffix, + consts.LIDFileSuffix, + } + + if isLegacy { + suffixes = []string{ + consts.IndexFileSuffix, + } + } + + var total int64 + for _, suffix := range suffixes { + st, err := os.Stat(basePath + suffix) + if err != nil { + logger.Fatal( + "can't stat index file", + zap.String("file", basePath+suffix), + zap.Error(err), + ) + } + total += st.Size() } - info.IndexOnDisk = uint64(stat.Size()) - return info + return uint64(total) } diff --git a/frac/sealed/sealing/blocks_builder.go b/frac/sealed/sealing/blocks_builder.go index 14a5cac7..f91a4f9a 100644 --- a/frac/sealed/sealing/blocks_builder.go +++ b/frac/sealed/sealing/blocks_builder.go @@ -2,8 +2,8 @@ package sealing import ( "encoding/binary" - "errors" "iter" + "unsafe" "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/seqids" @@ -54,85 +54,109 @@ func (bb *blocksBuilder) LastError() error { return bb.lastErr } -// BuildTokenBlocks converts token batches into token blocks with field tables. The function creates an iterator -// that returns token blocks and corresponding field tables describing which fields are covered by which tokens -// in the block. -// -// Visualization of relationships between fields, tokens, and table entries: -// -// Field Ranges: <-------f1----------><------f2-------><------------f3------------><----------f4----------> -// Token Blocks: [.t1.t2.t3.t4.][.t5.t6.t7.t8.][.t9....etc...][.............][.............][.............] -// Field Entries: {-----f1------}{-f1-}{---f2--}{--f2--}{-f3--}{------f3-----}{-f3-}{----f4-}{-----f4------} -// -// So we split field ranges into field entries - sub-ranges of fields aligned to block boundaries. -// Each field table (token.FieldTable) links a field to a blocks and token ranges inside the blocks. -// -// Parameters: -// - tokenBatches: Iterator of token batches, where each batch becomes a separate block -// - fields: Iterator of [fieldName, maxTID] pairs for all fields in ascending TID order -// -// Returns: Iterator of [token block, field table for block] pairs, where field table contains -// information about which fields and their ranges are represented in this block. func (bb *blocksBuilder) BuildTokenBlocks( - tokenBatches iter.Seq[[][]byte], - fields iter.Seq2[string, uint32], + it iter.Seq2[string, iter.Seq2[[]byte, []uint32]], + accumulate func([]uint32) error, blockCapacity int, ) iter.Seq2[tokensSealBlock, []token.FieldTable] { return func(yield func(tokensSealBlock, []token.FieldTable) bool) { - // Create pull iterator for fields - convert Seq2 to a function that can be called on demand - getNextField, stop := iter.Pull2(fields) - defer stop() + accumulate := func(lids []uint32) error { + if err := accumulate(lids); err != nil { + bb.lastErr = err + return err + } + return nil + } var ( - hasMore bool - currentTID uint32 = 1 // Current TID to process - fieldMaxTID uint32 = 0 // Maximum TID of current field (0 = field not yet selected) - fieldName string // Current field name + block tokensSealBlock + blockIdx uint32 + blockSize int ) - // Iterate through all token blocks created from batches - for idx, block := range createTokensSealBlocks(tokenBatches) { - table := []token.FieldTable{} - // Process all TIDs in current block (from currentTID to block.ext.maxTID) - for currentTID <= block.ext.maxTID { - // If current field doesn't cover currentTID, get next field - // This happens when: 1) field not yet selected, 2) current field has ended - if fieldMaxTID < currentTID { - if fieldName, fieldMaxTID, hasMore = getNextField(); !hasMore { - bb.lastErr = errors.New("not enough fields to cover all TIDs") + var ( + currentTID uint32 + pendingTable []token.FieldTable + fieldName string + fieldEntryStartTID uint32 + ) + + emitFieldEntry := func() { + // Handle case when field does not have tokens. + if fieldName == "" || fieldEntryStartTID > currentTID { + return + } + + entry := newTokenTableEntry(fieldEntryStartTID, currentTID, blockIdx, block) + pendingTable = append(pendingTable, token.FieldTable{ + Field: fieldName, + Entries: []*token.TableEntry{entry}, + }) + } + + flushBlock := func() bool { + emitFieldEntry() + block.ext.maxTID = currentTID + + if !yield(block, pendingTable) { + return false + } + + block.payload.Payload = block.payload.Payload[:0] + block.payload.Offsets = block.payload.Offsets[:0] + block.ext.minTID = currentTID + 1 + + blockIdx++ + blockSize = 0 + + pendingTable = pendingTable[:0] + fieldEntryStartTID = currentTID + 1 + + return true + } + + block.ext.minTID = 1 + for field, tokIt := range it { + emitFieldEntry() + + fieldName = field + fieldEntryStartTID = currentTID + 1 + + for tok, lids := range tokIt { + tokenSize := int(unsafe.Sizeof(uint32(0))) + len(tok) + + if blockSize > 0 && blockSize+tokenSize > blockCapacity { + if !flushBlock() { return } } - // Entry covers TIDs from currentTID to min(fieldMaxTID, block.ext.maxTID) - entry := createTokenTableEntry(currentTID, fieldMaxTID, idx, block) - table = append(table, token.FieldTable{Field: fieldName, Entries: []*token.TableEntry{entry}}) - currentTID += entry.ValCount - } - if !yield(block, table) { - return // Consumer requested stop + block.payload.Offsets = append(block.payload.Offsets, uint32(len(block.payload.Payload))) + block.payload.Payload = binary.LittleEndian.AppendUint32(block.payload.Payload, uint32(len(tok))) + block.payload.Payload = append(block.payload.Payload, tok...) + + if err := accumulate(lids); err != nil { + bb.lastErr = err + return + } + + currentTID++ + blockSize += tokenSize } } - // Verify consistency - if currentTID-1 != fieldMaxTID { - bb.lastErr = errors.New("fields and tokens not consistent") - } else if _, _, hasMore = getNextField(); hasMore { - bb.lastErr = errors.New("excess field after processing all blocks") + if blockSize > 0 { + flushBlock() } } } -// createTokenTableEntry creates a token table entry for a field-block span. -// Calculates the range of tokens belonging to a field within a specific block. -// Parameters: -// - entryStartTID: Starting token ID for this entry -// - fieldMaxTID: Maximum token ID for the field -// - blockIndex: Index of the current token block -// - block: Current token block data -func createTokenTableEntry(entryStartTID, fieldMaxTID, blockIndex uint32, block tokensSealBlock) *token.TableEntry { +func newTokenTableEntry( + entryStartTID, entryEndTID uint32, + blockIndex uint32, block tokensSealBlock, +) *token.TableEntry { // Convert global TIDs to block-local indices firstIndex := entryStartTID - block.ext.minTID - lastIndex := min(fieldMaxTID, block.ext.maxTID) - block.ext.minTID + lastIndex := entryEndTID - block.ext.minTID // Extract min and max token values for the entry range minVal := string(block.payload.GetToken(int(firstIndex))) @@ -148,159 +172,105 @@ func createTokenTableEntry(entryStartTID, fieldMaxTID, blockIndex uint32, block } } -// BuildLIDsBlocks constructs LID blocks from Token LID sequences. -// Processes LIDs grouped by TID and creates optimally sized blocks: -// - Splits large LID sequences across multiple blocks -// - Tracks continuation status between blocks -// -// Parameters: -// - tokenLIDs: Sequence of LID arrays, one per TokenID, in TID order -// - blockCapacity: Maximum number of LIDs per block -// -// Returns: -// - iter.Seq[lidsSealBlock]: Sequence of sealed LID blocks -func (bb *blocksBuilder) BuildLIDsBlocks(tokenLIDs iter.Seq[[]uint32], blockCapacity int) iter.Seq[lidsSealBlock] { - return func(yield func(lidsSealBlock) bool) { - if blockCapacity <= 0 { - bb.lastErr = errors.New("sealing: LID block size must be > 0") - return - } - var ( - currentTID uint32 // Current TID being processed - currentBlock lidsSealBlock // Current block under construction - isEndOfToken bool // Flag for end of current token's LIDs - isContinued bool // Flag for block continuation - ) +// seqBlockID accumulates scalar (ID, position) pairs into sealed ID blocks. +// A new block is yielded every `blockCapacity` IDs. +func seqBlockID(ids iter.Seq2[seq.ID, seq.DocPos], blockCapacity int) iter.Seq[idsSealBlock] { + return func(yield func(idsSealBlock) bool) { + var block idsSealBlock - // Initialize first block - currentBlock.ext.minTID = 1 - currentBlock.payload = lids.Block{ - LIDs: make([]uint32, 0, blockCapacity), // Pre-allocate with capacity - Offsets: []uint32{0}, // Start with initial offset - } + for id, pos := range ids { + block.mids.Values = append(block.mids.Values, uint64(id.MID)) + block.rids.Values = append(block.rids.Values, uint64(id.RID)) + block.params.Values = append(block.params.Values, uint64(pos)) + + if len(block.mids.Values) == blockCapacity { + if !yield(block) { + return + } - // finalizeBlock prepares and yields the current block - finalizeBlock := func() bool { - if !isEndOfToken { - // Add final offset for current token if not already done - currentBlock.payload.Offsets = append(currentBlock.payload.Offsets, uint32(len(currentBlock.payload.LIDs))) + block.mids.Values = block.mids.Values[:0] + block.rids.Values = block.rids.Values[:0] + block.params.Values = block.params.Values[:0] } - currentBlock.payload.IsLastLID = isEndOfToken // TODO(eguguchkin): Remove legacy field - currentBlock.ext.isContinued = isContinued // TODO(eguguchkin): Remove legacy field - isContinued = !isEndOfToken - return yield(currentBlock) } - // Process LIDs for each TID - for lidsBatch := range tokenLIDs { - currentTID++ + if len(block.mids.Values) > 0 { + yield(block) + } + } +} - for _, lid := range lidsBatch { - // Check if block reached capacity - if len(currentBlock.payload.LIDs) == blockCapacity { - if !finalizeBlock() { - return - } - // Initialize new block - currentBlock.ext.minTID = currentTID - currentBlock.payload.LIDs = currentBlock.payload.LIDs[:0] - currentBlock.payload.Offsets = currentBlock.payload.Offsets[:1] // Reset to initial offset - } +type lidBlocksAcc struct { + blockCapacity int - isEndOfToken = false - currentBlock.ext.maxTID = currentTID - currentBlock.payload.LIDs = append(currentBlock.payload.LIDs, lid) // Add each LID to the block - } + currentTID uint32 + currentBlock lidsSealBlock - // Store offset and mark end of current token - currentBlock.payload.Offsets = append(currentBlock.payload.Offsets, uint32(len(currentBlock.payload.LIDs))) - isEndOfToken = true - } + isEndOfToken bool + isContinued bool +} + +func newLIDBlocksAccumulator(blockCapacity int) *lidBlocksAcc { + a := &lidBlocksAcc{blockCapacity: blockCapacity} - // Yield the final block - finalizeBlock() + a.currentBlock.ext.minTID = 1 + a.currentBlock.payload = lids.Block{ + LIDs: make([]uint32, 0, blockCapacity), + Offsets: []uint32{0}, } + + return a } -// createIDsSealBlocks converts sequences of IDs and positions into sealed ID blocks. -// Transforms raw ID sequences into optimized block format for storage: -// - Processes IDs in batches for efficiency -// - Maintains correlation between IDs and their positions -// - Creates separate slices for MIDs, RIDs, and positions -// -// Parameters: -// - idsBatches: Sequence of ID batches with corresponding document positions +// Add processes LIDs of one token (must be called in TID order). // -// Returns: -// - iter.Seq[idsSealBlock]: Sequence of sealed ID blocks -func createIDsSealBlocks(idsBatches iter.Seq2[[]seq.ID, []seq.DocPos]) iter.Seq[idsSealBlock] { - return func(yield func(idsSealBlock) bool) { - block := idsSealBlock{} - - // Process each batch of IDs and positions - for ids, positions := range idsBatches { - // Reset block arrays for new batch - block.mids.Values = block.mids.Values[:0] - block.rids.Values = block.rids.Values[:0] - block.params.Values = block.params.Values[:0] - - // Convert each ID and position to storage format - for i, id := range ids { - block.mids.Values = append(block.mids.Values, uint64(id.MID)) - block.rids.Values = append(block.rids.Values, uint64(id.RID)) - block.params.Values = append(block.params.Values, uint64(positions[i])) +// For each block that fills up, `onBlock` is called immediately +// before the backing arrays are reset, so `onBlock` may read the +// block data but must not retain references to it. +func (a *lidBlocksAcc) Add(lidsbuf []uint32, onBlock func(lidsSealBlock) error) error { + a.currentTID++ + + for _, lid := range lidsbuf { + if len(a.currentBlock.payload.LIDs) == a.blockCapacity { + if err := onBlock(a.finalizeBlock()); err != nil { + return err } - // Yield completed block - if !yield(block) { - return - } + a.currentBlock.ext.minTID = a.currentTID + a.currentBlock.payload.LIDs = a.currentBlock.payload.LIDs[:0] + a.currentBlock.payload.Offsets = a.currentBlock.payload.Offsets[:1] } - } -} -// createTokensSealBlocks converts raw token sequences into sealed token blocks. -// Transforms batches of tokens into optimized storage format: -// - Merges a set of byte slices into a contiguous slice Payload and a slice of Offsets -// - Tracks token ID ranges for indexing [MinTID, MaxTID] -// -// Parameters: -// - tokenBatches: Sequence of token batches to process -// -// Returns: -// - iter.Seq[uint32, tokensSealBlock]: Sequence of sealed token blocks with their indexes -func createTokensSealBlocks(tokenBatches iter.Seq[[][]byte]) iter.Seq2[uint32, tokensSealBlock] { - return func(yield func(uint32, tokensSealBlock) bool) { - var ( - idx uint32 // 1-based block index - currentTID uint32 // Current token ID counter - block tokensSealBlock // Current block under construction - ) + a.isEndOfToken = false + a.currentBlock.ext.maxTID = a.currentTID + a.currentBlock.payload.LIDs = append(a.currentBlock.payload.LIDs, lid) + } - // Process each batch of tokens - for tokens := range tokenBatches { - idx++ - // Initialize new block - block.ext.minTID = currentTID + 1 - block.payload.Payload = block.payload.Payload[:0] - block.payload.Offsets = block.payload.Offsets[:0] + a.isEndOfToken = true + a.currentBlock.payload.Offsets = append( + a.currentBlock.payload.Offsets, + uint32(len(a.currentBlock.payload.LIDs)), + ) - // Process each token in current batch - for _, tokenData := range tokens { - currentTID++ - // Store offset to current token - block.payload.Offsets = append(block.payload.Offsets, uint32(len(block.payload.Payload))) - // Store token length (little-endian) followed by token bytes - block.payload.Payload = binary.LittleEndian.AppendUint32(block.payload.Payload, uint32(len(tokenData))) - block.payload.Payload = append(block.payload.Payload, tokenData...) - } + return nil +} - block.ext.maxTID = currentTID +func (a *lidBlocksAcc) Flush() lidsSealBlock { + return a.finalizeBlock() +} - // Yield completed block - if !yield(idx, block) { - return - } - } +func (a *lidBlocksAcc) finalizeBlock() lidsSealBlock { + if !a.isEndOfToken { + a.currentBlock.payload.Offsets = append( + a.currentBlock.payload.Offsets, + uint32(len(a.currentBlock.payload.LIDs)), + ) } + + result := a.currentBlock + result.payload.IsLastLID = a.isEndOfToken + result.ext.isContinued = a.isContinued + + a.isContinued = !a.isEndOfToken + return result } diff --git a/frac/sealed/sealing/blocks_builder_test.go b/frac/sealed/sealing/blocks_builder_test.go index 80892ca2..4d32ad2a 100644 --- a/frac/sealed/sealing/blocks_builder_test.go +++ b/frac/sealed/sealing/blocks_builder_test.go @@ -13,6 +13,8 @@ import ( "github.com/ozontech/seq-db/seq" ) +var _ Source = (*mockSource)(nil) + type mockSource struct { info common.Info tokens [][]byte @@ -25,68 +27,58 @@ type mockSource struct { lastError error } -func (m *mockSource) Info() common.Info { return m.info } - -func (m *mockSource) Fields() iter.Seq2[string, uint32] { - return func(yield func(string, uint32) bool) { - for i := range len(m.fields) { - if !yield(m.fields[i], m.fieldMaxTIDs[i]) { +func (m *mockSource) Info() *common.Info { return &m.info } + +func (m *mockSource) TokenTriplet() iter.Seq2[string, iter.Seq2[[]byte, []uint32]] { + return func(yield func(string, iter.Seq2[[]byte, []uint32]) bool) { + start := 0 + for i, field := range m.fields { + end := int(m.fieldMaxTIDs[i]) + tokenStart, tokenEnd := start, end + if !yield(field, func(yield func([]byte, []uint32) bool) { + for j := tokenStart; j < tokenEnd; j++ { + var lidsbuf []uint32 + if j < len(m.tokenLIDs) { + lidsbuf = m.tokenLIDs[j] + } + if !yield(m.tokens[j], lidsbuf) { + return + } + } + }) { return } + start = end } } } -func (m *mockSource) IDsBlocks(size int) iter.Seq2[[]seq.ID, []seq.DocPos] { - return func(yield func([]seq.ID, []seq.DocPos) bool) { - ids := make([]seq.ID, 0, size) - pos := make([]seq.DocPos, 0, size) +func (m *mockSource) ID() iter.Seq2[seq.ID, seq.DocPos] { + return func(yield func(seq.ID, seq.DocPos) bool) { for i, id := range m.ids { - if len(ids) == size { - if !yield(ids, pos) { - return - } - ids = ids[:0] - pos = pos[:0] + if !yield(id, m.pos[i]) { + return } - ids = append(ids, id) - pos = append(pos, m.pos[i]) } - yield(ids, pos) } } -func (m *mockSource) TokenBlocks(size int) iter.Seq[[][]byte] { - return func(yield func([][]byte) bool) { - block := [][]byte{} - blockSize := 0 - for _, token := range m.tokens { - if blockSize >= size { - if !yield(block) { - return - } - blockSize = 0 - block = block[:0] +func (m *mockSource) TokenAndLIDs() iter.Seq2[[]byte, []uint32] { + return func(yield func([]byte, []uint32) bool) { + for i, token := range m.tokens { + var lidsbuf []uint32 + if i < len(m.tokenLIDs) { + lidsbuf = m.tokenLIDs[i] } - block = append(block, token) - blockSize += len(token) + 4 - } - yield(block) - } -} - -func (m *mockSource) TokenLIDs() iter.Seq[[]uint32] { - return func(yield func([]uint32) bool) { - for _, lids := range m.tokenLIDs { - if !yield(lids) { + if !yield(token, lidsbuf) { return } } } } -func (m *mockSource) BlocksOffsets() []uint64 { return m.blocksOffsets } -func (m *mockSource) LastError() error { return m.lastError } +func (m *mockSource) BlockOffsets() []uint64 { return m.blocksOffsets } +func (m *mockSource) LastError() error { return m.lastError } func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { src := mockSource{ @@ -112,13 +104,43 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { }, fields: []string{"f1", "f2", "f3", "f4", "f5", "f6"}, fieldMaxTIDs: []uint32{2, 7, 9, 12, 13, 14}, + tokenLIDs: [][]uint32{ + {10, 20, 30, 40}, // 1 + {2}, // 2 + {3}, // 3 + {4}, // 4 + {5}, // 5 + {6}, // 6 + {7}, // 7 + {8}, // 8 + {9}, // 9 + {10}, // 10 + {11}, // 11 + {12}, // 12 + {13}, // 13 + {14}, // 14 + }, } // Block size in bytes. const blockSize = 24 - - bb := blocksBuilder{} - tokenBlocks := bb.BuildTokenBlocks(src.TokenBlocks(blockSize), src.Fields()) + const lidBlockCap = 3 + + var bb blocksBuilder + lidAccum := newLIDBlocksAccumulator(lidBlockCap) + var lidBlocks []lidsSealBlock + tokenBlocks := bb.BuildTokenBlocks( + src.TokenTriplet(), + func(lids []uint32) error { + return lidAccum.Add(lids, func(block lidsSealBlock) error { + block.payload.LIDs = slices.Clone(block.payload.LIDs) + block.payload.Offsets = slices.Clone(block.payload.Offsets) + lidBlocks = append(lidBlocks, block) + return nil + }) + }, + blockSize, + ) // In our test case, each token is 4 bytes long. Also for each token we use uint32 to encode the length. // So 3 tokens take up exactly 24 bytes. And we expect all token blocks to contain 3 tokens except the last one. @@ -128,11 +150,11 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { blockIndex := 0 allFieldsTables := []token.FieldTable{} - for block, fieldsTables := range tokenBlocks { - assert.Equal(t, expectedSizes[blockIndex], block.payload.Len()) - for i := range block.payload.Len() { + for result, fieldsTables := range tokenBlocks { + assert.Equal(t, expectedSizes[blockIndex], result.payload.Len()) + for i := range result.payload.Len() { tid++ - assert.Equal(t, src.tokens[tid-1], block.payload.GetToken(i)) + assert.Equal(t, src.tokens[tid-1], result.payload.GetToken(i)) } allFieldsTables = append(allFieldsTables, fieldsTables...) blockIndex++ @@ -149,7 +171,7 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { { StartIndex: 0, StartTID: 1, - BlockIndex: 1, + BlockIndex: 0, ValCount: 2, MinVal: "f1v1", MaxVal: "f1v2", @@ -161,21 +183,21 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { { StartIndex: 2, StartTID: 3, - BlockIndex: 1, + BlockIndex: 0, ValCount: 1, MinVal: "f2v1", MaxVal: "f2v1", }, { StartIndex: 0, StartTID: 4, - BlockIndex: 2, + BlockIndex: 1, ValCount: 3, MinVal: "f2v2", MaxVal: "f2v4", }, { StartIndex: 0, StartTID: 7, - BlockIndex: 3, + BlockIndex: 2, ValCount: 1, MinVal: "f2v5", MaxVal: "f2v5", @@ -187,7 +209,7 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { { StartIndex: 1, StartTID: 8, - BlockIndex: 3, + BlockIndex: 2, ValCount: 2, MinVal: "f3v1", MaxVal: "f3v2", @@ -199,7 +221,7 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { { StartIndex: 0, StartTID: 10, - BlockIndex: 4, + BlockIndex: 3, ValCount: 3, MinVal: "f4v1", MaxVal: "f4v3", @@ -211,7 +233,7 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { { StartIndex: 0, StartTID: 13, - BlockIndex: 5, + BlockIndex: 4, ValCount: 1, MinVal: "f5v1", MaxVal: "f5v1", @@ -223,7 +245,7 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { { StartIndex: 1, StartTID: 14, - BlockIndex: 5, + BlockIndex: 4, ValCount: 1, MinVal: "f6v1", MaxVal: "f6v1", @@ -233,6 +255,39 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { }, } assert.Equal(t, actualTokenTable.FieldsTables, expectedTokenTable.FieldsTables) + + finalBlock := lidAccum.Flush() + finalBlock.payload.LIDs = slices.Clone(finalBlock.payload.LIDs) + finalBlock.payload.Offsets = slices.Clone(finalBlock.payload.Offsets) + lidBlocks = append(lidBlocks, finalBlock) + + expectedLIDBlocks := []lidsSealBlock{ + { + ext: lidsExt{minTID: 1, maxTID: 1, isContinued: false}, + payload: lids.Block{LIDs: []uint32{10, 20, 30}, Offsets: []uint32{0, 3}, IsLastLID: false}, + }, + { + ext: lidsExt{minTID: 1, maxTID: 3, isContinued: true}, + payload: lids.Block{LIDs: []uint32{40, 2, 3}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, + }, + { + ext: lidsExt{minTID: 4, maxTID: 6, isContinued: false}, + payload: lids.Block{LIDs: []uint32{4, 5, 6}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, + }, + { + ext: lidsExt{minTID: 7, maxTID: 9, isContinued: false}, + payload: lids.Block{LIDs: []uint32{7, 8, 9}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, + }, + { + ext: lidsExt{minTID: 10, maxTID: 12, isContinued: false}, + payload: lids.Block{LIDs: []uint32{10, 11, 12}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, + }, + { + ext: lidsExt{minTID: 13, maxTID: 14, isContinued: false}, + payload: lids.Block{LIDs: []uint32{13, 14}, Offsets: []uint32{0, 1, 2}, IsLastLID: true}, + }, + } + assert.Equal(t, expectedLIDBlocks, lidBlocks) } func TestBlocksBuilder_IDsBlocks(t *testing.T) { @@ -268,7 +323,7 @@ func TestBlocksBuilder_IDsBlocks(t *testing.T) { i := 0 ids := []seq.ID{} pos := []seq.DocPos{} - for block := range createIDsSealBlocks(src.IDsBlocks(3)) { + for block := range seqBlockID(src.ID(), 3) { assert.Equal(t, expectedSizes[i], len(block.mids.Values)) assert.Equal(t, expectedSizes[i], len(block.rids.Values)) assert.Equal(t, expectedSizes[i], len(block.params.Values)) @@ -284,112 +339,3 @@ func TestBlocksBuilder_IDsBlocks(t *testing.T) { assert.Equal(t, src.ids, ids) assert.Equal(t, src.pos, pos) } - -func TestBlocksBuilder_BuildLIDsBlocks(t *testing.T) { - src := mockSource{ - tokenLIDs: [][]uint32{ - { - 10, // block 1, tid 1 - 20, // block 1, tid 1 - 30, // block 1, tid 1 - - 40, // block 2, tid 1 - }, { - 11, // block 2, tid 2 - 21, // block 2, tid 2 - - 31, // block 3, tid 2 - 41, // block 3, tid 2 - }, { - 10, // block 3, tid 3 - - 11, // block 4, tid 3 - 20, // block 4, tid 3 - 21, // block 4, tid 3 - - }, { - 30, // block 5, tid 4 - 40, // block 5, tid 4 - 50, // block 5, tid 4 - - 60, // block 6, tid 4 - }, - }, - } - - expected := []lidsSealBlock{{ - ext: lidsExt{ - minTID: 1, - maxTID: 1, - isContinued: false, - }, - payload: lids.Block{ - LIDs: []uint32{10, 20, 30}, - Offsets: []uint32{0, 3}, - IsLastLID: false, - }, - }, { - ext: lidsExt{ - minTID: 1, - maxTID: 2, - isContinued: true, - }, - payload: lids.Block{ - LIDs: []uint32{40, 11, 21}, - Offsets: []uint32{0, 1, 3}, - IsLastLID: false, - }, - }, { - ext: lidsExt{ - minTID: 2, - maxTID: 3, - isContinued: true, - }, - payload: lids.Block{ - LIDs: []uint32{31, 41, 10}, - Offsets: []uint32{0, 2, 3}, - IsLastLID: false, - }, - }, { - ext: lidsExt{ - minTID: 3, - maxTID: 3, - isContinued: true, - }, - payload: lids.Block{ - LIDs: []uint32{11, 20, 21}, - Offsets: []uint32{0, 3}, - IsLastLID: true, - }, - }, { - ext: lidsExt{ - minTID: 4, - maxTID: 4, - isContinued: false, - }, - payload: lids.Block{ - LIDs: []uint32{30, 40, 50}, - Offsets: []uint32{0, 3}, - IsLastLID: false, - }, - }, { - ext: lidsExt{ - minTID: 4, - maxTID: 4, - isContinued: true, - }, - payload: lids.Block{ - LIDs: []uint32{60}, - Offsets: []uint32{0, 1}, - IsLastLID: true, - }}, - } - bb := blocksBuilder{} - blocks := []lidsSealBlock{} - for block := range bb.BuildLIDsBlocks(src.TokenLIDs(), 3) { - block.payload.LIDs = slices.Clone(block.payload.LIDs) // copy lids - block.payload.Offsets = slices.Clone(block.payload.Offsets) // copy offsets - blocks = append(blocks, block) - } - assert.Equal(t, expected, blocks) -} diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go index 491c7233..6c6d57eb 100644 --- a/frac/sealed/sealing/index.go +++ b/frac/sealed/sealing/index.go @@ -1,15 +1,8 @@ package sealing import ( - "bytes" - "encoding/binary" "io" - "iter" - "time" - "github.com/alecthomas/units" - - "github.com/ozontech/seq-db/bytespool" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" @@ -22,25 +15,32 @@ import ( "github.com/ozontech/seq-db/zstd" ) -// IndexSealer is responsible for creating and writing the index structure for sealed fractions. -// It organizes data into blocks, compresses them, and builds the complete index file with: -// - Multiple data sections (info, tokens, token table, offsets, IDs, LIDs) -// - Compression using ZSTD with configurable levels -// - Registry for quick access to block locations -// - PreloadedData structures for fast initialization instance of sealed fraction +// indexBlock is one compressed (or not) block with its registry metadata. +type indexBlock struct { + codec storage.Codec + payload []byte + rawLen uint32 + ext1 uint64 + ext2 uint64 +} + +func (i indexBlock) Bin(pos int64) (storage.IndexBlockHeader, []byte) { + return storage.NewIndexBlockHeader(pos, i.ext1, i.ext2, uint32(len(i.payload)), i.rawLen, i.codec), i.payload +} + type IndexSealer struct { - lastErr error // Last error encountered during processing - buf1 []byte // Reusable buffer for packing raw data before compression - buf2 []byte // Reusable buffer for compressed data - params common.SealParams // Configuration parameters for sealing process - - // PreloadedData structures built during sealing for fast initialization of sealed fraction - idsTable seqids.Table // Table mapping document IDs to blocks - lidsTable lids.Table // Table mapping token IDs to LID blocks - tokenTable token.Table // Table mapping fields to token blocks + params common.SealParams + + buf1 []byte + buf2 []byte + + idsTable seqids.Table + lidsTable lids.Table + tokenTable token.Table + + lastErr error } -// NewIndexSealer creates a new IndexSealer instance with the given parameters. func NewIndexSealer(params common.SealParams) *IndexSealer { return &IndexSealer{ params: params, @@ -49,295 +49,198 @@ func NewIndexSealer(params common.SealParams) *IndexSealer { } } -// indexBlock represents a single block of data in the index file. -// Each block can be compressed and contains metadata for efficient retrieval. -type indexBlock struct { - codec storage.Codec // Compression codec used (No compression or ZSTD) - payload []byte // The actual block data (may be compressed) - rawLen uint32 // Original uncompressed data length - ext1 uint64 // Extended metadata field 1 (block-specific usage) - ext2 uint64 // Extended metadata field 2 (block-specific usage) +func (s *IndexSealer) LIDsTable() lids.Table { + return s.lidsTable } -// Bin converts the indexBlock to its binary representation for storage. -// It creates a header with metadata and returns the header + payload. -// Parameters: -// - pos: The file position where this block will be written -// -// Returns: -// - storage.IndexBlockHeader: The block header with metadata -// - []byte: The payload data to write -func (i indexBlock) Bin(pos int64) (storage.IndexBlockHeader, []byte) { - header := storage.NewIndexBlockHeader(pos, i.ext1, i.ext2, uint32(len(i.payload)), i.rawLen, i.codec) - return header, i.payload +func (s *IndexSealer) TokenTable() token.Table { + return s.tokenTable } -// WriteIndex writes the complete index structure to the provided writer. -// The index file structure: -// +----------------+----------------+----------------+ -// | Prefix | Data Blocks | Registry | -// | (16 bytes) | (multiple) | (block headers)| -// +----------------+----------------+----------------+ -// -// Prefix contains: -// - 8 bytes: Position of registry start -// - 8 bytes: Size of registry -// -// Parameters: -// - ws: WriteSeeker to write the index data to -// - src: Source interface providing the data to be sealed -// -// Returns: -// - error: Any error encountered during writing -func (s *IndexSealer) WriteIndex(ws io.WriteSeeker, src Source) error { - const prefixSize = 16 // Size of prefix that will hold registry position and size - - // Skip prefix area initially - we'll write it at the end - if _, err := ws.Seek(prefixSize, io.SeekStart); err != nil { +func (s *IndexSealer) IDsTable() seqids.Table { + return s.idsTable +} + +// WriteOffsetsFile writes the .offsets file containing a single BlockOffsets block. +func (s *IndexSealer) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { + w, err := newWriter(ws) + if err != nil { return err } + defer w.release() - // Create buffers for headers and payload writing - hw := bytes.NewBuffer(nil) // Headers writer - collects all block headers - bw := bytespool.AcquireWriterSize(ws, int(units.MiB)) // Buffered writer for payload - defer bytespool.ReleaseWriter(bw) + offsets := sealed.BlockOffsets{ + IDsTotal: src.Info().DocsTotal + 1, + Offsets: src.BlockOffsets(), + } - // Write all index blocks and collect headers - if err := s.writeBlocks(prefixSize, bw, hw, src); err != nil { + if err := w.writeBlock(btypeOffset, s.packBlocksOffsetsBlock(offsets)); err != nil { return err } - if err := bw.Flush(); err != nil { + + // Emit trailing separator. + if err := w.writeBlock(btypeBlackhole, indexBlock{}); err != nil { return err } - // Calculate registry position and size - size := hw.Len() // Registry size (all headers) - pos, err := ws.Seek(0, io.SeekEnd) // Current end position = registry start + return w.finalize() +} + +func (s *IndexSealer) WriteIDFile(ws io.WriteSeeker, src Source) error { + w, err := newWriter(ws) if err != nil { return err } + defer w.release() - // Write registry (all block headers) at the end of file - if _, err := bw.Write(hw.Bytes()); err != nil { - return err + for block := range seqBlockID(src.ID(), consts.IDsPerBlock) { + if err := w.writeBlock(btypeMid, s.packMIDsBlock(block)); err != nil { + return err + } + + if err := w.writeBlock(btypeRid, s.packRIDsBlock(block)); err != nil { + return err + } + + if err := w.writeBlock(btypeDocPos, s.packPosBlock(block)); err != nil { + return err + } } - if err := bw.Flush(); err != nil { + + // Emit trailing separator. + if err := w.writeBlock(btypeBlackhole, indexBlock{}); err != nil { return err } - // Write prefix at beginning of file with registry metadata - prefix := make([]byte, 0, prefixSize) - prefix = binary.LittleEndian.AppendUint64(prefix, uint64(pos)) // Registry position - prefix = binary.LittleEndian.AppendUint64(prefix, uint64(size)) // Registry size - if _, err := ws.Seek(0, io.SeekStart); err != nil { + return w.finalize() +} + +func (s *IndexSealer) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) error { + tw, err := newWriter(tws) + if err != nil { return err } - if _, err = ws.Write(prefix); err != nil { + defer tw.release() + + lw, err := newWriter(lws) + if err != nil { return err } + defer lw.release() + + var ( + bb blocksBuilder + allFieldsTables []token.FieldTable + lidacc = newLIDBlocksAccumulator(consts.LIDBlockCap) + ) + + // NOTE(dkharms): This is so ugly but I cannot come up with other solution here. + accumulate := func(lids []uint32) error { + return lidacc.Add(lids, func(block lidsSealBlock) error { + return lw.writeBlock(btypeLid, s.packLIDsBlock(block)) + }) + } - return nil -} - -// writeBlocks processes all index blocks from the source and writes them to the output. -// It simultaneously writes payload data to one writer and headers to another. -// Parameters: -// - pos: Starting position for the first block -// - payloadWriter: Writer for block payload data -// - headersWriter: Writer for block headers (registry) -// - src: Data source -// -// Returns: -// - error: Any error encountered during processing -func (s *IndexSealer) writeBlocks(pos int, payloadWriter, headersWriter io.Writer, src Source) error { - // Process each index block from the source - for block := range s.indexBlocks(src) { - header, payload := block.Bin(int64(pos)) - // Write payload to main data section - if _, err := payloadWriter.Write(payload); err != nil { - return err - } - // Write header to registry - if _, err := headersWriter.Write(header); err != nil { + for block, fieldsTables := range bb.BuildTokenBlocks(src.TokenTriplet(), accumulate, consts.RegularBlockSize) { + if err := tw.writeBlock(btypeToken, s.packTokenBlock(block)); err != nil { return err } - pos += len(payload) // Advance position for next block + allFieldsTables = append(allFieldsTables, fieldsTables...) } - if s.lastErr != nil { + + if s.lastErr = util.CollapseErrors([]error{src.LastError(), bb.LastError()}); s.lastErr != nil { return s.lastErr } - return nil -} -// indexBlocks generates a sequence of index blocks from the source data. -// The blocks are organized in specific sections: -// 1. Info Section - Basic fraction metadata -// 2. Tokens Section - Token data blocks -// 3. Token Table Section - Field-to-token mapping table -// 4. Offsets Section - Document block offsets -// 5. IDs Section - Document ID blocks (MIDs, RIDs, Positions) -// 6. LIDs Section - Token ID to LID mapping blocks -// -// Returns: -// - iter.Seq[indexBlock]: Sequence of index blocks to write -func (s *IndexSealer) indexBlocks(src Source) iter.Seq[indexBlock] { - return func(yield func(indexBlock) bool) { - bb := blocksBuilder{} - blocksCounter := uint32(0) // Global block counter for indexing - statsOverall := startStats() // Overall statistics collector - - // Helper to push a block and update statistics - push := func(b indexBlock, statsSection *blocksStats) bool { - blocksCounter++ - statsOverall.takeStock(b) - statsSection.takeStock(b) - return yield(b) - } - - // Helper to write section separator (empty block) - sectionSeparator := func() bool { - blocksCounter++ - return yield(indexBlock{}) // empty block as separator - } + if err := s.finalizeLIDFile(lw, lidacc); err != nil { + return err + } - // SECTION 1: Info Section - statsInfo := startStats() - info := src.Info() - if !push(s.packInfoBlock(sealed.BlockInfo{Info: info}), &statsInfo) { - return - } + return s.finalizeTokenFile(tw, allFieldsTables) +} - // SECTION 2: Tokens Section - statsTokens := startStats() - allFieldsTables := []token.FieldTable{} - tokensBlocks := bb.BuildTokenBlocks(src.TokenBlocks(consts.RegularBlockSize), src.Fields()) - for block, fieldsTables := range tokensBlocks { - if !push(s.packTokenBlock(block), &statsTokens) { - return - } - allFieldsTables = append(allFieldsTables, fieldsTables...) - } - if s.lastErr = util.CollapseErrors([]error{src.LastError(), bb.LastError()}); s.lastErr != nil { - return - } +func (s *IndexSealer) finalizeLIDFile(w *writer, lidAccum *lidBlocksAcc) error { + if err := w.writeBlock(btypeLid, s.packLIDsBlock(lidAccum.Flush())); err != nil { + return err + } - if !sectionSeparator() { - return - } + // Emit trailing separator. + if err := w.writeBlock(btypeBlackhole, indexBlock{}); err != nil { + return err + } - // SECTION 3: Token Table Section - statsTokenTable := startStats() - tokenTableBlock := token.TableBlock{FieldsTables: collapseOrderedFieldsTables(allFieldsTables)} - if !push(s.packTokenTableBlock(tokenTableBlock), &statsTokenTable) { - return - } + return w.finalize() +} - if !sectionSeparator() { - return - } +func (s *IndexSealer) finalizeTokenFile(w *writer, allFieldsTables []token.FieldTable) error { + // Emit section separator. + if err := w.writeBlock(btypeToken, indexBlock{}); err != nil { + return err + } - // SECTION 4: Offsets Section - statsOffsets := startStats() - offsets := sealed.BlockOffsets{ - IDsTotal: info.DocsTotal + 1, // +1 for system ID at position zero - Offsets: src.BlocksOffsets(), - } - if !push(s.packBlocksOffsetsBlock(offsets), &statsOffsets) { - return - } + tokenTableBlock := token.TableBlock{FieldsTables: collapseOrderedFieldsTables(allFieldsTables)} + if err := w.writeBlock(btypeTokenTable, s.packTokenTableBlock(tokenTableBlock)); err != nil { + return err + } - // SECTION 5: IDs Section - s.idsTable.StartBlockIndex = blocksCounter // Record starting position for IDs blocks - statsMIDs, statsRIDs, statsParams := startStats(), startStats(), startStats() - for block := range createIDsSealBlocks(src.IDsBlocks(consts.IDsPerBlock)) { - if !push(s.packMIDsBlock(block), &statsMIDs) { - return - } - if !push(s.packRIDsBlock(block), &statsRIDs) { - return - } - if !push(s.packPosBlock(block), &statsParams) { - return - } - } - if s.lastErr = src.LastError(); s.lastErr != nil { - return - } + // Emit trailing separator. + if err := w.writeBlock(btypeBlackhole, indexBlock{}); err != nil { + return err + } - if !sectionSeparator() { - return - } + return w.finalize() +} - // SECTION 6: LIDs Section - statsLIDs := startStats() - s.lidsTable.StartBlockIndex = blocksCounter - for block := range bb.BuildLIDsBlocks(src.TokenLIDs(), consts.LIDBlockCap) { - if !push(s.packLIDsBlock(block), &statsLIDs) { - return - } - } - if s.lastErr = util.CollapseErrors([]error{src.LastError(), bb.LastError()}); s.lastErr != nil { - return - } +func (s *IndexSealer) WriteInfoFile(ws io.WriteSeeker, src Source) error { + w, err := newWriter(ws) + if err != nil { + return err + } + defer w.release() - if !sectionSeparator() { - return - } + block := sealed.BlockInfo{Info: src.Info()} + if err := w.writeBlock(btypeInfo, s.packInfoBlock(block)); err != nil { + return err + } - // Log statistics for all sections - endTime := time.Now() - statsInfo.log("info", statsTokens.start) - statsTokens.log("tokens", statsTokenTable.start) - statsTokenTable.log("tokenTable", statsOffsets.start) - statsOffsets.log("offsets", statsMIDs.start) - statsMIDs.log("mids", statsLIDs.start) - statsRIDs.log("rids", statsLIDs.start) - statsParams.log("pos", statsLIDs.start) - statsLIDs.log("lids", endTime) - statsOverall.log("overall", endTime) + // Emit trailing separator. + if err := w.writeBlock(btypeBlackhole, indexBlock{}); err != nil { + return err } + + return w.finalize() } -// collapseOrderedFieldsTables merges field tables with identical field names -// Assumes the input array is already sorted by the Field property +// collapseOrderedFieldsTables merges FieldTables with the same field name. +// Assumes input is sorted by Field. func collapseOrderedFieldsTables(src []token.FieldTable) []token.FieldTable { if len(src) == 0 { return nil } + current := src[0] - dst := []token.FieldTable{} + var dst []token.FieldTable for _, ft := range src[1:] { if current.Field == ft.Field { current.Entries = append(current.Entries, ft.Entries...) continue } + dst = append(dst, current) current = ft } - dst = append(dst, current) - return dst + + return append(dst, current) } -// newIndexBlock creates an uncompressed index block. func newIndexBlock(raw []byte) indexBlock { - return indexBlock{ - codec: storage.CodecNo, - rawLen: uint32(len(raw)), - payload: raw, - } + return indexBlock{codec: storage.CodecNo, rawLen: uint32(len(raw)), payload: raw} } -// newIndexBlockZSTD creates a compressed index block using ZSTD compression. -// Falls back to uncompressed if compression doesn't provide benefits. func (s *IndexSealer) newIndexBlockZSTD(raw []byte, level int) indexBlock { s.buf2 = zstd.CompressLevel(raw, s.buf2[:0], level) - // Only use compression if it actually reduces size if len(s.buf2) < len(raw) { - return indexBlock{ - codec: storage.CodecZSTD, - rawLen: uint32(len(raw)), - payload: s.buf2, - } + return indexBlock{codec: storage.CodecZSTD, rawLen: uint32(len(raw)), payload: s.buf2} } return newIndexBlock(raw) } @@ -382,18 +285,22 @@ func (s *IndexSealer) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlo func (s *IndexSealer) packMIDsBlock(block idsSealBlock) indexBlock { // Get the last ID in the block (smallest due to descending order) last := len(block.mids.Values) - 1 + minID := seq.ID{ MID: seq.MID(block.mids.Values[last]), RID: seq.RID(block.rids.Values[last]), } + s.idsTable.MinBlockIDs = append(s.idsTable.MinBlockIDs, minID) // Store for PreloadedData // Packing block s.buf1 = block.mids.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) + // Store min MID and RID in extended metadata b.ext1 = uint64(minID.MID) b.ext2 = uint64(minID.RID) + return b } @@ -430,20 +337,6 @@ func (s *IndexSealer) packLIDsBlock(block lidsSealBlock) indexBlock { b := s.newIndexBlockZSTD(s.buf1, s.params.LIDsZstdLevel) b.ext1 = ext1 // Legacy continuation flag b.ext2 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) // TID range - return b -} -// LIDsTable returns the built LIDs table for fast initialization of sealed fraction. -func (s *IndexSealer) LIDsTable() lids.Table { - return s.lidsTable -} - -// TokenTable returns the built token table for fast initialization of sealed fraction. -func (s *IndexSealer) TokenTable() token.Table { - return s.tokenTable -} - -// IDsTable returns the built IDs table for fast initialization of sealed fraction. -func (s *IndexSealer) IDsTable() seqids.Table { - return s.idsTable + return b } diff --git a/frac/sealed/sealing/sealer.go b/frac/sealed/sealing/sealer.go index 3eb00761..888f7973 100644 --- a/frac/sealed/sealing/sealer.go +++ b/frac/sealed/sealing/sealer.go @@ -14,87 +14,151 @@ import ( ) // Source interface defines the contract for data sources that can be sealed. -// Provides access to all necessary data components for index creation. +// Provides access to all necessary data components for index creation type Source interface { - Info() *common.Info // Fraction metadata information - IDsBlocks(size int) iter.Seq2[[]seq.ID, []seq.DocPos] // Ordered sequence of document IDs and their positions, divided into blocks - TokenBlocks(size int) iter.Seq[[][]byte] // Ordered sequence of tokens divided into blocks - Fields() iter.Seq2[string, uint32] // Ordered sequence of fields with their max field's TID value - TokenLIDs() iter.Seq[[]uint32] // Sequence of Token LIDs ordered by TID and LID - BlocksOffsets() []uint64 // Offsets of DocBlock's in the doc file - LastError() error // Last error encountered during data retrieval + // Info returns metadata describing this source. + Info() *common.Info + + // ID returns an iterator over stored document identifiers paired with + // their positions, in descending [seq.ID] order. + ID() iter.Seq2[seq.ID, seq.DocPos] + + // BlockOffsets returns byte offsets to each document block + // within this source's `.docs` file. + BlockOffsets() []uint64 + + // TokenTriplet iterates over fields in lexicographic order. + // For each field, it yields tokens (lexicographically sorted) + // paired with the local document ID list for that token. + TokenTriplet() iter.Seq2[string, iter.Seq2[[]byte, []uint32]] + + // LastError returns the last error encountered during iteration, + // or nil if no error occurred. + LastError() error +} + +func syncAndClose(f *os.File) error { + if err := f.Sync(); err != nil { + f.Close() + return err + } + return f.Close() +} + +func createAndWrite(tmpPath, finalPath string, write func(*os.File) error) error { + f, err := os.Create(tmpPath) + if err != nil { + return err + } + + if err := errors.Join(write(f), syncAndClose(f)); err != nil { + return err + } + + return os.Rename(tmpPath, finalPath) +} + +func createAndWriteBoth( + tmpPath1, finalPath1, + tmpPath2, finalPath2 string, + write func(*os.File, *os.File) error, +) error { + f1, err := os.Create(tmpPath1) + if err != nil { + return err + } + + f2, err := os.Create(tmpPath2) + if err != nil { + f1.Close() + return err + } + + writeErr := write(f1, f2) + if err := errors.Join(writeErr, syncAndClose(f1), syncAndClose(f2)); err != nil { + return err + } + + if err := os.Rename(tmpPath1, finalPath1); err != nil { + return err + } + + return os.Rename(tmpPath2, finalPath2) } -// Seal is the main entry point for sealing a fraction. -// It performs the complete sealing process: -// 1. Creates the index file structure -// 2. Writes all index blocks with compression -// 3. Builds PreloadedData structures for fast initialization of sealed fraction -// 4. Handles file system operations and error recovery -// -// Parameters: -// - src: Data source providing all fraction data -// - params: Sealing parameters including compression levels -// -// Returns: -// - *sealed.PreloadedData: Preloaded data structures for initialization of sealed fraction -// - error: Any error encountered during the sealing process +// Seal writes five index files (.info, .token, .offsets, .id, .lid) for the fraction +// and returns PreloadedData for fast initialization of the sealed fraction. func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { info := src.Info() - // Validate that we're not sealing an empty fraction if info.To == 0 { return nil, errors.New("sealing of an empty active fraction is not supported") } - // Create temporary index file (will be renamed on success) - indexFile, err := os.Create(info.Path + consts.IndexTmpFileSuffix) - if err != nil { - return nil, err - } + sealer := NewIndexSealer(params) - // Create index sealer and write the index structure - indexSealer := NewIndexSealer(params) - if err := indexSealer.WriteIndex(indexFile, src); err != nil { + if err := createAndWrite( + info.Path+consts.OffsetsTmpFileSuffix, + info.Path+consts.OffsetsFileSuffix, + func(f *os.File) error { return sealer.WriteOffsetsFile(f, src) }, + ); err != nil { return nil, err } - // Ensure data is flushed to disk - if err := indexFile.Sync(); err != nil { + if err := createAndWrite( + info.Path+consts.IDTmpFileSuffix, + info.Path+consts.IDFileSuffix, + func(f *os.File) error { return sealer.WriteIDFile(f, src) }, + ); err != nil { return nil, err } - // Get final file size for metadata - stat, err := indexFile.Stat() - if err != nil { + if err := createAndWriteBoth( + info.Path+consts.TokenTmpFileSuffix, info.Path+consts.TokenFileSuffix, + info.Path+consts.LIDTmpFileSuffix, info.Path+consts.LIDFileSuffix, + func(tokenF, lidF *os.File) error { return sealer.WriteTokenTriplet(tokenF, lidF, src) }, + ); err != nil { return nil, err } - info.IndexOnDisk = uint64(stat.Size()) - // Close file before renaming - if err := indexFile.Close(); err != nil { + if err := createAndWrite( + info.Path+consts.InfoTmpFileSuffix, + info.Path+consts.InfoFileSuffix, + func(f *os.File) error { return sealer.WriteInfoFile(f, src) }, + ); err != nil { return nil, err } - // Atomically rename temporary file to final name - if err := os.Rename(indexFile.Name(), info.Path+consts.IndexFileSuffix); err != nil { - return nil, err + util.MustSyncPath(filepath.Dir(info.Path)) + + // Compute total index size as sum of all 5 files. + var totalSize uint64 + for _, suffix := range []string{ + consts.InfoFileSuffix, + consts.TokenFileSuffix, + consts.OffsetsFileSuffix, + consts.IDFileSuffix, + consts.LIDFileSuffix, + } { + st, err := os.Stat(info.Path + suffix) + if err != nil { + return nil, err + } + totalSize += uint64(st.Size()) } - // Ensure directory metadata is synced to disk - util.MustSyncPath(filepath.Dir(info.Path)) + info.IndexOnDisk = totalSize + lidsTable := sealer.LIDsTable() - // Build preloaded data structure for fast query access - lidsTable := indexSealer.LIDsTable() - preloaded := sealed.PreloadedData{ + preloaded := &sealed.PreloadedData{ Info: info, - TokenTable: indexSealer.TokenTable(), + TokenTable: sealer.TokenTable(), BlocksData: sealed.BlocksData{ - IDsTable: indexSealer.IDsTable(), + IDsTable: sealer.IDsTable(), LIDsTable: &lidsTable, - BlocksOffsets: src.BlocksOffsets(), + BlocksOffsets: src.BlockOffsets(), }, } - return &preloaded, nil + return preloaded, nil } diff --git a/frac/sealed/sealing/stats.go b/frac/sealed/sealing/stats.go deleted file mode 100644 index 5b119d60..00000000 --- a/frac/sealed/sealing/stats.go +++ /dev/null @@ -1,42 +0,0 @@ -package sealing - -import ( - "time" - - "go.uber.org/zap" - - "github.com/ozontech/seq-db/logger" - "github.com/ozontech/seq-db/util" -) - -type blocksStats struct { - start time.Time - len int - rawLen int - blocksCount int -} - -func startStats() blocksStats { - return blocksStats{start: time.Now()} -} - -func (s *blocksStats) takeStock(block indexBlock) { - s.blocksCount++ - s.len += len(block.payload) - s.rawLen += int(block.rawLen) -} - -func (s *blocksStats) log(name string, endTime time.Time) { - var ratio float64 - if s.len > 0 { - ratio = float64(s.rawLen) / float64(s.len) - } - logger.Info("seal block stats", - zap.String("type", name), - util.ZapUint64AsSizeStr("raw", uint64(s.rawLen)), - util.ZapUint64AsSizeStr("compressed", uint64(s.len)), - util.ZapFloat64WithPrec("ratio", ratio, 2), - zap.Uint64("blocks_count", uint64(s.blocksCount)), - util.ZapDurationWithPrec("write_duration_ms", endTime.Sub(s.start), "ms", 0), - ) -} diff --git a/frac/sealed/sealing/writer.go b/frac/sealed/sealing/writer.go new file mode 100644 index 00000000..c0e9e645 --- /dev/null +++ b/frac/sealed/sealing/writer.go @@ -0,0 +1,134 @@ +package sealing + +import ( + "bytes" + "encoding/binary" + "io" + + "github.com/alecthomas/units" + "go.uber.org/zap" + + "github.com/ozontech/seq-db/bytespool" + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/util" +) + +const prefixSize = 16 + +const ( + btypeInfo = "info" + btypeOffset = "offset" + btypeToken = "token" + btypeTokenTable = "token-table" + btypeMid = "mid" + btypeRid = "rid" + btypeDocPos = "doc-pos" + btypeLid = "lid" + btypeBlackhole = "blackhole" +) + +// writer writes blocks incrementally to a single file using the +// [prefix][blocks][registry] format. +type writer struct { + ws io.WriteSeeker + + wpayload *bytespool.Writer + wheader bytes.Buffer + + pos int + stats map[string]blockstat +} + +type blockstat struct { + count int + raw int + compressed int + header int +} + +func (b blockstat) log(btype string) { + logger.Info( + "seal block stats", + zap.String("type", btype), + util.ZapUint64AsSizeStr("raw", uint64(b.raw)), + util.ZapUint64AsSizeStr("compressed", uint64(b.compressed)), + util.ZapUint64AsSizeStr("header", uint64(b.header)), + zap.Uint64("blocks_count", uint64(b.count)), + ) +} + +func newWriter(ws io.WriteSeeker) (*writer, error) { + if _, err := ws.Seek(prefixSize, io.SeekStart); err != nil { + return nil, err + } + + return &writer{ + ws: ws, + wpayload: bytespool.AcquireWriterSize(ws, int(units.MiB)), + pos: prefixSize, + stats: make(map[string]blockstat), + }, nil +} + +func (w *writer) writeBlock(btype string, block indexBlock) error { + header, payload := block.Bin(int64(w.pos)) + if _, err := w.wpayload.Write(payload); err != nil { + return err + } + + if btype != btypeBlackhole { + w.stats[btype] = blockstat{ + count: w.stats[btype].count + 1, + raw: w.stats[btype].raw + int(block.rawLen), + compressed: w.stats[btype].compressed + len(block.payload), + header: w.stats[btype].header + len(header), + } + } + + w.wheader.Write(header) + w.pos += len(payload) + + return nil +} + +func (w *writer) finalize() error { + if err := w.wpayload.Flush(); err != nil { + return err + } + + regpos, err := w.ws.Seek(0, io.SeekEnd) + if err != nil { + return err + } + + if _, err := w.wpayload.Write(w.wheader.Bytes()); err != nil { + return err + } + + if err := w.wpayload.Flush(); err != nil { + return err + } + + prefix := make([]byte, 0, prefixSize) + prefix = binary.LittleEndian.AppendUint64(prefix, uint64(regpos)) + prefix = binary.LittleEndian.AppendUint64(prefix, uint64(w.wheader.Len())) + + if _, err := w.ws.Seek(0, io.SeekStart); err != nil { + return err + } + + if _, err := w.ws.Write(prefix); err != nil { + return err + } + + for btype, stats := range w.stats { + stats.log(btype) + } + + return nil +} + +func (w *writer) release() { + bytespool.ReleaseWriter(w.wpayload) + w.wpayload = nil +} diff --git a/frac/sealed/token/provider.go b/frac/sealed/token/provider.go index 6d18ff68..a650c266 100644 --- a/frac/sealed/token/provider.go +++ b/frac/sealed/token/provider.go @@ -1,6 +1,7 @@ package token import ( + "math" "sort" ) @@ -15,9 +16,9 @@ type Provider struct { func NewProvider(loader *BlockLoader, entries []*TableEntry) *Provider { return &Provider{ - loader: loader, - entries: entries, - curEntry: nil, + loader: loader, + entries: entries, + curBlockIndex: math.MaxUint32, // sentinel: no block loaded yet } } diff --git a/frac/sealed/token/table_entry.go b/frac/sealed/token/table_entry.go index a16b9a55..6e1df9c9 100644 --- a/frac/sealed/token/table_entry.go +++ b/frac/sealed/token/table_entry.go @@ -12,7 +12,7 @@ type TableEntry struct { } func (t *TableEntry) GetIndexInTokensBlock(tid uint32) int { - return int(t.StartIndex + tid - t.StartTID) + return int(t.StartIndex + (tid - t.StartTID)) } func (t *TableEntry) getLastTID() uint32 { diff --git a/frac/sealed/token/table_loader.go b/frac/sealed/token/table_loader.go index 6c3a5936..0750de62 100644 --- a/frac/sealed/token/table_loader.go +++ b/frac/sealed/token/table_loader.go @@ -50,6 +50,7 @@ func (l *TableLoader) Load() Table { func TableFromBlocks(blocks []TableBlock) Table { table := make(Table) + for _, block := range blocks { for _, ft := range block.FieldsTables { fd, ok := table[ft.Field] @@ -62,13 +63,16 @@ func TableFromBlocks(blocks []TableBlock) Table { } else if minVal < fd.MinVal { fd.MinVal = minVal } + for _, e := range ft.Entries { e.MinVal = "" fd.Entries = append(fd.Entries, e) } + table[ft.Field] = fd } } + return table } @@ -89,10 +93,8 @@ func (l *TableLoader) readBlock() ([]byte, error) { } func (l *TableLoader) loadBlocks() ([]TableBlock, error) { - // todo: scan all headers in sealed_loader and remember startIndex for each sections - // todo: than use this startIndex to load sections on demand (do not scan every time) - l.i = 1 - for h := l.readHeader(); h.Len() > 0; h = l.readHeader() { // skip actual token blocks, go for token table + l.i = 0 + for h := l.readHeader(); h.Len() > 0; h = l.readHeader() { // skip token blocks, go for token table } blocks := make([]TableBlock, 0) @@ -104,6 +106,7 @@ func (l *TableLoader) loadBlocks() ([]TableBlock, error) { tb.Unpack(blockData) blocks = append(blocks, tb) } + return blocks, nil } diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index ae639862..28b9ef9f 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -16,36 +16,41 @@ import ( "github.com/ozontech/seq-db/util" ) -type Loader struct { - reader *storage.IndexReader +// LegacyLoader reads the old single .index file format by scanning blocks sequentially. +// Block indices stored in lids.Table and seqids.Table are absolute within the .index file, +// so the same IndexReader can be passed to all sub-loaders unchanged. +type LegacyLoader struct { + reader storage.IndexReader blockIndex uint32 - blockBuf []byte } -func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, indexReader *storage.IndexReader) { +// Load populates blocksData from a single legacy .index file. +// It starts at block 1 (block 0 is the Info block, already read by loadHeader). +func (l *LegacyLoader) Load(blocksData *sealed.BlocksData, info *common.Info, reader storage.IndexReader) { t := time.Now() - l.reader = indexReader - l.blockIndex = 1 // skipping info block that's already read + l.reader = reader + l.blockIndex = 1 // skip Info block at index 0 - l.skipTokens() + l.skipSection() // skip token blocks + l.skipSection() // skip token table blocks var err error - - if blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(info.BinaryDataVer); err != nil { - logger.Fatal("load ids error", zap.Error(err)) + blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(info.BinaryDataVer) + if err != nil { + logger.Fatal("legacy load ids error", zap.Error(err)) } - if blocksData.LIDsTable, err = l.loadLIDsBlocksTable(); err != nil { - logger.Fatal("load lids error", zap.Error(err)) + blocksData.LIDsTable, err = l.loadLIDs() + if err != nil { + logger.Fatal("legacy load lids error", zap.Error(err)) } took := time.Since(t) - docsTotalK := float64(info.DocsTotal) / 1000 indexOnDiskMb := util.SizeToUnit(info.IndexOnDisk, "mb") throughput := indexOnDiskMb / util.DurationToUnit(took, "s") - logger.Info("sealed fraction loaded", + logger.Info("sealed fraction loaded (legacy format)", zap.String("fraction", info.Path), util.ZapMsTsAsESTimeStr("creation_time", info.CreationTime), zap.String("from", info.From.String()), @@ -56,104 +61,231 @@ func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, indexRea ) } -func (l *Loader) nextIndexBlock() ([]byte, error) { - data, _, err := l.reader.ReadIndexBlock(l.blockIndex, l.blockBuf) - l.blockBuf = data - l.blockIndex++ - return data, err -} +// skipSection advances past one separator-delimited section (reads headers until Len() == 0). +func (l *LegacyLoader) skipSection() { + for { + h, err := l.reader.GetBlockHeader(l.blockIndex) + if err != nil { + logger.Panic("error reading block header", zap.Error(err)) + } -func (l *Loader) skipBlock() storage.IndexBlockHeader { - header, err := l.reader.GetBlockHeader(l.blockIndex) - if err != nil { - logger.Panic("error reading block header", zap.Error(err)) + l.blockIndex++ + if h.Len() == 0 { + return + } } - l.blockIndex++ - return header } -func (l *Loader) loadIDs(fracVersion config.BinaryDataVersion) (idsTable seqids.Table, blocksOffsets []uint64, err error) { - var result []byte +// loadIDs reads the BlockOffsets block and then scans MID/RID/Pos triplets. +func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Table, []uint64, error) { + var buf []byte - if result, err = l.nextIndexBlock(); err != nil { - return idsTable, nil, err + data, _, err := l.reader.ReadIndexBlock(l.blockIndex, buf) + if err != nil { + return seqids.Table{}, nil, err } - b := sealed.BlockOffsets{} - if err := b.Unpack(result); err != nil { - return idsTable, nil, err + var offsets sealed.BlockOffsets + if err := offsets.Unpack(data); err != nil { + return seqids.Table{}, nil, err } - blocksOffsets = b.Offsets - idsTable.IDsTotal = b.IDsTotal - idsTable.IDBlocksTotal = uint32(len(b.Offsets)) - idsTable.StartBlockIndex = l.blockIndex + // Move to the first block of ID section. + l.blockIndex++ + + table := seqids.Table{ + StartBlockIndex: l.blockIndex, // absolute index of first MID block in .index + IDsTotal: offsets.IDsTotal, + IDBlocksTotal: uint32(len(offsets.Offsets)), + } for { - // get MIDs block header - header := l.skipBlock() - if header.Len() == 0 { + h, err := l.reader.GetBlockHeader(l.blockIndex) + if err != nil { + logger.Fatal("error reading id block header", zap.Error(err)) + } + + l.blockIndex++ + if h.Len() == 0 { break } - var mid seq.MID + mid := seq.MID(h.GetExt1()) if fracVersion < config.BinaryDataV2 { - mid = seq.MillisToMID(header.GetExt1()) - } else { - mid = seq.MID(header.GetExt1()) + mid = seq.MillisToMID(h.GetExt1()) } - idsTable.MinBlockIDs = append(idsTable.MinBlockIDs, seq.ID{ + table.MinBlockIDs = append(table.MinBlockIDs, seq.ID{ MID: mid, - RID: seq.RID(header.GetExt2()), + RID: seq.RID(h.GetExt2()), }) - // skipping RIDs and Pos blocks - l.skipBlock() - l.skipBlock() + l.blockIndex += 2 // skip RIDs and Pos blocks } - return idsTable, blocksOffsets, nil + return table, offsets.Offsets, nil } -func (l *Loader) skipTokens() { +// loadLIDs scans LID block headers, recording the absolute start index for lids.Table. +func (l *LegacyLoader) loadLIDs() (*lids.Table, error) { + startIndex := l.blockIndex // absolute index of first LID block in .index + + var ( + maxTIDs []uint32 + minTIDs []uint32 + isContinued []bool + ) + for { - // skip actual token blocks - header := l.skipBlock() - if header.Len() == 0 { + h, err := l.reader.GetBlockHeader(l.blockIndex) + if err != nil { + return nil, err + } + + l.blockIndex++ + if h.Len() == 0 { break } + + maxTIDs = append(maxTIDs, uint32(h.GetExt2()>>32)) + minTIDs = append(minTIDs, uint32(h.GetExt2()&0xFFFFFFFF)) + + isContinued = append(isContinued, h.GetExt1() == 1) } - for { - // skip token table - header := l.skipBlock() - if header.Len() == 0 { + return lids.NewTable(startIndex, minTIDs, maxTIDs, isContinued), nil +} + +// IndexReaders holds one IndexReader per split index file. +type IndexReaders struct { + Info storage.IndexReader + Token storage.IndexReader + Offsets storage.IndexReader + ID storage.IndexReader + LID storage.IndexReader +} + +// Loader reads the per-section index files to populate BlocksData. +// Token data is loaded lazily (BlockLoader / TableLoader use the Token reader directly). +// Info is loaded separately via loadHeader before Load is called. +type Loader struct { + buf []byte +} + +// Load populates blocksData from the .offsets, .id, and .lid files. +func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, readers IndexReaders) { + t := time.Now() + + var ( + err error + blockOffsets sealed.BlockOffsets + ) + + blockOffsets, err = l.loadBlocksOffsets(readers.Offsets) + if err != nil { + logger.Fatal("load offsets error", zap.Error(err)) + } + + blocksData.BlocksOffsets = blockOffsets.Offsets + blocksData.IDsTable = l.loadIDsTable(readers.ID, blockOffsets.IDsTotal, info.BinaryDataVer) + + blocksData.LIDsTable, err = l.loadLIDsTable(readers.LID) + if err != nil { + logger.Fatal("load lids error", zap.Error(err)) + } + + took := time.Since(t) + docsTotalK := float64(info.DocsTotal) / 1000 + indexOnDiskMb := util.SizeToUnit(info.IndexOnDisk, "mb") + throughput := indexOnDiskMb / util.DurationToUnit(took, "s") + logger.Info("sealed fraction loaded", + zap.String("fraction", info.Path), + util.ZapMsTsAsESTimeStr("creation_time", info.CreationTime), + zap.String("from", info.From.String()), + zap.String("to", info.To.String()), + util.ZapFloat64WithPrec("docs_k", docsTotalK, 1), + util.ZapDurationWithPrec("took_ms", took, "ms", 1), + util.ZapFloat64WithPrec("throughput_mb_sec", throughput, 1), + ) +} + +// loadBlocksOffsets reads block 0 from the .offsets file. +func (l *Loader) loadBlocksOffsets(r storage.IndexReader) (sealed.BlockOffsets, error) { + data, _, err := r.ReadIndexBlock(0, l.buf) + l.buf = data + + if err != nil { + return sealed.BlockOffsets{}, err + } + + var b sealed.BlockOffsets + if err := b.Unpack(data); err != nil { + return sealed.BlockOffsets{}, err + } + + return b, nil +} + +// loadIDsTable scans block headers in the .id file to build seqids.Table. +// Blocks are stored as (MIDs, RIDs, Pos) triplets; we only need MIDs headers. +func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersion config.BinaryDataVersion) seqids.Table { + table := seqids.Table{ + StartBlockIndex: 0, + IDsTotal: idsTotal, + } + + for blockIdx := uint32(0); ; { + header, err := r.GetBlockHeader(blockIdx) + if err != nil { + logger.Fatal("error reading id block header", zap.Error(err)) + } + if header.Len() == 0 { // separator break } + + var mid seq.MID + if fracVersion < config.BinaryDataV2 { + mid = seq.MillisToMID(header.GetExt1()) + } else { + mid = seq.MID(header.GetExt1()) + } + + table.MinBlockIDs = append(table.MinBlockIDs, seq.ID{ + MID: mid, + RID: seq.RID(header.GetExt2()), + }) + + table.IDBlocksTotal++ + blockIdx += 3 // skip RIDs and Pos blocks } + + return table } -func (l *Loader) loadLIDsBlocksTable() (*lids.Table, error) { - maxTIDs := make([]uint32, 0) - minTIDs := make([]uint32, 0) - isContinued := make([]bool, 0) +// loadLIDsTable scans block headers in the .lid file to build lids.Table. +func (l *Loader) loadLIDsTable(r storage.IndexReader) (*lids.Table, error) { + var ( + maxTIDs []uint32 + minTIDs []uint32 + isContinued []bool + ) + + for blockIdx := uint32(0); ; blockIdx++ { + header, err := r.GetBlockHeader(blockIdx) + if err != nil { + return nil, err + } - startIndex := l.blockIndex - for { - header := l.skipBlock() if header.Len() == 0 { break } - ext1 := header.GetExt1() ext2 := header.GetExt2() - maxTIDs = append(maxTIDs, uint32(ext2>>32)) minTIDs = append(minTIDs, uint32(ext2&0xFFFFFFFF)) - isContinued = append(isContinued, ext1 == 1) + isContinued = append(isContinued, header.GetExt1() == 1) } - return lids.NewTable(startIndex, minTIDs, maxTIDs, isContinued), nil + return lids.NewTable(0, minTIDs, maxTIDs, isContinued), nil } diff --git a/fracmanager/cache_maintainer.go b/fracmanager/cache_maintainer.go index 70e5f956..2a6ac6dd 100644 --- a/fracmanager/cache_maintainer.go +++ b/fracmanager/cache_maintainer.go @@ -149,7 +149,12 @@ func (cm *CacheMaintainer) CreateIndexCache() *frac.IndexCache { LIDs: newCache[*lids.Block](cm, lidsName), Tokens: newCache[*token.Block](cm, tokensName), TokenTable: newCache[token.Table](cm, tokenTableName), - Registry: newCache[[]byte](cm, indexName), + // Each index file gets its own registry cache (they all use key=1 internally). + InfoRegistry: newCache[[]byte](cm, indexName), + TokenRegistry: newCache[[]byte](cm, indexName), + OffsetsRegistry: newCache[[]byte](cm, indexName), + IDRegistry: newCache[[]byte](cm, indexName), + LIDRegistry: newCache[[]byte](cm, indexName), } } diff --git a/fracmanager/frac_manifest.go b/fracmanager/frac_manifest.go index 2a258bda..a7f8b81c 100644 --- a/fracmanager/frac_manifest.go +++ b/fracmanager/frac_manifest.go @@ -19,20 +19,28 @@ import ( type fracManifest struct { basePath string // base path to fraction files (without extension) hasDocs bool // presence of main documents file - hasIndex bool // presence of index file hasMeta bool // presence of meta-information (legacy WAL format) hasWal bool // presence of WAL with meta (new WAL format) + hasIndex bool // presence of index file hasSdocs bool // presence of sorted documents hasRemote bool // presence of remote fraction + // Split index file flags + hasInfo bool + hasToken bool + hasOffsets bool + hasID bool + hasLID bool + // Deletion marker file flags hasDocsDel bool // documents deletion marker hasSdocsDel bool // sorted documents deletion marker hasIndexDel bool // index deletion marker +} - // Temporary file flags - hasIndexTmp bool // temporary index file - hasSdocsTmp bool // temporary sorted documents file +// hasAllIndexFiles reports whether all 5 split index files are present. +func (m *fracManifest) hasAllIndexFiles() bool { + return m.hasInfo && m.hasToken && m.hasOffsets && m.hasID && m.hasLID } // AddExtension adds information about a file with the specified extension @@ -52,6 +60,17 @@ func (m *fracManifest) AddExtension(ext string) error { case consts.RemoteFractionSuffix: m.hasRemote = true + case consts.InfoFileSuffix: + m.hasInfo = true + case consts.TokenFileSuffix: + m.hasToken = true + case consts.OffsetsFileSuffix: + m.hasOffsets = true + case consts.IDFileSuffix: + m.hasID = true + case consts.LIDFileSuffix: + m.hasLID = true + case consts.DocsDelFileSuffix: m.hasDocsDel = true case consts.SdocsDelFileSuffix: @@ -59,10 +78,13 @@ func (m *fracManifest) AddExtension(ext string) error { case consts.IndexDelFileSuffix: m.hasIndexDel = true - case consts.IndexTmpFileSuffix: - m.hasIndexTmp = true - case consts.SdocsTmpFileSuffix: - m.hasSdocsTmp = true + case consts.IndexTmpFileSuffix, + consts.InfoTmpFileSuffix, consts.TokenTmpFileSuffix, + consts.OffsetsTmpFileSuffix, consts.IDTmpFileSuffix, + consts.LIDTmpFileSuffix, consts.SdocsTmpFileSuffix: + + // Just handle temporary files (which were not commited). + // We will just drop them in all possible cases. default: return fmt.Errorf("unknown fraction file type %s", ext) @@ -88,13 +110,13 @@ func (m *fracManifest) Stage() fracStage { if m.hasRemote { return fracStageRemote } - if m.hasIndex && (m.hasSdocs || m.hasDocs) { + if (m.hasAllIndexFiles() || m.hasIndex) && (m.hasSdocs || m.hasDocs) { return fracStageSealed } if (m.hasMeta || m.hasWal) && m.hasDocs { return fracStageActive } - if m.hasDocsDel || m.hasIndexDel || m.hasSdocsDel { + if m.hasDocsDel || m.hasSdocsDel || m.hasIndexDel { return fracStageZombie } return fracStageUnknown @@ -108,7 +130,7 @@ func removeDocs(m *fracManifest) { } func removeSdocs(m *fracManifest) { - if m.hasDocs { + if m.hasSdocs { util.RemoveFile(m.basePath + consts.SdocsFileSuffix) m.hasSdocs = false } @@ -125,18 +147,23 @@ func removeMeta(m *fracManifest) { } } -func removeIndex(m *fracManifest) { - if m.hasIndex { - util.RemoveFile(m.basePath + consts.IndexFileSuffix) - m.hasIndex = false - } -} - -func removeIndexDel(m *fracManifest) { - if m.hasIndexDel { - util.RemoveFile(m.basePath + consts.IndexDelFileSuffix) - m.hasIndexDel = false +func removeIndexFiles(m *fracManifest) { + for _, suffix := range []string{ + consts.InfoFileSuffix, + consts.TokenFileSuffix, + consts.OffsetsFileSuffix, + consts.IDFileSuffix, + consts.LIDFileSuffix, + consts.IndexFileSuffix, + } { + util.RemoveFile(m.basePath + suffix) } + m.hasInfo = false + m.hasToken = false + m.hasOffsets = false + m.hasID = false + m.hasLID = false + m.hasIndex = false } func removeSdocsDel(m *fracManifest) { @@ -154,17 +181,20 @@ func removeDocsDel(m *fracManifest) { } func removeIndexTmp(m *fracManifest) { - if m.hasIndexTmp { - util.RemoveFile(m.basePath + consts.IndexTmpFileSuffix) - m.hasIndexTmp = false + for _, suffix := range []string{ + consts.IndexTmpFileSuffix, + consts.InfoTmpFileSuffix, + consts.TokenTmpFileSuffix, + consts.OffsetsTmpFileSuffix, + consts.IDTmpFileSuffix, + consts.LIDTmpFileSuffix, + } { + util.RemoveFile(m.basePath + suffix) } } func removeSdocsTmp(m *fracManifest) { - if m.hasSdocsTmp { - util.RemoveFile(m.basePath + consts.SdocsTmpFileSuffix) - m.hasSdocsTmp = false - } + util.RemoveFile(m.basePath + consts.SdocsTmpFileSuffix) } // analyzeFiles analyzes fraction files and groups them by fraction ID @@ -240,8 +270,7 @@ func cleanupRemoteFrac(m *fracManifest) { removeMeta(m) removeDocs(m) removeSdocs(m) - removeIndex(m) - removeIndexDel(m) + removeIndexFiles(m) } // cleanupSealedFrac cleans files for sealed fractions @@ -265,17 +294,22 @@ func cleanupTemporary(m *fracManifest) { // removeAllFiles completely removes all fraction files // Used for cleaning up partially deleted or corrupted fractions func removeAllFiles(basePath string) { - // Remove main files first, then deletion markers to preserve deletion intent - util.RemoveFile(basePath + consts.IndexFileSuffix) - util.RemoveFile(basePath + consts.DocsFileSuffix) - util.RemoveFile(basePath + consts.SdocsFileSuffix) - util.RemoveFile(basePath + consts.MetaFileSuffix) - - util.RemoveFile(basePath + consts.IndexDelFileSuffix) - util.RemoveFile(basePath + consts.DocsDelFileSuffix) - util.RemoveFile(basePath + consts.SdocsDelFileSuffix) - util.RemoveFile(basePath + consts.SdocsTmpFileSuffix) - util.RemoveFile(basePath + consts.IndexTmpFileSuffix) + for _, suffix := range []string{ + consts.DocsFileSuffix, consts.DocsDelFileSuffix, + consts.SdocsFileSuffix, consts.SdocsDelFileSuffix, consts.SdocsTmpFileSuffix, + consts.IndexFileSuffix, consts.IndexDelFileSuffix, consts.IndexTmpFileSuffix, + + consts.InfoFileSuffix, consts.InfoTmpFileSuffix, + consts.TokenFileSuffix, consts.TokenTmpFileSuffix, + consts.OffsetsFileSuffix, consts.OffsetsTmpFileSuffix, + consts.IDFileSuffix, consts.IDTmpFileSuffix, + consts.LIDFileSuffix, consts.LIDTmpFileSuffix, + + consts.MetaFileSuffix, + consts.WalFileSuffix, + } { + util.RemoveFile(basePath + suffix) + } } // parseFilePath extracts components from a fraction file path diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index e2915598..f823649d 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -55,7 +55,7 @@ func (fp *fractionProvider) NewActive(name string) *frac.Active { ) } -func (fp *fractionProvider) NewSealed(name string, cachedInfo *common.Info) *frac.Sealed { +func (fp *fractionProvider) NewSealed(name string, cachedInfo *common.Info, isLegacy bool) *frac.Sealed { return frac.NewSealed( name, fp.readLimiter, @@ -63,6 +63,7 @@ func (fp *fractionProvider) NewSealed(name string, cachedInfo *common.Info) *fra fp.cacheProvider.CreateDocBlockCache(), cachedInfo, // Preloaded meta information &fp.config.Fraction, + isLegacy, ) } @@ -77,7 +78,7 @@ func (fp *fractionProvider) NewSealedPreloaded(name string, preloadedData *seale ) } -func (fp *fractionProvider) NewRemote(ctx context.Context, name string, cachedInfo *common.Info) *frac.Remote { +func (fp *fractionProvider) NewRemote(ctx context.Context, name string, cachedInfo *common.Info, isLegacy bool) *frac.Remote { return frac.NewRemote( ctx, name, @@ -87,6 +88,7 @@ func (fp *fractionProvider) NewRemote(ctx context.Context, name string, cachedIn cachedInfo, &fp.config.Fraction, fp.s3cli, + isLegacy, ) } @@ -127,9 +129,11 @@ func (fp *fractionProvider) Offload(ctx context.Context, f *frac.Sealed) (*frac. if err != nil { return nil, err } + if !mustBeOffloaded { return nil, nil } + info := f.Info() - return fp.NewRemote(ctx, info.Path, info), nil + return fp.NewRemote(ctx, info.Path, info, f.IsLegacy), nil } diff --git a/fracmanager/loader.go b/fracmanager/loader.go index 6eb788ee..69ff7c02 100644 --- a/fracmanager/loader.go +++ b/fracmanager/loader.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/logger" ) @@ -136,9 +137,23 @@ func (l *Loader) discover(ctx context.Context) ([]*frac.Active, []*frac.Sealed, case fracStageActive: actives = append(actives, l.provider.NewActive(manifest.basePath)) case fracStageSealed: - locals = append(locals, l.loadSealed(manifest.basePath, loadedInfoCache)) + locals = append(locals, l.loadSealed(manifest, loadedInfoCache)) case fracStageRemote: - remotes = append(remotes, l.loadRemote(ctx, manifest.basePath, loadedInfoCache)) + // TODO(dkharms): Drop this compatibility check. + + indexName := filepath.Base(manifest.basePath) + consts.IndexFileSuffix + hasIndex, err := l.provider.s3cli.Exists(ctx, indexName) + if err != nil { + logger.Error( + "will skip fraction: cannot check existence of .index file", + zap.String("fraction", filepath.Base(manifest.basePath)), + zap.Error(err), + ) + continue + } + + manifest.hasIndex = hasIndex + remotes = append(remotes, l.loadRemote(ctx, manifest, loadedInfoCache)) default: logger.Error("unexpected fraction stage", zap.Any("manifest", manifest)) } @@ -153,21 +168,21 @@ func (l *Loader) discover(ctx context.Context) ([]*frac.Active, []*frac.Sealed, } // loadSealed loads a sealed fraction using cache -func (l *Loader) loadSealed(basePath string, loadedInfoCache *fracInfoCache) *frac.Sealed { - info, found := loadedInfoCache.Get(filepath.Base(basePath)) +func (l *Loader) loadSealed(manifest *fracManifest, loadedInfoCache *fracInfoCache) *frac.Sealed { + info, found := loadedInfoCache.Get(filepath.Base(manifest.basePath)) l.updateStats(found) - f := l.provider.NewSealed(basePath, info) + f := l.provider.NewSealed(manifest.basePath, info, manifest.hasIndex) l.infoCache.Add(f.Info()) return f } // loadRemote loads a remote fraction -func (l *Loader) loadRemote(ctx context.Context, basePath string, loadedInfoCache *fracInfoCache) *frac.Remote { - info, found := loadedInfoCache.Get(filepath.Base(basePath)) +func (l *Loader) loadRemote(ctx context.Context, manifest *fracManifest, loadedInfoCache *fracInfoCache) *frac.Remote { + info, found := loadedInfoCache.Get(filepath.Base(manifest.basePath)) l.updateStats(found) - f := l.provider.NewRemote(ctx, basePath, info) + f := l.provider.NewRemote(ctx, manifest.basePath, info, manifest.hasIndex) l.infoCache.Add(f.Info()) return f } diff --git a/indexer/processor.go b/indexer/processor.go index dbf7c106..9ca83938 100644 --- a/indexer/processor.go +++ b/indexer/processor.go @@ -13,7 +13,6 @@ import ( "go.uber.org/zap" "github.com/ozontech/seq-db/consts" - "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/tokenizer" diff --git a/seq/seq.go b/seq/seq.go index 6a5a0039..adae4265 100644 --- a/seq/seq.go +++ b/seq/seq.go @@ -10,14 +10,23 @@ import ( "github.com/ozontech/seq-db/util" ) +var ( + SystemMID MID = math.MaxUint64 + SystemRID RID = math.MaxUint64 + SystemID ID = ID{SystemMID, SystemRID} + SystemDocPos DocPos = DocPos(0) +) + type ID struct { MID MID RID RID } -type MID uint64 // nanoseconds part of ID -type RID uint64 // random part of ID -type LID uint32 // local id for a fraction +type ( + MID uint64 // nanoseconds part of ID + RID uint64 // random part of ID + LID uint32 // local id for a fraction +) func (m MID) Time() time.Time { nanosPerSecond := uint64(time.Second) @@ -100,7 +109,6 @@ func FromString(x string) (ID, error) { } rid, err := hex.DecodeString(x[17:]) - if err != nil { return id, err }