From 2663cbb123677a76442c147d0eab5641df2beec0 Mon Sep 17 00:00:00 2001 From: Jaideep K Date: Sat, 7 Apr 2018 21:24:58 +0700 Subject: [PATCH 1/3] Redshift views :- - RedShift viewer interface - RedShift Views - Viewer interface to be implemented by a view class. - Test which calls the class. --- build.gradle | 4 + .../launcher/views/RedShiftViewer.java | 281 ++++++++++++++++++ .../plumbox/launcher/views/RedShiftViews.java | 53 ++++ .../oogway/plumbox/launcher/views/Viewer.java | 31 ++ .../launcher/views/RedShiftViewsTest.java | 53 ++++ 5 files changed, 422 insertions(+) create mode 100644 src/main/java/in/oogway/plumbox/launcher/views/RedShiftViewer.java create mode 100644 src/main/java/in/oogway/plumbox/launcher/views/RedShiftViews.java create mode 100644 src/main/java/in/oogway/plumbox/launcher/views/Viewer.java create mode 100644 src/test/java/in/oogway/plumbox/launcher/views/RedShiftViewsTest.java diff --git a/build.gradle b/build.gradle index 80d1041..ef6a336 100644 --- a/build.gradle +++ b/build.gradle @@ -71,6 +71,10 @@ dependencies { compile group: 'org.json', name: 'json', version: '20180130' compile 'com.google.code.gson:gson:2.8.2' compile group: 'commons-cli', name: 'commons-cli', version: '1.4' + compile 'org.apache.hadoop:hadoop-aws:2.7.3' + + compile group: 'com.databricks', name: 'spark-redshift_2.10', version: '2.0.1' + // JUnit Jupiter API and TestEngine implementation diff --git a/src/main/java/in/oogway/plumbox/launcher/views/RedShiftViewer.java b/src/main/java/in/oogway/plumbox/launcher/views/RedShiftViewer.java new file mode 100644 index 0000000..ec7b33b --- /dev/null +++ b/src/main/java/in/oogway/plumbox/launcher/views/RedShiftViewer.java @@ -0,0 +1,281 @@ +package in.oogway.plumbox.launcher.views; + +import in.oogway.plumbox.cache.TableTopSlicer; +import in.oogway.plumbox.config.JDBCConfig; +import in.oogway.plumbox.datalake.dataLakeProperties; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructField; + +import java.sql.*; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.stream.Collectors; + +/** + * Created by jaideep Khandelwal on 05/04/18. + */ +public interface RedShiftViewer extends Viewer, TableTopSlicer { + + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + + /** + * @return Data Type of the column for partitioning + */ + String partitionColumnDataType(); + + /* + * HashMap to convert HiveDataType to RedShift Data Type + */ + HashMap dataTypeMapping = new HashMap() {{ + put("STRING", "VARCHAR"); + put("LONG", "BIGINT"); + put("INTEGER", "INTEGER"); + put("DECIMAL","DECIMAL(38,2)"); + put("BOOLEAN", "BOOLEAN"); + put("TIMESTAMP", "TIMESTAMP"); + }}; + + /** + * @return S3 Path + */ + default dataLakeProperties dataLakeProperties() { + return new dataLakeProperties( + System.getProperty("storage_path"), + System.getProperty("top_dir"), + System.getProperty("storage_protocol") + ); + } + + /** + * @param fields dataframe fields to get the schema + * @return (columnname datatype, columnname datatype, ....) + */ + default String generateSQLSchema(StructField[] fields) { + return Arrays.stream(fields).map( + structField -> String.format("%s %s", structField.name(), + hiveToPgDataType(structField.dataType().typeName())) + ).collect(Collectors.joining(", ")); + } + + /** + * @param hiveDataType function to convert hive table to postgres datatype + * @return redShift compatible datatype + */ + default String hiveToPgDataType(String hiveDataType) { + String s = hiveDataType.toUpperCase(); + + if (s.startsWith("DECIMAL")) { + s = "DECIMAL"; + } + return dataTypeMapping.get(s); + } + + /** Generate a table which takes fields to be used for creating a view, + * this is independent of the dataframe. + * @param ss Spark Session + * @param fields Struct fields that has schema of the view to be created + */ + @Override + default void generateView(SparkSession ss, StructField[] fields) { + + String path = dataLakeProperties().MakePath(destinationView()); + + if (checkIfTableExists()) { + return; + } + + String query = String.format("CREATE EXTERNAL TABLE %s.%s (%s) " + + "PARTITIONED BY (%s %s) " + + "ROW FORMAT DELIMITED " + + "FIELDS TERMINATED BY '\\t' "+ + "STORED AS %s " + + "LOCATION '%s'", getSchemaName(), destinationView(), generateSQLSchema(fields), + getPartitionColumn(), partitionColumnDataType(), storageFormat(), path + ); + + try { + PreparedStatement statement = jdbcConn(query); + statement.execute(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + + /** Check ff the given external table already exists for a given schema. + * @return + */ + default Boolean checkIfTableExists() { + + String query = String.format("SELECT TRUE WHERE EXISTS (" + + "SELECT * FROM SVV_EXTERNAL_TABLES WHERE TABLENAME = '%s' AND SCHEMANAME='%s')", + destinationView(), + getSchemaName() + ); + + try { + PreparedStatement statement = jdbcConn(query); + ResultSet resultSet = statement.executeQuery(); + if (resultSet.next()) { + return true; + } + } catch (SQLException e) { + e.printStackTrace(); + } + + return false; + } + + /** + * Creates a JDBC connection and executes the query + * @throws SQLException + */ + default PreparedStatement jdbcConn(String query) throws SQLException { + + JDBCConfig writeConfig = getWriteConfig(); + + Properties props = new Properties(); + + //Uncomment the following line if using a keystore. + props.setProperty("ssl", "true"); + props.setProperty("user", writeConfig.get().get("user")); + props.setProperty("password", writeConfig.get().get("password")); + + Connection connection = DriverManager.getConnection(writeConfig.get().get("url"), props); + + return connection.prepareStatement(query); + + } + + /** + * @return JDBC Write configuration + */ + default JDBCConfig getWriteConfig() { + + JDBCConfig jdbcConfig = new JDBCConfig(String.format("%s.%s", getSchemaName(), destinationView())); + jdbcConfig.set(new HashMap() {{ + put("dbtable", String.format("%s.%s", getSchemaName(), destinationView())); + put("url", System.getProperty("rs_url")); + put("driver", "com.amazon.redshift.jdbc.Driver"); + put("user",System.getProperty("rs_user")); + put("password",System.getProperty("rs_password")); + }}); + + return jdbcConfig; + } + + /** Add one partition to a view + * @param partition name of the partition to be added + * @param viewName table name + */ + @Override + default void addPartition(String partition, String viewName) { + + String path = dataLakeProperties().MakePath(String.format("%s/%s", viewName, partition)); + + partition = partition.replaceFirst("(?<==)(.+)", "'$1'"); + String query = String.format( + "ALTER TABLE %s.%s ADD IF NOT EXISTS PARTITION(%s) LOCATION '%s'", + getSchemaName(), + viewName, + partition, + path + ); + + try { + PreparedStatement statement = jdbcConn(query); + statement.execute(); + } catch (SQLException e) { + e.printStackTrace(); + } + + } + + /** Remove one particular partition from a view + * @param partition name of the partition to be removed + * @param viewName table name + */ + @Override + default void dropPartition(String partition, String viewName) { + + partition = partition.replaceFirst("(?<==)(.+)", "'$1'"); + + String query = String.format( + "ALTER TABLE %s.%s DROP PARTITION(%s)", + getSchemaName(), + viewName, + partition + ); + + try { + PreparedStatement statement = jdbcConn(query); + statement.execute(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + + /** All the partitions that should be added to a view + * @param span Number of partitions to be added to view + * @return Set of proposed partitions + */ + @Override + default Set proposedPartitions(Integer span) { + /*Get a set of last 30 days from the current date + * */ + Calendar cal = Calendar.getInstance(); + + return getBackDays(span, cal, new HashSet()); + } + + /** Get current partitions for the view + * @param viewName Name of the view + * @return Active partitions to be added. + */ + @Override + default Set activePartitions(String viewName) { + String query = String.format("SELECT values FROM svv_external_partitions WHERE schemaname='%s' AND tablename='%s'", + getSchemaName(), + destinationView() + ); + + HashSet aP = new HashSet<>(); + + try { + PreparedStatement statement = jdbcConn(query); + ResultSet resultSet = statement.executeQuery(); + while (resultSet.next()) { + String value = resultSet.getString("values"); + + String partition = String.format( + "%s=%s", getPartitionColumn(), value.replaceFirst("\\[\"(.*?)\"\\]", "$1") + ); + + aP.add(partition); + } + } catch (SQLException e) { + e.printStackTrace(); + } + + return aP; + } + + /** Get the consecutive back days for a given span + * @param span number of days to check value + * @param cal Calendar instance to be used for getting dates as string in recursive function + * @param set Append dates to the set. + * @return + */ + default Set getBackDays(int span, Calendar cal, Set set) { + if (span > 0) { + + set.add(String.format("%s=%s", getPartitionColumn(), dateFormat.format(cal.getTime()))); + + cal.add(Calendar.DATE, -1); + + return getBackDays(span - 1, cal, set); + + } else { + return set; + } + } +} \ No newline at end of file diff --git a/src/main/java/in/oogway/plumbox/launcher/views/RedShiftViews.java b/src/main/java/in/oogway/plumbox/launcher/views/RedShiftViews.java new file mode 100644 index 0000000..bdf6571 --- /dev/null +++ b/src/main/java/in/oogway/plumbox/launcher/views/RedShiftViews.java @@ -0,0 +1,53 @@ +package in.oogway.plumbox.launcher.views; + +import java.util.HashMap; + +/** + * Created by jaideep Khandelwal on 02/04/18. + * + * This is an implementation of the RedShift viewer to create a view in RedShift Spectrum + * + */ + + +public class RedShiftViews implements RedShiftViewer { + + @Override + public String destinationView() { + return "sample_table"; + } + + @Override + public String destinationWarmView() { + return null; + } + + @Override + public HashMap getViewsAndPartitionNumber() { + HashMap hm=new HashMap(); + hm.put(destinationView(), 30); + + return hm; + } + + @Override + public String getPartitionColumn() { + return "date"; + } + + @Override + public String storageFormat() { + return "parquet"; + } + + @Override + public String getSchemaName() { + return "spectrum"; + } + + @Override + public String partitionColumnDataType() { + return "TIMESTAMP"; + } + +} diff --git a/src/main/java/in/oogway/plumbox/launcher/views/Viewer.java b/src/main/java/in/oogway/plumbox/launcher/views/Viewer.java new file mode 100644 index 0000000..29a6efc --- /dev/null +++ b/src/main/java/in/oogway/plumbox/launcher/views/Viewer.java @@ -0,0 +1,31 @@ +package in.oogway.plumbox.launcher.views; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructField; + +/** + * Created by jaideep Khandelwal on 06/04/18. + */ +public interface Viewer { + + /** + * @return Column name on which partition has to be done + */ + String getPartitionColumn(); + + /** + * @return Format of the storage + */ + String storageFormat(); + + /** + * @return Name of the schema + */ + String getSchemaName(); + + + + void generateView(SparkSession ss, StructField[] fields); +} diff --git a/src/test/java/in/oogway/plumbox/launcher/views/RedShiftViewsTest.java b/src/test/java/in/oogway/plumbox/launcher/views/RedShiftViewsTest.java new file mode 100644 index 0000000..b1f4b76 --- /dev/null +++ b/src/test/java/in/oogway/plumbox/launcher/views/RedShiftViewsTest.java @@ -0,0 +1,53 @@ +package in.oogway.plumbox.launcher.views; + +import in.oogway.plumbox.cli.testing.LocalTester; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.types.StructType; +import org.junit.Before; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Created by jaideep Khandelwal on 06/04/18. + */ +class RedShiftViewsTest extends LocalTester { + + @Before + void setUp() { + + localSession() + .sparkContext() + .hadoopConfiguration() + .set("fs.s3n.awsAccessKeyId",""); + + localSession() + .sparkContext() + .hadoopConfiguration() + .set("fs.s3n.awsSecretAccessKey",""); + + System.setProperty("rs_url", ""); + System.setProperty("rs_user", ""); + System.setProperty("rs_password", ""); + System.setProperty("storage_path", ""); + System.setProperty("storage_protocol", ""); + System.setProperty("top_dir", ""); + + } + + @Test + void testViewCreation() { + setUp(); + RedShiftViews redShiftViews = new RedShiftViews(); + + StructType schema = ExpressionEncoder.javaBean(ViewBean.class).schema(); + + redShiftViews.generateView(localSession(), schema.fields()); + + redShiftViews.rePopulate(); + } +} \ No newline at end of file From b118acaf52af1f5bf06abf9a7beafe5cabc52827 Mon Sep 17 00:00:00 2001 From: Jaideep K Date: Sat, 7 Apr 2018 21:25:59 +0700 Subject: [PATCH 2/3] Remove the extra lines --- .../launcher/views/RedShiftViewsTest.java | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/src/test/java/in/oogway/plumbox/launcher/views/RedShiftViewsTest.java b/src/test/java/in/oogway/plumbox/launcher/views/RedShiftViewsTest.java index b1f4b76..2b1b480 100644 --- a/src/test/java/in/oogway/plumbox/launcher/views/RedShiftViewsTest.java +++ b/src/test/java/in/oogway/plumbox/launcher/views/RedShiftViewsTest.java @@ -1,16 +1,11 @@ package in.oogway.plumbox.launcher.views; import in.oogway.plumbox.cli.testing.LocalTester; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; import org.apache.spark.sql.types.StructType; import org.junit.Before; import org.junit.jupiter.api.Test; -import java.util.Arrays; - -import static org.junit.jupiter.api.Assertions.*; /** * Created by jaideep Khandelwal on 06/04/18. @@ -19,29 +14,18 @@ class RedShiftViewsTest extends LocalTester { @Before void setUp() { - - localSession() - .sparkContext() - .hadoopConfiguration() - .set("fs.s3n.awsAccessKeyId",""); - - localSession() - .sparkContext() - .hadoopConfiguration() - .set("fs.s3n.awsSecretAccessKey",""); - System.setProperty("rs_url", ""); System.setProperty("rs_user", ""); System.setProperty("rs_password", ""); System.setProperty("storage_path", ""); System.setProperty("storage_protocol", ""); System.setProperty("top_dir", ""); - } @Test void testViewCreation() { setUp(); + RedShiftViews redShiftViews = new RedShiftViews(); StructType schema = ExpressionEncoder.javaBean(ViewBean.class).schema(); @@ -49,5 +33,6 @@ void testViewCreation() { redShiftViews.generateView(localSession(), schema.fields()); redShiftViews.rePopulate(); + } } \ No newline at end of file From f522699e396592c8fc932762d3a1fadeb7569153 Mon Sep 17 00:00:00 2001 From: Jaideep K Date: Sat, 7 Apr 2018 21:42:39 +0700 Subject: [PATCH 3/3] Add bean class --- .../plumbox/launcher/views/ViewBean.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 src/test/java/in/oogway/plumbox/launcher/views/ViewBean.java diff --git a/src/test/java/in/oogway/plumbox/launcher/views/ViewBean.java b/src/test/java/in/oogway/plumbox/launcher/views/ViewBean.java new file mode 100644 index 0000000..773100e --- /dev/null +++ b/src/test/java/in/oogway/plumbox/launcher/views/ViewBean.java @@ -0,0 +1,27 @@ +package in.oogway.plumbox.launcher.views; + +import java.io.Serializable; + +/** + * Created by jaideep Khandelwal on 07/04/18. + */ +public class ViewBean implements Serializable { + private String name; + private Integer rollNo; + + public Integer getRollNo() { + return rollNo; + } + + public void setRollNo(Integer rollNo) { + this.rollNo = rollNo; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +}