Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vinishjail97 committed Dec 19, 2024
1 parent ea325d7 commit a723014
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ public class ConversionConfig {
// One or more targets to sync the table metadata to
List<TargetTable> targetTables;
// Each target table can be synced to multiple target catalogs, this is map from
// targetTableIdentifier to target catalogs.
Map<String, List<TargetCatalogConfig>> targetCatalogs;
// targetTable to target catalogs.
Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs;
// The mode, incremental or snapshot
SyncMode syncMode;

@Builder
ConversionConfig(
@NonNull SourceTable sourceTable,
List<TargetTable> targetTables,
Map<String, List<TargetCatalogConfig>> targetCatalogs,
Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs,
SyncMode syncMode) {
this.sourceTable = sourceTable;
this.targetTables = targetTables;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,4 @@ public TargetTable(
this.metadataRetention =
metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : metadataRetention;
}

public String getId() {
return String.format("%s#%s", sanitizeBasePath(this.basePath), formatName);
}
}
20 changes: 20 additions & 0 deletions xtable-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,24 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
<configuration>
<skip>false</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public Map<String, SyncResult> syncTableAcrossCatalogs(
Map<String, SyncResult> catalogSyncResults = new HashMap<>();
for (TargetTable targetTable : config.getTargetTables()) {
Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients =
config.getTargetCatalogs().get(targetTable.getId()).stream()
config.getTargetCatalogs().get(targetTable).stream()
.collect(
Collectors.toMap(
TargetCatalogConfig::getCatalogTableIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@
import org.junit.jupiter.api.Test;

import org.apache.xtable.conversion.ExternalCatalogConfig;
import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.conversion.TargetCatalogConfig;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.spi.extractor.CatalogConversionSource;
import org.apache.xtable.spi.sync.CatalogSyncClient;
import org.apache.xtable.testutil.ITTestUtils.TestCatalogImpl;

class TestCatalogConversionFactory {

Expand Down Expand Up @@ -69,54 +68,4 @@ void createForCatalog() {
.createCatalogSyncClient(targetCatalogConfig.getCatalogConfig(), new Configuration());
assertEquals(catalogSyncClient.getClass().getName(), TestCatalogImpl.class.getName());
}

public static class TestCatalogImpl
implements CatalogSyncClient<Object>, CatalogConversionSource {

public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration hadoopConf) {}

@Override
public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
return null;
}

@Override
public String getCatalogName() {
return null;
}

@Override
public String getStorageDescriptorLocation(Object o) {
return null;
}

@Override
public boolean hasDatabase(String databaseName) {
return false;
}

@Override
public void createDatabase(String databaseName) {}

@Override
public Object getTable(CatalogTableIdentifier tableIdentifier) {
return null;
}

@Override
public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {}

@Override
public void refreshTable(
InternalTable table, Object catalogTable, CatalogTableIdentifier tableIdentifier) {}

@Override
public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {}

@Override
public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {}

@Override
public void close() throws Exception {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -579,7 +580,7 @@ private ConversionConfig getTableSyncConfig(
.targetTables(targetTables)
.targetCatalogs(
targetTables.stream()
.collect(Collectors.toMap(TargetTable::getId, k -> targetCatalogs)))
.collect(Collectors.toMap(Function.identity(), k -> targetCatalogs)))
.syncMode(syncMode)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@

import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Assertions;

import org.apache.xtable.conversion.ExternalCatalogConfig;
import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.DataLayoutStrategy;
import org.apache.xtable.spi.extractor.CatalogConversionSource;
import org.apache.xtable.spi.sync.CatalogSyncClient;

public class ITTestUtils {

Expand All @@ -44,4 +50,57 @@ public static void validateTable(
Assertions.assertEquals(basePath, internalTable.getBasePath());
Assertions.assertEquals(partitioningFields, internalTable.getPartitioningFields());
}

public static class TestCatalogImpl implements CatalogConversionSource, CatalogSyncClient {

public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration hadoopConf) {}

@Override
public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
return SourceTable.builder()
.name("source_table_name")
.basePath("file://base_path/v1/")
.formatName("ICEBERG")
.build();
}

@Override
public String getCatalogName() {
return null;
}

@Override
public String getStorageLocation(Object o) {
return null;
}

@Override
public boolean hasDatabase(String databaseName) {
return false;
}

@Override
public void createDatabase(String databaseName) {}

@Override
public Object getTable(CatalogTableIdentifier tableIdentifier) {
return null;
}

@Override
public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {}

@Override
public void refreshTable(
InternalTable table, Object catalogTable, CatalogTableIdentifier tableIdentifier) {}

@Override
public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {}

@Override
public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {}

@Override
public void close() throws Exception {}
}
}
9 changes: 9 additions & 0 deletions xtable-utilities/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.xtable</groupId>
<artifactId>xtable-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- command line arg parsing -->
<dependency>
<groupId>commons-cli</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public static void main(String[] args) throws Exception {
dataset.getSourceCatalogTableIdentifier().getCatalogTableIdentifier());
}
List<TargetTable> targetTables = new ArrayList<>();
Map<String, List<TargetCatalogConfig>> targetCatalogs = new HashMap<>();
Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs = new HashMap<>();
for (TargetTableIdentifier targetCatalogTableIdentifier :
dataset.getTargetCatalogTableIdentifiers()) {
TargetTable targetTable =
Expand All @@ -172,11 +172,11 @@ public static void main(String[] args) throws Exception {
.formatName(targetCatalogTableIdentifier.getTableFormat())
.build();
targetTables.add(targetTable);
if (!targetCatalogs.containsKey(targetTable.getId())) {
targetCatalogs.put(targetTable.getId(), new ArrayList<>());
if (!targetCatalogs.containsKey(targetTable)) {
targetCatalogs.put(targetTable, new ArrayList<>());
}
targetCatalogs
.get(targetTable.getId())
.get(targetTable)
.add(
TargetCatalogConfig.builder()
.catalogTableIdentifier(
Expand Down Expand Up @@ -248,45 +248,86 @@ static Map<String, ConversionSourceProvider> getConversionSourceProviders(

@Data
public static class DatasetConfig {
/**
* Configuration of the source catalog from which XTable will read. It must contain all the
* necessary connection and access details for describing and listing tables
*/
private Catalog sourceCatalog;
/**
* Defines configuration one or more target catalogs, to which XTable will write or update
* tables. Unlike the source, these catalogs must be writable
*/
private List<Catalog> targetCatalogs;
/** A list of datasets that specify how a source table maps to one or more target tables. */
private List<Dataset> datasets;

/** Configuration for catalog. */
@Data
public static class Catalog {
/** A unique name for the catalog. */
private String catalogName;
/**
* The type of the source catalog. This might be a specific type understood by XTable, such as
* Hive, Glue etc.
*/
private String catalogType;
/**
* (Optional) A fully qualified class name that implements the interfaces for
* CatalogSyncClient, it can be used if the implementation for catalogType doesn't exist in
* XTable. This is an optional field.
*/
private String catalogImpl;
/**
* A collection of configs used to configure access or connection properties for the catalog.
*/
private Map<String, String> catalogProperties;
}

@Data
public static class StorageIdentifier {
String tableFormat;
String tableBasePath;
String tableDataPath;
String tableName;
String partitionSpec;
String namespace;
public static class Dataset {
/** Identifies the source table in sourceCatalog. */
private SourceTableIdentifier sourceCatalogTableIdentifier;
/** A list of one or more targets that this source table should be written to. */
private List<TargetTableIdentifier> targetCatalogTableIdentifiers;
}

@Data
public static class SourceTableIdentifier {
/** Specifies the table identifier in the source catalog. */
CatalogTableIdentifier catalogTableIdentifier;
/**
* (Optional) Provides direct storage details such as a table’s base path (like an S3
* location) and the partition specification. This allows reading from a source even if it is
* not strictly registered in a catalog, as long as the format and location are known
*/
StorageIdentifier storageIdentifier;
}

@Data
public static class TargetTableIdentifier {
/** name of the target catalog where the table will be created or updated */
String catalogName;
/**
* The target table format (e.g., DELTA, HUDI, ICEBERG), specifying how the data will be
* stored at the target.
*/
String tableFormat;
/** Specifies the table identifier in the target catalog. */
CatalogTableIdentifier catalogTableIdentifier;
}

/**
* Configuration in storage for table. This is an optional field in {@link
* SourceTableIdentifier}.
*/
@Data
public static class Dataset {
private SourceTableIdentifier sourceCatalogTableIdentifier;
private List<TargetTableIdentifier> targetCatalogTableIdentifiers;
public static class StorageIdentifier {
String tableFormat;
String tableBasePath;
String tableDataPath;
String tableName;
String partitionSpec;
String namespace;
}
}
}
Loading

0 comments on commit a723014

Please sign in to comment.