Skip to content

Commit

Permalink
Merge branch 'apache/590-CatalogSync'
Browse files Browse the repository at this point in the history
  • Loading branch information
kroushan-nit committed Dec 21, 2024
2 parents a4ed741 + 6e09f5d commit 77e54bd
Show file tree
Hide file tree
Showing 17 changed files with 146 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,41 @@
import lombok.NonNull;
import lombok.Value;

/** Defines the configuration for an external catalog. */
/**
* Defines the configuration for an external catalog, user needs to populate at-least one of {@link
* ExternalCatalogConfig#catalogType} or {@link ExternalCatalogConfig#catalogSyncClientImpl}
*/
@Value
@Builder
public class ExternalCatalogConfig implements CatalogConfig {
/** The name of the catalog, it also acts as a unique identifier for each catalog */
@NonNull String catalogName;
public class ExternalCatalogConfig {
/**
* A user-defined unique identifier for the catalog, allows user to sync table to multiple
* catalogs of the same name/type eg: HMS catalog with url1, HMS catalog with url2
*/
@NonNull String catalogId;

/**
* The type of the catalog. If the catalogType implementation exists in XTable, the implementation
* class will be inferred.
*/
String catalogType;

/** The implementation class path for the catalog */
@NonNull String catalogImpl;
/**
* (Optional) A fully qualified class name that implements the interface for {@link
* org.apache.xtable.spi.sync.CatalogSyncClient}, it can be used if the implementation for
* catalogType doesn't exist in XTable.
*/
String catalogSyncClientImpl;

/**
* (Optional) A fully qualified class name that implements the interface for {@link
* org.apache.xtable.spi.extractor.CatalogConversionSource} it can be used if the implementation
* for catalogType doesn't exist in XTable.
*/
String catalogConversionSourceImpl;

/**
* The properties for each catalog, used for providing any custom behaviour during catalog sync
*/
@NonNull @Builder.Default Map<String, String> catalogOptions = Collections.emptyMap();
@NonNull @Builder.Default Map<String, String> catalogProperties = Collections.emptyMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@Builder
public class TargetCatalogConfig {
/**
* The tableIdentifiers(databaseName, tableName) that will be used when syncing {@link
* The tableIdentifiers(catalogName.databaseName.tableName) that will be used when syncing {@link
* TargetTable} to the catalog.
*/
@NonNull CatalogTableIdentifier catalogTableIdentifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,29 @@
import lombok.NonNull;
import lombok.Value;

/** This class represents the unique identifier for a table in a catalog. */
/**
* An internal representation of a fully qualified table identifier within a catalog. The naming
* convention varies across different catalogs but many of them follow a three level hierarchy, few
* examples can be found below.
*
* <ul>
* <li>1. catalog.database.table
* <li>2. catalog.schema.table
* <li>3. database.schema.table
* </ul>
*
* We have selected the first naming convention and will interoperate among other catalogs following
* a different convention.
*/
@Value
@Builder
public class CatalogTableIdentifier {
/**
* The top level hierarchy/namespace for organizing tables. Each catalog can have multiple
* databases/schemas. This is an optional field as many catalogs have a "default" catalog whose
* name varies depending on the catalogType.
*/
String catalogName;
/**
* Catalogs have the ability to group tables logically, databaseName is the identifier for such
* logical classification. The alternate names for this field include namespace, schemaName etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public static class SyncStatus {
@Value
@Builder
public static class CatalogSyncStatus {
// Catalog Identifier.
String catalogName;
// A user defined unique catalog identifier.
String catalogId;
// Status code
SyncStatusCode statusCode;
// errorDetails if any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
import org.apache.xtable.model.catalog.CatalogTableIdentifier;

/**
* A client for converting the table with tableIdentifier {@link CatalogTableIdentifier} in {@link
* org.apache.xtable.conversion.SourceCatalog} to SourceTable object. {@link SourceTable} can be
* used by downstream consumers for syncing it to multiple {@link
* org.apache.xtable.conversion.TargetTable}
* A client for converting the table with tableIdentifier {@link CatalogTableIdentifier} in source
* catalog to SourceTable object {@link SourceTable}, can be used by downstream consumers for
* syncing it to multiple {@link org.apache.xtable.conversion.TargetTable}
*/
public interface CatalogConversionSource {
/** Returns the source table object present in the catalog. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public SyncResult syncTable(
catalogSyncClient.getClass().getName());
results.add(
getCatalogSyncFailureStatus(
catalogSyncClient.getCatalogName(), catalogSyncClient.getClass().getName(), e));
catalogSyncClient.getCatalogId(), catalogSyncClient.getClass().getName(), e));
}
}));
return SyncResult.builder()
Expand Down Expand Up @@ -109,15 +109,15 @@ private <TABLE> CatalogSyncStatus syncCatalog(
catalogSyncClient.refreshTable(table, catalogTable, tableIdentifier);
}
return CatalogSyncStatus.builder()
.catalogName(catalogSyncClient.getCatalogName())
.catalogId(catalogSyncClient.getCatalogId())
.statusCode(SyncResult.SyncStatusCode.SUCCESS)
.build();
}

private CatalogSyncStatus getCatalogSyncFailureStatus(
String catalogName, String catalogImpl, Exception e) {
String catalogId, String catalogImpl, Exception e) {
return CatalogSyncStatus.builder()
.catalogName(catalogName)
.catalogId(catalogId)
.statusCode(SyncResult.SyncStatusCode.ERROR)
.errorDetails(
SyncResult.ErrorDetails.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
*/
public interface CatalogSyncClient<TABLE> extends AutoCloseable {
/**
* Returns a unique identifier for the catalog, allows user to sync table to multiple catalogs of
* the same type eg: HMS catalogs with url1, HMS catalog with url2.
* Returns the user-defined unique identifier for the catalog, allows user to sync table to
* multiple catalogs of the same name/type eg: HMS catalog with url1, HMS catalog with url2.
*/
String getCatalogName();
String getCatalogId();

/** Returns the storage location of the table synced to the catalog. */
String getStorageLocation(TABLE table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void testSyncTable() {
when(mockClient2.getStorageLocation(any())).thenReturn("/tmp/test");
when(mockClient3.getStorageLocation(any())).thenReturn("/tmp/test");

when(mockClient4.getCatalogName()).thenReturn("catalogName4");
when(mockClient4.getCatalogId()).thenReturn("catalogId4");

Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients =
ImmutableMap.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static CatalogConversionFactory getInstance() {
public static CatalogConversionSource createCatalogConversionSource(
ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) {
return ReflectionUtils.createInstanceOfClass(
sourceCatalogConfig.getCatalogImpl(), sourceCatalogConfig, configuration);
sourceCatalogConfig.getCatalogConversionSourceImpl(), sourceCatalogConfig, configuration);
}

/**
Expand All @@ -53,8 +53,11 @@ public static CatalogConversionSource createCatalogConversionSource(
* @param configuration hadoop configuration
*/
public CatalogSyncClient createCatalogSyncClient(
ExternalCatalogConfig targetCatalogConfig, Configuration configuration) {
ExternalCatalogConfig targetCatalogConfig, String tableFormat, Configuration configuration) {
return ReflectionUtils.createInstanceOfClass(
targetCatalogConfig.getCatalogImpl(), targetCatalogConfig, configuration);
targetCatalogConfig.getCatalogSyncClientImpl(),
targetCatalogConfig,
tableFormat,
configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
public class ExternalCatalogConfigFactory {

public static ExternalCatalogConfig fromCatalogType(
String catalogType, String catalogName, Map<String, String> properties) {
String catalogType, String catalogId, Map<String, String> properties) {
// TODO: Choose existing implementation based on catalogType.
String catalogImpl = "";
String catalogSyncClientImpl = "";
String catalogConversionSourceImpl = "";
return ExternalCatalogConfig.builder()
.catalogImpl(catalogImpl)
.catalogName(catalogName)
.catalogOptions(properties)
.catalogType(catalogType)
.catalogSyncClientImpl(catalogSyncClientImpl)
.catalogConversionSourceImpl(catalogConversionSourceImpl)
.catalogId(catalogId)
.catalogProperties(properties)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ public Map<String, SyncResult> syncTableAcrossCatalogs(
TargetCatalogConfig::getCatalogTableIdentifier,
targetCatalog ->
catalogConversionFactory.createCatalogSyncClient(
targetCatalog.getCatalogConfig(), conf)));
targetCatalog.getCatalogConfig(),
targetTable.getFormatName(),
conf)));
catalogSyncResults.put(
targetTable.getFormatName(),
syncCatalogsForTargetTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/**
* Iceberg requires a catalog to perform any operation, if no catalog is provided the default
* catalog (HadoopCatalog or storage based catalog) is used. For syncing iceberg to multiple
* catalogs, you can use {@link org.apache.xtable.catalog.ExternalCatalogConfig} instead which
* catalogs, you can use {@link org.apache.xtable.conversion.ExternalCatalogConfig} instead which
* allows syncing the latest version of iceberg metadata to multiple catalogs.
*/
@Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,24 @@
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.spi.extractor.CatalogConversionSource;
import org.apache.xtable.spi.sync.CatalogSyncClient;
import org.apache.xtable.testutil.ITTestUtils.TestCatalogImpl;
import org.apache.xtable.testutil.ITTestUtils.TestCatalogConversionSourceImpl;
import org.apache.xtable.testutil.ITTestUtils.TestCatalogSyncImpl;

class TestCatalogConversionFactory {

@Test
void createSourceForConfig() {
ExternalCatalogConfig sourceCatalog =
ExternalCatalogConfig.builder()
.catalogName("catalogName")
.catalogImpl(TestCatalogImpl.class.getName())
.catalogOptions(Collections.emptyMap())
.catalogId("catalogId")
.catalogConversionSourceImpl(TestCatalogConversionSourceImpl.class.getName())
.catalogProperties(Collections.emptyMap())
.build();
CatalogConversionSource catalogConversionSource =
CatalogConversionFactory.createCatalogConversionSource(sourceCatalog, new Configuration());
assertEquals(catalogConversionSource.getClass().getName(), TestCatalogImpl.class.getName());
assertEquals(
catalogConversionSource.getClass().getName(),
TestCatalogConversionSourceImpl.class.getName());
}

@Test
Expand All @@ -53,9 +56,9 @@ void createForCatalog() {
TargetCatalogConfig.builder()
.catalogConfig(
ExternalCatalogConfig.builder()
.catalogName("catalogName")
.catalogImpl(TestCatalogImpl.class.getName())
.catalogOptions(Collections.emptyMap())
.catalogId("catalogId")
.catalogSyncClientImpl(TestCatalogSyncImpl.class.getName())
.catalogProperties(Collections.emptyMap())
.build())
.catalogTableIdentifier(
CatalogTableIdentifier.builder()
Expand All @@ -65,7 +68,8 @@ void createForCatalog() {
.build();
CatalogSyncClient catalogSyncClient =
CatalogConversionFactory.getInstance()
.createCatalogSyncClient(targetCatalogConfig.getCatalogConfig(), new Configuration());
assertEquals(catalogSyncClient.getClass().getName(), TestCatalogImpl.class.getName());
.createCatalogSyncClient(
targetCatalogConfig.getCatalogConfig(), "TABLE_FORMAT", new Configuration());
assertEquals(catalogSyncClient.getClass().getName(), TestCatalogSyncImpl.class.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,10 @@ void testNoTableFormatConversionWithMultipleCatalogSync() {
.thenReturn(tableFormatSyncResults);
// Mocks for catalogSync.
when(mockCatalogConversionFactory.createCatalogSyncClient(
targetCatalogs.get(0).getCatalogConfig(), mockConf))
eq(targetCatalogs.get(0).getCatalogConfig()), any(), eq(mockConf)))
.thenReturn(mockCatalogSyncClient1);
when(mockCatalogConversionFactory.createCatalogSyncClient(
targetCatalogs.get(1).getCatalogConfig(), mockConf))
eq(targetCatalogs.get(1).getCatalogConfig()), any(), eq(mockConf)))
.thenReturn(mockCatalogSyncClient2);
when(catalogSync.syncTable(
eq(
Expand Down Expand Up @@ -589,9 +589,9 @@ private TargetCatalogConfig getTargetCatalog(String suffix) {
return TargetCatalogConfig.builder()
.catalogConfig(
ExternalCatalogConfig.builder()
.catalogName("catalogName-" + suffix)
.catalogImpl("catalogImpl-" + suffix)
.catalogOptions(Collections.emptyMap())
.catalogId("catalogId-" + suffix)
.catalogSyncClientImpl("catalogImpl-" + suffix)
.catalogProperties(Collections.emptyMap())
.build())
.catalogTableIdentifier(
CatalogTableIdentifier.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,13 @@ public static void validateTable(
Assertions.assertEquals(partitioningFields, internalTable.getPartitioningFields());
}

public static class TestCatalogImpl implements CatalogConversionSource, CatalogSyncClient {
public static class TestCatalogSyncImpl implements CatalogSyncClient {

public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration hadoopConf) {}
public TestCatalogSyncImpl(
ExternalCatalogConfig catalogConfig, String tableFormat, Configuration hadoopConf) {}

@Override
public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
return SourceTable.builder()
.name("source_table_name")
.basePath("file://base_path/v1/")
.formatName("ICEBERG")
.build();
}

@Override
public String getCatalogName() {
public String getCatalogId() {
return null;
}

Expand Down Expand Up @@ -103,4 +95,18 @@ public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifie
@Override
public void close() throws Exception {}
}

public static class TestCatalogConversionSourceImpl implements CatalogConversionSource {
public TestCatalogConversionSourceImpl(
ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) {}

@Override
public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
return SourceTable.builder()
.name("source_table_name")
.basePath("file://base_path/v1/")
.formatName("ICEBERG")
.build();
}
}
}
Loading

0 comments on commit 77e54bd

Please sign in to comment.