Skip to content

Commit

Permalink
[BACKLOG-42827] - CRUD for Global objects in repository
Browse files Browse the repository at this point in the history
- RepositorySharedObjectsIO and test
- fixed file-open-save for repositories (partial backport)
- stopped saving or loading shared objects to the repository as part of
  transformations or jobs.
- Still links steps to databases
- Keep ObjectId in various Node-based memory structures, but don't store on
  disk in VfsSharedObjectsIO
- Reset DefaultBowl managers on connect/disconnect from Repository
  • Loading branch information
abryant-hv committed Dec 10, 2024
1 parent 9551344 commit 211a34d
Show file tree
Hide file tree
Showing 22 changed files with 832 additions and 299 deletions.
3 changes: 1 addition & 2 deletions core/src/main/java/org/pentaho/di/core/bowl/BaseBowl.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ public void addParentBowl( Bowl parent ) {
parentBowls.add( parent );
}

// use with caution. For testing only.
@VisibleForTesting
// use with caution.
public synchronized void clearManagers() {
managerInstances.clear();
}
Expand Down
5 changes: 0 additions & 5 deletions core/src/main/java/org/pentaho/di/core/bowl/DefaultBowl.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ public SharedObjectsIO getSharedObjectsIO() {
return sharedObjectsIO;
}

