Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions pkg/mediorum/mediorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ func runMediorum(lc *lifecycle.Lifecycle, logger *zap.Logger, mediorumEnv string
logger.Warn("failed to parse OPENAUDIO_REPAIR_INTERVAL, using default 1h", zap.String("value", ri), zap.Error(err))
}
}
repairQmCidsUseListIndex := os.Getenv("OPENAUDIO_REPAIR_QM_CIDS_USE_LIST_INDEX") == "true"

config := server.MediorumConfig{
Self: registrar.Peer{
Host: httputil.RemoveTrailingSlash(strings.ToLower(nodeEndpoint)),
Expand Down Expand Up @@ -175,7 +173,6 @@ func runMediorum(lc *lifecycle.Lifecycle, logger *zap.Logger, mediorumEnv string
DeadHosts: []string{},
RepairEnabled: repairEnabled,
RepairInterval: repairInterval,
RepairQmCidsUseListIndex: repairQmCidsUseListIndex,
BlobStorageStreaming: os.Getenv("OPENAUDIO_BLOB_STORAGE_STREAMING") == "true",
}

Expand Down
43 changes: 20 additions & 23 deletions pkg/mediorum/server/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,23 @@ func (ss *MediorumServer) runRepair(ctx context.Context, tracker *RepairTracker)
}
}

// Build a presence index from bucket listing to avoid per-key HeadObject.
// Replaces hundreds of thousands of HeadObject calls with one ListObjects pagination.
var presenceIndex *repairPresenceIndex
ss.logger.Info("building repair presence index from bucket listing")
indexStart := time.Now()
idx, err := ss.buildRepairPresenceIndex(ctx)
if err != nil {
ss.logger.Warn("failed to build presence index; falling back to per-key attrs", zap.Error(err))
tracker.Counters["qm_cids_list_index_build_fail"]++
} else {
presenceIndex = idx
tracker.Counters["qm_cids_list_index_entries"] = len(idx.entries)
ss.logger.Info("presence index built",
zap.Int("entries", len(idx.entries)),
zap.Duration("took", time.Since(indexStart)))
}

// scroll uploads and repair CIDs
// (later this can clean up "derivative" images if we make image resizing dynamic)
for {
Expand Down Expand Up @@ -195,14 +212,14 @@ func (ss *MediorumServer) runRepair(ctx context.Context, tracker *RepairTracker)
}

tracker.CursorUploadID = u.ID
ss.repairCid(ctx, u.OrigFileCID, u.PlacementHosts, tracker, nil)
ss.repairCid(ctx, u.OrigFileCID, u.PlacementHosts, tracker, presenceIndex)
// images are resized dynamically
// so only consider audio TranscodeResults for repair
if u.Template != JobTemplateAudio {
continue
}
for _, cid := range u.TranscodeResults {
ss.repairCid(ctx, cid, u.PlacementHosts, tracker, nil)
ss.repairCid(ctx, cid, u.PlacementHosts, tracker, presenceIndex)
}
}

Expand Down Expand Up @@ -242,33 +259,13 @@ func (ss *MediorumServer) runRepair(ctx context.Context, tracker *RepairTracker)
}

tracker.CursorPreviewCID = u.CID
ss.repairCid(ctx, u.CID, nil, tracker, nil)
ss.repairCid(ctx, u.CID, nil, tracker, presenceIndex)
}

tracker.Duration += time.Since(startIter)
saveTracker()
}

// optionally build a presence index from bucket.List to avoid per-key
// HeadObject calls during qm_cids cleanup. This replaces millions of
// HeadObject calls with a single ListObjects pagination.
var presenceIndex *repairPresenceIndex
if tracker.CleanupMode && ss.Config.RepairQmCidsUseListIndex {
ss.logger.Info("building qm_cids presence index from bucket listing")
indexStart := time.Now()
idx, err := ss.buildRepairPresenceIndex(ctx)
if err != nil {
ss.logger.Warn("failed to build presence index; falling back to per-key attrs", zap.Error(err))
tracker.Counters["qm_cids_list_index_build_fail"]++
} else {
presenceIndex = idx
tracker.Counters["qm_cids_list_index_entries"] = len(idx.entries)
ss.logger.Info("presence index built",
zap.Int("entries", len(idx.entries)),
zap.Duration("took", time.Since(indexStart)))
}
}

// scroll older qm_cids table and repair
for {
// abort if context is canceled
Expand Down
1 change: 0 additions & 1 deletion pkg/mediorum/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ type MediorumConfig struct {
DeadHosts []string
RepairEnabled bool `default:"true"`
RepairInterval time.Duration `default:"1h"`
RepairQmCidsUseListIndex bool

ProgrammableDistributionEnabled bool
BlobStorageStreaming bool
Expand Down
Loading