From d3a9874f6979a432fe78b01f285c6e5d5e49226a Mon Sep 17 00:00:00 2001 From: michaeloffner Date: Thu, 8 Aug 2024 14:12:11 +0200 Subject: [PATCH] make streamName optional for KinesisInfo --- pom.xml | 2 +- source/fld/function.fld | 5 +- .../aws/kinesis/function/KinesisInfo.java | 102 +++++++++++------- 3 files changed, 67 insertions(+), 42 deletions(-) diff --git a/pom.xml b/pom.xml index 1f47138..c720dc4 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 org.lucee kinesis-extension - 1.0.1.0-SNAPSHOT + 1.0.1.1-SNAPSHOT pom Kinesis Extension diff --git a/source/fld/function.fld b/source/fld/function.fld index fcc8846..1e9663e 100755 --- a/source/fld/function.fld +++ b/source/fld/function.fld @@ -198,8 +198,9 @@ streamName string - yes - The name of the AWS Kinesis stream from which records are to be fetched. This parameter is required to identify the target stream. + no + The name of the AWS Kinesis stream from which records are to be fetched. + If not defined the function will return info for all streams. accessKeyId diff --git a/source/java/src/org/lucee/extension/aws/kinesis/function/KinesisInfo.java b/source/java/src/org/lucee/extension/aws/kinesis/function/KinesisInfo.java index f4b3451..1585979 100644 --- a/source/java/src/org/lucee/extension/aws/kinesis/function/KinesisInfo.java +++ b/source/java/src/org/lucee/extension/aws/kinesis/function/KinesisInfo.java @@ -19,6 +19,8 @@ import software.amazon.awssdk.services.kinesis.model.DescribeLimitsResponse; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; +import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest; +import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse; import software.amazon.awssdk.services.kinesis.model.Shard; public class KinesisInfo extends KinesisFunction { @@ -39,61 +41,83 @@ public static Struct call(PageContext pc, String streamName, String accessKeyId, try { Log log = pc.getConfig().getLog("application"); - KinesisClient client = AmazonKinesisClient.get(CommonUtil.toKinesisProps(pc, accessKeyId, secretAccessKey, host, location), toTimeout(timeout), log); - - Struct result = eng.getCreationUtil().createStruct(); - - // Describe the stream and print shard IDs - String exclusiveStartShardId = null; - Query qShards = eng.getCreationUtil().createQuery(new String[] { "shardId", "parentShardId", "adjacentParentShardId" }, 0, "shards"); - result.put("shards", qShards); - int row; - do { - // describeStream - DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder().streamName(streamName).exclusiveStartShardId(exclusiveStartShardId).build(); - DescribeStreamResponse describeStreamResponse = client.describeStream(describeStreamRequest); - List listShards = describeStreamResponse.streamDescription().shards(); - - for (Shard s: listShards) { - row = qShards.addRow(); - qShards.setAtEL("shardId", row, s.shardId()); - qShards.setAtEL("parentShardId", row, s.parentShardId()); - qShards.setAtEL("adjacentParentShardId", row, s.adjacentParentShardId()); - } - - if (describeStreamResponse.streamDescription().hasMoreShards()) { - exclusiveStartShardId = describeStreamResponse.streamDescription().shards().get(describeStreamResponse.streamDescription().shards().size() - 1).shardId(); - } - else { - exclusiveStartShardId = null; - } + if (!Util.isEmpty(streamName, true)) { + return getStream(eng, client, streamName); } - while (exclusiveStartShardId != null); + else { + return getStreams(eng, client); - // describeLimits - try { - DescribeLimitsResponse limitsResponse = client.describeLimits(); - result.put("shardLimit", limitsResponse.shardLimit()); - } - catch (AccessDeniedException ade) { - result.put("shardLimit", ade.getMessage()); - // if (throwOnAccessDenied) throw ade; } - return result; } catch (Exception e) { throw CommonUtil.toPageException(e); } } + private static Struct getStreams(CFMLEngine eng, KinesisClient client) throws PageException { + Struct result = eng.getCreationUtil().createStruct(); + + ListStreamsRequest listStreamsRequest = ListStreamsRequest.builder().build(); + ListStreamsResponse listStreamsResponse = client.listStreams(listStreamsRequest); + + List streamNames = listStreamsResponse.streamNames(); + + for (String sn: streamNames) { + result.setEL(sn, getStream(eng, client, sn)); + } + return result; + } + + private static Struct getStream(CFMLEngine eng, KinesisClient client, String streamName) throws PageException { + Struct result = eng.getCreationUtil().createStruct(); + + // Describe the stream and print shard IDs + String exclusiveStartShardId = null; + Query qShards = eng.getCreationUtil().createQuery(new String[] { "shardId", "parentShardId", "adjacentParentShardId" }, 0, "shards"); + result.put("shards", qShards); + int row; + do { + // describeStream + DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder().streamName(streamName).exclusiveStartShardId(exclusiveStartShardId).build(); + DescribeStreamResponse describeStreamResponse = client.describeStream(describeStreamRequest); + List listShards = describeStreamResponse.streamDescription().shards(); + + for (Shard s: listShards) { + row = qShards.addRow(); + qShards.setAtEL("shardId", row, s.shardId()); + qShards.setAtEL("parentShardId", row, s.parentShardId()); + qShards.setAtEL("adjacentParentShardId", row, s.adjacentParentShardId()); + } + + if (describeStreamResponse.streamDescription().hasMoreShards()) { + exclusiveStartShardId = describeStreamResponse.streamDescription().shards().get(describeStreamResponse.streamDescription().shards().size() - 1).shardId(); + } + else { + exclusiveStartShardId = null; + } + } + while (exclusiveStartShardId != null); + + // describeLimits + try { + DescribeLimitsResponse limitsResponse = client.describeLimits(); + result.put("shardLimit", limitsResponse.shardLimit()); + } + catch (AccessDeniedException ade) { + result.put("shardLimit", ade.getMessage()); + // if (throwOnAccessDenied) throw ade; + } + return result; + } + @Override public Object invoke(PageContext pc, Object[] args) throws PageException { CFMLEngine engine = CFMLEngineFactory.getInstance(); Cast cast = engine.getCastUtil(); - if (args.length < 1 || args.length > 6) throw engine.getExceptionUtil().createFunctionException(pc, "KinesisInfo", 1, 6, args.length); + if (args.length > 6) throw engine.getExceptionUtil().createFunctionException(pc, "KinesisInfo", 0, 6, args.length); // streamName String streamName = cast.toString(args[0]);