Skip to content

Commit

Permalink
[590] Add Iceberg Glue Catalog Sync implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
kroushan-nit committed Dec 21, 2024
1 parent 6e09f5d commit d6ae25a
Show file tree
Hide file tree
Showing 21 changed files with 2,576 additions and 1 deletion.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
<lombok-maven-plugin.version>1.18.20.0</lombok-maven-plugin.version>
<hadoop.version>3.4.0</hadoop.version>
<hudi.version>0.14.0</hudi.version>
<aws.version>2.28.22</aws.version>
<maven-source-plugin.version>3.3.1</maven-source-plugin.version>
<maven-javadoc-plugin.version>3.8.0</maven-javadoc-plugin.version>
<maven-gpg-plugin.version>3.2.4</maven-gpg-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@ public class CatalogTableIdentifier {
* from the table name in storage.
*/
@NonNull String tableName;

@Override
public String toString() {
return String.format("%s.%s", databaseName, tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public enum ErrorCode {
UNSUPPORTED_SCHEMA_TYPE(10007),
UNSUPPORTED_FEATURE(10008),
PARSE_EXCEPTION(10009),
CATALOG_REFRESH_EXCEPTION(10010);
CATALOG_REFRESH_EXCEPTION(10010),
CATALOG_SYNC_GENERIC_EXCEPTION(10011);

private final int errorCode;

Expand Down
11 changes: 11 additions & 0 deletions xtable-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,23 @@
<scope>provided</scope>
</dependency>

<!-- AWS Glue dependencies -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>glue</artifactId>
<version>${aws.version}</version>
</dependency>

<!-- Mockito -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
</dependency>

<!-- Junit -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

import java.util.Map;

import org.apache.iceberg.TableProperties;

import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.storage.TableFormat;

public class TableFormatUtils {

public static String getTableDataLocation(
String tableFormat, String tableLocation, Map<String, String> properties) {
switch (tableFormat) {
case TableFormat.ICEBERG:
return getIcebergDataLocation(tableLocation, properties);
case TableFormat.HUDI:
return tableLocation;
default:
throw new NotSupportedException("Unsupported table format: " + tableFormat);
}
}

/** Get iceberg table data files location */
private static String getIcebergDataLocation(
String tableLocation, Map<String, String> properties) {
String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION);
if (dataLocation == null) {
dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION);
if (dataLocation == null) {
dataLocation = properties.get(TableProperties.OBJECT_STORE_PATH);
if (dataLocation == null) {
dataLocation = String.format("%s/data", tableLocation);
}
}
}
return dataLocation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog.glue;

import java.util.Map;

import org.apache.commons.lang3.StringUtils;

import org.apache.xtable.reflection.ReflectionUtils;

import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.GlueClientBuilder;

/**
* Factory class for creating and configuring instances of {@link GlueClient} with settings provided
* by {@link GlueCatalogConfig}.
*
* <p>This factory is responsible for setting the AWS region and credentials for the Glue client. If
* a custom credentials provider class is specified in {@code GlueCatalogConfig}, it will use
* reflection to instantiate the provider; otherwise, it defaults to the standard AWS credentials
* provider.
*/
public class DefaultGlueClientFactory extends GlueClientFactory {

public DefaultGlueClientFactory(GlueCatalogConfig glueConfig) {
super(glueConfig);
}

public GlueClient getGlueClient() {
GlueClientBuilder builder = GlueClient.builder();
if (!StringUtils.isEmpty(glueConfig.getRegion())) {
builder.region(Region.of(glueConfig.getRegion()));
}

AwsCredentialsProvider credentialsProvider;
if (!StringUtils.isEmpty(glueConfig.getClientCredentialsProviderClass())) {
String className = glueConfig.getClientCredentialsProviderClass();
try {
credentialsProvider =
ReflectionUtils.createInstanceOfClassFromStaticMethod(
className,
"create",
new Class<?>[] {Map.class},
new Object[] {glueConfig.getClientCredentialConfigs()});
} catch (Exception e) {
credentialsProvider =
ReflectionUtils.createInstanceOfClassFromStaticMethod(className, "create");
}
} else {
credentialsProvider = DefaultCredentialsProvider.create();
}

builder.credentialsProvider(credentialsProvider);
return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog.glue;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

/** Configurations for setting up Glue client and running Glue catalog operations */
@Getter
@EqualsAndHashCode
@ToString
public class GlueCatalogConfig {

private static final ObjectMapper OBJECT_MAPPER =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

public static final String CLIENT_CREDENTIAL_PROVIDER_PREFIX =
"externalCatalog.glue.credentials.provider.";

@JsonProperty("externalCatalog.glue.catalogId")
private String catalogId;

@JsonProperty("externalCatalog.glue.region")
private String region;

@JsonProperty("externalCatalog.glue.credentialsProviderClass")
private String clientCredentialsProviderClass;

/**
* In case a credentialsProviderClass is configured and require additional properties for
* instantiation, those properties should start with {@link #CLIENT_CREDENTIAL_PROVIDER_PREFIX}.
*
* <p>For ex: if credentialsProviderClass requires `accessKey` and `secretAccessKey`, they should
* be configured using below keys:
* <li>externalCatalog.glue.credentials.provider.accessKey
* <li>externalCatalog.glue.credentials.provider.secretAccessKey
*/
@Setter private Map<String, String> clientCredentialConfigs;

/** Creates GlueCatalogConfig from given key-value map */
public static GlueCatalogConfig of(Map<String, String> properties) {
try {
GlueCatalogConfig glueCatalogConfig =
OBJECT_MAPPER.readValue(
OBJECT_MAPPER.writeValueAsString(properties), GlueCatalogConfig.class);
Map<String, String> clientCredentialProperties =
propertiesWithPrefix(properties, CLIENT_CREDENTIAL_PROVIDER_PREFIX);
glueCatalogConfig.setClientCredentialConfigs(clientCredentialProperties);
return glueCatalogConfig;
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

private static Map<String, String> propertiesWithPrefix(
Map<String, String> properties, String prefix) {
if (properties == null || properties.isEmpty()) {
return Collections.emptyMap();
}

return properties.entrySet().stream()
.filter(e -> e.getKey().startsWith(prefix))
.collect(Collectors.toMap(e -> e.getKey().replaceFirst(prefix, ""), Map.Entry::getValue));
}
}
Loading

0 comments on commit d6ae25a

Please sign in to comment.