Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion ydb/core/backup/common/metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ const std::optional<std::vector<TChangefeedMetadata>>& TMetadata::GetChangefeeds
return Changefeeds;
}

void TMetadata::AddIndex(const TIndexMetadata& index) {
if (!Indexes) {
Indexes.emplace();
}
Indexes->push_back(index);
}

const std::optional<std::vector<TIndexMetadata>>& TMetadata::GetIndexes() const {
return Indexes;
}

void TMetadata::SetEnablePermissions(bool enablePermissions) {
EnablePermissions = enablePermissions;
}
Expand Down Expand Up @@ -75,9 +86,19 @@ TString TMetadata::Serialize() const {
changefeeds.AppendValue(std::move(changefeedMap));
}
}
// We always serialize changefeeds in order to list them explicitly during import
m["changefeeds"] = changefeeds;

NJson::TJsonArray indexes;
if (Indexes) {
for (const auto& index : *Indexes) {
NJson::TJsonMap indexMap;
indexMap["export_prefix"] = index.ExportPrefix;
indexMap["impl_table_prefix"] = index.ImplTablePrefix;
indexes.AppendValue(std::move(indexMap));
}
}
m["indexes"] = indexes;

return NJson::WriteJson(&m, false);
}

Expand Down Expand Up @@ -108,6 +129,17 @@ TMetadata TMetadata::Deserialize(const TString& metadata) {
}
}

if (json.Has("indexes")) {
result.Indexes.emplace();
const NJson::TJsonValue& indexes = json["indexes"];
for (const NJson::TJsonValue& index : indexes.GetArray()) {
result.AddIndex({
.ExportPrefix = index["export_prefix"].GetString(),
.ImplTablePrefix = index["impl_table_prefix"].GetString(),
});
}
}

return result;
}

Expand Down
15 changes: 15 additions & 0 deletions ydb/core/backup/common/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ struct TChangefeedMetadata {
TString Name;
};

struct TIndexMetadata {
TString ExportPrefix;
TString ImplTablePrefix;
};

