diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java index 3d2b35d4f3d..e6b615aa644 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java @@ -23,8 +23,8 @@ import org.apache.heron.api.bolt.IBasicBolt; /** - * The interface for streamlet basic operators. It is used to support existing user bolts - * extended from IBasicBolt only. It shouldn't be used to create streamlet operators. + * The interface for streamlet operators. It can be used to create + * operators based on existing Bolts (subclasses of IBasicBolt). */ public interface IStreamletBasicOperator extends IBasicBolt { } diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java index 820f8e18bd3..24b989c26b5 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java @@ -23,7 +23,8 @@ import org.apache.heron.api.bolt.IRichBolt; /** - * The interface for streamlet operators. + * The interface for custom operators: it can be used to create + * operators based on existing Bolts (subclasses of IRichBolt). */ public interface IStreamletOperator extends IRichBolt { } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java index 0e9097aca37..677b34ef964 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java @@ -526,7 +526,8 @@ public Streamlet applyOperator(IStreamletBasicOperator operator) { */ @Override public Streamlet applyOperator(IStreamletWindowOperator operator) { - CustomWindowStreamlet customStreamlet = new CustomWindowStreamlet<>(this, operator); + CustomWindowStreamlet customStreamlet = + new CustomWindowStreamlet<>(this, operator); addChild(customStreamlet); return customStreamlet; } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java index 96283e0caa5..a199d6f5320 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java @@ -47,7 +47,7 @@ * to implement more advanced features. * 2. Use it in Streamlet * .... - * .perform(new MyOperator()) + * .applyOperator(new MyOperator()) * .... */ public abstract class CustomOperator extends StreamletOperator { @@ -68,6 +68,7 @@ public abstract class CustomOperator extends StreamletOperator { * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc. * @param collector The collector is used to emit tuples from this bolt. Tuples can be emitted at any time, including the prepare and cleanup methods. The collector is thread-safe and should be saved as an instance variable of this bolt object. */ + @Override public void prepare(Map heronConf, TopologyContext context, OutputCollector collector) { diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java index 5f7158381b7..dadc743ac03 100644 --- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java @@ -32,8 +32,8 @@ import org.apache.heron.common.basics.ByteAmount; import org.apache.heron.resource.TestBasicBolt; import org.apache.heron.resource.TestBolt; -import org.apache.heron.resource.TestWindowBolt; import org.apache.heron.resource.TestCustomOperator; +import org.apache.heron.resource.TestWindowBolt; import org.apache.heron.streamlet.Config; import org.apache.heron.streamlet.Context; import org.apache.heron.streamlet.IStreamletBasicOperator; @@ -47,7 +47,6 @@ import org.apache.heron.streamlet.impl.streamlets.CustomBasicStreamlet; import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet; import org.apache.heron.streamlet.impl.streamlets.CustomWindowStreamlet; -import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet; import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet; import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet; import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet; @@ -268,23 +267,6 @@ public void testCustomStreamlet() throws Exception { assertEquals(supplierStreamlet.getChildren().get(0), streamlet); } - private class MyBoltOperator extends TestBolt implements IStreamletOperator { - } - - @Test - @SuppressWarnings("unchecked") - public void testCustomStreamletFromBolt() throws Exception { - Streamlet baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); - Streamlet streamlet = baseStreamlet.setNumPartitions(20). - applyOperator(new MyBoltOperator()); - assertTrue(streamlet instanceof CustomStreamlet); - CustomStreamlet mStreamlet = (CustomStreamlet) streamlet; - assertEquals(20, mStreamlet.getNumPartitions()); - SupplierStreamlet supplierStreamlet = (SupplierStreamlet) baseStreamlet; - assertEquals(supplierStreamlet.getChildren().size(), 1); - assertEquals(supplierStreamlet.getChildren().get(0), streamlet); - } - @Test @SuppressWarnings("unchecked") public void testSimpleBuild() throws Exception {