Skip to content

Commit

Permalink
Merge branch 'main' into FSTORE-1628-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf authored Dec 16, 2024
2 parents 1aac88f + d937310 commit da9ad2a
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 28 deletions.
22 changes: 22 additions & 0 deletions java/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
<properties>
<flink.version>1.17.1.0</flink.version>
<fasterxml.version>2.13.4.2</fasterxml.version>
<bouncycastle.version>1.79</bouncycastle.version>
<guava.version>14.0.1</guava.version>
<httpclient.version>4.5.6</httpclient.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -88,5 +91,24 @@
<version>${fasterxml.version}</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
<scope>test</scope>
<version>${bouncycastle.version}</version>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.logicalclocks.hsfs.flink.engine;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
Expand All @@ -25,6 +26,8 @@
import com.logicalclocks.hsfs.flink.StreamFeatureGroup;

import com.logicalclocks.hsfs.metadata.HopsworksInternalClient;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import com.twitter.chill.Base64;
import lombok.Getter;

import org.apache.avro.generic.GenericRecord;
Expand All @@ -36,16 +39,24 @@
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.FileUtils;
import org.apache.kafka.common.config.SslConfigs;

import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.Certificate;
import java.security.cert.CertificateEncodingException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class FlinkEngine extends EngineBase {
private static FlinkEngine INSTANCE = null;

public static synchronized FlinkEngine getInstance() throws FeatureStoreException {
public static synchronized FlinkEngine getInstance() {
if (INSTANCE == null) {
INSTANCE = new FlinkEngine();
}
Expand All @@ -55,38 +66,38 @@ public static synchronized FlinkEngine getInstance() throws FeatureStoreExceptio
@Getter
private StreamExecutionEnvironment streamExecutionEnvironment;

private FlinkEngine() throws FeatureStoreException {
private FlinkEngine() {
streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure the streamExecutionEnvironment
streamExecutionEnvironment.getConfig().enableObjectReuse();
}

public DataStreamSink<?> writeDataStream(StreamFeatureGroup streamFeatureGroup, DataStream<?> dataStream,
Map<String, String> writeOptions) throws FeatureStoreException, IOException {
Map<String, String> writeOptions) throws FeatureStoreException, IOException {

DataStream<Object> genericDataStream = (DataStream<Object>) dataStream;
Properties properties = new Properties();
properties.putAll(getKafkaConfig(streamFeatureGroup, writeOptions));

KafkaSink<GenericRecord> sink = KafkaSink.<GenericRecord>builder()
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setKafkaProducerConfig(properties)
.setRecordSerializer(new KafkaRecordSerializer(streamFeatureGroup))
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setKafkaProducerConfig(properties)
.setRecordSerializer(new KafkaRecordSerializer(streamFeatureGroup))
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
Map<String, String> complexFeatureSchemas = new HashMap<>();
for (String featureName: streamFeatureGroup.getComplexFeatures()) {
for (String featureName : streamFeatureGroup.getComplexFeatures()) {
complexFeatureSchemas.put(featureName, streamFeatureGroup.getFeatureAvroSchema(featureName));
}

DataStream<GenericRecord> avroRecordDataStream =
genericDataStream.map(new PojoToAvroRecord(
streamFeatureGroup.getDeserializedAvroSchema(),
streamFeatureGroup.getDeserializedEncodedAvroSchema(),
complexFeatureSchemas))
.returns(
new GenericRecordAvroTypeInfo(streamFeatureGroup.getDeserializedEncodedAvroSchema())
);
genericDataStream.map(new PojoToAvroRecord(
streamFeatureGroup.getDeserializedAvroSchema(),
streamFeatureGroup.getDeserializedEncodedAvroSchema(),
complexFeatureSchemas))
.returns(
new GenericRecordAvroTypeInfo(streamFeatureGroup.getDeserializedEncodedAvroSchema())
);

return avroRecordDataStream.sinkTo(sink);
}
Expand All @@ -96,34 +107,98 @@ public String addFile(String filePath) throws IOException {
if (Strings.isNullOrEmpty(filePath)) {
return filePath;
}
// this is used for unit testing
if (!filePath.startsWith("file://")) {
filePath = "hdfs://" + filePath;

if (filePath.startsWith("hdfs://")) {
String targetPath = FileUtils.getCurrentWorkingDirectory().toString()
+ filePath.substring(filePath.lastIndexOf("/"));
FileUtils.copy(new Path(filePath), new Path(targetPath), false);

return targetPath;
}
String targetPath = FileUtils.getCurrentWorkingDirectory().toString()
+ filePath.substring(filePath.lastIndexOf("/"));
FileUtils.copy(new Path(filePath), new Path(targetPath), false);
return targetPath;

return filePath;
}

@Override
public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroup, Map<String, String> writeOptions)
throws FeatureStoreException, IOException {
throws FeatureStoreException, IOException {
boolean external = !(System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS)
|| (writeOptions != null
&& Boolean.parseBoolean(writeOptions.getOrDefault("internal_kafka", "false"))));
|| (writeOptions != null
&& Boolean.parseBoolean(writeOptions.getOrDefault("internal_kafka", "false"))));

StorageConnector.KafkaConnector storageConnector =
storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external);
storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external);
storageConnector.setSslTruststoreLocation(addFile(storageConnector.getSslTruststoreLocation()));
storageConnector.setSslKeystoreLocation(addFile(storageConnector.getSslKeystoreLocation()));

Map<String, String> config = storageConnector.kafkaOptions();

// To avoid distribution issues of the certificates across multiple pods/nodes
// here we are extracting the key/certificates from the JKS keyStore/trustStore and
// pass them in the configuration as PEM content
try {
KeyStore keyStore = KeyStore.getInstance("JKS");
keyStore.load(new FileInputStream(storageConnector.getSslKeystoreLocation()),
storageConnector.getSslKeystorePassword().toCharArray());
config.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, getKey(keyStore, storageConnector.getSslKeystorePassword()));
config.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, getCertificateChain(keyStore));
config.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM");

KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(new FileInputStream(storageConnector.getSslTruststoreLocation()),
storageConnector.getSslTruststorePassword().toCharArray());
config.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, getRootCA(trustStore));
config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM");
} catch (Exception ex) {
throw new IOException(ex);
}