class TMetadata {
public:
TMetadata() = default;
Expand All @@ -52,9 +57,13 @@ class TMetadata {
void SetVersion(ui64 version);
bool HasVersion() const;
ui64 GetVersion() const;

void AddChangefeed(const TChangefeedMetadata& changefeed);
const std::optional<std::vector<TChangefeedMetadata>>& GetChangefeeds() const;

void AddIndex(const TIndexMetadata& index);
const std::optional<std::vector<TIndexMetadata>>& GetIndexes() const;

void SetEnablePermissions(bool enablePermissions = true);
bool HasEnablePermissions() const;
bool GetEnablePermissions() const;
Expand All @@ -74,6 +83,12 @@ class TMetadata {
// [...]: The export must have all changefeeds listed here
std::optional<std::vector<TChangefeedMetadata>> Changefeeds;

// Indexes:
// Undefined (previous versions): we don't know if we see the export with _materialized_ indexes or without them, so list suitable S3 files to find out all materialized indexes
// []: The export has no materialized indexes
// [...]: The export must have all materialized indexes listed here
std::optional<std::vector<TIndexMetadata>> Indexes;

// EnablePermissions:
// Undefined (previous versions): we don't know if we see the export with permissions or without them, so check S3 for the permissions file existence
// 0: The export has no permissions
Expand Down
33 changes: 32 additions & 1 deletion ydb/core/base/table_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,30 @@ TString InvalidIndexType(NKikimrSchemeOp::EIndexType indexType) {
return TStringBuilder() << "Invalid index type " << static_cast<int>(indexType);
}

std::optional<NKikimrSchemeOp::EIndexType> TryConvertIndexType(Ydb::Table::TableIndex::TypeCase type) {
switch (type) {
case Ydb::Table::TableIndex::TypeCase::TYPE_NOT_SET:
case Ydb::Table::TableIndex::TypeCase::kGlobalIndex:
return NKikimrSchemeOp::EIndexTypeGlobal;
case Ydb::Table::TableIndex::TypeCase::kGlobalAsyncIndex:
return NKikimrSchemeOp::EIndexTypeGlobalAsync;
case Ydb::Table::TableIndex::TypeCase::kGlobalUniqueIndex:
return NKikimrSchemeOp::EIndexTypeGlobalUnique;
case Ydb::Table::TableIndex::TypeCase::kGlobalVectorKmeansTreeIndex:
return NKikimrSchemeOp::EIndexTypeGlobalVectorKmeansTree;
case Ydb::Table::TableIndex::TypeCase::kGlobalFulltextIndex:
return NKikimrSchemeOp::EIndexTypeGlobalFulltext;
default:
return std::nullopt;
}
}

NKikimrSchemeOp::EIndexType ConvertIndexType(Ydb::Table::TableIndex::TypeCase type) {
const auto result = TryConvertIndexType(type);
Y_ENSURE(result);
return *result;
}

bool IsCompatibleIndex(NKikimrSchemeOp::EIndexType indexType, const TTableColumns& table, const TIndexColumns& index, TString& explain) {
if (const auto* broken = IsContains(table.Keys, table.Columns)) {
explain = TStringBuilder()
Expand Down Expand Up @@ -232,7 +256,11 @@ bool DoesIndexSupportTTL(NKikimrSchemeOp::EIndexType indexType) {
}
}

std::span<const std::string_view> GetImplTables(NKikimrSchemeOp::EIndexType indexType, std::span<const TString> indexKeys) {
std::span<const std::string_view> GetImplTables(
NKikimrSchemeOp::EIndexType indexType,
std::span<const TString> indexKeys,
std::optional<Ydb::Table::FulltextIndexSettings::Layout> layout)
{
switch (indexType) {
case NKikimrSchemeOp::EIndexTypeGlobal:
case NKikimrSchemeOp::EIndexTypeGlobalAsync:
Expand All @@ -244,6 +272,9 @@ std::span<const std::string_view> GetImplTables(NKikimrSchemeOp::EIndexType inde
} else {
return PrefixedGlobalKMeansTreeImplTables;
}
case NKikimrSchemeOp::EIndexTypeGlobalFulltext:
Y_ENSURE(layout);
return GetFulltextImplTables(*layout);
default:
Y_ENSURE(false, InvalidIndexType(indexType));
}
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/base/table_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <util/generic/string.h>
#include <util/string/builder.h>

#include <optional>
#include <span>
#include <string_view>

Expand Down Expand Up @@ -47,8 +48,13 @@ bool DoesIndexSupportTTL(NKikimrSchemeOp::EIndexType indexType);

NKikimrSchemeOp::EIndexType GetIndexType(const NKikimrSchemeOp::TIndexCreationConfig& indexCreation);
TString InvalidIndexType(NKikimrSchemeOp::EIndexType indexType);
std::optional<NKikimrSchemeOp::EIndexType> TryConvertIndexType(Ydb::Table::TableIndex::TypeCase type);
NKikimrSchemeOp::EIndexType ConvertIndexType(Ydb::Table::TableIndex::TypeCase type);

std::span<const std::string_view> GetImplTables(NKikimrSchemeOp::EIndexType indexType, std::span<const TString> indexKeys);
std::span<const std::string_view> GetImplTables(
NKikimrSchemeOp::EIndexType indexType,
std::span<const TString> indexKeys,
std::optional<Ydb::Table::FulltextIndexSettings::Layout> layout = std::nullopt);
std::span<const std::string_view> GetFulltextImplTables(Ydb::Table::FulltextIndexSettings::Layout layout);
bool IsImplTable(std::string_view tableName);
bool IsBuildImplTable(std::string_view tableName);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,5 @@ message TFeatureFlags {
optional bool EnableShuttingDownNodeState = 218 [default = false];
optional bool EnableStreamingQueriesCounters = 219 [default = false];
optional bool UseYdsTopicStorageMetering = 220 [default = false];
}
optional bool EnableIndexMaterialization = 221 [default = false];
}
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnableTopicMessageLevelParallelism)
FEATURE_FLAG_SETTER(EnableTopicAutopartitioningForReplication)
FEATURE_FLAG_SETTER(EnableAccessToIndexImplTables)
FEATURE_FLAG_SETTER(EnableIndexMaterialization)

#undef FEATURE_FLAG_SETTER
};
Expand Down
31 changes: 31 additions & 0 deletions ydb/core/tx/datashard/export_s3_uploader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "extstorage_usage_config.h"

#include <ydb/core/base/appdata.h>
#include <ydb/core/base/table_index.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/core/backup/common/checksum.h>
Expand Down Expand Up @@ -978,6 +979,36 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const {
}
}

if (scheme) {
int idx = changefeeds.size() + 1;
for (const auto& index : scheme->indexes()) {
const auto indexType = NTableIndex::ConvertIndexType(index.type_case());
const TVector<TString> indexColumns(index.index_columns().begin(), index.index_columns().end());
std::optional<Ydb::Table::FulltextIndexSettings::Layout> layout;
if (indexType == NKikimrSchemeOp::EIndexTypeGlobalFulltext) {
const auto& settings = index.global_fulltext_index().fulltext_settings();
layout = settings.has_layout() ? settings.layout() : Ydb::Table::FulltextIndexSettings::LAYOUT_UNSPECIFIED;
}

for (const auto& implTable : NTableIndex::GetImplTables(indexType, indexColumns, layout)) {
const TString implTablePrefix = TStringBuilder() << index.name() << "/" << implTable;
TString exportPrefix;
if (encrypted) {
std::stringstream prefix;
prefix << std::setfill('0') << std::setw(3) << std::right << idx++;
exportPrefix = prefix.str();
} else {
exportPrefix = implTablePrefix;
}

metadata.AddIndex(TIndexMetadata{
.ExportPrefix = exportPrefix,
.ImplTablePrefix = implTablePrefix,
});
}
}
}

auto permissions = (Task.GetEnablePermissions() && Task.GetShardNum() == 0)
? GenYdbPermissions(Task.GetTable())
: Nothing();
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4457,6 +4457,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
exportInfo->EndTime = TInstant::Seconds(rowset.GetValueOrDefault<Schema::Exports::EndTime>());
exportInfo->EnableChecksums = rowset.GetValueOrDefault<Schema::Exports::EnableChecksums>(false);
exportInfo->EnablePermissions = rowset.GetValueOrDefault<Schema::Exports::EnablePermissions>(false);
exportInfo->MaterializeIndexes = rowset.GetValueOrDefault<Schema::Exports::MaterializeIndexes>(false);

if (rowset.HaveValue<Schema::Exports::ExportMetadata>()) {
exportInfo->ExportMetadata = rowset.GetValue<Schema::Exports::ExportMetadata>();
Expand Down Expand Up @@ -4506,6 +4507,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
item.SourcePathId.OwnerId = rowset.GetValueOrDefault<Schema::ExportItems::SourceOwnerPathId>(selfId);
item.SourcePathId.LocalPathId = rowset.GetValue<Schema::ExportItems::SourcePathId>();
item.SourcePathType = rowset.GetValue<Schema::ExportItems::SourcePathType>();
item.ParentIdx = rowset.GetValueOrDefault<Schema::ExportItems::ParentIndex>(Max<ui32>());

item.State = static_cast<TExportInfo::EState>(rowset.GetValue<Schema::ExportItems::State>());
item.WaitTxId = rowset.GetValueOrDefault<Schema::ExportItems::BackupTxId>(InvalidTxId);
Expand Down Expand Up @@ -4651,6 +4653,17 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
item.Issue = rowset.GetValueOrDefault<Schema::ImportItems::Issue>(TString());
item.SrcPrefix = rowset.GetValueOrDefault<Schema::ImportItems::SrcPrefix>(TString());
item.SrcPath = rowset.GetValueOrDefault<Schema::ImportItems::SrcPath>(TString());

item.ParentIdx = rowset.GetValueOrDefault<Schema::ImportItems::ParentIndex>(Max<ui32>());
if (item.ParentIdx != Max<ui32>()) {
Y_VERIFY_S(item.ParentIdx < importInfo->Items.size(), "Invalid item's index"
<< ": importId# " << importId
<< ", itemIdx# " << item.ParentIdx);

TImportInfo::TItem& parentItem = importInfo->Items[item.ParentIdx];
parentItem.ChildItems.push_back(itemIdx);
}

if (rowset.HaveValue<Schema::ImportItems::EncryptionIV>()) {
item.ExportItemIV = NBackup::TEncryptionIV::FromBinaryString(rowset.GetValue<Schema::ImportItems::EncryptionIV>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ class TBackupRestoreOperationBase: public TSubOperation {

const TString& parentPath = Transaction.GetWorkingDir();
const TString name = TKind::GetTableName(Transaction);
const bool internal = Transaction.HasInternal() && Transaction.GetInternal();

LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
TKind::Name() << " Propose"
Expand Down Expand Up @@ -618,9 +619,13 @@ class TBackupRestoreOperationBase: public TSubOperation {
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
.NotUnderOperation()
.IsCommonSensePath() //forbid alter impl index tables
.CanBackupTable(); //forbid backup table with indexes
.NotUnderOperation();

if (!internal) {
checks
.IsCommonSensePath() // forbid alter impl index tables
.CanBackupTable(); // forbid backup table with indexes
}

if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ static std::optional<NKikimrSchemeOp::TModifyScheme> CreateIndexTask(NKikimr::NS

auto operation = scheme.MutableCreateTableIndex();
operation->SetName(dst.LeafName());

operation->SetType(indexInfo->Type);
operation->SetState(indexInfo->State);

for (const auto& keyName: indexInfo->IndexKeys) {
*operation->MutableKeyColumnNames()->Add() = keyName;
Expand Down Expand Up @@ -269,6 +269,7 @@ bool CreateConsistentCopyTables(
TStringBuilder{} << "Consistent copy table doesn't support table with index type " << indexInfo->Type)};
return false;
}
scheme->SetInternal(tx.GetInternal());
result.push_back(CreateNewTableIndex(NextPartId(nextId, result), *scheme));

for (const auto& [srcImplTableName, srcImplTablePathId] : srcIndexPath.Base()->GetChildren()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,7 @@ TVector<ISubOperation::TPtr> CreateCopyTable(TOperationId nextId, const TTxTrans
auto operation = schema.MutableCreateTableIndex();
operation->SetName(name);
operation->SetType(indexInfo->Type);
operation->SetState(indexInfo->State);
for (const auto& keyName: indexInfo->IndexKeys) {
*operation->MutableKeyColumnNames()->Add() = keyName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ class TCreateTableIndex: public TSubOperation {
.NotDeleted()
.NotUnderDeleting()
.IsCommonSensePath()
.IsTable()
.NotBackupTable();
.IsTable();

if (!internal) {
checks.NotAsyncReplicaTable();
Expand All @@ -151,6 +150,8 @@ class TCreateTableIndex: public TSubOperation {
checks
.IsUnderCreating(NKikimrScheme::StatusNameConflict)
.IsUnderTheSameOperation(OperationId.GetTxId()); //allow only as part of creating base table
} else {
checks.NotBackupTable(); // allow to create backup table with index, but not to build index on a backup table
}

if (!checks) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo& e
NIceDb::TUpdate<Schema::Exports::Items>(exportInfo.Items.size()),
NIceDb::TUpdate<Schema::Exports::EnableChecksums>(exportInfo.EnableChecksums),
NIceDb::TUpdate<Schema::Exports::EnablePermissions>(exportInfo.EnablePermissions),
NIceDb::TUpdate<Schema::Exports::MaterializeIndexes>(exportInfo.MaterializeIndexes),
NIceDb::TUpdate<Schema::Exports::PeerName>(exportInfo.PeerName),
NIceDb::TUpdate<Schema::Exports::SanitizedToken>(exportInfo.SanitizedToken)
);
Expand All @@ -171,7 +172,8 @@ void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo& e
NIceDb::TUpdate<Schema::ExportItems::SourceOwnerPathId>(item.SourcePathId.OwnerId),
NIceDb::TUpdate<Schema::ExportItems::SourcePathId>(item.SourcePathId.LocalPathId),
NIceDb::TUpdate<Schema::ExportItems::State>(static_cast<ui8>(item.State)),
NIceDb::TUpdate<Schema::ExportItems::SourcePathType>(item.SourcePathType)
NIceDb::TUpdate<Schema::ExportItems::SourcePathType>(item.SourcePathType),
NIceDb::TUpdate<Schema::ExportItems::ParentIndex>(item.ParentIdx)
);
}
}
Expand Down
Loading
Loading