Skip to content
109 changes: 71 additions & 38 deletions cmd/index_analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"fmt"
"hash/fnv"
"os"
"strings"
"sync"
"time"

"github.com/alecthomas/units"
"go.uber.org/zap"

"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/frac/sealed/lids"
"github.com/ozontech/seq-db/frac/sealed/token"
Expand All @@ -21,7 +23,7 @@ import (

// Launch as:
//
// > go run ./cmd/index_analyzer/... ./data/*.index | tee ~/report.txt
// > go run ./cmd/index_analyzer/... ./data/*.info | tee ~/report.txt
func main() {
if len(os.Args) < 2 {
fmt.Println("No args")
Expand Down Expand Up @@ -73,45 +75,80 @@ func getCacheMaintainer() (*fracmanager.CacheMaintainer, func()) {
}
}

// basePath strips any known index suffix to return the fraction base path.
func basePath(path string) string {
for _, suffix := range []string{
consts.InfoFileSuffix,
consts.TokenFileSuffix,
consts.OffsetsFileSuffix,
consts.IDFileSuffix,
consts.LIDFileSuffix,
} {
if strings.HasSuffix(path, suffix) {
return path[:len(path)-len(suffix)]
}
}
return path
}

func openFile(path string) *os.File {
f, err := os.Open(path)
if err != nil {
panic(err)
}
return f
}

func analyzeIndex(
path string,
cm *fracmanager.CacheMaintainer,
reader *storage.ReadLimiter,
rl *storage.ReadLimiter,
mergedTokensUniq map[string]map[string]int,
allTokensValuesUniq map[string]int,
) Stats {
base := basePath(path)
indexCache := cm.CreateIndexCache()

// Open per-section files.
infoFile := openFile(base + consts.InfoFileSuffix)
tokenFile := openFile(base + consts.TokenFileSuffix)
lidFile := openFile(base + consts.LIDFileSuffix)
defer infoFile.Close()
defer tokenFile.Close()
defer lidFile.Close()

infoReader := storage.NewIndexReader(rl, infoFile.Name(), infoFile, indexCache.InfoRegistry)
tokenReader := storage.NewIndexReader(rl, tokenFile.Name(), tokenFile, indexCache.TokenRegistry)
lidReader := storage.NewIndexReader(rl, lidFile.Name(), lidFile, indexCache.LIDRegistry)

// --- Info ---
var blockIndex uint32
cache := cm.CreateIndexCache()

f, err := os.Open(path)
infoData, _, err := infoReader.ReadIndexBlock(0, nil)
if err != nil {
panic(err)
logger.Fatal("error reading info block", zap.String("file", infoFile.Name()), zap.Error(err))
}
var b sealed.BlockInfo
if err := b.Unpack(infoData); err != nil {
logger.Fatal("error unpacking block info", zap.Error(err))
}
docsCount := int(b.Info.DocsTotal)

indexReader := storage.NewIndexReader(reader, f.Name(), f, cache.Registry)

readBlock := func() []byte {
data, _, err := indexReader.ReadIndexBlock(blockIndex, nil)
// --- Tokens (.token file) ---
// Token blocks start at index 0, followed by an empty separator, then token table blocks.
blockIndex = 0
readTokenBlock := func() []byte {
data, _, err := tokenReader.ReadIndexBlock(blockIndex, nil)
blockIndex++
if err != nil {
logger.Fatal("error reading block", zap.String("file", f.Name()), zap.Error(err))
logger.Fatal("error reading token block", zap.String("file", tokenFile.Name()), zap.Error(err))
}
return data
}

// load info
var b sealed.BlockInfo
if err := b.Unpack(readBlock()); err != nil {
logger.Fatal("error unpacking block info", zap.Error(err))
}

docsCount := int(b.Info.DocsTotal)

// load tokens
tokens := [][]byte{}
for {
data := readBlock()
if len(data) == 0 { // empty block - is section separator
data := readTokenBlock()
if len(data) == 0 { // empty block - section separator
break
}
block := token.Block{}
Expand All @@ -123,11 +160,10 @@ func analyzeIndex(
}
}

// load tokens table
tokenTableBlocks := []token.TableBlock{}
for {
data := readBlock()
if len(data) == 0 { // empty block - is section separator
data := readTokenBlock()
if len(data) == 0 { // empty block - section separator
break
}
block := token.TableBlock{}
Expand All @@ -136,28 +172,25 @@ func analyzeIndex(
}
tokenTable := token.TableFromBlocks(tokenTableBlocks)

// skip position
blockIndex++

// skip IDS
for {
data := readBlock()
if len(data) == 0 { // empty block - is section separator
break
// --- LIDs (.lid file) ---
blockIndex = 0
readLIDBlock := func() []byte {
data, _, err := lidReader.ReadIndexBlock(blockIndex, nil)
blockIndex++
if err != nil {
logger.Fatal("error reading lid block", zap.String("file", lidFile.Name()), zap.Error(err))
}
blockIndex++ // skip RID
blockIndex++ // skip Param
return data
}

// load LIDs
tid := 0
lidsTotal := 0
lidsUniq := map[[16]byte]int{}
lidsLens := make([]int, len(tokens))
tokenLIDs := []uint32{}
for {
data := readBlock()
if len(data) == 0 { // empty block - is section separator
data := readLIDBlock()
if len(data) == 0 { // empty block - section separator
break
}

Expand Down
18 changes: 18 additions & 0 deletions consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,26 @@ const (
SdocsTmpFileSuffix = "._sdocs"
SdocsDelFileSuffix = ".sdocs.del"

InfoFileSuffix = ".info"
InfoTmpFileSuffix = "._info"

TokenFileSuffix = ".tokens"
TokenTmpFileSuffix = "._tokens"

OffsetsFileSuffix = ".offsets"
OffsetsTmpFileSuffix = "._offsets"

IDFileSuffix = ".ids"
IDTmpFileSuffix = "._ids"

LIDFileSuffix = ".lids"
LIDTmpFileSuffix = "._lids"

// IndexFileSuffix is the legacy single-file index format (pre-split).
IndexFileSuffix = ".index"
IndexTmpFileSuffix = "._index"
// TODO(dkharms): [IndexDelFileSuffix] is actually not necessary.
// We can remove it in the future releases.
IndexDelFileSuffix = ".index.del"

RemoteFractionSuffix = ".remote"
Expand Down
22 changes: 5 additions & 17 deletions frac/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package frac
import (
"context"
"io"
"math"
"os"
"path/filepath"
"sync"
Expand All @@ -26,9 +25,7 @@ import (
"github.com/ozontech/seq-db/util"
)

var (
_ Fraction = (*Active)(nil)
)
var _ Fraction = (*Active)(nil)

type Active struct {
Config *Config
Expand Down Expand Up @@ -61,16 +58,6 @@ type Active struct {
indexer *ActiveIndexer
}

const (
systemMID = math.MaxUint64
systemRID = math.MaxUint64
)

var systemSeqID = seq.ID{
MID: systemMID,
RID: systemRID,
}

func NewActive(
baseFileName string,
activeIndexer *ActiveIndexer,
Expand Down Expand Up @@ -109,8 +96,8 @@ func NewActive(
}

// use of 0 as keys in maps is prohibited – it's system key, so add first element
f.MIDs.Append(systemMID)
f.RIDs.Append(systemRID)
f.MIDs.Append(uint64(seq.SystemMID))
f.RIDs.Append(uint64(seq.SystemRID))

logger.Info("active fraction created", zap.String("fraction", baseFileName))

Expand All @@ -121,7 +108,8 @@ func mustOpenMetaWriter(
baseFileName string,
readLimiter *storage.ReadLimiter,
docsFile *os.File,
docsStats os.FileInfo) (*os.File, *ActiveWriter, *storage.DocBlocksReader, *storage.WalReader, uint64) {
docsStats os.FileInfo,
) (*os.File, *ActiveWriter, *storage.DocBlocksReader, *storage.WalReader, uint64) {
legacyMetaFileName := baseFileName + consts.MetaFileSuffix

if _, err := os.Stat(legacyMetaFileName); err == nil {
Expand Down
Loading
Loading