Skip to content

Commit

Permalink
[improve](extension) improve kettle plugin and add some testcase (apa…
Browse files Browse the repository at this point in the history
…che#44324)

### What problem does this PR solve?

improve kettle plugin and add some testcase
  • Loading branch information
JNSimba authored Nov 20, 2024
1 parent 070489c commit 61a49ad
Show file tree
Hide file tree
Showing 8 changed files with 623 additions and 4 deletions.
6 changes: 6 additions & 0 deletions extension/kettle/impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ under the License.
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
<scope>test</scope>
</dependency>

<!--log-->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.pentaho.di.trans.steps.dorisstreamloader;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.row.ValueMetaInterface;
Expand Down Expand Up @@ -172,4 +173,9 @@ public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {

super.dispose( smi, sdi );
}

@VisibleForTesting
public DorisBatchStreamLoad getStreamLoad(){
return streamLoad;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.injection.AfterInjection;
import org.pentaho.di.core.injection.Injection;
import org.pentaho.di.core.injection.InjectionSupported;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.util.Utils;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.i18n.BaseMessages;
Expand Down Expand Up @@ -341,7 +343,7 @@ public void setDeletable(boolean deletable) {
this.deletable = deletable;
}

public String[] getFieldTable() {
public String[] getFieldTable() {
return fieldTable;
}

Expand Down Expand Up @@ -379,4 +381,18 @@ public String toString() {
", fieldStream=" + Arrays.toString(fieldStream) +
'}';
}

/**
* If we use injection we can have different arrays lengths.
* We need synchronize them for consistency behavior with UI
*/
@AfterInjection
public void afterInjectionSynchronization() {
int nrFields = (fieldTable == null) ? -1 : fieldTable.length;
if (nrFields <= 0) {
return;
}
String[][] rtnStrings = Utils.normalizeArrays(nrFields, fieldStream);
fieldStream = rtnStrings[0];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ private synchronized boolean doFlush(
}

private synchronized boolean flush(String bufferKey, boolean waitUtilDone) {
if (bufferMap.isEmpty()) {
// bufferMap may have been flushed by other threads
return false;
}
if (null == bufferKey) {
boolean flush = false;
for (String key : bufferMap.keySet()) {
Expand All @@ -275,7 +279,7 @@ private synchronized boolean flush(String bufferKey, boolean waitUtilDone) {
} else if (bufferMap.containsKey(bufferKey)) {
flushBuffer(bufferKey);
} else {
throw new DorisRuntimeException("buffer not found for key: " + bufferKey);
log.logDetailed("buffer not found for key: {}, may be already flushed.", bufferKey);
}
if (waitUtilDone) {
waitAsyncLoadFinish();
Expand Down Expand Up @@ -311,7 +315,9 @@ private void checkFlushException() {
}

private void waitAsyncLoadFinish() {
for (int i = 0; i < FLUSH_QUEUE_SIZE + 1; i++) {
// Because the flush thread will drainTo once after polling is completed
// if queue_size is 2, at least 4 empty queues must be consumed to ensure that flush has been completed
for (int i = 0; i < FLUSH_QUEUE_SIZE * 2; i++) {
BatchRecordBuffer empty = new BatchRecordBuffer();
putRecordToFlushQueue(empty);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class DorisBatchStreamLoadTest {
@Ignore
public void testStreamLoad() throws Exception {
DorisOptions options = DorisOptions.builder()
.withFenodes("10.16.10.6:28737")
.withFenodes("127.0.0.1:8030")
.withDatabase("test")
.withTable("test_flink_c")
.withUsername("root")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.pentaho.di.trans.steps.dorisstreamloader;

import org.junit.Assert;
import org.junit.Test;
import org.pentaho.di.core.logging.LogChannel;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.row.value.ValueMetaBigNumber;
import org.pentaho.di.core.row.value.ValueMetaBinary;
import org.pentaho.di.core.row.value.ValueMetaBoolean;
import org.pentaho.di.core.row.value.ValueMetaDate;
import org.pentaho.di.core.row.value.ValueMetaInteger;
import org.pentaho.di.core.row.value.ValueMetaInternetAddress;
import org.pentaho.di.core.row.value.ValueMetaNumber;
import org.pentaho.di.core.row.value.ValueMetaString;
import org.pentaho.di.core.row.value.ValueMetaTimestamp;
import org.pentaho.di.trans.steps.dorisstreamloader.serializer.DorisRecordSerializer;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import javax.mail.internet.InternetAddress;

import static org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.CSV;

public class DorisRecordSerializerTest {

@Test
public void testSerialize() throws Exception {
ValueMetaInterface[] formatMeta = new ValueMetaInterface[]{
new ValueMetaBoolean("boolean"),
new ValueMetaInteger("integer"),
new ValueMetaNumber("number"),
new ValueMetaBigNumber("bignumber"),
new ValueMetaDate("date"),
new ValueMetaTimestamp("timestamp"),
new ValueMetaBinary("binary"),
new ValueMetaString("string"),
new ValueMetaInternetAddress("address"),
};

DorisRecordSerializer serializer = DorisRecordSerializer.builder()
.setType(CSV)
.setFieldNames(new String[]{"c_boolean", "c_integer", "c_number", "c_bignumber", "c_date", "c_timestamp", "c_binary", "c_string", "c_internetAddress"})
.setFormatMeta(formatMeta)
.setFieldDelimiter(",")
.setLogChannelInterface(new LogChannel())
.setDeletable(false)
.build();

SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
Object[] data = new Object[]{
true,
10L,
123.23,
new BigDecimal("123456789.1234"),
dateFormat.parse("2024-01-01"),
Timestamp.valueOf("2024-01-01 10:11:22.123"),
"binary",
"string",
new InternetAddress("127.0.0.1")};
String actual = serializer.buildCSVString(data, formatMeta.length);
String except = "true,10,123.23,123456789.1234,2024-01-01,2024-01-01 10:11:22.123,binary,string,127.0.0.1";
Assert.assertEquals(except, actual);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.pentaho.di.trans.steps.dorisstreamloader;

import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.junit.rules.RestorePDIEngineEnvironment;
import org.pentaho.di.trans.steps.loadsave.LoadSaveTester;
import org.pentaho.di.trans.steps.loadsave.validator.ArrayLoadSaveValidator;
import org.pentaho.di.trans.steps.loadsave.validator.FieldLoadSaveValidator;
import org.pentaho.di.trans.steps.loadsave.validator.IntLoadSaveValidator;
import org.pentaho.di.trans.steps.loadsave.validator.LongLoadSaveValidator;
import org.pentaho.di.trans.steps.loadsave.validator.StringLoadSaveValidator;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class DorisStreamLoaderMetaTest {

@ClassRule
public static RestorePDIEngineEnvironment env = new RestorePDIEngineEnvironment();

@Test
public void testRoundTrip() throws KettleException {
List<String> attributes =
Arrays.asList( "fenodes", "database", "table", "username", "password",
"streamLoadProp", "bufferFlushMaxRows", "bufferFlushMaxBytes", "maxRetries",
"deletable", "stream_name", "field_name");

Map<String, String> getterMap = new HashMap<>();
getterMap.put( "fenodes", "getFenodes" );
getterMap.put( "database", "getDatabase" );
getterMap.put( "table", "getTable" );
getterMap.put( "username", "getUsername" );
getterMap.put( "password", "getPassword" );
getterMap.put( "streamLoadProp", "getStreamLoadProp" );
getterMap.put( "bufferFlushMaxRows", "getBufferFlushMaxRows" );
getterMap.put( "bufferFlushMaxBytes", "getBufferFlushMaxBytes" );
getterMap.put( "maxRetries", "getMaxRetries" );
getterMap.put( "deletable", "isDeletable" );
getterMap.put( "stream_name", "getFieldTable" );
getterMap.put( "field_name", "getFieldStream" );

Map<String, String> setterMap = new HashMap<>();
setterMap.put( "fenodes", "setFenodes" );
setterMap.put( "database", "setDatabase" );
setterMap.put( "table", "setTable" );
setterMap.put( "username", "setUsername" );
setterMap.put( "password", "setPassword" );
setterMap.put( "streamLoadProp", "setStreamLoadProp" );
setterMap.put( "bufferFlushMaxRows", "setBufferFlushMaxRows" );
setterMap.put( "bufferFlushMaxBytes", "setBufferFlushMaxBytes" );
setterMap.put( "maxRetries", "setMaxRetries" );
setterMap.put( "deletable", "setDeletable" );
setterMap.put( "stream_name", "setFieldTable" );
setterMap.put( "field_name", "setFieldStream" );

Map<String, FieldLoadSaveValidator<?>> fieldLoadSaveValidatorAttributeMap = new HashMap<>();
FieldLoadSaveValidator<String[]> stringArrayLoadSaveValidator = new ArrayLoadSaveValidator<>( new StringLoadSaveValidator(), 25 );

fieldLoadSaveValidatorAttributeMap.put("maxRetries", new IntLoadSaveValidator( Integer.MAX_VALUE ));
fieldLoadSaveValidatorAttributeMap.put("bufferFlushMaxRows", new LongLoadSaveValidator());
fieldLoadSaveValidatorAttributeMap.put("bufferFlushMaxBytes", new LongLoadSaveValidator());
fieldLoadSaveValidatorAttributeMap.put("streamLoadProp", new StringLoadSaveValidator());
fieldLoadSaveValidatorAttributeMap.put("stream_name", stringArrayLoadSaveValidator );
fieldLoadSaveValidatorAttributeMap.put("field_name", stringArrayLoadSaveValidator );

LoadSaveTester loadSaveTester =
new LoadSaveTester( DorisStreamLoaderMeta.class, attributes, getterMap, setterMap,
fieldLoadSaveValidatorAttributeMap, new HashMap<String, FieldLoadSaveValidator<?>>() );

loadSaveTester.testSerialization();
}

@Test
public void testPDI16559() throws Exception {
DorisStreamLoaderMeta streamLoader = new DorisStreamLoaderMeta();
streamLoader.setFieldTable( new String[] { "table1", "table2", "table3" } );
streamLoader.setFieldStream( new String[] { "stream1" } );
streamLoader.setTable( "test_table" );

try {
String badXml = streamLoader.getXML();
Assert.fail( "Before calling afterInjectionSynchronization, should have thrown an ArrayIndexOOB" );
} catch ( Exception expected ) {
// Do Nothing
}
streamLoader.afterInjectionSynchronization();
//run without a exception
String ktrXml = streamLoader.getXML();

int targetSz = streamLoader.getFieldTable().length;
Assert.assertEquals( targetSz, streamLoader.getFieldStream().length );
}
}
Loading

0 comments on commit 61a49ad

Please sign in to comment.