Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKLOG-42827] - CRUD for Global objects in repository #9750

Open
wants to merge 1 commit into
base: project-profile
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions core/src/main/java/org/pentaho/di/core/bowl/BaseBowl.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ public void addParentBowl( Bowl parent ) {
parentBowls.add( parent );
}

// use with caution. For testing only.
@VisibleForTesting
// use with caution.
public synchronized void clearManagers() {
managerInstances.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ public SharedObjectsIO getSharedObjectsIO() {
return sharedObjectsIO;
}

/**
* Set a specific metastore supplier for use by later calls to this class. Note that this will cause the
* ConnectionManager from this class and from ConnectionManager.getInstance() to return different instances.
*/
@VisibleForTesting
public void setSharedObjectsIO( SharedObjectsIO sharedObjectsIO ) {
this.sharedObjectsIO = sharedObjectsIO;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2021 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -617,7 +617,6 @@ public Object clone() {
public Object deepClone( boolean cloneUpdateFlag ) {
DatabaseMeta databaseMeta = new DatabaseMeta();
databaseMeta.replaceMeta( this, cloneUpdateFlag );
databaseMeta.setObjectId( null );
return databaseMeta;
}

Expand Down Expand Up @@ -1024,6 +1023,8 @@ public DatabaseMeta( Node con ) throws KettleXMLException {

setReadOnly( Boolean.valueOf( XMLHandler.getTagValue( con, "read_only" ) ) );

readObjectId( con );

// Also, read the database attributes...
Node attrsnode = XMLHandler.getSubNode( con, "attributes" );
if ( attrsnode != null ) {
Expand Down Expand Up @@ -1075,6 +1076,7 @@ public String getXML() {
retval.append( " " ).append( XMLHandler.addTagValue( "servername", getServername() ) );
retval.append( " " ).append( XMLHandler.addTagValue( "data_tablespace", getDataTablespace() ) );
retval.append( " " ).append( XMLHandler.addTagValue( "index_tablespace", getIndexTablespace() ) );
appendObjectId( retval );

// only write the tag out if it is set to true
if ( isReadOnly() ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand All @@ -22,6 +22,10 @@

package org.pentaho.di.repository;

import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.shared.SharedObjectInterface;
import org.w3c.dom.Node;

/**
* A repository element is an object that can be saved or loaded from the repository. As such, we need to be able to
* identify it. It needs a RepositoryDirectory, a name and an ID.
Expand Down Expand Up @@ -102,4 +106,19 @@ public interface RepositoryElementInterface extends RepositoryObjectInterface {
*/
public void setObjectRevision( ObjectRevision objectRevision );

default void appendObjectId( StringBuilder builder ) {
if ( getObjectId() != null ) {
builder.append( " " ).append( XMLHandler.addTagValue( SharedObjectInterface.OBJECT_ID,
getObjectId().toString() ) );
}
}

default void readObjectId( Node node ) {
String objectId = XMLHandler.getTagValue( node, SharedObjectInterface.OBJECT_ID );
if ( objectId != null ) {
setObjectId( new StringObjectId( objectId ) );
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

public interface SharedObjectInterface<T extends SharedObjectInterface> {

static final String OBJECT_ID = "object_id";

/**
* @deprecated
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ private static String getDefaultSharedObjectFileLocation() {
public void saveSharedObject( String type, String name, Node node ) throws KettleException {
// Get the map for the type
Map<String, Node> nodeMap = getNodesMapForType( type );

// strip out any Object IDs
NodeList children = node.getChildNodes();
for ( int i = 0; i < children.getLength(); i++ ) {
Node childNode = children.item( i );
if ( childNode.getNodeName().equalsIgnoreCase( SharedObjectInterface.OBJECT_ID ) ) {
node.removeChild( childNode );
}
}

// Add or Update the map entry for this name
nodeMap.put( name, node );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2018 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -169,6 +169,7 @@ public String getXML() {
xml.append( " " ).append( XMLHandler.addTagValue( "sockets_flush_interval", socketsFlushInterval ) );
xml.append( " " ).append( XMLHandler.addTagValue( "sockets_compressed", socketsCompressed ) );
xml.append( " " ).append( XMLHandler.addTagValue( "dynamic", dynamic ) );
appendObjectId( xml );

xml.append( " " ).append( XMLHandler.openTag( "slaveservers" ) ).append( Const.CR );
for ( int i = 0; i < slaveServers.size(); i++ ) {
Expand All @@ -190,6 +191,7 @@ public ClusterSchema( Node clusterSchemaNode, List<SlaveServer> referenceSlaveSe
socketsCompressed = "Y".equalsIgnoreCase( XMLHandler.getTagValue( clusterSchemaNode, "sockets_compressed" ) );
dynamic = "Y".equalsIgnoreCase( XMLHandler.getTagValue( clusterSchemaNode, "dynamic" ) );

readObjectId( clusterSchemaNode );
Node slavesNode = XMLHandler.getSubNode( clusterSchemaNode, "slaveservers" );
int nrSlaves = XMLHandler.countNodes( slavesNode, "name" );
for ( int i = 0; i < nrSlaves; i++ ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ public SlaveServer( Node slaveNode ) {
this.master = "Y".equalsIgnoreCase( XMLHandler.getTagValue( slaveNode, "master" ) );
initializeVariablesFrom( null );
this.log = new LogChannel( this );
readObjectId( slaveNode );

setSslMode( "Y".equalsIgnoreCase( XMLHandler.getTagValue( slaveNode, SSL_MODE_TAG ) ) );
Node sslConfig = XMLHandler.getSubNode( slaveNode, SslConfiguration.XML_TAG );
Expand Down Expand Up @@ -277,6 +278,7 @@ public String getXML() {
xml.append( " " ).append( XMLHandler.addTagValue( "non_proxy_hosts", nonProxyHosts ) );
xml.append( " " ).append( XMLHandler.addTagValue( "master", master ) );
xml.append( " " ).append( XMLHandler.addTagValue( SSL_MODE_TAG, isSslMode(), false ) );
appendObjectId( xml );
if ( sslConfig != null ) {
xml.append( sslConfig.getXML() );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -164,6 +164,7 @@ public String getXML() {
xml
.append( " " ).append(
XMLHandler.addTagValue( "partitions_per_slave", numberOfPartitionsPerSlave ) );
appendObjectId( xml );

xml.append( " " ).append( XMLHandler.closeTag( XML_TAG ) ).append( Const.CR );
return xml.toString();
Expand All @@ -179,6 +180,7 @@ public PartitionSchema( Node partitionSchemaNode ) {
Node partitionNode = XMLHandler.getSubNodeByNr( partitionSchemaNode, "partition", i, false );
partitionIDs.add( XMLHandler.getTagValue( partitionNode, "id" ) );
}
readObjectId( partitionSchemaNode );

dynamicallyDefined = "Y".equalsIgnoreCase( XMLHandler.getTagValue( partitionSchemaNode, "dynamic" ) );
numberOfPartitionsPerSlave = XMLHandler.getTagValue( partitionSchemaNode, "partitions_per_slave" );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,6 @@ public JobMeta loadJob( String jobname, RepositoryDirectoryInterface repdir, Pro
jobMeta.setRepository( this );
jobMeta.setMetaStore( MetaStoreConst.getDefaultMetastore() );

readDatabases( jobMeta, true );
abryant-hv marked this conversation as resolved.
Show resolved Hide resolved
jobMeta.clearChanged();

return jobMeta;
Expand Down Expand Up @@ -1118,46 +1117,11 @@ public TransMeta loadTransformation( String transname, RepositoryDirectoryInterf
transMeta.setName( transname );
transMeta.setObjectId( new StringObjectId( calcObjectId( repdir, transname, EXT_TRANSFORMATION ) ) );

readDatabases( transMeta, true );
transMeta.clearChanged();

return transMeta;
}

/**
* Read all the databases from the repository, insert into the has databases object, overwriting optionally
*
* @param TransMeta
* The transformation to load into.
* @param overWriteShared
* if an object with the same name exists, overwrite
* @throws KettleException
*/
public void readDatabases( AbstractMeta transMeta, boolean overWriteShared ) throws KettleException {
try {
ObjectId[] dbids = getDatabaseIDs( false );
for ( int i = 0; i < dbids.length; i++ ) {
DatabaseMeta databaseMeta = loadDatabaseMeta( dbids[i], null ); // reads last version
if ( transMeta instanceof VariableSpace ) {
databaseMeta.shareVariablesWith( (VariableSpace) transMeta );
}

DatabaseMeta check = transMeta.findDatabase( databaseMeta.getName() ); // Check if there already is one in the
// transformation
if ( check == null || overWriteShared ) { // We only add, never overwrite database connections.
if ( databaseMeta.getName() != null ) {
transMeta.getDatabaseManagementInterface().add( databaseMeta );
if ( !overWriteShared ) {
databaseMeta.setChanged( false );
}
}
}
}
} catch ( KettleException e ) {
throw e;
}
}

public ValueMetaAndData loadValueMetaAndData( ObjectId id_value ) throws KettleException {

return null;
Expand Down Expand Up @@ -1191,49 +1155,12 @@ public void clearSharedObjectCache() {

@Override
public void readJobMetaSharedObjects( JobMeta jobMeta ) throws KettleException {

// Then we read the databases etc...
//
for ( ObjectId id : getDatabaseIDs( false ) ) {
DatabaseMeta databaseMeta = loadDatabaseMeta( id, null ); // Load last version
databaseMeta.shareVariablesWith( jobMeta );
jobMeta.getDatabaseManagementInterface().add( databaseMeta );
}

for ( ObjectId id : getSlaveIDs( false ) ) {
SlaveServer slaveServer = loadSlaveServer( id, null ); // Load last version
slaveServer.shareVariablesWith( jobMeta );
jobMeta.addOrReplaceSlaveServer( slaveServer );
}
// No-Op
}

@Override
public void readTransSharedObjects( TransMeta transMeta ) throws KettleException {

// Then we read the databases etc...
//
for ( ObjectId id : getDatabaseIDs( false ) ) {
DatabaseMeta databaseMeta = loadDatabaseMeta( id, null ); // Load last version
databaseMeta.shareVariablesWith( transMeta );
transMeta.getDatabaseManagementInterface().add( databaseMeta );
}

for ( ObjectId id : getSlaveIDs( false ) ) {
SlaveServer slaveServer = loadSlaveServer( id, null ); // Load last version
slaveServer.shareVariablesWith( transMeta );
transMeta.addOrReplaceSlaveServer( slaveServer );
}

for ( ObjectId id : getClusterIDs( false ) ) {
ClusterSchema clusterSchema = loadClusterSchema( id, transMeta.getSlaveServers(), null ); // Load last version
clusterSchema.shareVariablesWith( transMeta );
transMeta.addOrReplaceClusterSchema( clusterSchema );
}

for ( ObjectId id : getPartitionSchemaIDs( false ) ) {
PartitionSchema partitionSchema = loadPartitionSchema( id, null ); // Load last version
transMeta.addOrReplacePartitionSchema( partitionSchema );
}
// No-Op
}

private ObjectId renameObject( ObjectId id, RepositoryDirectoryInterface newDirectory, String newName,
Expand Down
Loading