From b81de9baef2a77fdf6bd9d3a27b580bba4a56222 Mon Sep 17 00:00:00 2001 From: Alan Bryant Date: Thu, 5 Dec 2024 06:44:23 -0500 Subject: [PATCH] [BACKLOG-42827] - CRUD for Global objects in repository - RepositorySharedObjectsIO and test - fixed file-open-save for repositories (partial backport) - stopped saving or loading shared objects to the repository as part of transformations or jobs. - Still links steps to databases - Keep ObjectId in various Node-based memory structures, but don't store on disk in VfsSharedObjectsIO - Reset DefaultBowl managers on connect/disconnect from Repository --- .../org/pentaho/di/core/bowl/BaseBowl.java | 3 +- .../org/pentaho/di/core/bowl/DefaultBowl.java | 5 - .../di/core/database/DatabaseMeta.java | 6 +- .../RepositoryElementInterface.java | 21 +- .../di/shared/SharedObjectInterface.java | 2 + .../pentaho/di/shared/VfsSharedObjectsIO.java | 10 + .../org/pentaho/di/cluster/ClusterSchema.java | 4 +- .../org/pentaho/di/cluster/SlaveServer.java | 2 + .../pentaho/di/partition/PartitionSchema.java | 4 +- .../filerep/KettleFileRepository.java | 77 +----- .../di/shared/RepositorySharedObjectsIO.java | 239 +++++++++++++++++ .../java/org/pentaho/di/trans/TransMeta.java | 1 + .../shared/RepositorySharedObjectsIOTest.java | 248 ++++++++++++++++++ .../steps/loadsave/MemoryRepository.java | 176 +++++++++---- .../loadsave/MemoryRepositoryExtended.java | 112 ++++++++ .../repository/model/RepositoryDirectory.java | 9 +- .../pur/ISharedObjectsTransformer.java | 11 + .../di/repository/pur/JobDelegate.java | 35 +-- .../di/repository/pur/PurRepository.java | 13 +- .../di/repository/pur/TransDelegate.java | 56 +--- .../RepositoryConnectController.java | 4 +- .../java/org/pentaho/di/ui/spoon/Spoon.java | 100 +++---- 22 files changed, 839 insertions(+), 299 deletions(-) create mode 100644 engine/src/main/java/org/pentaho/di/shared/RepositorySharedObjectsIO.java create mode 100644 engine/src/test/java/org/pentaho/di/shared/RepositorySharedObjectsIOTest.java create mode 100644 engine/src/test/java/org/pentaho/di/trans/steps/loadsave/MemoryRepositoryExtended.java diff --git a/core/src/main/java/org/pentaho/di/core/bowl/BaseBowl.java b/core/src/main/java/org/pentaho/di/core/bowl/BaseBowl.java index 1f709336b5b0..815b79a1077e 100644 --- a/core/src/main/java/org/pentaho/di/core/bowl/BaseBowl.java +++ b/core/src/main/java/org/pentaho/di/core/bowl/BaseBowl.java @@ -61,8 +61,7 @@ public void addParentBowl( Bowl parent ) { parentBowls.add( parent ); } - // use with caution. For testing only. - @VisibleForTesting + // use with caution. public synchronized void clearManagers() { managerInstances.clear(); } diff --git a/core/src/main/java/org/pentaho/di/core/bowl/DefaultBowl.java b/core/src/main/java/org/pentaho/di/core/bowl/DefaultBowl.java index 3d02e80a37e3..e51759fc8786 100644 --- a/core/src/main/java/org/pentaho/di/core/bowl/DefaultBowl.java +++ b/core/src/main/java/org/pentaho/di/core/bowl/DefaultBowl.java @@ -95,11 +95,6 @@ public SharedObjectsIO getSharedObjectsIO() { return sharedObjectsIO; } - /** - * Set a specific metastore supplier for use by later calls to this class. Note that this will cause the - * ConnectionManager from this class and from ConnectionManager.getInstance() to return different instances. - */ - @VisibleForTesting public void setSharedObjectsIO( SharedObjectsIO sharedObjectsIO ) { this.sharedObjectsIO = sharedObjectsIO; } diff --git a/core/src/main/java/org/pentaho/di/core/database/DatabaseMeta.java b/core/src/main/java/org/pentaho/di/core/database/DatabaseMeta.java index fc6c17464bda..a3fd88e1c65c 100644 --- a/core/src/main/java/org/pentaho/di/core/database/DatabaseMeta.java +++ b/core/src/main/java/org/pentaho/di/core/database/DatabaseMeta.java @@ -3,7 +3,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2021 by Hitachi Vantara : http://www.pentaho.com + * Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -617,7 +617,6 @@ public Object clone() { public Object deepClone( boolean cloneUpdateFlag ) { DatabaseMeta databaseMeta = new DatabaseMeta(); databaseMeta.replaceMeta( this, cloneUpdateFlag ); - databaseMeta.setObjectId( null ); return databaseMeta; } @@ -1024,6 +1023,8 @@ public DatabaseMeta( Node con ) throws KettleXMLException { setReadOnly( Boolean.valueOf( XMLHandler.getTagValue( con, "read_only" ) ) ); + readObjectId( con ); + // Also, read the database attributes... Node attrsnode = XMLHandler.getSubNode( con, "attributes" ); if ( attrsnode != null ) { @@ -1075,6 +1076,7 @@ public String getXML() { retval.append( " " ).append( XMLHandler.addTagValue( "servername", getServername() ) ); retval.append( " " ).append( XMLHandler.addTagValue( "data_tablespace", getDataTablespace() ) ); retval.append( " " ).append( XMLHandler.addTagValue( "index_tablespace", getIndexTablespace() ) ); + appendObjectId( retval ); // only write the tag out if it is set to true if ( isReadOnly() ) { diff --git a/core/src/main/java/org/pentaho/di/repository/RepositoryElementInterface.java b/core/src/main/java/org/pentaho/di/repository/RepositoryElementInterface.java index 667c9947aac7..61d0a8e38384 100644 --- a/core/src/main/java/org/pentaho/di/repository/RepositoryElementInterface.java +++ b/core/src/main/java/org/pentaho/di/repository/RepositoryElementInterface.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com + * Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -22,6 +22,10 @@ package org.pentaho.di.repository; +import org.pentaho.di.core.xml.XMLHandler; +import org.pentaho.di.shared.SharedObjectInterface; +import org.w3c.dom.Node; + /** * A repository element is an object that can be saved or loaded from the repository. As such, we need to be able to * identify it. It needs a RepositoryDirectory, a name and an ID. @@ -102,4 +106,19 @@ public interface RepositoryElementInterface extends RepositoryObjectInterface { */ public void setObjectRevision( ObjectRevision objectRevision ); + default void appendObjectId( StringBuilder builder ) { + if ( getObjectId() != null ) { + builder.append( " " ).append( XMLHandler.addTagValue( SharedObjectInterface.OBJECT_ID, + getObjectId().toString() ) ); + } + } + + default void readObjectId( Node node ) { + String objectId = XMLHandler.getTagValue( node, SharedObjectInterface.OBJECT_ID ); + if ( objectId != null ) { + setObjectId( new StringObjectId( objectId ) ); + } + } + + } diff --git a/core/src/main/java/org/pentaho/di/shared/SharedObjectInterface.java b/core/src/main/java/org/pentaho/di/shared/SharedObjectInterface.java index 90d5aa9266bd..4ce1966755de 100644 --- a/core/src/main/java/org/pentaho/di/shared/SharedObjectInterface.java +++ b/core/src/main/java/org/pentaho/di/shared/SharedObjectInterface.java @@ -29,6 +29,8 @@ public interface SharedObjectInterface { + static final String OBJECT_ID = "object_id"; + /** * @deprecated * diff --git a/core/src/main/java/org/pentaho/di/shared/VfsSharedObjectsIO.java b/core/src/main/java/org/pentaho/di/shared/VfsSharedObjectsIO.java index e0e3f1a6eefe..ed6a8c1d9255 100644 --- a/core/src/main/java/org/pentaho/di/shared/VfsSharedObjectsIO.java +++ b/core/src/main/java/org/pentaho/di/shared/VfsSharedObjectsIO.java @@ -193,6 +193,16 @@ private static String getDefaultSharedObjectFileLocation() { public void saveSharedObject( String type, String name, Node node ) throws KettleException { // Get the map for the type Map nodeMap = getNodesMapForType( type ); + + // strip out any Object IDs + NodeList children = node.getChildNodes(); + for ( int i = 0; i < children.getLength(); i++ ) { + Node childNode = children.item( i ); + if ( childNode.getNodeName().equalsIgnoreCase( SharedObjectInterface.OBJECT_ID ) ) { + node.removeChild( childNode ); + } + } + // Add or Update the map entry for this name nodeMap.put( name, node ); diff --git a/engine/src/main/java/org/pentaho/di/cluster/ClusterSchema.java b/engine/src/main/java/org/pentaho/di/cluster/ClusterSchema.java index 0ed51dd8ad16..37514bc65f44 100644 --- a/engine/src/main/java/org/pentaho/di/cluster/ClusterSchema.java +++ b/engine/src/main/java/org/pentaho/di/cluster/ClusterSchema.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2018 by Hitachi Vantara : http://www.pentaho.com + * Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -169,6 +169,7 @@ public String getXML() { xml.append( " " ).append( XMLHandler.addTagValue( "sockets_flush_interval", socketsFlushInterval ) ); xml.append( " " ).append( XMLHandler.addTagValue( "sockets_compressed", socketsCompressed ) ); xml.append( " " ).append( XMLHandler.addTagValue( "dynamic", dynamic ) ); + appendObjectId( xml ); xml.append( " " ).append( XMLHandler.openTag( "slaveservers" ) ).append( Const.CR ); for ( int i = 0; i < slaveServers.size(); i++ ) { @@ -190,6 +191,7 @@ public ClusterSchema( Node clusterSchemaNode, List referenceSlaveSe socketsCompressed = "Y".equalsIgnoreCase( XMLHandler.getTagValue( clusterSchemaNode, "sockets_compressed" ) ); dynamic = "Y".equalsIgnoreCase( XMLHandler.getTagValue( clusterSchemaNode, "dynamic" ) ); + readObjectId( clusterSchemaNode ); Node slavesNode = XMLHandler.getSubNode( clusterSchemaNode, "slaveservers" ); int nrSlaves = XMLHandler.countNodes( slavesNode, "name" ); for ( int i = 0; i < nrSlaves; i++ ) { diff --git a/engine/src/main/java/org/pentaho/di/cluster/SlaveServer.java b/engine/src/main/java/org/pentaho/di/cluster/SlaveServer.java index f9f55f68de5c..101fe67075cc 100644 --- a/engine/src/main/java/org/pentaho/di/cluster/SlaveServer.java +++ b/engine/src/main/java/org/pentaho/di/cluster/SlaveServer.java @@ -246,6 +246,7 @@ public SlaveServer( Node slaveNode ) { this.master = "Y".equalsIgnoreCase( XMLHandler.getTagValue( slaveNode, "master" ) ); initializeVariablesFrom( null ); this.log = new LogChannel( this ); + readObjectId( slaveNode ); setSslMode( "Y".equalsIgnoreCase( XMLHandler.getTagValue( slaveNode, SSL_MODE_TAG ) ) ); Node sslConfig = XMLHandler.getSubNode( slaveNode, SslConfiguration.XML_TAG ); @@ -277,6 +278,7 @@ public String getXML() { xml.append( " " ).append( XMLHandler.addTagValue( "non_proxy_hosts", nonProxyHosts ) ); xml.append( " " ).append( XMLHandler.addTagValue( "master", master ) ); xml.append( " " ).append( XMLHandler.addTagValue( SSL_MODE_TAG, isSslMode(), false ) ); + appendObjectId( xml ); if ( sslConfig != null ) { xml.append( sslConfig.getXML() ); } diff --git a/engine/src/main/java/org/pentaho/di/partition/PartitionSchema.java b/engine/src/main/java/org/pentaho/di/partition/PartitionSchema.java index d2b479aa655a..623163935a9c 100644 --- a/engine/src/main/java/org/pentaho/di/partition/PartitionSchema.java +++ b/engine/src/main/java/org/pentaho/di/partition/PartitionSchema.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com + * Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -164,6 +164,7 @@ public String getXML() { xml .append( " " ).append( XMLHandler.addTagValue( "partitions_per_slave", numberOfPartitionsPerSlave ) ); + appendObjectId( xml ); xml.append( " " ).append( XMLHandler.closeTag( XML_TAG ) ).append( Const.CR ); return xml.toString(); @@ -179,6 +180,7 @@ public PartitionSchema( Node partitionSchemaNode ) { Node partitionNode = XMLHandler.getSubNodeByNr( partitionSchemaNode, "partition", i, false ); partitionIDs.add( XMLHandler.getTagValue( partitionNode, "id" ) ); } + readObjectId( partitionSchemaNode ); dynamicallyDefined = "Y".equalsIgnoreCase( XMLHandler.getTagValue( partitionSchemaNode, "dynamic" ) ); numberOfPartitionsPerSlave = XMLHandler.getTagValue( partitionSchemaNode, "partitions_per_slave" ); diff --git a/engine/src/main/java/org/pentaho/di/repository/filerep/KettleFileRepository.java b/engine/src/main/java/org/pentaho/di/repository/filerep/KettleFileRepository.java index 043268d9b62b..ade48fe468b0 100644 --- a/engine/src/main/java/org/pentaho/di/repository/filerep/KettleFileRepository.java +++ b/engine/src/main/java/org/pentaho/di/repository/filerep/KettleFileRepository.java @@ -958,7 +958,6 @@ public JobMeta loadJob( String jobname, RepositoryDirectoryInterface repdir, Pro jobMeta.setRepository( this ); jobMeta.setMetaStore( MetaStoreConst.getDefaultMetastore() ); - readDatabases( jobMeta, true ); jobMeta.clearChanged(); return jobMeta; @@ -1118,46 +1117,11 @@ public TransMeta loadTransformation( String transname, RepositoryDirectoryInterf transMeta.setName( transname ); transMeta.setObjectId( new StringObjectId( calcObjectId( repdir, transname, EXT_TRANSFORMATION ) ) ); - readDatabases( transMeta, true ); transMeta.clearChanged(); return transMeta; } - /** - * Read all the databases from the repository, insert into the has databases object, overwriting optionally - * - * @param TransMeta - * The transformation to load into. - * @param overWriteShared - * if an object with the same name exists, overwrite - * @throws KettleException - */ - public void readDatabases( AbstractMeta transMeta, boolean overWriteShared ) throws KettleException { - try { - ObjectId[] dbids = getDatabaseIDs( false ); - for ( int i = 0; i < dbids.length; i++ ) { - DatabaseMeta databaseMeta = loadDatabaseMeta( dbids[i], null ); // reads last version - if ( transMeta instanceof VariableSpace ) { - databaseMeta.shareVariablesWith( (VariableSpace) transMeta ); - } - - DatabaseMeta check = transMeta.findDatabase( databaseMeta.getName() ); // Check if there already is one in the - // transformation - if ( check == null || overWriteShared ) { // We only add, never overwrite database connections. - if ( databaseMeta.getName() != null ) { - transMeta.getDatabaseManagementInterface().add( databaseMeta ); - if ( !overWriteShared ) { - databaseMeta.setChanged( false ); - } - } - } - } - } catch ( KettleException e ) { - throw e; - } - } - public ValueMetaAndData loadValueMetaAndData( ObjectId id_value ) throws KettleException { return null; @@ -1191,49 +1155,12 @@ public void clearSharedObjectCache() { @Override public void readJobMetaSharedObjects( JobMeta jobMeta ) throws KettleException { - - // Then we read the databases etc... - // - for ( ObjectId id : getDatabaseIDs( false ) ) { - DatabaseMeta databaseMeta = loadDatabaseMeta( id, null ); // Load last version - databaseMeta.shareVariablesWith( jobMeta ); - jobMeta.getDatabaseManagementInterface().add( databaseMeta ); - } - - for ( ObjectId id : getSlaveIDs( false ) ) { - SlaveServer slaveServer = loadSlaveServer( id, null ); // Load last version - slaveServer.shareVariablesWith( jobMeta ); - jobMeta.addOrReplaceSlaveServer( slaveServer ); - } + // No-Op } @Override public void readTransSharedObjects( TransMeta transMeta ) throws KettleException { - - // Then we read the databases etc... - // - for ( ObjectId id : getDatabaseIDs( false ) ) { - DatabaseMeta databaseMeta = loadDatabaseMeta( id, null ); // Load last version - databaseMeta.shareVariablesWith( transMeta ); - transMeta.getDatabaseManagementInterface().add( databaseMeta ); - } - - for ( ObjectId id : getSlaveIDs( false ) ) { - SlaveServer slaveServer = loadSlaveServer( id, null ); // Load last version - slaveServer.shareVariablesWith( transMeta ); - transMeta.addOrReplaceSlaveServer( slaveServer ); - } - - for ( ObjectId id : getClusterIDs( false ) ) { - ClusterSchema clusterSchema = loadClusterSchema( id, transMeta.getSlaveServers(), null ); // Load last version - clusterSchema.shareVariablesWith( transMeta ); - transMeta.addOrReplaceClusterSchema( clusterSchema ); - } - - for ( ObjectId id : getPartitionSchemaIDs( false ) ) { - PartitionSchema partitionSchema = loadPartitionSchema( id, null ); // Load last version - transMeta.addOrReplacePartitionSchema( partitionSchema ); - } + // No-Op } private ObjectId renameObject( ObjectId id, RepositoryDirectoryInterface newDirectory, String newName, diff --git a/engine/src/main/java/org/pentaho/di/shared/RepositorySharedObjectsIO.java b/engine/src/main/java/org/pentaho/di/shared/RepositorySharedObjectsIO.java new file mode 100644 index 000000000000..11fd67d1ed22 --- /dev/null +++ b/engine/src/main/java/org/pentaho/di/shared/RepositorySharedObjectsIO.java @@ -0,0 +1,239 @@ +/*! + * Copyright 2024 Hitachi Vantara. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.pentaho.di.shared; + +import org.pentaho.di.cluster.ClusterSchema; +import org.pentaho.di.cluster.ClusterSchemaManagementInterface.SlaveServersSupplier; +import org.pentaho.di.cluster.SlaveServer; +import org.pentaho.di.core.Const; +import org.pentaho.di.core.database.DatabaseMeta; +import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.partition.PartitionSchema; +import org.pentaho.di.repository.ObjectId; +import org.pentaho.di.repository.Repository; +import org.pentaho.di.repository.RepositoryElementInterface; +import org.pentaho.di.repository.RepositoryExtended; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.w3c.dom.Node; + +/** + * An implementation of SharedObjectsIO that backs to a Repository. + *