// Remove the keystore and truststore location from the properties otherwise
// the SSL engine will try to use them first.
config.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
config.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
config.remove(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
config.remove(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
config.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);

if (writeOptions != null) {
config.putAll(writeOptions);
}
config.put("enable.idempotence", "false");
return config;
}

private String getKey(KeyStore keyStore, String password)
throws KeyStoreException, UnrecoverableKeyException, NoSuchAlgorithmException {
String keyAlias = keyStore.aliases().nextElement();
return "-----BEGIN PRIVATE KEY-----\n"
+ Base64.encodeBytes(keyStore.getKey(keyAlias, password.toCharArray()).getEncoded())
+ "\n-----END PRIVATE KEY-----";
}

private String getCertificateChain(KeyStore keyStore) throws KeyStoreException, CertificateEncodingException {
String certificateAlias = keyStore.aliases().nextElement();
Certificate[] certificateChain = keyStore.getCertificateChain(certificateAlias);

StringBuilder certificateChainBuilder = new StringBuilder();
for (Certificate certificate : certificateChain) {
certificateChainBuilder.append("-----BEGIN CERTIFICATE-----\n")
.append(Base64.encodeBytes(certificate.getEncoded()))
.append("\n-----END CERTIFICATE-----\n");
}

return certificateChainBuilder.toString();
}

private String getRootCA(KeyStore trustStore) throws KeyStoreException, CertificateEncodingException {
String rootCaAlias = trustStore.aliases().nextElement();
return "-----BEGIN CERTIFICATE-----\n"
+ Base64.encodeBytes(trustStore.getCertificate(rootCaAlias).getEncoded())
+ "\n-----END CERTIFICATE-----";
}

