diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index ad981c07..e81775e7 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -1595,6 +1595,51 @@ func (s *Spec) GetCreateTableSql(tableName string) (string, error) { return createSql, nil } +func (s *Spec) ModifyPartitionProperty(destTableName string, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error { + if batchModifyPartitionsInfo == nil || len(batchModifyPartitionsInfo.Infos) == 0 { + log.Warnf("empty partition infos, skip modify partition property") + return nil + } + + dbName := utils.FormatKeywordName(s.Database) + destTableName = utils.FormatKeywordName(destTableName) + + var lastErr error + successCount := 0 + for _, partitionInfo := range batchModifyPartitionsInfo.Infos { + if partitionInfo.DataProperty == nil || partitionInfo.DataProperty.StorageMedium == "" { + log.Warnf("partition %d has no storage medium, skip modify partition property", partitionInfo.PartitionId) + continue + } + + sql := fmt.Sprintf("ALTER TABLE %s.%s MODIFY PARTITION %s SET (\"storage_medium\" = \"%s\")", + dbName, destTableName, utils.FormatKeywordName(partitionInfo.PartitionName), partitionInfo.DataProperty.StorageMedium) + + log.Infof("modify partition property sql: %s", sql) + if err := s.Exec(sql); err != nil { + errMsg := err.Error() + // Skip if partition not found (partition may have been dropped) + if strings.Contains(errMsg, "does not exist") || strings.Contains(errMsg, "not found") { + log.Warnf("partition %s not found, skip: %v", partitionInfo.PartitionName, err) + continue + } + // For other errors, record and continue to try remaining partitions + log.Warnf("modify partition %s property failed: %v", partitionInfo.PartitionName, err) + lastErr = err + } else { + successCount++ + } + } + + // Return error if any partition modification failed (except partition not found) + if lastErr != nil { + return xerror.Wrapf(lastErr, xerror.Normal, + "modify partition storage medium failed, success: %d, total: %d", + successCount, len(batchModifyPartitionsInfo.Infos)) + } + + return nil +} // Determine whether the error are network related, eg connection refused, connection reset, exposed from net packages. func isNetworkRelated(err error) bool { diff --git a/pkg/ccr/base/specer.go b/pkg/ccr/base/specer.go index a7cd32d4..4f33b0e2 100644 --- a/pkg/ccr/base/specer.go +++ b/pkg/ccr/base/specer.go @@ -66,6 +66,7 @@ type Specer interface { AddPartition(destTableName string, addPartition *record.AddPartition) error DropPartition(destTableName string, dropPartition *record.DropPartition) error RenamePartition(destTableName, oldPartition, newPartition string) error + ModifyPartitionProperty(destTableName string, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error LightningIndexChange(tableAlias string, changes *record.ModifyTableAddOrDropInvertedIndices) error BuildIndex(tableAlias string, buildIndex *record.IndexChangeJob) error diff --git a/pkg/ccr/handle/create_table.go b/pkg/ccr/handle/create_table.go index 42b495c3..4282c368 100644 --- a/pkg/ccr/handle/create_table.go +++ b/pkg/ccr/handle/create_table.go @@ -1,6 +1,8 @@ package handle import ( + "fmt" + "regexp" "strings" "github.com/selectdb/ccr_syncer/pkg/ccr" @@ -18,6 +20,70 @@ type CreateTableHandle struct { IdempotentJobHandle[*record.CreateTable] } +// Check if error message indicates storage medium or capacity related issues +func isStorageMediumError(errMsg string) bool { + // Doris returns "Failed to find enough backend" for storage/capacity issues + return strings.Contains(strings.ToLower(errMsg), "failed to find enough backend") +} + +// Set specific property in CREATE TABLE SQL +func setPropertyInCreateTableSql(createSql string, key string, value string) string { + // Add property to PROPERTIES clause + pattern := `(?i)(PROPERTIES\s*\(\s*)` + replacement := fmt.Sprintf(`${1}"%s" = "%s", `, key, value) + createSql = regexp.MustCompile(pattern).ReplaceAllString(createSql, replacement) + + // Clean up trailing comma if PROPERTIES was empty + return ccr.FilterTailingCommaFromCreateTableSql(createSql) +} + +// Set specific storage_medium in CREATE TABLE SQL +func setStorageMediumInCreateTableSql(createSql string, medium string) string { + // Remove existing storage_medium first + createSql = ccr.FilterStorageMediumFromCreateTableSql(createSql) + return setPropertyInCreateTableSql(createSql, "storage_medium", medium) +} + +// Set specific medium_allocation_mode in CREATE TABLE SQL +func setMediumAllocationModeInCreateTableSql(createSql string, mode string) string { + // Remove existing medium_allocation_mode first + createSql = ccr.FilterMediumAllocationModeFromCreateTableSql(createSql) + return setPropertyInCreateTableSql(createSql, "medium_allocation_mode", mode) +} + +// Process CREATE TABLE SQL according to storage medium policy +func processCreateTableSqlByMediumPolicy(j *ccr.Job, createTable *record.CreateTable) { + storageMedium := j.StorageMedium + mediumAllocationMode := j.MediumAllocationMode + + // Process storage_medium + switch storageMedium { + case ccr.StorageMediumSameWithUpstream: + // Keep upstream storage_medium unchanged + log.Infof("using same_with_upstream storage medium, keeping original storage_medium") + + case ccr.StorageMediumHDD: + log.Infof("using hdd storage medium, setting storage_medium to hdd") + createTable.Sql = setStorageMediumInCreateTableSql(createTable.Sql, "hdd") + + case ccr.StorageMediumSSD: + log.Infof("using ssd storage medium, setting storage_medium to ssd") + createTable.Sql = setStorageMediumInCreateTableSql(createTable.Sql, "ssd") + + default: + log.Warnf("unknown storage medium: %s, falling back to filter storage_medium", storageMedium) + if ccr.FeatureFilterStorageMedium { + createTable.Sql = ccr.FilterStorageMediumFromCreateTableSql(createTable.Sql) + } + } + + // Process medium_allocation_mode from CCR job parameter + if mediumAllocationMode != "" { + log.Infof("setting medium_allocation_mode to %s", mediumAllocationMode) + createTable.Sql = setMediumAllocationModeInCreateTableSql(createTable.Sql, mediumAllocationMode) + } +} + func (h *CreateTableHandle) Handle(j *ccr.Job, commitSeq int64, createTable *record.CreateTable) error { if j.SyncType != ccr.DBSync { return xerror.Errorf(xerror.Normal, "invalid sync type: %v", j.SyncType) @@ -68,9 +134,8 @@ func (h *CreateTableHandle) Handle(j *ccr.Job, commitSeq int64, createTable *rec } } - if ccr.FeatureFilterStorageMedium { - createTable.Sql = ccr.FilterStorageMediumFromCreateTableSql(createTable.Sql) - } + // Process SQL according to storage medium policy + processCreateTableSqlByMediumPolicy(j, createTable) createTable.Sql = ccr.FilterDynamicPartitionStoragePolicyFromCreateTableSql(createTable.Sql) if ccr.FeatureOverrideReplicationNum() && j.ReplicationNum > 0 { @@ -79,26 +144,49 @@ func (h *CreateTableHandle) Handle(j *ccr.Job, commitSeq int64, createTable *rec if err := j.IDest.CreateTableOrView(createTable, j.Src.Database); err != nil { errMsg := err.Error() + + // Skip unsupported features if strings.Contains(errMsg, "Can not found function") { log.Warnf("skip creating table/view because the UDF function is not supported yet: %s", errMsg) return nil - } else if strings.Contains(errMsg, "Can not find resource") { + } + if strings.Contains(errMsg, "Can not find resource") { log.Warnf("skip creating table/view for the resource is not supported yet: %s", errMsg) return nil - } else if createTable.IsCreateView() && strings.Contains(errMsg, "Unknown column") { + } + + // Trigger partial snapshot for recoverable errors + if createTable.IsCreateView() && strings.Contains(errMsg, "Unknown column") { log.Warnf("create view but the column is not found, trigger partial snapshot, commit seq: %d, msg: %s", commitSeq, errMsg) replace := false // new view no need to replace isView := true return j.NewPartialSnapshot(createTable.TableId, createTable.TableName, nil, replace, isView) } - if len(createTable.TableName) > 0 && ccr.IsSessionVariableRequired(errMsg) { // ignore doris 2.0.3 + if len(createTable.TableName) > 0 && ccr.IsSessionVariableRequired(errMsg) { log.Infof("a session variable is required to create table %s, force partial snapshot, commit seq: %d, msg: %s", createTable.TableName, commitSeq, errMsg) replace := false // new table no need to replace isView := false return j.NewPartialSnapshot(createTable.TableId, createTable.TableName, nil, replace, isView) } + + // Storage medium related error: pause job and require manual intervention + if isStorageMediumError(errMsg) { + log.Errorf("create table %s failed due to storage medium issue, job will be paused. "+ + "Current storage_medium=%s. Please check target cluster resources or update storage_medium via API. Error: %s", + createTable.TableName, j.StorageMedium, errMsg) + return xerror.Panicf(xerror.Normal, + "Create table failed: storage medium issue for table %s. "+ + "Current storage_medium=%s. Possible causes:\n"+ + "1. Storage medium (%s) not available on target cluster\n"+ + "2. Insufficient disk capacity\n"+ + "3. Replication number exceeds available BE nodes\n"+ + "Please check target cluster configuration or update storage_medium via /update_storage_medium API. "+ + "Original error: %s", + createTable.TableName, j.StorageMedium, j.StorageMedium, errMsg) + } + return xerror.Wrapf(err, xerror.Normal, "create table %d", createTable.TableId) } diff --git a/pkg/ccr/handle/modify_partitions.go b/pkg/ccr/handle/modify_partitions.go new file mode 100644 index 00000000..a62cdbae --- /dev/null +++ b/pkg/ccr/handle/modify_partitions.go @@ -0,0 +1,105 @@ +package handle + +import ( + "strings" + + "github.com/selectdb/ccr_syncer/pkg/ccr" + "github.com/selectdb/ccr_syncer/pkg/ccr/record" + festruct "github.com/selectdb/ccr_syncer/pkg/rpc/kitex_gen/frontendservice" + "github.com/selectdb/ccr_syncer/pkg/xerror" + log "github.com/sirupsen/logrus" +) + +func init() { + ccr.RegisterJobHandle[*record.BatchModifyPartitionsInfo](festruct.TBinlogType_MODIFY_PARTITIONS, &ModifyPartitionsHandle{}) +} + +type ModifyPartitionsHandle struct { + // The modify partitions binlog is idempotent + IdempotentJobHandle[*record.BatchModifyPartitionsInfo] +} + +// Filter partitions that have storage medium changes and are not temporary partitions +func filterStorageMediumChanges(infos []*record.ModifyPartitionInfo) []*record.ModifyPartitionInfo { + filtered := make([]*record.ModifyPartitionInfo, 0) + for _, info := range infos { + // Skip temporary partitions (they are not synced) + if info.IsTempPartition { + log.Debugf("skip temporary partition %d", info.PartitionId) + continue + } + // Only process partitions with storage medium specified + if info.DataProperty != nil && info.DataProperty.StorageMedium != "" { + filtered = append(filtered, info) + } + } + return filtered +} + +func (h *ModifyPartitionsHandle) Handle(j *ccr.Job, commitSeq int64, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error { + // Skip if using fixed storage medium (hdd/ssd) + // Only process when using same_with_upstream storage medium + if j.StorageMedium == ccr.StorageMediumHDD || + j.StorageMedium == ccr.StorageMediumSSD { + log.Infof("skip modify partitions for storage_medium is fixed to %s", j.StorageMedium) + return nil + } + + // Safety check: ensure we have partition infos to process + if batchModifyPartitionsInfo == nil || len(batchModifyPartitionsInfo.Infos) == 0 { + log.Warnf("batch modify partitions info is empty or nil, skip") + return nil + } + + // Filter to only process storage medium changes + filteredInfos := filterStorageMediumChanges(batchModifyPartitionsInfo.Infos) + if len(filteredInfos) == 0 { + log.Infof("no storage medium changes in modify partitions binlog, skip") + return nil + } + log.Infof("processing %d partition storage medium changes out of %d total modifications", + len(filteredInfos), len(batchModifyPartitionsInfo.Infos)) + + // Update to use filtered infos + batchModifyPartitionsInfo.Infos = filteredInfos + + // Get table ID from the first partition info (all partitions should belong to the same table) + tableId := batchModifyPartitionsInfo.GetTableId() + if tableId <= 0 { + log.Warnf("invalid table ID: %d, skip modify partitions", tableId) + return nil + } + + // Check if it's a materialized view table + if isAsyncMv, err := j.IsMaterializedViewTable(tableId); err != nil { + return err + } else if isAsyncMv { + log.Infof("skip modify partitions for materialized view table %d", tableId) + return nil + } + + // Get destination table name + destTableName, err := j.GetDestNameBySrcId(tableId) + if err != nil { + errMsg := err.Error() + // If table not found in mapping, it may not be synced yet or already dropped + if strings.Contains(errMsg, "not found") || strings.Contains(errMsg, "does not exist") { + log.Warnf("table %d not found in dest, skip modify partitions: %v", tableId, err) + return nil + } + // Other errors (network, etc.): return error to retry + return xerror.Wrapf(err, xerror.Normal, "failed to get dest table name for table %d", tableId) + } + + // Execute modify partition property + // Note: ModifyPartition only updates FE metadata, actual BE storage migration is async + // So this SQL won't fail due to backend resource issues + if err := j.Dest.ModifyPartitionProperty(destTableName, batchModifyPartitionsInfo); err != nil { + // Return error to let job framework retry (network issues, etc.) + return xerror.Wrapf(err, xerror.Normal, "modify partition storage medium failed for table %s", destTableName) + } + + log.Infof("successfully modified storage medium for %d partitions in table %s", + len(filteredInfos), destTableName) + return nil +} diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index b5f59ec8..b859130e 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -80,6 +80,7 @@ var ( featureSeperatedHandles bool featureEnableSnapshotCompress bool featureOverrideReplicationNumInternal bool + FeatureMediumAllocationMode bool flagBinlogBatchSize int64 @@ -127,6 +128,8 @@ func init() { "enable snapshot compress") flag.BoolVar(&featureOverrideReplicationNumInternal, "feature_override_replication_num", true, "enable override replication_num for downstream cluster") + flag.BoolVar(&FeatureMediumAllocationMode, "feature_medium_allocation_mode", true, + "enable medium allocation mode support for sync job") flag.Int64Var(&flagBinlogBatchSize, "binlog_batch_size", 16, "the max num of binlogs to get in a batch") } @@ -143,6 +146,19 @@ const ( TableSync SyncType = 1 ) +// Storage medium constants +const ( + StorageMediumHDD = "hdd" + StorageMediumSSD = "ssd" + StorageMediumSameWithUpstream = "same_with_upstream" +) + +// Medium allocation mode constants +const ( + MediumAllocationModeStrict = "strict" + MediumAllocationModeAdaptive = "adaptive" +) + func (s SyncType) String() string { switch s { case DBSync: @@ -223,6 +239,10 @@ type Job struct { destMeta Metaer `json:"-"` State JobState `json:"state"` Extra JobExtra `json:"extra"` + // Storage medium for backup/restore operations: "hdd", "ssd" or "same_with_upstream" + StorageMedium string `json:"storage_medium"` + // Medium allocation mode for backup/restore operations: "strict" or "adaptive" + MediumAllocationMode string `json:"medium_allocation_mode"` factory *Factory `json:"-"` @@ -254,7 +274,9 @@ type JobContext struct { ReuseBinlogLabel bool Factory *Factory // Replication number policy: -1 means inherit from upstream (default), >0 means fixed replica num, 0 is invalid - ReplicationNum int + ReplicationNum int + StorageMedium string + MediumAllocationMode string } // new job @@ -268,16 +290,48 @@ func NewJobFromService(name string, ctx context.Context) (*Job, error) { src := jobContext.Src dest := jobContext.Dest id := getJobId(name, src, dest) + + // Set default storage medium if not specified + storageMedium := jobContext.StorageMedium + log.Infof("NewJobFromService: received storage_medium=%s from JobContext", storageMedium) + if storageMedium == "" { + storageMedium = StorageMediumSameWithUpstream + log.Infof("NewJobFromService: storage_medium was empty, set to default=%s", storageMedium) + } + // Validate storage medium + if storageMedium != StorageMediumHDD && + storageMedium != StorageMediumSSD && + storageMedium != StorageMediumSameWithUpstream { + return nil, xerror.Errorf(xerror.Normal, "invalid storage medium: %s, must be %s, %s or %s", + storageMedium, StorageMediumHDD, StorageMediumSSD, StorageMediumSameWithUpstream) + } + + // Set default medium allocation mode if not specified + mediumAllocationMode := jobContext.MediumAllocationMode + log.Infof("NewJobFromService: received medium_allocation_mode=%s from JobContext", mediumAllocationMode) + if mediumAllocationMode == "" { + mediumAllocationMode = MediumAllocationModeAdaptive + log.Infof("NewJobFromService: medium_allocation_mode was empty, set to default=%s", mediumAllocationMode) + } + // Validate medium allocation mode + if mediumAllocationMode != MediumAllocationModeStrict && + mediumAllocationMode != MediumAllocationModeAdaptive { + return nil, xerror.Errorf(xerror.Normal, "invalid medium allocation mode: %s, must be %s or %s", + mediumAllocationMode, MediumAllocationModeStrict, MediumAllocationModeAdaptive) + } + job := &Job{ - Name: name, - Id: id, - Src: src, - ISrc: factory.NewSpecer(&src), - srcMeta: factory.NewMeta(&jobContext.Src), - Dest: dest, - IDest: factory.NewSpecer(&dest), - destMeta: factory.NewMeta(&jobContext.Dest), - State: JobRunning, + Name: name, + Id: id, + Src: src, + ISrc: factory.NewSpecer(&src), + srcMeta: factory.NewMeta(&jobContext.Src), + Dest: dest, + IDest: factory.NewSpecer(&dest), + destMeta: factory.NewMeta(&jobContext.Dest), + State: JobRunning, + StorageMedium: storageMedium, + MediumAllocationMode: mediumAllocationMode, Extra: JobExtra{ allowTableExists: jobContext.AllowTableExists, @@ -294,6 +348,7 @@ func NewJobFromService(name string, ctx context.Context) (*Job, error) { concurrencyManager: rpc.NewConcurrencyManager(), } + log.Infof("NewJobFromService: Job created with StorageMedium=%s, MediumAllocationMode=%s", job.StorageMedium, job.MediumAllocationMode) // set and validate replication number policy job.ReplicationNum = jobContext.ReplicationNum @@ -795,17 +850,24 @@ func (j *Job) partialSync() error { isForceReplace := featureRestoreReplaceDiffSchema && j.progress.PartialSyncData.IsView isAtomicRestore := featureAtomicRestore && isForceReplace + // Use job's storage medium and medium allocation mode + storageMedium := j.StorageMedium + mediumAllocationMode := j.MediumAllocationMode + log.Infof("partialSync: using StorageMedium=%s, MediumAllocationMode=%s", storageMedium, mediumAllocationMode) + restoreReq := rpc.RestoreSnapshotRequest{ TableRefs: tableRefs, SnapshotName: restoreSnapshotName, SnapshotResult: snapshotResp, // DO NOT drop exists tables and partitions - CleanPartitions: false, - CleanTables: false, - AtomicRestore: isAtomicRestore, - Compress: false, - ForceReplace: isForceReplace, + CleanPartitions: false, + CleanTables: false, + AtomicRestore: isAtomicRestore, + Compress: false, + ForceReplace: isForceReplace, + StorageMedium: storageMedium, + MediumAllocationMode: mediumAllocationMode, } // apply replication properties override if policy provided @@ -850,6 +912,23 @@ func (j *Job) partialSync() error { log.Warnf("force partial sync with replace, because the snapshot %s signature is not matched", restoreSnapshotName) return j.NewPartialSnapshot(tableId, table, nil, true, false) // only in partition sync. } else if err != nil { + // Check if it's a backend insufficient error - this requires manual intervention + errMsg := err.Error() + if isBackendInsufficientError(errMsg) { + log.Errorf("Restore failed: insufficient backend resources. Job storage_medium=%s, medium_allocation_mode=%s, Error: %s", + j.StorageMedium, j.MediumAllocationMode, errMsg) + // Stop the job immediately without retry, as this requires manual intervention + // Use Panicf to stop the job completely (not just return error and retry) + return xerror.Panicf(xerror.Normal, + "Restore failed: Insufficient backend resources. Possible causes:\n"+ + "1. Replication number exceeds available BE nodes\n"+ + "2. Storage medium (%s) not available - check storage_medium setting\n"+ + "3. Replication tag mismatch\n"+ + "4. Insufficient disk capacity on BE nodes\n"+ + "5. All BE nodes on same host\n"+ + "Please check target cluster configuration. Original error: %v", + j.StorageMedium, err) + } j.progress.NextSubVolatile(RestoreSnapshot, inMemoryData) return err } @@ -1263,6 +1342,10 @@ func (j *Job) fullSync() error { if featureRestoreReplaceDiffSchema { restoreReq.ForceReplace = true } + // Set storage medium and medium allocation mode + restoreReq.StorageMedium = j.StorageMedium + restoreReq.MediumAllocationMode = j.MediumAllocationMode + log.Infof("fullSync: setting StorageMedium=%s, MediumAllocationMode=%s", j.StorageMedium, j.MediumAllocationMode) restoreResp, err := destRpc.RestoreSnapshot(dest, &restoreReq) if err != nil { return err @@ -1295,6 +1378,25 @@ func (j *Job) fullSync() error { for { restoreFinished, err := j.IDest.CheckRestoreFinished(restoreSnapshotName) + if err != nil { + // Check if it's a backend insufficient error first - this requires manual intervention + errMsg := err.Error() + if !errors.Is(err, base.ErrRestoreSignatureNotMatched) && isBackendInsufficientError(errMsg) { + log.Errorf("Restore failed: insufficient backend resources. Job storage_medium=%s, medium_allocation_mode=%s, Error: %s", + j.StorageMedium, j.MediumAllocationMode, errMsg) + // Stop the job immediately without retry, as this requires manual intervention + // Use Panicf to stop the job completely (not just return error and retry) + return xerror.Panicf(xerror.Normal, + "Restore failed: Insufficient backend resources. Possible causes:\n"+ + "1. Replication number exceeds available BE nodes\n"+ + "2. Storage medium (%s) not available - check storage_medium setting\n"+ + "3. Replication tag mismatch\n"+ + "4. Insufficient disk capacity on BE nodes\n"+ + "5. All BE nodes on same host\n"+ + "Please check target cluster configuration. Original error: %v", + j.StorageMedium, err) + } + } if err != nil && errors.Is(err, base.ErrRestoreSignatureNotMatched) { // We need rebuild the exists table. var tableName string @@ -3995,6 +4097,14 @@ func (j *Job) run() { log.Warnf("job sync failed, job: %s, err: %+v", j.Name, err) panicError = j.handleError(j.Name, err) + if panicError != nil { + // Pause the job when panic error occurs, so user can see the status change + log.Errorf("job %s encountered panic error: %+v", j.Name, panicError) + log.Errorf("job %s pausing automatically. Please check the error, fix it, then resume the job", j.Name) + if err := j.Pause(); err != nil { + log.Errorf("failed to pause job %s after panic error: %v", j.Name, err) + } + } } } } @@ -4709,6 +4819,12 @@ func ResetReplicationNumFromCreateTableSql(createSql string, replicationNum int) return FilterTailingCommaFromCreateTableSql(createSql) } +func FilterMediumAllocationModeFromCreateTableSql(createSql string) string { + pattern := `"medium_allocation_mode"\s*=\s*"[^"]*"(,\s*)?` + createSql = regexp.MustCompile(pattern).ReplaceAllString(createSql, "") + return FilterTailingCommaFromCreateTableSql(createSql) +} + func FilterDynamicPartitionStoragePolicyFromCreateTableSql(createSql string) string { // Two patterns: // - "dynamic_partition.storage_policy"="storage_policy", @@ -4730,3 +4846,65 @@ func getJobId(name string, src base.Spec, dest base.Spec) string { io.WriteString(h, dest.String()) return fmt.Sprintf("%x", h.Sum(nil)) } + +// isBackendInsufficientError checks if the error is "Failed to find enough backend" +// This error requires manual intervention and should not be retried, possible causes: +// 1. replication_num exceeds available BE nodes +// 2. storage medium (SSD/HDD) not available +// 3. replication tag mismatch +// 4. insufficient disk capacity +// 5. all BE nodes on same host +func isBackendInsufficientError(errMsg string) bool { + lowerMsg := strings.ToLower(errMsg) + // This error from Doris FE indicates backend resource issues that require manual intervention + return strings.Contains(lowerMsg, "failed to find enough backend") +} + +func (j *Job) UpdateMediumAllocationMode(mediumAllocationMode string) error { + defer j.raiseInterruptSignal()() + j.lock.Lock() + defer j.lock.Unlock() + + // Validate medium allocation mode + if mediumAllocationMode != MediumAllocationModeStrict && + mediumAllocationMode != MediumAllocationModeAdaptive { + return xerror.Errorf(xerror.Normal, "invalid medium allocation mode: %s, must be %s or %s", + mediumAllocationMode, MediumAllocationModeStrict, MediumAllocationModeAdaptive) + } + + oldMediumAllocationMode := j.MediumAllocationMode + j.MediumAllocationMode = mediumAllocationMode + + if err := j.persistJob(); err != nil { + j.MediumAllocationMode = oldMediumAllocationMode + return err + } + + log.Infof("update job %s medium allocation mode from %s to %s", j.Name, oldMediumAllocationMode, mediumAllocationMode) + return nil +} + +func (j *Job) UpdateStorageMedium(storageMedium string) error { + defer j.raiseInterruptSignal()() + j.lock.Lock() + defer j.lock.Unlock() + + // Validate storage medium + if storageMedium != StorageMediumHDD && + storageMedium != StorageMediumSSD && + storageMedium != StorageMediumSameWithUpstream { + return xerror.Errorf(xerror.Normal, "invalid storage medium: %s, must be %s, %s or %s", + storageMedium, StorageMediumHDD, StorageMediumSSD, StorageMediumSameWithUpstream) + } + + oldStorageMedium := j.StorageMedium + j.StorageMedium = storageMedium + + if err := j.persistJob(); err != nil { + j.StorageMedium = oldStorageMedium + return err + } + + log.Infof("update job %s storage medium from %s to %s", j.Name, oldStorageMedium, storageMedium) + return nil +} diff --git a/pkg/ccr/job_manager.go b/pkg/ccr/job_manager.go index 174e4ee4..02800986 100644 --- a/pkg/ccr/job_manager.go +++ b/pkg/ccr/job_manager.go @@ -277,3 +277,25 @@ func (jm *JobManager) SkipBinlog(jobName string, params SkipBinlogParams) error return xerror.Errorf(xerror.Normal, "job not exist: %s", jobName) } } + +func (jm *JobManager) UpdateMediumAllocationMode(jobName string, mediumAllocationMode string) error { + jm.lock.Lock() + defer jm.lock.Unlock() + + if job, ok := jm.jobs[jobName]; ok { + return job.UpdateMediumAllocationMode(mediumAllocationMode) + } else { + return xerror.Errorf(xerror.Normal, "job not exist: %s", jobName) + } +} + +func (jm *JobManager) UpdateStorageMedium(jobName string, storageMedium string) error { + jm.lock.Lock() + defer jm.lock.Unlock() + + if job, ok := jm.jobs[jobName]; ok { + return job.UpdateStorageMedium(storageMedium) + } else { + return xerror.Errorf(xerror.Normal, "job not exist: %s", jobName) + } +} diff --git a/pkg/ccr/record/modify_partition_property.go b/pkg/ccr/record/modify_partition_property.go new file mode 100644 index 00000000..3d14e182 --- /dev/null +++ b/pkg/ccr/record/modify_partition_property.go @@ -0,0 +1,77 @@ +package record + +import ( + "encoding/json" + "fmt" + + "github.com/selectdb/ccr_syncer/pkg/xerror" +) + +type DataProperty struct { + StorageMedium string `json:"storageMedium"` + CooldownTimeMs int64 `json:"cooldownTimeMs"` + StoragePolicy string `json:"storagePolicy"` + IsMutable bool `json:"isMutable"` + StorageMediumSpecified bool `json:"storageMediumSpecified,omitempty"` +} + +type Tag struct { + Type string `json:"type"` + Value string `json:"value"` +} + +type ReplicaAllocation struct { + AllocMap map[string]int16 `json:"allocMap"` +} + +// ModifyPartitionInfo represents single partition modification info +type ModifyPartitionInfo struct { + DbId int64 `json:"dbId"` + TableId int64 `json:"tableId"` + PartitionId int64 `json:"partitionId"` + DataProperty *DataProperty `json:"dataProperty"` + ReplicationNum int16 `json:"replicationNum"` + IsInMemory bool `json:"isInMemory"` + ReplicaAlloc *ReplicaAllocation `json:"replicaAlloc"` + StoragePolicy string `json:"storagePolicy"` + TblProperties map[string]string `json:"tableProperties"` + PartitionName string `json:"partitionName,omitempty"` + IsTempPartition bool `json:"isTempPartition,omitempty"` +} + +type BatchModifyPartitionsInfo struct { + Infos []*ModifyPartitionInfo `json:"infos"` +} + +func (batchModifyPartitionsInfo *BatchModifyPartitionsInfo) Deserialize(data string) error { + err := json.Unmarshal([]byte(data), &batchModifyPartitionsInfo) + if err != nil { + return xerror.Wrap(err, xerror.Normal, "unmarshal batch modify partitions info error") + } + + if len(batchModifyPartitionsInfo.Infos) == 0 { + return xerror.Errorf(xerror.Normal, "modify partition infos is empty") + } + + return nil +} + +func NewBatchModifyPartitionsInfoFromJson(data string) (*BatchModifyPartitionsInfo, error) { + var batchModifyPartitionsInfo BatchModifyPartitionsInfo + if err := batchModifyPartitionsInfo.Deserialize(data); err != nil { + return nil, err + } + return &batchModifyPartitionsInfo, nil +} + +func (batchModifyPartitionsInfo *BatchModifyPartitionsInfo) String() string { + return fmt.Sprintf("BatchModifyPartitionsInfo: Infos count: %d", len(batchModifyPartitionsInfo.Infos)) +} + +// GetTableId implements Record interface by returning the first table ID in the batch +func (batchModifyPartitionsInfo *BatchModifyPartitionsInfo) GetTableId() int64 { + if len(batchModifyPartitionsInfo.Infos) == 0 { + return -1 + } + return batchModifyPartitionsInfo.Infos[0].TableId +} diff --git a/pkg/rpc/fe.go b/pkg/rpc/fe.go index 492336ba..af223e9a 100644 --- a/pkg/rpc/fe.go +++ b/pkg/rpc/fe.go @@ -99,6 +99,8 @@ type RestoreSnapshotRequest struct { CleanTables bool Compress bool ForceReplace bool + StorageMedium string + MediumAllocationMode string // PropertiesOverride allows caller to pass-through properties for restore job, // such as replication settings. If set, it will be preferred over defaults. PropertiesOverride map[string]string @@ -875,26 +877,28 @@ func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, restoreReq *RestoreS } req := &festruct.TRestoreSnapshotRequest{ - Table: &spec.Table, - LabelName: &restoreReq.SnapshotName, - RepoName: &repoName, - TableRefs: restoreReq.TableRefs, - Properties: properties, - Meta: meta, - JobInfo: jobInfo, - CleanTables: &restoreReq.CleanTables, - CleanPartitions: &restoreReq.CleanPartitions, - AtomicRestore: &restoreReq.AtomicRestore, - Compressed: utils.ThriftValueWrapper(restoreReq.Compress), - ForceReplace: &restoreReq.ForceReplace, + Table: &spec.Table, + LabelName: &restoreReq.SnapshotName, + RepoName: &repoName, + TableRefs: restoreReq.TableRefs, + Properties: properties, + Meta: meta, + JobInfo: jobInfo, + CleanTables: &restoreReq.CleanTables, + CleanPartitions: &restoreReq.CleanPartitions, + AtomicRestore: &restoreReq.AtomicRestore, + Compressed: utils.ThriftValueWrapper(restoreReq.Compress), + ForceReplace: &restoreReq.ForceReplace, + StorageMedium: utils.ThriftValueWrapper(restoreReq.StorageMedium), + MediumAllocationMode: utils.ThriftValueWrapper(restoreReq.MediumAllocationMode), } setAuthInfo(req, spec) // NOTE: ignore meta, because it's too large - log.Debugf("RestoreSnapshotRequest user %s, db %s, table %s, label name %s, properties %v, clean tables: %t, clean partitions: %t, atomic restore: %t, compressed: %t, forceReplace: %t", + log.Debugf("RestoreSnapshotRequest user %s, db %s, table %s, label name %s, properties %v, clean tables: %t, clean partitions: %t, atomic restore: %t, compressed: %t, forceReplace: %t, storageMedium: %s, mediumAllocationMode: %s", req.GetUser(), req.GetDb(), req.GetTable(), req.GetLabelName(), properties, restoreReq.CleanTables, restoreReq.CleanPartitions, restoreReq.AtomicRestore, - req.GetCompressed(), restoreReq.ForceReplace) + req.GetCompressed(), restoreReq.ForceReplace, restoreReq.StorageMedium, restoreReq.MediumAllocationMode) if resp, err := client.RestoreSnapshot(context.Background(), req); err != nil { return nil, xerror.Wrapf(err, xerror.RPC, "RestoreSnapshot failed") diff --git a/pkg/rpc/kitex_gen/frontendservice/FrontendService.go b/pkg/rpc/kitex_gen/frontendservice/FrontendService.go index 09b13aca..ab034fc5 100644 --- a/pkg/rpc/kitex_gen/frontendservice/FrontendService.go +++ b/pkg/rpc/kitex_gen/frontendservice/FrontendService.go @@ -54064,23 +54064,25 @@ func (p *TTableRef) Field3DeepEqual(src *string) bool { } type TRestoreSnapshotRequest struct { - Cluster *string `thrift:"cluster,1,optional" frugal:"1,optional,string" json:"cluster,omitempty"` - User *string `thrift:"user,2,optional" frugal:"2,optional,string" json:"user,omitempty"` - Passwd *string `thrift:"passwd,3,optional" frugal:"3,optional,string" json:"passwd,omitempty"` - Db *string `thrift:"db,4,optional" frugal:"4,optional,string" json:"db,omitempty"` - Table *string `thrift:"table,5,optional" frugal:"5,optional,string" json:"table,omitempty"` - Token *string `thrift:"token,6,optional" frugal:"6,optional,string" json:"token,omitempty"` - LabelName *string `thrift:"label_name,7,optional" frugal:"7,optional,string" json:"label_name,omitempty"` - RepoName *string `thrift:"repo_name,8,optional" frugal:"8,optional,string" json:"repo_name,omitempty"` - TableRefs []*TTableRef `thrift:"table_refs,9,optional" frugal:"9,optional,list" json:"table_refs,omitempty"` - Properties map[string]string `thrift:"properties,10,optional" frugal:"10,optional,map" json:"properties,omitempty"` - Meta []byte `thrift:"meta,11,optional" frugal:"11,optional,binary" json:"meta,omitempty"` - JobInfo []byte `thrift:"job_info,12,optional" frugal:"12,optional,binary" json:"job_info,omitempty"` - CleanTables *bool `thrift:"clean_tables,13,optional" frugal:"13,optional,bool" json:"clean_tables,omitempty"` - CleanPartitions *bool `thrift:"clean_partitions,14,optional" frugal:"14,optional,bool" json:"clean_partitions,omitempty"` - AtomicRestore *bool `thrift:"atomic_restore,15,optional" frugal:"15,optional,bool" json:"atomic_restore,omitempty"` - Compressed *bool `thrift:"compressed,16,optional" frugal:"16,optional,bool" json:"compressed,omitempty"` - ForceReplace *bool `thrift:"force_replace,17,optional" frugal:"17,optional,bool" json:"force_replace,omitempty"` + Cluster *string `thrift:"cluster,1,optional" frugal:"1,optional,string" json:"cluster,omitempty"` + User *string `thrift:"user,2,optional" frugal:"2,optional,string" json:"user,omitempty"` + Passwd *string `thrift:"passwd,3,optional" frugal:"3,optional,string" json:"passwd,omitempty"` + Db *string `thrift:"db,4,optional" frugal:"4,optional,string" json:"db,omitempty"` + Table *string `thrift:"table,5,optional" frugal:"5,optional,string" json:"table,omitempty"` + Token *string `thrift:"token,6,optional" frugal:"6,optional,string" json:"token,omitempty"` + LabelName *string `thrift:"label_name,7,optional" frugal:"7,optional,string" json:"label_name,omitempty"` + RepoName *string `thrift:"repo_name,8,optional" frugal:"8,optional,string" json:"repo_name,omitempty"` + TableRefs []*TTableRef `thrift:"table_refs,9,optional" frugal:"9,optional,list" json:"table_refs,omitempty"` + Properties map[string]string `thrift:"properties,10,optional" frugal:"10,optional,map" json:"properties,omitempty"` + Meta []byte `thrift:"meta,11,optional" frugal:"11,optional,binary" json:"meta,omitempty"` + JobInfo []byte `thrift:"job_info,12,optional" frugal:"12,optional,binary" json:"job_info,omitempty"` + CleanTables *bool `thrift:"clean_tables,13,optional" frugal:"13,optional,bool" json:"clean_tables,omitempty"` + CleanPartitions *bool `thrift:"clean_partitions,14,optional" frugal:"14,optional,bool" json:"clean_partitions,omitempty"` + AtomicRestore *bool `thrift:"atomic_restore,15,optional" frugal:"15,optional,bool" json:"atomic_restore,omitempty"` + Compressed *bool `thrift:"compressed,16,optional" frugal:"16,optional,bool" json:"compressed,omitempty"` + ForceReplace *bool `thrift:"force_replace,17,optional" frugal:"17,optional,bool" json:"force_replace,omitempty"` + StorageMedium *string `thrift:"storage_medium,18,optional" frugal:"18,optional,string" json:"storage_medium,omitempty"` + MediumAllocationMode *string `thrift:"medium_allocation_mode,19,optional" frugal:"19,optional,string" json:"medium_allocation_mode,omitempty"` } func NewTRestoreSnapshotRequest() *TRestoreSnapshotRequest { @@ -54242,6 +54244,24 @@ func (p *TRestoreSnapshotRequest) GetForceReplace() (v bool) { } return *p.ForceReplace } + +var TRestoreSnapshotRequest_StorageMedium_DEFAULT string + +func (p *TRestoreSnapshotRequest) GetStorageMedium() (v string) { + if !p.IsSetStorageMedium() { + return TRestoreSnapshotRequest_StorageMedium_DEFAULT + } + return *p.StorageMedium +} + +var TRestoreSnapshotRequest_MediumAllocationMode_DEFAULT string + +func (p *TRestoreSnapshotRequest) GetMediumAllocationMode() (v string) { + if !p.IsSetMediumAllocationMode() { + return TRestoreSnapshotRequest_MediumAllocationMode_DEFAULT + } + return *p.MediumAllocationMode +} func (p *TRestoreSnapshotRequest) SetCluster(val *string) { p.Cluster = val } @@ -54293,6 +54313,12 @@ func (p *TRestoreSnapshotRequest) SetCompressed(val *bool) { func (p *TRestoreSnapshotRequest) SetForceReplace(val *bool) { p.ForceReplace = val } +func (p *TRestoreSnapshotRequest) SetStorageMedium(val *string) { + p.StorageMedium = val +} +func (p *TRestoreSnapshotRequest) SetMediumAllocationMode(val *string) { + p.MediumAllocationMode = val +} var fieldIDToName_TRestoreSnapshotRequest = map[int16]string{ 1: "cluster", @@ -54312,6 +54338,8 @@ var fieldIDToName_TRestoreSnapshotRequest = map[int16]string{ 15: "atomic_restore", 16: "compressed", 17: "force_replace", + 18: "storage_medium", + 19: "medium_allocation_mode", } func (p *TRestoreSnapshotRequest) IsSetCluster() bool { @@ -54382,6 +54410,14 @@ func (p *TRestoreSnapshotRequest) IsSetForceReplace() bool { return p.ForceReplace != nil } +func (p *TRestoreSnapshotRequest) IsSetStorageMedium() bool { + return p.StorageMedium != nil +} + +func (p *TRestoreSnapshotRequest) IsSetMediumAllocationMode() bool { + return p.MediumAllocationMode != nil +} + func (p *TRestoreSnapshotRequest) Read(iprot thrift.TProtocol) (err error) { var fieldTypeId thrift.TType @@ -54537,6 +54573,22 @@ func (p *TRestoreSnapshotRequest) Read(iprot thrift.TProtocol) (err error) { } else if err = iprot.Skip(fieldTypeId); err != nil { goto SkipFieldError } + case 18: + if fieldTypeId == thrift.STRING { + if err = p.ReadField18(iprot); err != nil { + goto ReadFieldError + } + } else if err = iprot.Skip(fieldTypeId); err != nil { + goto SkipFieldError + } + case 19: + if fieldTypeId == thrift.STRING { + if err = p.ReadField19(iprot); err != nil { + goto ReadFieldError + } + } else if err = iprot.Skip(fieldTypeId); err != nil { + goto SkipFieldError + } default: if err = iprot.Skip(fieldTypeId); err != nil { goto SkipFieldError @@ -54783,6 +54835,28 @@ func (p *TRestoreSnapshotRequest) ReadField17(iprot thrift.TProtocol) error { p.ForceReplace = _field return nil } +func (p *TRestoreSnapshotRequest) ReadField18(iprot thrift.TProtocol) error { + + var _field *string + if v, err := iprot.ReadString(); err != nil { + return err + } else { + _field = &v + } + p.StorageMedium = _field + return nil +} +func (p *TRestoreSnapshotRequest) ReadField19(iprot thrift.TProtocol) error { + + var _field *string + if v, err := iprot.ReadString(); err != nil { + return err + } else { + _field = &v + } + p.MediumAllocationMode = _field + return nil +} func (p *TRestoreSnapshotRequest) Write(oprot thrift.TProtocol) (err error) { var fieldId int16 @@ -54858,6 +54932,14 @@ func (p *TRestoreSnapshotRequest) Write(oprot thrift.TProtocol) (err error) { fieldId = 17 goto WriteFieldError } + if err = p.writeField18(oprot); err != nil { + fieldId = 18 + goto WriteFieldError + } + if err = p.writeField19(oprot); err != nil { + fieldId = 19 + goto WriteFieldError + } } if err = oprot.WriteFieldStop(); err != nil { goto WriteFieldStopError @@ -55218,6 +55300,44 @@ WriteFieldEndError: return thrift.PrependError(fmt.Sprintf("%T write field 17 end error: ", p), err) } +func (p *TRestoreSnapshotRequest) writeField18(oprot thrift.TProtocol) (err error) { + if p.IsSetStorageMedium() { + if err = oprot.WriteFieldBegin("storage_medium", thrift.STRING, 18); err != nil { + goto WriteFieldBeginError + } + if err := oprot.WriteString(*p.StorageMedium); err != nil { + return err + } + if err = oprot.WriteFieldEnd(); err != nil { + goto WriteFieldEndError + } + } + return nil +WriteFieldBeginError: + return thrift.PrependError(fmt.Sprintf("%T write field 18 begin error: ", p), err) +WriteFieldEndError: + return thrift.PrependError(fmt.Sprintf("%T write field 18 end error: ", p), err) +} + +func (p *TRestoreSnapshotRequest) writeField19(oprot thrift.TProtocol) (err error) { + if p.IsSetMediumAllocationMode() { + if err = oprot.WriteFieldBegin("medium_allocation_mode", thrift.STRING, 19); err != nil { + goto WriteFieldBeginError + } + if err := oprot.WriteString(*p.MediumAllocationMode); err != nil { + return err + } + if err = oprot.WriteFieldEnd(); err != nil { + goto WriteFieldEndError + } + } + return nil +WriteFieldBeginError: + return thrift.PrependError(fmt.Sprintf("%T write field 19 begin error: ", p), err) +WriteFieldEndError: + return thrift.PrependError(fmt.Sprintf("%T write field 19 end error: ", p), err) +} + func (p *TRestoreSnapshotRequest) String() string { if p == nil { return "" @@ -55283,6 +55403,12 @@ func (p *TRestoreSnapshotRequest) DeepEqual(ano *TRestoreSnapshotRequest) bool { if !p.Field17DeepEqual(ano.ForceReplace) { return false } + if !p.Field18DeepEqual(ano.StorageMedium) { + return false + } + if !p.Field19DeepEqual(ano.MediumAllocationMode) { + return false + } return true } @@ -55482,6 +55608,30 @@ func (p *TRestoreSnapshotRequest) Field17DeepEqual(src *bool) bool { } return true } +func (p *TRestoreSnapshotRequest) Field18DeepEqual(src *string) bool { + + if p.StorageMedium == src { + return true + } else if p.StorageMedium == nil || src == nil { + return false + } + if strings.Compare(*p.StorageMedium, *src) != 0 { + return false + } + return true +} +func (p *TRestoreSnapshotRequest) Field19DeepEqual(src *string) bool { + + if p.MediumAllocationMode == src { + return true + } else if p.MediumAllocationMode == nil || src == nil { + return false + } + if strings.Compare(*p.MediumAllocationMode, *src) != 0 { + return false + } + return true +} type TRestoreSnapshotResult_ struct { Status *status.TStatus `thrift:"status,1,optional" frugal:"1,optional,status.TStatus" json:"status,omitempty"` diff --git a/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go b/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go index 9eae5c9f..6584cef3 100644 --- a/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go +++ b/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go @@ -39360,6 +39360,34 @@ func (p *TRestoreSnapshotRequest) FastRead(buf []byte) (int, error) { goto SkipFieldError } } + case 18: + if fieldTypeId == thrift.STRING { + l, err = p.FastReadField18(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = bthrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } + case 19: + if fieldTypeId == thrift.STRING { + l, err = p.FastReadField19(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = bthrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } default: l, err = bthrift.Binary.Skip(buf[offset:], fieldTypeId) offset += l @@ -39659,6 +39687,32 @@ func (p *TRestoreSnapshotRequest) FastReadField17(buf []byte) (int, error) { return offset, nil } +func (p *TRestoreSnapshotRequest) FastReadField18(buf []byte) (int, error) { + offset := 0 + + if v, l, err := bthrift.Binary.ReadString(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + p.StorageMedium = &v + + } + return offset, nil +} + +func (p *TRestoreSnapshotRequest) FastReadField19(buf []byte) (int, error) { + offset := 0 + + if v, l, err := bthrift.Binary.ReadString(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + p.MediumAllocationMode = &v + + } + return offset, nil +} + // for compatibility func (p *TRestoreSnapshotRequest) FastWrite(buf []byte) int { return 0 @@ -39685,6 +39739,8 @@ func (p *TRestoreSnapshotRequest) FastWriteNocopy(buf []byte, binaryWriter bthri offset += p.fastWriteField10(buf[offset:], binaryWriter) offset += p.fastWriteField11(buf[offset:], binaryWriter) offset += p.fastWriteField12(buf[offset:], binaryWriter) + offset += p.fastWriteField18(buf[offset:], binaryWriter) + offset += p.fastWriteField19(buf[offset:], binaryWriter) } offset += bthrift.Binary.WriteFieldStop(buf[offset:]) offset += bthrift.Binary.WriteStructEnd(buf[offset:]) @@ -39712,6 +39768,8 @@ func (p *TRestoreSnapshotRequest) BLength() int { l += p.field15Length() l += p.field16Length() l += p.field17Length() + l += p.field18Length() + l += p.field19Length() } l += bthrift.Binary.FieldStopLength() l += bthrift.Binary.StructEndLength() @@ -39923,6 +39981,28 @@ func (p *TRestoreSnapshotRequest) fastWriteField17(buf []byte, binaryWriter bthr return offset } +func (p *TRestoreSnapshotRequest) fastWriteField18(buf []byte, binaryWriter bthrift.BinaryWriter) int { + offset := 0 + if p.IsSetStorageMedium() { + offset += bthrift.Binary.WriteFieldBegin(buf[offset:], "storage_medium", thrift.STRING, 18) + offset += bthrift.Binary.WriteStringNocopy(buf[offset:], binaryWriter, *p.StorageMedium) + + offset += bthrift.Binary.WriteFieldEnd(buf[offset:]) + } + return offset +} + +func (p *TRestoreSnapshotRequest) fastWriteField19(buf []byte, binaryWriter bthrift.BinaryWriter) int { + offset := 0 + if p.IsSetMediumAllocationMode() { + offset += bthrift.Binary.WriteFieldBegin(buf[offset:], "medium_allocation_mode", thrift.STRING, 19) + offset += bthrift.Binary.WriteStringNocopy(buf[offset:], binaryWriter, *p.MediumAllocationMode) + + offset += bthrift.Binary.WriteFieldEnd(buf[offset:]) + } + return offset +} + func (p *TRestoreSnapshotRequest) field1Length() int { l := 0 if p.IsSetCluster() { @@ -40120,6 +40200,28 @@ func (p *TRestoreSnapshotRequest) field17Length() int { return l } +func (p *TRestoreSnapshotRequest) field18Length() int { + l := 0 + if p.IsSetStorageMedium() { + l += bthrift.Binary.FieldBeginLength("storage_medium", thrift.STRING, 18) + l += bthrift.Binary.StringLengthNocopy(*p.StorageMedium) + + l += bthrift.Binary.FieldEndLength() + } + return l +} + +func (p *TRestoreSnapshotRequest) field19Length() int { + l := 0 + if p.IsSetMediumAllocationMode() { + l += bthrift.Binary.FieldBeginLength("medium_allocation_mode", thrift.STRING, 19) + l += bthrift.Binary.StringLengthNocopy(*p.MediumAllocationMode) + + l += bthrift.Binary.FieldEndLength() + } + return l +} + func (p *TRestoreSnapshotResult_) FastRead(buf []byte) (int, error) { var err error var offset int diff --git a/pkg/rpc/thrift/FrontendService.thrift b/pkg/rpc/thrift/FrontendService.thrift index 20a6131e..60459162 100644 --- a/pkg/rpc/thrift/FrontendService.thrift +++ b/pkg/rpc/thrift/FrontendService.thrift @@ -1221,6 +1221,8 @@ struct TRestoreSnapshotRequest { 15: optional bool atomic_restore 16: optional bool compressed; 17: optional bool force_replace + 18: optional string storage_medium + 19: optional string medium_allocation_mode } struct TRestoreSnapshotResult { diff --git a/pkg/service/http_service.go b/pkg/service/http_service.go index cc1279d3..63311cb0 100644 --- a/pkg/service/http_service.go +++ b/pkg/service/http_service.go @@ -105,6 +105,10 @@ type CreateCcrRequest struct { ReuseBinlogLabel bool `json:"reuse_binlog_label"` // replication_num: nil or -1 means inherit from upstream (default), >0 means fixed replica count, 0 is invalid ReplicationNum *int `json:"replication_num,omitempty"` + // Storage medium for backup/restore operations: "hdd", "ssd" or "same_with_upstream" + StorageMedium string `json:"storage_medium"` + // Medium allocation mode for backup/restore operations: "strict" or "adaptive" + MediumAllocationMode string `json:"medium_allocation_mode"` } // Stringer @@ -150,7 +154,10 @@ func createCcr(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobMana Db: db, Factory: jobManager.GetFactory(), ReplicationNum: replicationNum, + StorageMedium: request.StorageMedium, + MediumAllocationMode: request.MediumAllocationMode, } + job, err := ccr.NewJobFromService(request.Name, ctx) if err != nil { return err @@ -1121,6 +1128,96 @@ func (s *HttpService) failpointHandler(w http.ResponseWriter, r *http.Request) { result = newSuccessResult() } +type UpdateMediumAllocationModeRequest struct { + CcrCommonRequest + MediumAllocationMode string `json:"medium_allocation_mode"` +} + +func (s *HttpService) updateMediumAllocationModeHandler(w http.ResponseWriter, r *http.Request) { + log.Infof("update medium allocation mode") + + var result *defaultResult + defer func() { writeJson(w, result) }() + + // Parse the JSON request body + var request UpdateMediumAllocationModeRequest + err := json.NewDecoder(r.Body).Decode(&request) + if err != nil { + log.Warnf("update medium allocation mode failed: %+v", err) + result = newErrorResult(err.Error()) + return + } + + if request.Name == "" { + log.Warnf("update medium allocation mode failed: name is empty") + result = newErrorResult("name is empty") + return + } + + if request.MediumAllocationMode == "" { + log.Warnf("update medium allocation mode failed: medium_allocation_mode is empty") + result = newErrorResult("medium_allocation_mode is empty") + return + } + + if s.redirect(request.Name, w, r) { + return + } + + log.Infof("update medium allocation mode for job %s to %s", request.Name, request.MediumAllocationMode) + if err := s.jobManager.UpdateMediumAllocationMode(request.Name, request.MediumAllocationMode); err != nil { + log.Warnf("update medium allocation mode failed: %+v", err) + result = newErrorResult(err.Error()) + } else { + result = newSuccessResult() + } +} + +type UpdateStorageMediumRequest struct { + CcrCommonRequest + StorageMedium string `json:"storage_medium"` +} + +func (s *HttpService) updateStorageMediumHandler(w http.ResponseWriter, r *http.Request) { + log.Infof("update storage medium") + + var result *defaultResult + defer func() { writeJson(w, result) }() + + // Parse the JSON request body + var request UpdateStorageMediumRequest + err := json.NewDecoder(r.Body).Decode(&request) + if err != nil { + log.Warnf("update storage medium failed: %+v", err) + result = newErrorResult(err.Error()) + return + } + + if request.Name == "" { + log.Warnf("update storage medium failed: name is empty") + result = newErrorResult("name is empty") + return + } + + if request.StorageMedium == "" { + log.Warnf("update storage medium failed: storage_medium is empty") + result = newErrorResult("storage_medium is empty") + return + } + + if s.redirect(request.Name, w, r) { + return + } + + log.Infof("update storage medium for job %s to %s", request.Name, request.StorageMedium) + if err := s.jobManager.UpdateStorageMedium(request.Name, request.StorageMedium); err != nil { + log.Warnf("update storage medium failed: %+v", err) + result = newErrorResult(err.Error()) + } else { + result = newSuccessResult() + } +} + func (s *HttpService) RegisterHandlers() { s.mux.HandleFunc("/version", s.versionHandler) s.mux.HandleFunc("/create_ccr", s.createHandler) @@ -1141,6 +1238,8 @@ func (s *HttpService) RegisterHandlers() { s.mux.Handle("/metrics", xmetrics.GetHttpHandler()) s.mux.HandleFunc("/sync", s.syncHandler) s.mux.HandleFunc("/view", s.showJobStateHandler) + s.mux.HandleFunc("/update_medium_allocation_mode", s.updateMediumAllocationModeHandler) + s.mux.HandleFunc("/update_storage_medium", s.updateStorageMediumHandler) } func (s *HttpService) Start() error {