/**
* Set a specific metastore supplier for use by later calls to this class. Note that this will cause the
* ConnectionManager from this class and from ConnectionManager.getInstance() to return different instances.
*/
@VisibleForTesting
public void setSharedObjectsIO( SharedObjectsIO sharedObjectsIO ) {
this.sharedObjectsIO = sharedObjectsIO;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2021 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -617,7 +617,6 @@ public Object clone() {
public Object deepClone( boolean cloneUpdateFlag ) {
DatabaseMeta databaseMeta = new DatabaseMeta();
databaseMeta.replaceMeta( this, cloneUpdateFlag );
databaseMeta.setObjectId( null );
return databaseMeta;
}

Expand Down Expand Up @@ -1024,6 +1023,8 @@ public DatabaseMeta( Node con ) throws KettleXMLException {

setReadOnly( Boolean.valueOf( XMLHandler.getTagValue( con, "read_only" ) ) );

readObjectId( con );

// Also, read the database attributes...
Node attrsnode = XMLHandler.getSubNode( con, "attributes" );
if ( attrsnode != null ) {
Expand Down Expand Up @@ -1075,6 +1076,7 @@ public String getXML() {
retval.append( " " ).append( XMLHandler.addTagValue( "servername", getServername() ) );
retval.append( " " ).append( XMLHandler.addTagValue( "data_tablespace", getDataTablespace() ) );
retval.append( " " ).append( XMLHandler.addTagValue( "index_tablespace", getIndexTablespace() ) );
appendObjectId( retval );

// only write the tag out if it is set to true
if ( isReadOnly() ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand All @@ -22,6 +22,10 @@

package org.pentaho.di.repository;

import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.shared.SharedObjectInterface;
import org.w3c.dom.Node;

/**
* A repository element is an object that can be saved or loaded from the repository. As such, we need to be able to
* identify it. It needs a RepositoryDirectory, a name and an ID.
Expand Down Expand Up @@ -102,4 +106,19 @@ public interface RepositoryElementInterface extends RepositoryObjectInterface {
*/
public void setObjectRevision( ObjectRevision objectRevision );

default void appendObjectId( StringBuilder builder ) {
if ( getObjectId() != null ) {
builder.append( " " ).append( XMLHandler.addTagValue( SharedObjectInterface.OBJECT_ID,
getObjectId().toString() ) );
}
}

default void readObjectId( Node node ) {
String objectId = XMLHandler.getTagValue( node, SharedObjectInterface.OBJECT_ID );
if ( objectId != null ) {
setObjectId( new StringObjectId( objectId ) );
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

public interface SharedObjectInterface<T extends SharedObjectInterface> {

static final String OBJECT_ID = "object_id";

/**
* @deprecated
*
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/pentaho/di/shared/VfsSharedObjectsIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ private static String getDefaultSharedObjectFileLocation() {
public void saveSharedObject( String type, String name, Node node ) throws KettleException {
// Get the map for the type
Map<String, Node> nodeMap = getNodesMapForType( type );

// strip out any Object IDs
NodeList children = node.getChildNodes();
for ( int i = 0; i < children.getLength(); i++ ) {
Node childNode = children.item( i );
if ( childNode.getNodeName().equalsIgnoreCase( SharedObjectInterface.OBJECT_ID ) ) {
node.removeChild( childNode );
}
}

// Add or Update the map entry for this name
nodeMap.put( name, node );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2018 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -169,6 +169,7 @@ public String getXML() {
xml.append( " " ).append( XMLHandler.addTagValue( "sockets_flush_interval", socketsFlushInterval ) );
xml.append( " " ).append( XMLHandler.addTagValue( "sockets_compressed", socketsCompressed ) );
xml.append( " " ).append( XMLHandler.addTagValue( "dynamic", dynamic ) );
appendObjectId( xml );

xml.append( " " ).append( XMLHandler.openTag( "slaveservers" ) ).append( Const.CR );
for ( int i = 0; i < slaveServers.size(); i++ ) {
Expand All @@ -190,6 +191,7 @@ public ClusterSchema( Node clusterSchemaNode, List<SlaveServer> referenceSlaveSe
socketsCompressed = "Y".equalsIgnoreCase( XMLHandler.getTagValue( clusterSchemaNode, "sockets_compressed" ) );
dynamic = "Y".equalsIgnoreCase( XMLHandler.getTagValue( clusterSchemaNode, "dynamic" ) );

readObjectId( clusterSchemaNode );
Node slavesNode = XMLHandler.getSubNode( clusterSchemaNode, "slaveservers" );
int nrSlaves = XMLHandler.countNodes( slavesNode, "name" );
for ( int i = 0; i < nrSlaves; i++ ) {
Expand Down
2 changes: 2 additions & 0 deletions engine/src/main/java/org/pentaho/di/cluster/SlaveServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ public SlaveServer( Node slaveNode ) {
this.master = "Y".equalsIgnoreCase( XMLHandler.getTagValue( slaveNode, "master" ) );
initializeVariablesFrom( null );
this.log = new LogChannel( this );
readObjectId( slaveNode );

setSslMode( "Y".equalsIgnoreCase( XMLHandler.getTagValue( slaveNode, SSL_MODE_TAG ) ) );
Node sslConfig = XMLHandler.getSubNode( slaveNode, SslConfiguration.XML_TAG );
Expand Down Expand Up @@ -277,6 +278,7 @@ public String getXML() {
xml.append( " " ).append( XMLHandler.addTagValue( "non_proxy_hosts", nonProxyHosts ) );
xml.append( " " ).append( XMLHandler.addTagValue( "master", master ) );
xml.append( " " ).append( XMLHandler.addTagValue( SSL_MODE_TAG, isSslMode(), false ) );
appendObjectId( xml );
if ( sslConfig != null ) {
xml.append( sslConfig.getXML() );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -164,6 +164,7 @@ public String getXML() {
xml
.append( " " ).append(
XMLHandler.addTagValue( "partitions_per_slave", numberOfPartitionsPerSlave ) );
appendObjectId( xml );

xml.append( " " ).append( XMLHandler.closeTag( XML_TAG ) ).append( Const.CR );
return xml.toString();
Expand All @@ -179,6 +180,7 @@ public PartitionSchema( Node partitionSchemaNode ) {
Node partitionNode = XMLHandler.getSubNodeByNr( partitionSchemaNode, "partition", i, false );
partitionIDs.add( XMLHandler.getTagValue( partitionNode, "id" ) );
}
readObjectId( partitionSchemaNode );

dynamicallyDefined = "Y".equalsIgnoreCase( XMLHandler.getTagValue( partitionSchemaNode, "dynamic" ) );
numberOfPartitionsPerSlave = XMLHandler.getTagValue( partitionSchemaNode, "partitions_per_slave" );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,6 @@ public JobMeta loadJob( String jobname, RepositoryDirectoryInterface repdir, Pro
jobMeta.setRepository( this );
jobMeta.setMetaStore( MetaStoreConst.getDefaultMetastore() );

readDatabases( jobMeta, true );
jobMeta.clearChanged();

return jobMeta;
Expand Down Expand Up @@ -1118,46 +1117,11 @@ public TransMeta loadTransformation( String transname, RepositoryDirectoryInterf
transMeta.setName( transname );
transMeta.setObjectId( new StringObjectId( calcObjectId( repdir, transname, EXT_TRANSFORMATION ) ) );

readDatabases( transMeta, true );
transMeta.clearChanged();

return transMeta;
}

/**
* Read all the databases from the repository, insert into the has databases object, overwriting optionally
*
* @param TransMeta
* The transformation to load into.
* @param overWriteShared
* if an object with the same name exists, overwrite
* @throws KettleException
*/
public void readDatabases( AbstractMeta transMeta, boolean overWriteShared ) throws KettleException {
try {
ObjectId[] dbids = getDatabaseIDs( false );
for ( int i = 0; i < dbids.length; i++ ) {
DatabaseMeta databaseMeta = loadDatabaseMeta( dbids[i], null ); // reads last version
if ( transMeta instanceof VariableSpace ) {
databaseMeta.shareVariablesWith( (VariableSpace) transMeta );
}

DatabaseMeta check = transMeta.findDatabase( databaseMeta.getName() ); // Check if there already is one in the
// transformation
if ( check == null || overWriteShared ) { // We only add, never overwrite database connections.
if ( databaseMeta.getName() != null ) {
transMeta.getDatabaseManagementInterface().add( databaseMeta );
if ( !overWriteShared ) {
databaseMeta.setChanged( false );
}
}
}
}
} catch ( KettleException e ) {
throw e;
}
}

public ValueMetaAndData loadValueMetaAndData( ObjectId id_value ) throws KettleException {

return null;
Expand Down Expand Up @@ -1191,49 +1155,12 @@ public void clearSharedObjectCache() {

@Override
public void readJobMetaSharedObjects( JobMeta jobMeta ) throws KettleException {

// Then we read the databases etc...
//
for ( ObjectId id : getDatabaseIDs( false ) ) {
DatabaseMeta databaseMeta = loadDatabaseMeta( id, null ); // Load last version
databaseMeta.shareVariablesWith( jobMeta );
jobMeta.getDatabaseManagementInterface().add( databaseMeta );
}

for ( ObjectId id : getSlaveIDs( false ) ) {
SlaveServer slaveServer = loadSlaveServer( id, null ); // Load last version
slaveServer.shareVariablesWith( jobMeta );
jobMeta.addOrReplaceSlaveServer( slaveServer );
}
// No-Op
}

@Override
public void readTransSharedObjects( TransMeta transMeta ) throws KettleException {

// Then we read the databases etc...
//
for ( ObjectId id : getDatabaseIDs( false ) ) {
DatabaseMeta databaseMeta = loadDatabaseMeta( id, null ); // Load last version
databaseMeta.shareVariablesWith( transMeta );
transMeta.getDatabaseManagementInterface().add( databaseMeta );
}

for ( ObjectId id : getSlaveIDs( false ) ) {
SlaveServer slaveServer = loadSlaveServer( id, null ); // Load last version
slaveServer.shareVariablesWith( transMeta );
transMeta.addOrReplaceSlaveServer( slaveServer );
}

for ( ObjectId id : getClusterIDs( false ) ) {
ClusterSchema clusterSchema = loadClusterSchema( id, transMeta.getSlaveServers(), null ); // Load last version
clusterSchema.shareVariablesWith( transMeta );
transMeta.addOrReplaceClusterSchema( clusterSchema );
}

for ( ObjectId id : getPartitionSchemaIDs( false ) ) {
PartitionSchema partitionSchema = loadPartitionSchema( id, null ); // Load last version
transMeta.addOrReplacePartitionSchema( partitionSchema );
}
// No-Op
}

private ObjectId renameObject( ObjectId id, RepositoryDirectoryInterface newDirectory, String newName,
Expand Down
Loading

0 comments on commit 211a34d

Please sign in to comment.