@VisibleForTesting
public void setStorageConnectorApi(StorageConnectorApi storageConnectorApi) {
this.storageConnectorApi = storageConnectorApi;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2024. Hopsworks AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and limitations under the License.
*
*/
package com.logicalclocks.hsfs.flink.engine;

import com.logicalclocks.hsfs.*;
import com.logicalclocks.hsfs.flink.FeatureStore;
import com.logicalclocks.hsfs.flink.StreamFeatureGroup;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.HopsworksHttpClient;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import org.apache.kafka.common.config.SslConfigs;
import org.bouncycastle.cert.X509CertificateHolder;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.bouncycastle.openssl.PEMParser;

import java.io.IOException;
import java.io.StringReader;
import java.security.cert.CertificateException;
import java.util.HashMap;
import java.util.Map;

public class TestFlinkEngine {

@Test
public void testKafkaProperties_Certificates() throws IOException, FeatureStoreException, CertificateException {
// Arrange
HopsworksClient hopsworksClient = Mockito.mock(HopsworksClient.class);
hopsworksClient.setInstance(new HopsworksClient(Mockito.mock(HopsworksHttpClient.class), "host"));

StorageConnector.KafkaConnector kafkaConnector = new StorageConnector.KafkaConnector();
kafkaConnector.setSslKeystoreLocation(this.getClass().getResource("/test_kstore.jks").getPath());
kafkaConnector.setSslKeystorePassword("O74K016I5UTB7YYPC6K6RXIM9F7LVPFW23FNK8WF3JEOO7Y607VCU7E7691UQ3CA");
kafkaConnector.setSslTruststoreLocation(this.getClass().getResource("/test_tstore.jks").getPath());
kafkaConnector.setSslTruststorePassword("O74K016I5UTB7YYPC6K6RXIM9F7LVPFW23FNK8WF3JEOO7Y607VCU7E7691UQ3CA");
kafkaConnector.setSecurityProtocol(SecurityProtocol.SSL);
kafkaConnector.setSslEndpointIdentificationAlgorithm(SslEndpointIdentificationAlgorithm.EMPTY);
kafkaConnector.setExternalKafka(true);

StorageConnectorApi storageConnectorApi = Mockito.mock(StorageConnectorApi.class);
Mockito.when(storageConnectorApi.getKafkaStorageConnector(Mockito.any(), Mockito.anyBoolean()))
.thenReturn(kafkaConnector);
FlinkEngine flinkEngine = FlinkEngine.getInstance();
flinkEngine.setStorageConnectorApi(storageConnectorApi);

StreamFeatureGroup featureGroup = new StreamFeatureGroup();
featureGroup.setFeatureStore(new FeatureStore());

// Act
Map<String, String> kafkaOptions = flinkEngine.getKafkaConfig(featureGroup, new HashMap<>());

// Assert
Assert.assertEquals("PEM", kafkaOptions.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG));
Assert.assertEquals("PEM", kafkaOptions.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));

String keystoreChainPem = kafkaOptions.get(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG);
String trustStorePem = kafkaOptions.get(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG);

try (PEMParser pemParser = new PEMParser(new StringReader(keystoreChainPem))) {
Assert.assertEquals("CN=FraudWorkshop__fabio000",
((X509CertificateHolder) pemParser.readObject()).getSubject().toString());

Assert.assertEquals("C=SE,O=Hopsworks,OU=core,CN=HopsRootCA",
((X509CertificateHolder) pemParser.readObject()).getIssuer().toString());
}

try (PEMParser pemParser = new PEMParser(new StringReader(trustStorePem))) {
Assert.assertEquals("C=SE,O=Hopsworks,OU=core,CN=HopsRootCA",
((X509CertificateHolder) pemParser.readObject()).getSubject().toString());
}
}
}
Binary file added java/flink/src/test/resources/test_kstore.jks
Binary file not shown.
Binary file added java/flink/src/test/resources/test_tstore.jks
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.logicalclocks.hsfs.metadata;

import com.logicalclocks.hsfs.FeatureStoreException;
import jdk.nashorn.internal.runtime.regexp.joni.exception.InternalException;
import lombok.Getter;
import lombok.Setter;
import org.apache.http.HttpHeaders;
Expand Down

0 comments on commit da9ad2a

Please sign in to comment.