+ * This class does not cache anything. Note that PurRepository does its own caching, but only through the + * RepositoryExtended interface, which this class makes use of. + * + */ +public class RepositorySharedObjectsIO implements SharedObjectsIO { + + private final Repository repository; + private final SlaveServersSupplier slaveServerSupplier; + + public RepositorySharedObjectsIO( Repository repository, SlaveServersSupplier slaveServerSupplier ) { + this.repository = Objects.requireNonNull( repository ); + this.slaveServerSupplier = slaveServerSupplier; + } + + @Override + public Map getSharedObjects( String type ) throws KettleException { + SharedObjectType objectType = SharedObjectType.valueOf( type.toUpperCase() ); + List objects = null; + if ( repository instanceof RepositoryExtended ) { + // use the methods that support caching + RepositoryExtended extended = (RepositoryExtended) repository; + switch ( objectType ) { + case CONNECTION: + objects = extended.getConnections( true ); + break; + case SLAVESERVER: + objects = extended.getSlaveServers( true ); + break; + case PARTITIONSCHEMA: + objects = extended.getPartitions( true ); + break; + case CLUSTERSCHEMA: + objects = extended.getClusters( true ); + break; + } + } else { + switch ( objectType ) { + case CONNECTION: + objects = repository.readDatabases(); + break; + case SLAVESERVER: + objects = repository.getSlaveServers(); + break; + case PARTITIONSCHEMA: + ObjectId[] psids = repository.getPartitionSchemaIDs( false ); + List pss = new ArrayList(); + if ( psids != null ) { + for ( ObjectId id : psids ) { + pss.add( repository.loadPartitionSchema( id, null ) ); + } + } + objects = pss; + break; + case CLUSTERSCHEMA: + ObjectId[] csids = repository.getClusterIDs( false ); + List css = new ArrayList(); + if ( csids != null ) { + List sss = slaveServerSupplier.get(); + for ( ObjectId id : csids ) { + css.add( repository.loadClusterSchema( id, sss, null ) ); + } + } + objects = css; + break; + } + } + if ( objects != null ) { + Map result = new HashMap<>(); + for ( SharedObjectInterface object : objects ) { + result.put( object.getName(), object.toNode() ); + } + return result; + } + return Collections.emptyMap(); + } + + @Override + public Node getSharedObject( String type, String name ) throws KettleException { + SharedObjectInterface object = null; + ObjectId id = null; + SharedObjectType objectType = SharedObjectType.valueOf( type.toUpperCase() ); + switch ( objectType ) { + case CONNECTION: + id = repository.getDatabaseID( name ); + if ( id != null ) { + object = repository.loadDatabaseMeta( id, null ); + } + break; + case SLAVESERVER: + id = repository.getSlaveID( name ); + if ( id != null ) { + object = repository.loadSlaveServer( id, null ); + } + break; + case PARTITIONSCHEMA: + id = repository.getPartitionSchemaID( name ); + if ( id != null ) { + object = repository.loadPartitionSchema( id, null ); + } + break; + case CLUSTERSCHEMA: + id = repository.getClusterID( name ); + if ( id != null ) { + object = repository.loadClusterSchema( id, slaveServerSupplier.get(), null ); + } + break; + } + if ( object != null ) { + return object.toNode(); + } + return null; + } + + @Override + public void clear( String type ) throws KettleException { + SharedObjectType objectType = SharedObjectType.valueOf( type.toUpperCase() ); + ObjectId[] ids = null; + switch ( objectType ) { + case CONNECTION: + String[] names = repository.getDatabaseNames( false ); + for ( String name : names ) { + repository.deleteDatabaseMeta( name ); + } + break; + case SLAVESERVER: + ids = repository.getSlaveIDs( false ); + for ( ObjectId id : ids ) { + repository.deleteSlave( id ); + } + break; + case PARTITIONSCHEMA: + ids = repository.getPartitionSchemaIDs( false ); + for ( ObjectId id : ids ) { + repository.deletePartitionSchema( id ); + } + break; + case CLUSTERSCHEMA: + ids = repository.getClusterIDs( false ); + for ( ObjectId id : ids ) { + repository.deleteClusterSchema( id ); + } + break; + } + } + + @Override + public void delete( String type, String name ) throws KettleException { + SharedObjectType objectType = SharedObjectType.valueOf( type.toUpperCase() ); + ObjectId id; + switch ( objectType ) { + case CONNECTION: + repository.deleteDatabaseMeta( name ); + break; + case SLAVESERVER: + id = repository.getSlaveID( name ); + if ( id != null ) { + repository.deleteSlave( id ); + } + break; + case PARTITIONSCHEMA: + id = repository.getPartitionSchemaID( name ); + if ( id != null ) { + repository.deletePartitionSchema( id ); + } + break; + case CLUSTERSCHEMA: + id = repository.getClusterID( name ); + if ( id != null ) { + repository.deleteClusterSchema( id ); + } + break; + } + } + + @Override + public void saveSharedObject( String type, String name, Node node ) throws KettleException { + RepositoryElementInterface repoElement = null; + SharedObjectType objectType = SharedObjectType.valueOf( type.toUpperCase() ); + switch ( objectType ) { + case CONNECTION: + repoElement = new DatabaseMeta( node ); + break; + case SLAVESERVER: + repoElement = new SlaveServer( node ); + break; + case PARTITIONSCHEMA: + repoElement = new PartitionSchema( node ); + break; + case CLUSTERSCHEMA: + repoElement = new ClusterSchema( node, slaveServerSupplier.get() ); + break; + } + if ( repoElement != null ) { + repository.save( repoElement, Const.VERSION_COMMENT_EDIT_VERSION, null ); + } + } +} + diff --git a/engine/src/main/java/org/pentaho/di/trans/TransMeta.java b/engine/src/main/java/org/pentaho/di/trans/TransMeta.java index 61d69e7dbcf2..cc78a83a0a79 100644 --- a/engine/src/main/java/org/pentaho/di/trans/TransMeta.java +++ b/engine/src/main/java/org/pentaho/di/trans/TransMeta.java @@ -732,6 +732,7 @@ public void clear() { log = LogChannel.GENERAL; } + @Override protected void initializeNonLocalSharedObjects() { super.initializeNonLocalSharedObjects(); localClusterSchemaManager = diff --git a/engine/src/test/java/org/pentaho/di/shared/RepositorySharedObjectsIOTest.java b/engine/src/test/java/org/pentaho/di/shared/RepositorySharedObjectsIOTest.java new file mode 100644 index 000000000000..024d3d9304d9 --- /dev/null +++ b/engine/src/test/java/org/pentaho/di/shared/RepositorySharedObjectsIOTest.java @@ -0,0 +1,248 @@ +/*! + * Copyright 2024 Hitachi Vantara. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.pentaho.di.shared; + +import org.pentaho.di.cluster.ClusterSchema; +import org.pentaho.di.cluster.SlaveServer; +import org.pentaho.di.core.database.DatabaseMeta; +import org.pentaho.di.partition.PartitionSchema; +import org.pentaho.di.repository.Repository; +import org.pentaho.di.trans.steps.loadsave.MemoryRepository; +import org.pentaho.di.trans.steps.loadsave.MemoryRepositoryExtended; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.w3c.dom.Node; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +@RunWith( Parameterized.class ) +public class RepositorySharedObjectsIOTest { + + private MemoryRepository rep; + private RepositorySharedObjectsIO shared; + + public RepositorySharedObjectsIOTest( MemoryRepository rep ) { + this.rep = rep; + } + + /** + * Test both Repository APIs + */ + @Parameterized.Parameters + public static List repositories() { + ArrayList reps = new ArrayList<>(); + reps.add( new Object[] { new MemoryRepository() } ); + reps.add( new Object[] { new MemoryRepositoryExtended() } ); + return reps; + } + + @Before + public void setup() throws Exception { + shared = new RepositorySharedObjectsIO( rep, () -> Collections.emptyList() ); + } + + @Test + public void testSave() throws Exception { + DatabaseMeta db = new DatabaseMeta(); + db.setName( "foo" ); + db.setDBName( "bar" ); + + shared.saveSharedObject( SharedObjectsIO.SharedObjectType.CONNECTION.getName(), db.getName(), db.toNode() ); + + Node node = shared.getSharedObject( SharedObjectsIO.SharedObjectType.CONNECTION.getName(), db.getName() ); + assertNotNull( node ); + DatabaseMeta readDb = new DatabaseMeta( node ); + assertNotNull( readDb.getObjectId() ); + + Map dbs = shared.getSharedObjects( SharedObjectsIO.SharedObjectType.CONNECTION.getName() ); + assertNotNull( dbs ); + assertEquals( 1, dbs.size() ); + assertNotNull( dbs.get( "foo" ) ); + readDb = new DatabaseMeta( dbs.get( "foo" ) ); + assertNotNull( readDb.getObjectId() ); + } + + @Test + public void testUpdate() throws Exception { + DatabaseMeta db = new DatabaseMeta(); + db.setName( "foo" ); + db.setDBName( "bar" ); + + shared.saveSharedObject( SharedObjectsIO.SharedObjectType.CONNECTION.getName(), db.getName(), db.toNode() ); + + Map dbs = shared.getSharedObjects( SharedObjectsIO.SharedObjectType.CONNECTION.getName() ); + assertEquals( 1, dbs.size() ); + Node createdNode = shared.getSharedObject( SharedObjectsIO.SharedObjectType.CONNECTION.getName(), db.getName() ); + assertNotNull( createdNode ); + + DatabaseMeta afterCreate = new DatabaseMeta( createdNode ); + + db.setServername( "testing" ); + + // should replace the existing item. + shared.saveSharedObject( SharedObjectsIO.SharedObjectType.CONNECTION.getName(), db.getName(), db.toNode() ); + + dbs = shared.getSharedObjects( SharedObjectsIO.SharedObjectType.CONNECTION.getName() ); + assertEquals( 1, dbs.size() ); + Node updatedNode = shared.getSharedObject( SharedObjectsIO.SharedObjectType.CONNECTION.getName(), db.getName() ); + assertNotNull( updatedNode ); + + DatabaseMeta afterUpdate = new DatabaseMeta( updatedNode ); + + assertEquals( db.getDescription(), afterUpdate.getDescription() ); + + assertNull( afterCreate.getServername() ); + assertEquals( "testing", afterUpdate.getServername() ); + } + + @Test + public void testDelete() throws Exception { + DatabaseMeta db = new DatabaseMeta(); + db.setName( "foo" ); + db.setDBName( "bar" ); + + shared.saveSharedObject( SharedObjectsIO.SharedObjectType.CONNECTION.getName(), db.getName(), db.toNode() ); + + Node node = shared.getSharedObject( SharedObjectsIO.SharedObjectType.CONNECTION.getName(), db.getName() ); + assertNotNull( node ); + + Map dbs = shared.getSharedObjects( SharedObjectsIO.SharedObjectType.CONNECTION.getName() ); + assertNotNull( dbs ); + assertEquals( 1, dbs.size() ); + assertNotNull( dbs.get( "foo" ) ); + + shared.delete( SharedObjectsIO.SharedObjectType.CONNECTION.getName(), "foo" ); + + node = shared.getSharedObject( SharedObjectsIO.SharedObjectType.CONNECTION.getName(), db.getName() ); + assertNull( node ); + + dbs = shared.getSharedObjects( SharedObjectsIO.SharedObjectType.CONNECTION.getName() ); + assertNotNull( dbs ); + assertEquals( 0, dbs.size() ); + } + + @Test + public void testOtherTypes() throws Exception { + //test the non-db types. + SlaveServer slaveServer = new SlaveServer(); + slaveServer.setName( "slave 1" ); + + PartitionSchema partitionSchema = new PartitionSchema(); + partitionSchema.setName( "pschema 1" ); + + ClusterSchema clusterSchema = new ClusterSchema(); + clusterSchema.setName( "Cluster Schema 1" ); + clusterSchema.setSlaveServers( Collections.singletonList( slaveServer ) ); + + shared = new RepositorySharedObjectsIO( rep, () -> Collections.singletonList( slaveServer ) ); + shared.saveSharedObject( SharedObjectsIO.SharedObjectType.SLAVESERVER.getName(), slaveServer.getName(), + slaveServer.toNode() ); + shared.saveSharedObject( SharedObjectsIO.SharedObjectType.PARTITIONSCHEMA.getName(), partitionSchema.getName(), + partitionSchema.toNode() ); + shared.saveSharedObject( SharedObjectsIO.SharedObjectType.CLUSTERSCHEMA.getName(), clusterSchema.getName(), + clusterSchema.toNode() ); + + SlaveServer readSlaveServer = new SlaveServer( shared.getSharedObject( + SharedObjectsIO.SharedObjectType.SLAVESERVER.getName(), slaveServer.getName() ) ); + + PartitionSchema readPartitionSchema = new PartitionSchema( shared.getSharedObject( + SharedObjectsIO.SharedObjectType.PARTITIONSCHEMA.getName(), partitionSchema.getName() ) ); + + ClusterSchema readClusterSchema = new ClusterSchema( shared.getSharedObject( SharedObjectsIO.SharedObjectType.CLUSTERSCHEMA.getName(), clusterSchema.getName() ), Collections.singletonList( slaveServer ) ); + + assertNotNull( readSlaveServer ); + assertNotNull( readSlaveServer.getObjectId() ); + assertNotNull( readPartitionSchema ); + assertNotNull( readPartitionSchema.getObjectId() ); + assertNotNull( readClusterSchema ); + assertEquals( 1, readClusterSchema.getSlaveServers().size() ); + assertNotNull( readClusterSchema.getObjectId() ); + + Map readObjects = shared.getSharedObjects( SharedObjectsIO.SharedObjectType.SLAVESERVER.getName() ); + assertEquals( 1, readObjects.size() ); + readSlaveServer = new SlaveServer( readObjects.get( slaveServer.getName() ) ); + assertNotNull( readSlaveServer ); + assertNotNull( readSlaveServer.getObjectId() ); + + readObjects = shared.getSharedObjects( SharedObjectsIO.SharedObjectType.PARTITIONSCHEMA.getName() ); + assertEquals( 1, readObjects.size() ); + readPartitionSchema = new PartitionSchema( readObjects.get( partitionSchema.getName() ) ); + assertNotNull( readPartitionSchema ); + assertNotNull( readPartitionSchema.getObjectId() ); + + readObjects = shared.getSharedObjects( SharedObjectsIO.SharedObjectType.CLUSTERSCHEMA.getName() ); + assertEquals( 1, readObjects.size() ); + readClusterSchema = new ClusterSchema( readObjects.get( clusterSchema.getName() ), + Collections.singletonList( slaveServer ) ); + assertNotNull( readClusterSchema ); + assertNotNull( readClusterSchema.getObjectId() ); + + shared.delete( SharedObjectsIO.SharedObjectType.SLAVESERVER.getName(), slaveServer.getName() ); + shared.delete( SharedObjectsIO.SharedObjectType.PARTITIONSCHEMA.getName(), partitionSchema.getName() ); + shared.delete( SharedObjectsIO.SharedObjectType.CLUSTERSCHEMA.getName(), clusterSchema.getName() ); + + assertEquals( 0, shared.getSharedObjects( SharedObjectsIO.SharedObjectType.SLAVESERVER.getName() ).size() ); + assertEquals( 0, shared.getSharedObjects( SharedObjectsIO.SharedObjectType.PARTITIONSCHEMA.getName() ).size() ); + assertEquals( 0, shared.getSharedObjects( SharedObjectsIO.SharedObjectType.CLUSTERSCHEMA.getName() ).size() ); + } + + @Test + public void testClear() throws Exception { + DatabaseMeta db = new DatabaseMeta(); + db.setName( "foo" ); + db.setDBName( "bar" ); + + SlaveServer slaveServer = new SlaveServer(); + slaveServer.setName( "slave 1" ); + + PartitionSchema partitionSchema = new PartitionSchema(); + partitionSchema.setName( "pschema 1" ); + + ClusterSchema clusterSchema = new ClusterSchema(); + clusterSchema.setName( "Cluster Schema 1" ); + clusterSchema.setSlaveServers( Collections.singletonList( slaveServer ) ); + + shared = new RepositorySharedObjectsIO( rep, () -> Collections.singletonList( slaveServer ) ); + shared.saveSharedObject( SharedObjectsIO.SharedObjectType.CONNECTION.getName(), db.getName(), db.toNode() ); + shared.saveSharedObject( SharedObjectsIO.SharedObjectType.SLAVESERVER.getName(), slaveServer.getName(), + slaveServer.toNode() ); + shared.saveSharedObject( SharedObjectsIO.SharedObjectType.PARTITIONSCHEMA.getName(), partitionSchema.getName(), + partitionSchema.toNode() ); + shared.saveSharedObject( SharedObjectsIO.SharedObjectType.CLUSTERSCHEMA.getName(), clusterSchema.getName(), + clusterSchema.toNode() ); + + shared.clear( SharedObjectsIO.SharedObjectType.CONNECTION.getName() ); + shared.clear( SharedObjectsIO.SharedObjectType.SLAVESERVER.getName() ); + shared.clear( SharedObjectsIO.SharedObjectType.PARTITIONSCHEMA.getName() ); + shared.clear( SharedObjectsIO.SharedObjectType.CLUSTERSCHEMA.getName() ); + + assertEquals( 0, shared.getSharedObjects( SharedObjectsIO.SharedObjectType.CONNECTION.getName() ).size() ); + assertEquals( 0, shared.getSharedObjects( SharedObjectsIO.SharedObjectType.SLAVESERVER.getName() ).size() ); + assertEquals( 0, shared.getSharedObjects( SharedObjectsIO.SharedObjectType.PARTITIONSCHEMA.getName() ).size() ); + assertEquals( 0, shared.getSharedObjects( SharedObjectsIO.SharedObjectType.CLUSTERSCHEMA.getName() ).size() ); + } + +} diff --git a/engine/src/test/java/org/pentaho/di/trans/steps/loadsave/MemoryRepository.java b/engine/src/test/java/org/pentaho/di/trans/steps/loadsave/MemoryRepository.java index 0b68b7b34ab0..7089334985bd 100644 --- a/engine/src/test/java/org/pentaho/di/trans/steps/loadsave/MemoryRepository.java +++ b/engine/src/test/java/org/pentaho/di/trans/steps/loadsave/MemoryRepository.java @@ -22,23 +22,14 @@ package org.pentaho.di.trans.steps.loadsave; -import java.util.Calendar; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.json.simple.parser.ParseException; import org.pentaho.di.cluster.ClusterSchema; import org.pentaho.di.cluster.SlaveServer; import org.pentaho.di.core.Condition; -import org.pentaho.di.core.ProgressMonitorListener; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleSecurityException; import org.pentaho.di.core.logging.LogChannelInterface; +import org.pentaho.di.core.ProgressMonitorListener; import org.pentaho.di.job.JobMeta; import org.pentaho.di.partition.PartitionSchema; import org.pentaho.di.repository.AbstractRepository; @@ -53,6 +44,7 @@ import org.pentaho.di.repository.RepositoryElementMetaInterface; import org.pentaho.di.repository.RepositoryMeta; import org.pentaho.di.repository.RepositoryObject; +import org.pentaho.di.repository.RepositoryObjectInterface; import org.pentaho.di.repository.RepositoryObjectType; import org.pentaho.di.repository.RepositorySecurityManager; import org.pentaho.di.repository.RepositorySecurityProvider; @@ -61,11 +53,24 @@ import org.pentaho.di.trans.TransMeta; import org.pentaho.metastore.api.IMetaStore; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import java.util.UUID; + +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; + public class MemoryRepository extends AbstractRepository { - private final Map>> stepAttributeMap = + protected final Map>> stepAttributeMap = new HashMap>>(); - private final Map>> jobAttributeMap = + protected final Map>> jobAttributeMap = new HashMap>>(); + protected final Map elements = new HashMap<>(); public MemoryRepository() { @@ -228,17 +233,33 @@ public ObjectId getJobId( String name, RepositoryDirectoryInterface repositoryDi @Override public void save( RepositoryElementInterface repositoryElement, String versionComment, ProgressMonitorListener monitor, boolean overwrite ) throws KettleException { - // TODO Auto-generated method stub - + save( repositoryElement ); } @Override public void save( RepositoryElementInterface repositoryElement, String versionComment, Calendar versionDate, ProgressMonitorListener monitor, boolean overwrite ) throws KettleException { - // TODO Auto-generated method stub + save( repositoryElement ); + } + private void save( RepositoryElementInterface repositoryElement ) { + if ( repositoryElement.getObjectId() == null ) { + RepositoryElementInterface existingElement = findExisting( repositoryElement ); + if ( existingElement != null ) { + repositoryElement.setObjectId( existingElement.getObjectId() ); + } else { + repositoryElement.setObjectId( new StringObjectId( UUID.randomUUID().toString() ) ); + } + } + elements.put( repositoryElement.getObjectId(), repositoryElement ); } + private RepositoryElementInterface findExisting( RepositoryElementInterface candidate ) { + return elements.values().stream() + .filter( e -> e.getClass().equals( candidate.getClass() ) && candidate.getName().equals( e.getName() ) ) + .findAny().orElse( null ); + } + @Override public RepositoryDirectoryInterface getDefaultSaveDirectory( RepositoryElementInterface repositoryElement ) throws KettleException { @@ -334,136 +355,180 @@ public void deleteJob( ObjectId id_job ) throws KettleException { @Override public DatabaseMeta loadDatabaseMeta( ObjectId id_database, String revision ) throws KettleException { - // TODO Auto-generated method stub + RepositoryElementInterface element = elements.get( id_database ); + if ( element instanceof DatabaseMeta ) { + return (DatabaseMeta) element; + } return null; } @Override public void deleteDatabaseMeta( String databaseName ) throws KettleException { - // TODO Auto-generated method stub - + ObjectId id = getDatabaseID( databaseName ); + if ( id != null ) { + elements.remove( id ); + } } @Override public ObjectId[] getDatabaseIDs( boolean includeDeleted ) throws KettleException { - // TODO Auto-generated method stub - return null; + return elements.values().stream() + .filter( e -> e instanceof DatabaseMeta ) + .map( RepositoryElementInterface::getObjectId ) + .toArray( ObjectId[]::new ); } @Override public String[] getDatabaseNames( boolean includeDeleted ) throws KettleException { - // TODO Auto-generated method stub - return null; + return elements.values().stream() + .filter( e -> e instanceof DatabaseMeta ) + .map( RepositoryElementInterface::getName ) + .toArray( String[]::new ); } @Override public List readDatabases() throws KettleException { - // TODO Auto-generated method stub - return null; + return elements.values().stream().filter( e -> e instanceof DatabaseMeta ) + .map( e -> (DatabaseMeta) e ).collect( Collectors.toList() ); } @Override public ObjectId getDatabaseID( String name ) throws KettleException { - // TODO Auto-generated method stub - return null; + return elements.values().stream() + .filter( e -> e instanceof DatabaseMeta && name.equals( e.getName() ) ) + .map( RepositoryElementInterface::getObjectId ) + .findAny().orElse( null ); } @Override public ClusterSchema loadClusterSchema( ObjectId id_cluster_schema, List slaveServers, String versionLabel ) throws KettleException { - // TODO Auto-generated method stub + RepositoryElementInterface element = elements.get( id_cluster_schema ); + if ( element instanceof ClusterSchema ) { + return (ClusterSchema) element; + } return null; } @Override public ObjectId[] getClusterIDs( boolean includeDeleted ) throws KettleException { - // TODO Auto-generated method stub - return null; + return elements.values().stream() + .filter( e -> e instanceof ClusterSchema ) + .map( RepositoryElementInterface::getObjectId ) + .toArray( ObjectId[]::new ); } @Override public String[] getClusterNames( boolean includeDeleted ) throws KettleException { - // TODO Auto-generated method stub - return null; + return elements.values().stream() + .filter( e -> e instanceof ClusterSchema ) + .map( RepositoryElementInterface::getName ) + .toArray( String[]::new ); } @Override public ObjectId getClusterID( String name ) throws KettleException { - // TODO Auto-generated method stub - return null; + return elements.values().stream() + .filter( e -> e instanceof ClusterSchema && name.equals( e.getName() ) ) + .map( RepositoryElementInterface::getObjectId ) + .findAny().orElse( null ); } @Override public void deleteClusterSchema( ObjectId id_cluster ) throws KettleException { - // TODO Auto-generated method stub - + Object value = elements.get( id_cluster ); + if ( value instanceof ClusterSchema ) { + elements.remove( id_cluster ); + } } @Override public SlaveServer loadSlaveServer( ObjectId id_slave_server, String versionLabel ) throws KettleException { - // TODO Auto-generated method stub + RepositoryElementInterface element = elements.get( id_slave_server ); + if ( element instanceof SlaveServer ) { + return (SlaveServer) element; + } return null; } @Override public ObjectId[] getSlaveIDs( boolean includeDeleted ) throws KettleException { - // TODO Auto-generated method stub - return null; + return elements.values().stream() + .filter( e -> e instanceof SlaveServer ) + .map( RepositoryElementInterface::getObjectId ) + .toArray( ObjectId[]::new ); } @Override public String[] getSlaveNames( boolean includeDeleted ) throws KettleException { - // TODO Auto-generated method stub - return null; + return elements.values().stream() + .filter( e -> e instanceof SlaveServer ) + .map( RepositoryElementInterface::getName ) + .toArray( String[]::new ); } @Override public List getSlaveServers() throws KettleException { - // TODO Auto-generated method stub - return null; + return elements.values().stream().filter( e -> e instanceof SlaveServer ) + .map( e -> (SlaveServer) e ).collect( Collectors.toList() ); } @Override public ObjectId getSlaveID( String name ) throws KettleException { - // TODO Auto-generated method stub - return null; + return elements.values().stream() + .filter( e -> e instanceof SlaveServer && name.equals( e.getName() ) ) + .map( RepositoryElementInterface::getObjectId ) + .findAny().orElse( null ); } @Override public void deleteSlave( ObjectId id_slave ) throws KettleException { - // TODO Auto-generated method stub - + Object value = elements.get( id_slave ); + if ( value instanceof SlaveServer ) { + elements.remove( id_slave ); + } } @Override public PartitionSchema loadPartitionSchema( ObjectId id_partition_schema, String versionLabel ) throws KettleException { - // TODO Auto-generated method stub + RepositoryElementInterface element = elements.get( id_partition_schema ); + if ( element instanceof PartitionSchema ) { + return (PartitionSchema) element; + } return null; } @Override public ObjectId[] getPartitionSchemaIDs( boolean includeDeleted ) throws KettleException { - // TODO Auto-generated method stub - return null; + return elements.values().stream() + .filter( e -> e instanceof PartitionSchema ) + .map( RepositoryElementInterface::getObjectId ) + .toArray( ObjectId[]::new ); } @Override public String[] getPartitionSchemaNames( boolean includeDeleted ) throws KettleException { - // TODO Auto-generated method stub - return null; + return elements.values().stream() + .filter( e -> e instanceof PartitionSchema ) + .map( RepositoryElementInterface::getName ) + .toArray( String[]::new ); } @Override public ObjectId getPartitionSchemaID( String name ) throws KettleException { - // TODO Auto-generated method stub - return null; + return elements.values().stream() + .filter( e -> e instanceof PartitionSchema && name.equals( e.getName() ) ) + .map( RepositoryElementInterface::getObjectId ) + .findAny().orElse( null ); } @Override public void deletePartitionSchema( ObjectId id_partition_schema ) throws KettleException { - // TODO Auto-generated method stub - + Object value = elements.get( id_partition_schema ); + if ( value instanceof PartitionSchema ) { + elements.remove( id_partition_schema ); + } } @Override @@ -805,4 +870,5 @@ public boolean getJobEntryAttributeBoolean( ObjectId id_jobentry, int nr, String throws KettleException { return "Y".equalsIgnoreCase( getJobAttribute( id_jobentry, nr, code, def ? "Y" : "N" ) ); } + } diff --git a/engine/src/test/java/org/pentaho/di/trans/steps/loadsave/MemoryRepositoryExtended.java b/engine/src/test/java/org/pentaho/di/trans/steps/loadsave/MemoryRepositoryExtended.java new file mode 100644 index 000000000000..f1b9be86edf2 --- /dev/null +++ b/engine/src/test/java/org/pentaho/di/trans/steps/loadsave/MemoryRepositoryExtended.java @@ -0,0 +1,112 @@ +/*! ****************************************************************************** + * + * Pentaho Data Integration + * + * Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com + * + ******************************************************************************* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package org.pentaho.di.trans.steps.loadsave; + +import org.pentaho.di.cluster.ClusterSchema; +import org.pentaho.di.cluster.SlaveServer; +import org.pentaho.di.core.database.DatabaseMeta; +import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.partition.PartitionSchema; +import org.pentaho.di.repository.ObjectId; +import org.pentaho.di.repository.RepositoryDirectoryInterface; +import org.pentaho.di.repository.RepositoryExtended; +import org.pentaho.di.repository.RepositoryObjectInterface; + +import java.util.List; +import java.util.stream.Collectors; +import org.json.simple.parser.ParseException; + +/** + * Used for testing Extended Repository methods + * + */ +public class MemoryRepositoryExtended extends MemoryRepository implements RepositoryExtended { + public MemoryRepositoryExtended() { + super(); + } + + public MemoryRepositoryExtended( String json ) throws ParseException { + super( json ); + } + + @Override + @Deprecated + public RepositoryDirectoryInterface loadRepositoryDirectoryTree( boolean eager ) throws KettleException { + return null; + } + + @Override + public RepositoryDirectoryInterface loadRepositoryDirectoryTree( + String path, + String filter, + int depth, + boolean showHidden, + boolean includeEmptyFolder, + boolean includeAcls ) + throws KettleException { + return null; + } + + @Override + public ObjectId renameRepositoryDirectory( final ObjectId dirId, final RepositoryDirectoryInterface newParent, + final String newName, final boolean renameHomeDirectories ) + throws KettleException { + return null; + } + + @Override + public void deleteRepositoryDirectory( final RepositoryDirectoryInterface dir, final boolean deleteHomeDirectories ) + throws KettleException { + } + + @Override + public List getChildren( String path, String filter ) { + return null; + } + + @Override + public List getConnections( boolean cached ) throws KettleException { + return elements.values().stream().filter( e -> e instanceof DatabaseMeta ) + .map( e -> (DatabaseMeta) e ).collect( Collectors.toList() ); + } + + @Override + public List getSlaveServers( boolean cached ) throws KettleException { + return elements.values().stream().filter( e -> e instanceof SlaveServer ) + .map( e -> (SlaveServer) e ).collect( Collectors.toList() ); + } + + @Override + public List getPartitions( boolean cached ) throws KettleException { + return elements.values().stream().filter( e -> e instanceof PartitionSchema ) + .map( e -> (PartitionSchema) e ).collect( Collectors.toList() ); + } + + @Override + public List getClusters( boolean cached ) throws KettleException { + return elements.values().stream().filter( e -> e instanceof ClusterSchema ) + .map( e -> (ClusterSchema) e ).collect( Collectors.toList() ); + } + + +} diff --git a/plugins/file-open-save-new/core/src/main/java/org/pentaho/di/plugins/fileopensave/providers/repository/model/RepositoryDirectory.java b/plugins/file-open-save-new/core/src/main/java/org/pentaho/di/plugins/fileopensave/providers/repository/model/RepositoryDirectory.java index 16bb68ae1e7e..ef63142b0991 100644 --- a/plugins/file-open-save-new/core/src/main/java/org/pentaho/di/plugins/fileopensave/providers/repository/model/RepositoryDirectory.java +++ b/plugins/file-open-save-new/core/src/main/java/org/pentaho/di/plugins/fileopensave/providers/repository/model/RepositoryDirectory.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2017-2023 by Hitachi Vantara : http://www.pentaho.com + * Copyright (C) 2017-2024 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -27,6 +27,8 @@ import org.pentaho.di.plugins.fileopensave.api.providers.Directory; import org.pentaho.di.plugins.fileopensave.providers.repository.RepositoryFileProvider; +import java.util.Objects; + /** * Created by bmorrise on 5/16/17. */ @@ -51,6 +53,11 @@ public static RepositoryDirectory build( String parentPath, repositoryDirectory.setParent( parentPath ); repositoryDirectory.setName( repositoryDirectoryInterface.getName() ); repositoryDirectory.setPath( parentPath == null ? repositoryDirectoryInterface.getPath() : parentPath + "/" + repositoryDirectoryInterface.getName() ); + if ( Objects.equals( parentPath, "/" ) || parentPath == null ) { + repositoryDirectory.setPath( repositoryDirectoryInterface.getPath() ); + } else { + repositoryDirectory.setPath( parentPath + "/" + repositoryDirectoryInterface.getName() ); + } repositoryDirectory.setObjectId( repositoryDirectoryInterface.getObjectId().getId() ); repositoryDirectory.setHidden( !repositoryDirectoryInterface.isVisible() ); repositoryDirectory.setRoot( RepositoryFileProvider.NAME ); diff --git a/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/ISharedObjectsTransformer.java b/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/ISharedObjectsTransformer.java index 063c8193fa37..f5bb3a151cf3 100644 --- a/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/ISharedObjectsTransformer.java +++ b/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/ISharedObjectsTransformer.java @@ -26,10 +26,21 @@ import org.pentaho.di.shared.SharedObjects; public interface ISharedObjectsTransformer extends ITransformer { + + /** + * @deprecated Shared Objects from the Repository are now accessed through a Bowl, they are no longer loaded into or + * saved from the Meta. + */ + @Deprecated void loadSharedObjects( final RepositoryElementInterface element, final Map> sharedObjectsByType ) throws KettleException; + /** + * @deprecated Shared Objects from the Repository are now accessed through a Bowl, they are no longer loaded into or + * saved from the Meta. + */ + @Deprecated void saveSharedObjects( final RepositoryElementInterface element, final String versionComment ) throws KettleException; } diff --git a/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/JobDelegate.java b/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/JobDelegate.java index 55496a8270c5..55ac5c948a1f 100644 --- a/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/JobDelegate.java +++ b/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/JobDelegate.java @@ -180,43 +180,18 @@ public JobDelegate( final Repository repo, final IUnifiedRepository pur ) { // ~ Methods ========================================================================================================= @SuppressWarnings( "unchecked" ) @Override + @Deprecated // Shared Object reads should now go through a SharedObjectsIO public void loadSharedObjects( final RepositoryElementInterface element, final Map> sharedObjectsByType ) throws KettleException { - JobMeta jobMeta = (JobMeta) element; - - // Repository objects take priority so let's overwrite them... - // - readDatabases( jobMeta, true, (List) sharedObjectsByType.get( RepositoryObjectType.DATABASE ) ); - readSlaves( jobMeta, true, (List) sharedObjectsByType.get( RepositoryObjectType.SLAVE_SERVER ) ); + // NO-OP } + @Override + @Deprecated // Shared Object writes should now go through a SharedObjectsIO public void saveSharedObjects( final RepositoryElementInterface element, final String versionComment ) throws KettleException { - JobMeta jobMeta = (JobMeta) element; - // Now store the databases in the job. - // Only store if the database has actually changed or doesn't have an object ID (imported) - // - for ( DatabaseMeta databaseMeta : jobMeta.getDatabases() ) { - if ( databaseMeta.hasChanged() || databaseMeta.getObjectId() == null ) { - if ( databaseMeta.getObjectId() == null - || unifiedRepositoryConnectionAclService.hasAccess( databaseMeta.getObjectId(), - RepositoryFilePermission.WRITE ) ) { - repo.save( databaseMeta, versionComment, null ); - } else { - log.logError( BaseMessages.getString( PKG, "PurRepository.ERROR_0004_DATABASE_UPDATE_ACCESS_DENIED", - databaseMeta.getName() ) ); - } - } - } - - // Store the slave server - // - for ( SlaveServer slaveServer : jobMeta.getSlaveServers() ) { - if ( slaveServer.hasChanged() || slaveServer.getObjectId() == null ) { - repo.save( slaveServer, versionComment, null ); - } - } + // NO-OP } public RepositoryElementInterface dataNodeToElement( final DataNode rootNode ) throws KettleException { diff --git a/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/PurRepository.java b/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/PurRepository.java index 8f3efad9a3c6..46a29bd2c6f0 100644 --- a/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/PurRepository.java +++ b/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/PurRepository.java @@ -2032,12 +2032,12 @@ private Map> deepCop @Override public void readJobMetaSharedObjects( final JobMeta jobMeta ) throws KettleException { - jobDelegate.loadSharedObjects( jobMeta, loadAndCacheSharedObjects( true ) ); + // NO-OP } @Override public void readTransSharedObjects( final TransMeta transMeta ) throws KettleException { - transDelegate.loadSharedObjects( transMeta, loadAndCacheSharedObjects( true ) ); + // NO-OP } @Override @@ -2503,7 +2503,6 @@ private TransMeta buildTransMeta( final RepositoryFile file, final RepositoryDir transMeta.setRepository( this ); transMeta.setRepositoryDirectory( parentDir ); transMeta.setMetaStore( MetaStoreConst.getDefaultMetastore() ); - readTransSharedObjects( transMeta ); // This should read from the local cache transDelegate.dataNodeToElement( data.getNode(), transMeta ); transMeta.clearChanged(); return transMeta; @@ -2618,7 +2617,6 @@ private JobMeta buildJobMeta( final RepositoryFile file, final RepositoryDirecto jobMeta.setRepository( this ); jobMeta.setRepositoryDirectory( parentDir ); jobMeta.setMetaStore( MetaStoreConst.getDefaultMetastore() ); - readJobMetaSharedObjects( jobMeta ); // This should read from the local cache jobDelegate.dataNodeToElement( data.getNode(), jobMeta ); jobMeta.clearChanged(); return jobMeta; @@ -3211,7 +3209,6 @@ public JobMeta loadJob( ObjectId idJob, String versionLabel ) throws KettleExcep jobMeta.setMetaStore( MetaStoreConst.getDefaultMetastore() ); // inject metastore - readJobMetaSharedObjects( jobMeta ); // Additional obfuscation through obscurity jobMeta.setRepositoryLock( unifiedRepositoryLockService.getLock( file ) ); @@ -3255,8 +3252,6 @@ public TransMeta loadTransformation( ObjectId idTransformation, String versionLa transMeta.setRepositoryLock( unifiedRepositoryLockService.getLock( file ) ); transMeta.setMetaStore( MetaStoreConst.getDefaultMetastore() ); // inject metastore - readTransSharedObjects( transMeta ); - transDelegate.dataNodeToElement( pur.getDataAtVersionForRead( idTransformation.getId(), versionLabel, NodeRepositoryFileData.class ).getNode(), transMeta ); @@ -3423,10 +3418,6 @@ protected void saveTransOrJob( ISharedObjectsTransformer objectTransformer, Repo element.getName() + element.getRepositoryElementType().getExtension() ) ); } - if ( saveSharedObjects ) { - objectTransformer.saveSharedObjects( element, versionComment ); - } - ExtensionPointHandler.callExtensionPoint( log, KettleExtensionPoint.BeforeSaveToRepository.id, element ); final boolean isUpdate = ( element.getObjectId() != null && element.getObjectId().getId() != null ); diff --git a/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/TransDelegate.java b/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/TransDelegate.java index 9229187f07af..c2ce6fe228bf 100644 --- a/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/TransDelegate.java +++ b/plugins/pur/core/src/main/java/org/pentaho/di/repository/pur/TransDelegate.java @@ -856,18 +856,11 @@ private void saveTransformationDetails( final DataNode rootNode, final TransMeta @SuppressWarnings( "unchecked" ) @Override + @Deprecated // Shared Object reads should now go through a SharedObjectsIO public void loadSharedObjects( final RepositoryElementInterface element, final Map> sharedObjectsByType ) throws KettleException { - TransMeta transMeta = (TransMeta) element; - - // Repository objects take priority so let's overwrite them... - // - readDatabases( transMeta, true, (List) sharedObjectsByType.get( RepositoryObjectType.DATABASE ) ); - readPartitionSchemas( transMeta, true, (List) sharedObjectsByType - .get( RepositoryObjectType.PARTITION_SCHEMA ) ); - readSlaves( transMeta, true, (List) sharedObjectsByType.get( RepositoryObjectType.SLAVE_SERVER ) ); - readClusters( transMeta, true, (List) sharedObjectsByType.get( RepositoryObjectType.CLUSTER_SCHEMA ) ); + // NO-OP } /** @@ -961,51 +954,10 @@ protected void readSlaves( TransMeta transMeta, boolean overWriteShared, List() ); - // Read them from the new repository. - if ( repository != null ) { - repository.readJobMetaSharedObjects( jobMeta ); - } - // Then we need to re-match the databases at save time... for ( DatabaseMeta oldDatabase : oldDatabases ) { DatabaseMeta newDatabase = DatabaseMeta.findDatabase( jobMeta.getDatabases(), oldDatabase.getName() ); @@ -4273,10 +4269,6 @@ private void loadSessionInformation( Repository repository, boolean saveOldDatab transMeta.setSlaveServers( new ArrayList() ); transMeta.setClusterSchemas( new ArrayList() ); - // Read them from the new repository. - if ( repository != null ) { - repository.readTransSharedObjects( transMeta ); - } // Then we need to re-match the databases at save time... for ( DatabaseMeta oldDatabase : oldDatabases ) { @@ -4337,14 +4329,6 @@ private void loadSessionInformation( Repository repository, boolean saveOldDatab public void clearSharedObjectCache() throws KettleException { if ( rep != null ) { rep.clearSharedObjectCache(); - TransMeta transMeta = getActiveTransformation(); - if ( transMeta != null ) { - rep.readTransSharedObjects( transMeta ); - } - JobMeta jobMeta = getActiveJob(); - if ( jobMeta != null ) { - rep.readJobMetaSharedObjects( jobMeta ); - } } } @@ -4570,7 +4554,7 @@ public void closeRepository() { setShellText(); SpoonPluginManager.getInstance().notifyLifecycleListeners( SpoonLifeCycleEvent.REPOSITORY_DISCONNECTED ); enableMenus(); - updateTreeForActiveAbstractMetas(); + forceRefreshTree(); clearRepositoryDirectory(); } } @@ -5354,9 +5338,6 @@ public void newTransFile() { transMeta.setMetaStore( metaStoreSupplier.get() ); try { - if ( rep != null ) { - rep.readTransSharedObjects( transMeta ); - } transMeta.importFromMetaStore(); transMeta.clearChanged(); } catch ( Exception e ) { @@ -5409,10 +5390,6 @@ public void newJobFile() { jobMeta.setMetaStore( metaStoreSupplier.get() ); try { - // TODO: MAKE LIKE TRANS - if ( rep != null ) { - rep.readJobMetaSharedObjects( jobMeta ); - } jobMeta.importFromMetaStore(); } catch ( Exception e ) { new ErrorDialog( @@ -5508,17 +5485,6 @@ public void setJobMetaVariables( JobMeta jobMeta ) { * functionality seems covered in loadSessionInformation */ public void loadRepositoryObjects( TransMeta transMeta ) { - // Load common database info from active repository... - if ( rep != null ) { - try { - rep.readTransSharedObjects( transMeta ); - } catch ( Exception e ) { - new ErrorDialog( - shell, BaseMessages.getString( PKG, "Spoon.Error.UnableToLoadSharedObjects.Title" ), BaseMessages - .getString( PKG, "Spoon.Error.UnableToLoadSharedObjects.Message" ), e ); - } - - } } public boolean promptForSave() throws KettleException { @@ -6928,6 +6894,7 @@ public void handleRepositoryLost( KettleRepositoryLostException e ) { SpoonPluginManager.getInstance().notifyLifecycleListeners( SpoonLifeCycleEvent.REPOSITORY_DISCONNECTED ); setShellText(); enableMenus(); + forceRefreshTree(); } private void warnRepositoryLost( KettleRepositoryLostException e ) { @@ -9189,7 +9156,12 @@ public Repository getRepository() { public void setRepository( Repository rep ) { this.rep = rep; + if ( rep != null ) { + DefaultBowl.getInstance().setSharedObjectsIO( new RepositorySharedObjectsIO( rep, () -> + getExecutionBowl().getManager( SlaveServerManagementInterface.class ).getAll() ) ); + DefaultBowl.getInstance().clearManagers(); + this.repositoryName = rep.getName(); List lastUsedFiles = getLastUsedRepoFiles(); if ( !Utils.isEmpty( lastUsedFiles ) ) { @@ -9198,14 +9170,18 @@ public void setRepository( Repository rep ) { lastFileOpenedProvider = ProviderFilterType.REPOSITORY.toString(); } } else { + // will be generated on the next call + DefaultBowl.getInstance().setSharedObjectsIO( null ); + DefaultBowl.getInstance().clearManagers(); + this.repositoryName = null; lastFileOpened = props.getLastUsedLocalFile(); // we don't know what the provider is. lastFileOpenedProvider = null; } - if ( rep != null ) { - this.capabilities = rep.getRepositoryMeta().getRepositoryCapabilities(); - } + if ( rep != null ) { + this.capabilities = rep.getRepositoryMeta().getRepositoryCapabilities(); + } ConnectionManager.getInstance().reset(); // Registering the UI Support classes @@ -9644,28 +9620,31 @@ public void displayDbDependancies() { try { - final DatabaseMeta databaseMeta = (DatabaseMeta) selectionObject; - String[] jobList = rep.getJobsUsingDatabase( databaseMeta.getObjectId() ); - String[] transList = rep.getTransformationsUsingDatabase( databaseMeta.getObjectId() ); - if ( jobList.length == 0 && transList.length == 0 ) { - MessageBox box = new MessageBox( shell, SWT.ICON_INFORMATION | SWT.OK ); - box.setText( "Connection dependencies" ); - box.setMessage( "This connection is not used by a job nor a transformation." ); - box.open(); - } else { - for ( String aJobList : jobList ) { - if ( aJobList != null ) { - createTreeItem( parent, aJobList, GUIResource.getInstance().getImageJobGraph() ); + withDatabase( ( dbManager, databaseMeta ) -> { + SpoonTreeLeveledSelection leveledSelection = (SpoonTreeLeveledSelection) selectionObject; + + String[] jobList = rep.getJobsUsingDatabase( databaseMeta.getObjectId() ); + String[] transList = rep.getTransformationsUsingDatabase( databaseMeta.getObjectId() ); + if ( jobList.length == 0 && transList.length == 0 ) { + MessageBox box = new MessageBox( shell, SWT.ICON_INFORMATION | SWT.OK ); + box.setText( "Connection dependencies" ); + box.setMessage( "This connection is not used by a job nor a transformation." ); + box.open(); + } else { + for ( String aJobList : jobList ) { + if ( aJobList != null ) { + createTreeItem( parent, aJobList, GUIResource.getInstance().getImageJobGraph() ); + } } - } - for ( String aTransList : transList ) { - if ( aTransList != null ) { - createTreeItem( parent, aTransList, GUIResource.getInstance().getImageTransGraph() ); + for ( String aTransList : transList ) { + if ( aTransList != null ) { + createTreeItem( parent, aTransList, GUIResource.getInstance().getImageTransGraph() ); + } } + parent.setExpanded( true ); } - parent.setExpanded( true ); - } + } ); } catch ( Exception e ) { new ErrorDialog( shell, "Error", "Error getting dependencies! :", e ); } @@ -9834,9 +9813,8 @@ public void setBowl( Bowl bowl ) { } /** - * Retrieves the Bowl for the Management context. This Bowl should be used for write operations. This Bowl will only - * return objects directly owned by the particular context. It will not include objects owned by the global context. - * Use DefaultBowl to access the global context. + * Retrieves the Bowl for the Execution context. This Bowl should be used for runtime operations. This Bowl will + * return objects from all Spoon-wide Levels with appropriate overrides. * * @return Bowl The Bowl that should be used during execution. */ @@ -9845,7 +9823,7 @@ public Bowl getExecutionBowl() { } /** - * Sets the Bowl for the management context. This Bowl should be used for write operations. + * Sets the Bowl for the Execution context. This Bowl should be used for runtime operations. * */ public void setExecutionBowl( Bowl bowl ) {