From d937310d091ca3cc6833d560f42e71f6e21d2d4c Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Mon, 16 Dec 2024 09:49:21 +0100 Subject: [PATCH] =?UTF-8?q?[FSTORE-1644]=20FlinkEngine=20configures=20Kafk?= =?UTF-8?q?a=20producer=20pointing=20to=20the=20l=E2=80=A6=20(#438)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [FSTORE-1644] FlinkEngine configures Kafka producer pointing to the local certificate path * Add missing dependencies --- java/flink/pom.xml | 22 +++ .../hsfs/flink/engine/FlinkEngine.java | 129 ++++++++++++++---- .../hsfs/flink/engine/TestFlinkEngine.java | 87 ++++++++++++ java/flink/src/test/resources/test_kstore.jks | Bin 0 -> 3961 bytes java/flink/src/test/resources/test_tstore.jks | Bin 0 -> 1460 bytes .../metadata/HopsworksInternalClient.java | 1 - 6 files changed, 211 insertions(+), 28 deletions(-) create mode 100644 java/flink/src/test/java/com/logicalclocks/hsfs/flink/engine/TestFlinkEngine.java create mode 100644 java/flink/src/test/resources/test_kstore.jks create mode 100644 java/flink/src/test/resources/test_tstore.jks diff --git a/java/flink/pom.xml b/java/flink/pom.xml index 11564004f..c50848325 100644 --- a/java/flink/pom.xml +++ b/java/flink/pom.xml @@ -14,6 +14,9 @@ 1.17.1.0 2.13.4.2 + 1.79 + 14.0.1 + 4.5.6 @@ -88,5 +91,24 @@ ${fasterxml.version} + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + + + com.google.guava + guava + ${guava.version} + + + + org.bouncycastle + bcpkix-jdk18on + test + ${bouncycastle.version} + + diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java index aa6b07537..b8e43908e 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/FlinkEngine.java @@ -17,6 +17,7 @@ package com.logicalclocks.hsfs.flink.engine; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.logicalclocks.hsfs.FeatureGroupBase; import com.logicalclocks.hsfs.FeatureStoreException; @@ -25,6 +26,8 @@ import com.logicalclocks.hsfs.flink.StreamFeatureGroup; import com.logicalclocks.hsfs.metadata.HopsworksInternalClient; +import com.logicalclocks.hsfs.metadata.StorageConnectorApi; +import com.twitter.chill.Base64; import lombok.Getter; import org.apache.avro.generic.GenericRecord; @@ -36,8 +39,16 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.FileUtils; +import org.apache.kafka.common.config.SslConfigs; +import java.io.FileInputStream; import java.io.IOException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.Certificate; +import java.security.cert.CertificateEncodingException; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -45,7 +56,7 @@ public class FlinkEngine extends EngineBase { private static FlinkEngine INSTANCE = null; - public static synchronized FlinkEngine getInstance() throws FeatureStoreException { + public static synchronized FlinkEngine getInstance() { if (INSTANCE == null) { INSTANCE = new FlinkEngine(); } @@ -55,38 +66,38 @@ public static synchronized FlinkEngine getInstance() throws FeatureStoreExceptio @Getter private StreamExecutionEnvironment streamExecutionEnvironment; - private FlinkEngine() throws FeatureStoreException { + private FlinkEngine() { streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); // Configure the streamExecutionEnvironment streamExecutionEnvironment.getConfig().enableObjectReuse(); } public DataStreamSink writeDataStream(StreamFeatureGroup streamFeatureGroup, DataStream dataStream, - Map writeOptions) throws FeatureStoreException, IOException { + Map writeOptions) throws FeatureStoreException, IOException { DataStream genericDataStream = (DataStream) dataStream; Properties properties = new Properties(); properties.putAll(getKafkaConfig(streamFeatureGroup, writeOptions)); KafkaSink sink = KafkaSink.builder() - .setBootstrapServers(properties.getProperty("bootstrap.servers")) - .setKafkaProducerConfig(properties) - .setRecordSerializer(new KafkaRecordSerializer(streamFeatureGroup)) - .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) - .build(); + .setBootstrapServers(properties.getProperty("bootstrap.servers")) + .setKafkaProducerConfig(properties) + .setRecordSerializer(new KafkaRecordSerializer(streamFeatureGroup)) + .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .build(); Map complexFeatureSchemas = new HashMap<>(); - for (String featureName: streamFeatureGroup.getComplexFeatures()) { + for (String featureName : streamFeatureGroup.getComplexFeatures()) { complexFeatureSchemas.put(featureName, streamFeatureGroup.getFeatureAvroSchema(featureName)); } DataStream avroRecordDataStream = - genericDataStream.map(new PojoToAvroRecord( - streamFeatureGroup.getDeserializedAvroSchema(), - streamFeatureGroup.getDeserializedEncodedAvroSchema(), - complexFeatureSchemas)) - .returns( - new GenericRecordAvroTypeInfo(streamFeatureGroup.getDeserializedEncodedAvroSchema()) - ); + genericDataStream.map(new PojoToAvroRecord( + streamFeatureGroup.getDeserializedAvroSchema(), + streamFeatureGroup.getDeserializedEncodedAvroSchema(), + complexFeatureSchemas)) + .returns( + new GenericRecordAvroTypeInfo(streamFeatureGroup.getDeserializedEncodedAvroSchema()) + ); return avroRecordDataStream.sinkTo(sink); } @@ -96,34 +107,98 @@ public String addFile(String filePath) throws IOException { if (Strings.isNullOrEmpty(filePath)) { return filePath; } - // this is used for unit testing - if (!filePath.startsWith("file://")) { - filePath = "hdfs://" + filePath; + + if (filePath.startsWith("hdfs://")) { + String targetPath = FileUtils.getCurrentWorkingDirectory().toString() + + filePath.substring(filePath.lastIndexOf("/")); + FileUtils.copy(new Path(filePath), new Path(targetPath), false); + + return targetPath; } - String targetPath = FileUtils.getCurrentWorkingDirectory().toString() - + filePath.substring(filePath.lastIndexOf("/")); - FileUtils.copy(new Path(filePath), new Path(targetPath), false); - return targetPath; + + return filePath; } @Override public Map getKafkaConfig(FeatureGroupBase featureGroup, Map writeOptions) - throws FeatureStoreException, IOException { + throws FeatureStoreException, IOException { boolean external = !(System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS) - || (writeOptions != null - && Boolean.parseBoolean(writeOptions.getOrDefault("internal_kafka", "false")))); + || (writeOptions != null + && Boolean.parseBoolean(writeOptions.getOrDefault("internal_kafka", "false")))); StorageConnector.KafkaConnector storageConnector = - storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external); + storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external); storageConnector.setSslTruststoreLocation(addFile(storageConnector.getSslTruststoreLocation())); storageConnector.setSslKeystoreLocation(addFile(storageConnector.getSslKeystoreLocation())); Map config = storageConnector.kafkaOptions(); + // To avoid distribution issues of the certificates across multiple pods/nodes + // here we are extracting the key/certificates from the JKS keyStore/trustStore and + // pass them in the configuration as PEM content + try { + KeyStore keyStore = KeyStore.getInstance("JKS"); + keyStore.load(new FileInputStream(storageConnector.getSslKeystoreLocation()), + storageConnector.getSslKeystorePassword().toCharArray()); + config.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, getKey(keyStore, storageConnector.getSslKeystorePassword())); + config.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, getCertificateChain(keyStore)); + config.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM"); + + KeyStore trustStore = KeyStore.getInstance("JKS"); + trustStore.load(new FileInputStream(storageConnector.getSslTruststoreLocation()), + storageConnector.getSslTruststorePassword().toCharArray()); + config.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, getRootCA(trustStore)); + config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); + } catch (Exception ex) { + throw new IOException(ex); + } + + // Remove the keystore and truststore location from the properties otherwise + // the SSL engine will try to use them first. + config.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); + config.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); + config.remove(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG); + config.remove(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG); + config.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); + if (writeOptions != null) { config.putAll(writeOptions); } config.put("enable.idempotence", "false"); return config; } + + private String getKey(KeyStore keyStore, String password) + throws KeyStoreException, UnrecoverableKeyException, NoSuchAlgorithmException { + String keyAlias = keyStore.aliases().nextElement(); + return "-----BEGIN PRIVATE KEY-----\n" + + Base64.encodeBytes(keyStore.getKey(keyAlias, password.toCharArray()).getEncoded()) + + "\n-----END PRIVATE KEY-----"; + } + + private String getCertificateChain(KeyStore keyStore) throws KeyStoreException, CertificateEncodingException { + String certificateAlias = keyStore.aliases().nextElement(); + Certificate[] certificateChain = keyStore.getCertificateChain(certificateAlias); + + StringBuilder certificateChainBuilder = new StringBuilder(); + for (Certificate certificate : certificateChain) { + certificateChainBuilder.append("-----BEGIN CERTIFICATE-----\n") + .append(Base64.encodeBytes(certificate.getEncoded())) + .append("\n-----END CERTIFICATE-----\n"); + } + + return certificateChainBuilder.toString(); + } + + private String getRootCA(KeyStore trustStore) throws KeyStoreException, CertificateEncodingException { + String rootCaAlias = trustStore.aliases().nextElement(); + return "-----BEGIN CERTIFICATE-----\n" + + Base64.encodeBytes(trustStore.getCertificate(rootCaAlias).getEncoded()) + + "\n-----END CERTIFICATE-----"; + } + + @VisibleForTesting + public void setStorageConnectorApi(StorageConnectorApi storageConnectorApi) { + this.storageConnectorApi = storageConnectorApi; + } } diff --git a/java/flink/src/test/java/com/logicalclocks/hsfs/flink/engine/TestFlinkEngine.java b/java/flink/src/test/java/com/logicalclocks/hsfs/flink/engine/TestFlinkEngine.java new file mode 100644 index 000000000..20540cbed --- /dev/null +++ b/java/flink/src/test/java/com/logicalclocks/hsfs/flink/engine/TestFlinkEngine.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2024. Hopsworks AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and limitations under the License. + * + */ +package com.logicalclocks.hsfs.flink.engine; + +import com.logicalclocks.hsfs.*; +import com.logicalclocks.hsfs.flink.FeatureStore; +import com.logicalclocks.hsfs.flink.StreamFeatureGroup; +import com.logicalclocks.hsfs.metadata.HopsworksClient; +import com.logicalclocks.hsfs.metadata.HopsworksHttpClient; +import com.logicalclocks.hsfs.metadata.StorageConnectorApi; +import org.apache.kafka.common.config.SslConfigs; +import org.bouncycastle.cert.X509CertificateHolder; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.bouncycastle.openssl.PEMParser; + +import java.io.IOException; +import java.io.StringReader; +import java.security.cert.CertificateException; +import java.util.HashMap; +import java.util.Map; + +public class TestFlinkEngine { + + @Test + public void testKafkaProperties_Certificates() throws IOException, FeatureStoreException, CertificateException { + // Arrange + HopsworksClient hopsworksClient = Mockito.mock(HopsworksClient.class); + hopsworksClient.setInstance(new HopsworksClient(Mockito.mock(HopsworksHttpClient.class), "host")); + + StorageConnector.KafkaConnector kafkaConnector = new StorageConnector.KafkaConnector(); + kafkaConnector.setSslKeystoreLocation(this.getClass().getResource("/test_kstore.jks").getPath()); + kafkaConnector.setSslKeystorePassword("O74K016I5UTB7YYPC6K6RXIM9F7LVPFW23FNK8WF3JEOO7Y607VCU7E7691UQ3CA"); + kafkaConnector.setSslTruststoreLocation(this.getClass().getResource("/test_tstore.jks").getPath()); + kafkaConnector.setSslTruststorePassword("O74K016I5UTB7YYPC6K6RXIM9F7LVPFW23FNK8WF3JEOO7Y607VCU7E7691UQ3CA"); + kafkaConnector.setSecurityProtocol(SecurityProtocol.SSL); + kafkaConnector.setSslEndpointIdentificationAlgorithm(SslEndpointIdentificationAlgorithm.EMPTY); + kafkaConnector.setExternalKafka(true); + + StorageConnectorApi storageConnectorApi = Mockito.mock(StorageConnectorApi.class); + Mockito.when(storageConnectorApi.getKafkaStorageConnector(Mockito.any(), Mockito.anyBoolean())) + .thenReturn(kafkaConnector); + FlinkEngine flinkEngine = FlinkEngine.getInstance(); + flinkEngine.setStorageConnectorApi(storageConnectorApi); + + StreamFeatureGroup featureGroup = new StreamFeatureGroup(); + featureGroup.setFeatureStore(new FeatureStore()); + + // Act + Map kafkaOptions = flinkEngine.getKafkaConfig(featureGroup, new HashMap<>()); + + // Assert + Assert.assertEquals("PEM", kafkaOptions.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)); + Assert.assertEquals("PEM", kafkaOptions.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); + + String keystoreChainPem = kafkaOptions.get(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG); + String trustStorePem = kafkaOptions.get(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG); + + try (PEMParser pemParser = new PEMParser(new StringReader(keystoreChainPem))) { + Assert.assertEquals("CN=FraudWorkshop__fabio000", + ((X509CertificateHolder) pemParser.readObject()).getSubject().toString()); + + Assert.assertEquals("C=SE,O=Hopsworks,OU=core,CN=HopsRootCA", + ((X509CertificateHolder) pemParser.readObject()).getIssuer().toString()); + } + + try (PEMParser pemParser = new PEMParser(new StringReader(trustStorePem))) { + Assert.assertEquals("C=SE,O=Hopsworks,OU=core,CN=HopsRootCA", + ((X509CertificateHolder) pemParser.readObject()).getSubject().toString()); + } + } +} diff --git a/java/flink/src/test/resources/test_kstore.jks b/java/flink/src/test/resources/test_kstore.jks new file mode 100644 index 0000000000000000000000000000000000000000..3f02a42ba97b4bc522dc9c1ce0f66336931a0022 GIT binary patch literal 3961 zcmb`|c{o&m-v{v7D0`MrVeEivhT8l z#AHd4U6C#8P5qwh`aSpk{PSGb{m1u@_c`BlopYVf=ll8`tsbp{Kp@D;0{>ntRxV~3 z3lFr5y{iq{*~G-k%-j|Y0Km!pXsX^7D-ehb1mIu?04+JC5IGnI7KG4)!DJv94z>_Z zHUG_mk>Me6-^7T{>pdH{P^_s-t4)1wtF$>lR24S&ElrTpm`&&U5Jms<6+^m`qs5-u zS9jj(f_deJ&8WLiKG_tGI(Za!&Rc424skY8aCE6pZ2PT`o zKbZU=^NcUqL28SpJxG8jWJcQilSUvmecGGQ+BD7)Hc%18Gcp)0S2(o{`XnT!$||9+ z6%UfjY~v^ETvIvw&Ot&{5i3`a=dY$T^FH99-B-c&=wSX6PvY`m;@f zJ!^1X;*>#R$I&-xwQ`b3Q~DKRW7*b(uv-FvFDa}OdnF2huN_!cESDoYb4W8Q{{y!NOy z>DhafgZ3jUfo`pbK_u&MsD5^r(h)bo#ry{b0~l`uNzV+9_f z_$kFJ&7b;4>=ldTk=@XefRkEKv+mh5T&=|q^4WJ@l?~1~5dtWaOKn3i0!L4P2Xl`kIB-O^}{z+NS90QH=&v6k5;`=a@EetHG z7{%RhQ3vx|OTxPdxVfj}CRslzO7~9QER_nhi0bMsf%`wM9qYJP+}9ZB(U+-PRVix* zr;1w?5n@kqFrveWSKX||1c)F@XI({ z=bn@9`d>zkOoD8$+iIj0xXuiG)QYCKl6|3$Dj`S+KgBjD9Z@j_Nq%ORR z@}3WEqX-g}%yX;)ql+lfmzh6a{20OifcZ!0{_Qsb_$)Kc%O-R(OeDI0#;i@mQpdEO)9k=z+>)2=0kSS2sLLE)kE!Da;O zfGXzI8y0e|<#q*HaVto$$Swt5IQ~%{e%EMHwL`4E?`2o|lYrbg4u<2PSP}LWs~hw? z+U8r!AzY!+p1e(NvBT++ceApLYV&YDZ@ZBnGCx8F?v1 z-qHjzwdu}t5vMsZH}6DULYB&1(SVIt*I^hj$j`Z*cB3eY*)73GLa~|Hu11Ras_pxl zk*Q|RBb_O6NTQvO^rtZ^9x#n-I2SFtQs z4Cv|4iHUsoP=JPrCT?E0Hp1PwrSdX0)2_6Y4_iMRw@IE_$hhSM#o zPHvVij+Pd-W^R^91%QKvMnnvO0InlMM5Pd-1}rq9r%~eX2yi1f0ggY9v)nq>T=zdT z2XJ8Se}3f&;7}Zx?nHJlB?JcsgAxfqwAVBglvlRhj=8n>;cD1cjA7Z3qZCs`v-iwbKre($AM{6^$3h<(g8hQ2AsH}X7 zxrthB{^|quV^Xc|KLq;<$>y3pu2o8MZHO!S;9o~a5bkAjfBAepH39xw<0aOy45^M# z`zCsoKS5urP;vzJ*tzwou8KuY>C0|d~ai8H;oY!u=_V_?{oAcYtB$a{2Mhj2nx))9hvIhc#N!nUY`8&QZ zhN5R*qtEonT_-xZ_Ad&u#ZZp%woG9_y(Dd2FZzlb2L*S^R9<=-Xr7BYGP@B!hz9khZv1$9PwEGXC0z|viVUGc4naUG@6Pw_S+62(F4NY24%$x(4;d2- zor9UZ8;;j_L_4DyZ#K8|oz)9asnW3Bf7_HyLd^(Bz6*%MVq`P3o^+vJPo>1VYrIf6 z{4ReG0xkFzVZ*#^8Z4Ysf|FjHpY8m9jvXcW)Y3CnO8Kqkxu3dt8 zi=T6(kh3dUZf%y|NL&*DJLf-N@Cjdan{eB6b}QfR3{1Rg{6x8`Pf{{R?ZZNTrP6sX z`pye=EU5j@7KU{D-F{+}L`>P?(zfHMZ@sh*gPPG$Fn8&K5=ONH9-taoTGamCID{5! z_|~1bs`tkI8d<)EA*G60p1TaPHgmT~QsRD-Dq6w}xKLMrEA|0FvhB;rybZCWN9T>T zzBVP_zFie4XQjM3^L4X5OHB8;a+L&fMr9m%?(VhV{CslpsE3nR-Xl3zuUQqa&N6kB zOIkHfskc@X8Hc?Rlx+^vAo%|E1#Vq+X?`$Q|50Osi~|{s{;Gk@*g0oDf|-S8qtAG- zddDJ5_HgD%##f$QO|$b+laKDqL~5J#75{~H;F;SptJr$$YR|hIQE}~<%*3p1n%-lp zmBM~;)jvtgWA$l1>29V@{w=)Sg6NWC^%*Lp8q;C9VD+P8{b=tDI}`bZ-Y_JbL}vx(OKn+K`ed$8<>F)IM!CYR)_N=W))lO% z=%z)VL@4TnkBeo8UziLCCd3SKK;Q4<^4Z#0U&I&O5 zGx0-BP@B;GJT_G&n0{eJo4|dN6~J)1;rAps3y`0T00@}uf6oJO@V_0Y-vYl9O7lq2 z%}b1<^aw#cr-w@C9OfrTFj(fX?IEA@4P2KN^x5@4i3kd2{F3ULfX7Yi&GIxjxlXK2i}wHIi20 zpX!9BrX4|8x`r`PbZ3Q?sA!}t{$iUrmMH`JWn%;`@i6i|^rCCXJ!V=-v9Dhj@7QPI zVz_f>H6Wx%uE&`;Z*r5RLq`L`kJkp`HZ3$_*op(Mh6LTa*`T;1mq>LOC`}~g-21N0 ztmYL?jkSK90YFB3(shH%voL-_f*GNf;?3PrHJEiYHf(5+-on=}WUD3{&h`Wu;# zUGIoCn0N9%ZPP6yuMkEynffn4X!2K_Q7XwLf&J_K$@MG#?bd~Pk}_WQ7U>N1n?HIl zw_Uxp5Ye5TSP*#&k}(dc37yh9YVFRg>C(bz*s{x6Bv3yt)wi`?H&tO%>(n9(4}&DS N*9FpsZkZ|E`4@&5^8 literal 0 HcmV?d00001 diff --git a/java/flink/src/test/resources/test_tstore.jks b/java/flink/src/test/resources/test_tstore.jks new file mode 100644 index 0000000000000000000000000000000000000000..c0e7effb621ce3a9be5a121de6222f4623c59922 GIT binary patch literal 1460 zcmezO_TO6u1_mY|W(3nb8Tkdp@kROhCGp9LK*7n}r{AP8utw;a8dx$gu;v>yv1S`I zF$XVTW@2PwVq`GjW#iOp^Jx3d%gD&h%3$DX$Zf#M#vIDRCd?Gmz;On9i8~q_eA0Qi(%`Y|LwNb?GlZyktgH^cV77OQ zjw}B%h4X9DTND;NJGe1lZ>!r(jvo@dGqgP{XZ!KR=xr0@_h^6H{d$U5<>ZrW7AA-D zQ`Jj6=dA5MWx=;Wvo`WAe`P{`;GVcuyJw!6(RaS0!F^j}%$iRPQ$EGssG4Q_#_h84 z^;a*}uC_l?Gudvh_Hn*9c5Wtc|N1q}`{a-KL#*JEPE108~&hF(~ z{C0KluG0&Z9kG+bG-K# z=HDx2JY85K*K9eW5qvAnKprHm%pzeR)_`3BYE}ZKL`DX+z~z3ETsj^rstCC*uRHxI zSV>Kn|D&Z^uikn8X@!D9)w$^(?5+xma<1yt zo23~3w7TczLfr*H3fb3}_3s|mFkN|X?Ods(w~72!$1C)A z=$m*=F9_J~H&v1=deQlY$$O;w)1T^H|LcCZm#b-RnAVq?b9-|qHEoJA;6B3Hdj0Y| z^Pho2o5G9kJ&=+YZZTowI-g!Xai`J7?j46S%k{jwk4;T@l3Vy}OT${vTVMF*3D&p# zt$DIA=K5BZxe_9E?pvDVj@SS2%F2KKIrBi4)z52Zg5SD{nN9fiP$E6k>7dD$ztUUP zkAi3tbaEvr}>=e-Fy1ef=@c%udB`1;QbPowajVn^198t-ZF$FW@2