diff --git a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java index ea76efe..38c5b05 100644 --- a/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/StreamWriteQuickStart.java @@ -67,8 +67,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc .addColumn("field2", SemanticType.Field, DataType.Float64) // .build(); - Table myMetric3Rows = Table.from(myMetric3Schema); - Table myMetric4Rows = Table.from(myMetric4Schema); + Table myMetric3 = Table.from(myMetric3Schema); + Table myMetric4 = Table.from(myMetric4Schema); for (int i = 0; i < 10; i++) { String tag1v = "tag_value_1_" + i; @@ -80,7 +80,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc BigDecimal field3 = new BigDecimal(i); int field4 = i + 1; - myMetric3Rows.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); + myMetric3.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); } for (int i = 0; i < 10; i++) { @@ -90,17 +90,17 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc Date field1 = Calendar.getInstance().getTime(); double field2 = i + 0.1; - myMetric4Rows.addRow(tag1v, tag2v, ts, field1, field2); + myMetric4.addRow(tag1v, tag2v, ts, field1, field2); } StreamWriter writer = greptimeDB.streamWriter(); // write data into stream - writer.write(myMetric3Rows); - writer.write(myMetric4Rows); + writer.write(myMetric3); + writer.write(myMetric4); // delete the first 5 rows - writer.write(myMetric3Rows.subRange(0, 5), WriteOp.Delete); + writer.write(myMetric3.subRange(0, 5), WriteOp.Delete); // complete the stream CompletableFuture future = writer.completed(); diff --git a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java index 7f03ffa..6b6d6d0 100644 --- a/ingester-example/src/main/java/io/greptime/WriteQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/WriteQuickStart.java @@ -72,8 +72,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc .addColumn("field2", SemanticType.Field, DataType.Float64) // .build(); - Table myMetric3Rows = Table.from(myMetric3Schema); - Table myMetric4Rows = Table.from(myMetric4Schema); + Table myMetric3 = Table.from(myMetric3Schema); + Table myMetric4 = Table.from(myMetric4Schema); for (int i = 0; i < 10; i++) { String tag1v = "tag_value_1_" + i; @@ -85,7 +85,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc BigDecimal field3 = new BigDecimal(i); int field4 = i + 1; - myMetric3Rows.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); + myMetric3.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4); } for (int i = 0; i < 10; i++) { @@ -95,15 +95,15 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc Date field1 = Calendar.getInstance().getTime(); double field2 = i + 0.1; - myMetric4Rows.addRow(tag1v, tag2v, ts, field1, field2); + myMetric4.addRow(tag1v, tag2v, ts, field1, field2); } - Collection rows = Arrays.asList(myMetric3Rows, myMetric4Rows); + Collection
tables = Arrays.asList(myMetric3, myMetric4); // For performance reasons, the SDK is designed to be purely asynchronous. // The return value is a future object. If you want to immediately obtain // the result, you can call `future.get()`. - CompletableFuture> future = greptimeDB.write(rows); + CompletableFuture> future = greptimeDB.write(tables); Result result = future.get(); @@ -113,8 +113,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc LOG.error("Failed to write: {}", result.getErr()); } - List
delete_pojos = Arrays.asList(myMetric3Rows.subRange(0, 5), myMetric4Rows.subRange(0, 5)); - Result deletes = greptimeDB.write(delete_pojos, WriteOp.Delete).get(); + List
delete_objs = Arrays.asList(myMetric3.subRange(0, 5), myMetric4.subRange(0, 5)); + Result deletes = greptimeDB.write(delete_objs, WriteOp.Delete).get(); if (deletes.isOk()) { LOG.info("Delete result: {}", result.getOk()); diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index fbfd718..550b1d3 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -152,8 +152,8 @@ public StreamWriter, WriteOk> streamWriterPOJOs(int maxPointsPerSecond, return new StreamWriter, WriteOk>() { @Override public StreamWriter, WriteOk> write(List val, WriteOp writeOp) { - Table rows = pojoMapper.toTableData(val); - delegate.write(rows, writeOp); + Table table = pojoMapper.toTableData(val); + delegate.write(table, writeOp); return this; } @@ -165,9 +165,9 @@ public CompletableFuture completed() { } @Override - public CompletableFuture> write(Collection
rows, WriteOp writeOp, Context ctx) { + public CompletableFuture> write(Collection
tables, WriteOp writeOp, Context ctx) { ensureInitialized(); - return this.writeClient.write(rows, writeOp, attachCtx(ctx)); + return this.writeClient.write(tables, writeOp, attachCtx(ctx)); } @Override diff --git a/ingester-protocol/src/main/java/io/greptime/Write.java b/ingester-protocol/src/main/java/io/greptime/Write.java index 3a8b12e..abe95a6 100644 --- a/ingester-protocol/src/main/java/io/greptime/Write.java +++ b/ingester-protocol/src/main/java/io/greptime/Write.java @@ -34,26 +34,26 @@ public interface Write { /** * @see #write(Collection, WriteOp, Context) */ - default CompletableFuture> write(Collection
rows) { - return write(rows, WriteOp.Insert, Context.newDefault()); + default CompletableFuture> write(Collection
tables) { + return write(tables, WriteOp.Insert, Context.newDefault()); } /** * @see #write(Collection, WriteOp, Context) */ - default CompletableFuture> write(Collection
rows, WriteOp writeOp) { - return write(rows, writeOp, Context.newDefault()); + default CompletableFuture> write(Collection
tables, WriteOp writeOp) { + return write(tables, writeOp, Context.newDefault()); } /** - * Write multi tables multi rows data to database. + * Write multi tables multi tables data to database. * - * @param rows rows with multi tables + * @param tables rows with multi tables * @param writeOp write operation(insert or delete) * @param ctx invoke context * @return write result */ - CompletableFuture> write(Collection
rows, WriteOp writeOp, Context ctx); + CompletableFuture> write(Collection
tables, WriteOp writeOp, Context ctx); /** * @see #streamWriter(int, Context) diff --git a/ingester-protocol/src/main/java/io/greptime/WriteClient.java b/ingester-protocol/src/main/java/io/greptime/WriteClient.java index 3b20cfe..7e7700b 100644 --- a/ingester-protocol/src/main/java/io/greptime/WriteClient.java +++ b/ingester-protocol/src/main/java/io/greptime/WriteClient.java @@ -37,7 +37,7 @@ import io.greptime.models.WriteOk; import io.greptime.models.Table; import io.greptime.models.TableHelper; -import io.greptime.models.WriteTable; +import io.greptime.models.WriteTables; import io.greptime.options.WriteOptions; import io.greptime.rpc.Context; import io.greptime.rpc.Observer; @@ -79,13 +79,13 @@ public void shutdownGracefully() { } @Override - public CompletableFuture> write(Collection
rows, WriteOp writeOp, Context ctx) { - Ensures.ensureNonNull(rows, "null `rows`"); - Ensures.ensure(!rows.isEmpty(), "empty `rows`"); + public CompletableFuture> write(Collection
tables, WriteOp writeOp, Context ctx) { + Ensures.ensureNonNull(tables, "null `tables`"); + Ensures.ensure(!tables.isEmpty(), "empty `tables`"); long startCall = Clock.defaultClock().getTick(); - - return this.writeLimiter.acquireAndDo(rows, () -> write0(rows, writeOp, ctx, 0).whenCompleteAsync((r, e) -> { + WriteTables writeTables = new WriteTables(tables, writeOp); + return this.writeLimiter.acquireAndDo(tables, () -> write0(writeTables, ctx, 0).whenCompleteAsync((r, e) -> { InnerMetricHelper.writeQps().mark(); if (r != null) { if (Util.isRwLogging()) { @@ -118,11 +118,11 @@ public StreamWriter streamWriter(int maxPointsPerSecond, Context .thenApply(reqObserver -> new RateLimitingStreamWriter(reqObserver, permitsPerSecond) { @Override - public StreamWriter write(Table rows, WriteOp writeOp) { + public StreamWriter write(Table table, WriteOp writeOp) { if (respFuture.isCompletedExceptionally()) { respFuture.getNow(null); // throw the exception now } - return super.write(rows, writeOp); // may wait + return super.write(table, writeOp); // may wait } @Override @@ -133,11 +133,11 @@ public CompletableFuture completed() { }).join(); } - private CompletableFuture> write0(Collection
rows, WriteOp writeOp, Context ctx, int retries) { + private CompletableFuture> write0(WriteTables writeTables, Context ctx, int retries) { InnerMetricHelper.writeByRetries(retries).mark(); return this.routerClient.route() - .thenComposeAsync(endpoint -> writeTo(endpoint, rows, writeOp, ctx, retries), this.asyncPool) + .thenComposeAsync(endpoint -> writeTo(endpoint, writeTables, ctx, retries), this.asyncPool) .thenComposeAsync(r -> { if (r.isOk()) { LOG.debug("Success to write to {}, ok={}.", Keys.DB_NAME, r.getOk()); @@ -155,14 +155,14 @@ private CompletableFuture> write0(Collection
rows, W return Util.completedCf(r); } - return write0(rows, writeOp, ctx, retries + 1); + return write0(writeTables, ctx, retries + 1); }, this.asyncPool); } - private CompletableFuture> writeTo(Endpoint endpoint, Collection
rows, WriteOp writeOp, Context ctx, int retries) { + private CompletableFuture> writeTo(Endpoint endpoint, WriteTables writeTables, Context ctx, int retries) { String database = this.opts.getDatabase(); AuthInfo authInfo = this.opts.getAuthInfo(); - Database.GreptimeRequest req = TableHelper.toGreptimeRequest(rows, writeOp, database, authInfo); + Database.GreptimeRequest req = TableHelper.toGreptimeRequest(writeTables, database, authInfo); ctx.with("retries", retries); CompletableFuture future = this.routerClient.invoke(endpoint, req, ctx); @@ -175,12 +175,12 @@ private CompletableFuture> writeTo(Endpoint endpoint, Colle int affectedRows = resp.getAffectedRows().getValue(); return WriteOk.ok(affectedRows, 0).mapToResult(); } else { - return Err.writeErr(statusCode, new ServerException(status.getErrMsg()), endpoint, rows).mapToResult(); + return Err.writeErr(statusCode, new ServerException(status.getErrMsg()), endpoint).mapToResult(); } }, this.asyncPool); } - private Observer streamWriteTo(Endpoint endpoint, Context ctx, Observer respObserver) { + private Observer streamWriteTo(Endpoint endpoint, Context ctx, Observer respObserver) { Observer rpcObserver = this.routerClient.invokeClientStreaming(endpoint, Database.GreptimeRequest.getDefaultInstance(), ctx, new Observer() { @@ -207,15 +207,13 @@ public void onCompleted() { } }); - return new Observer() { + return new Observer() { @Override - public void onNext(WriteTable writeTable) { - Table rows = writeTable.getRows(); - WriteOp writeOp = writeTable.getWriteOp(); + public void onNext(WriteTables writeTables) { String database = WriteClient.this.opts.getDatabase(); AuthInfo authInfo = WriteClient.this.opts.getAuthInfo(); - Database.GreptimeRequest req = TableHelper.toGreptimeRequest(rows, writeOp, database, authInfo); + Database.GreptimeRequest req = TableHelper.toGreptimeRequest(writeTables, database, authInfo); rpcObserver.onNext(req); } @@ -317,17 +315,17 @@ public Result rejected(Collection
in, RejectedState state) state.acquirePermits(), // state.maxPermits(), // state.availablePermits()); - return Result.err(Err.writeErr(Result.FLOW_CONTROL, new LimitedException(errMsg), null, in)); + return Result.err(Err.writeErr(Result.FLOW_CONTROL, new LimitedException(errMsg), null)); } } @SuppressWarnings("UnstableApiUsage") static abstract class RateLimitingStreamWriter implements StreamWriter { - private final Observer observer; + private final Observer observer; private final RateLimiter rateLimiter; - RateLimitingStreamWriter(Observer observer, double permitsPerSecond) { + RateLimitingStreamWriter(Observer observer, double permitsPerSecond) { this.observer = observer; if (permitsPerSecond > 0) { this.rateLimiter = RateLimiter.create(permitsPerSecond); @@ -337,14 +335,14 @@ static abstract class RateLimitingStreamWriter implements StreamWriter write(Table rows, WriteOp writeOp) { - Ensures.ensureNonNull(rows, "null `rows`"); + public StreamWriter write(Table table, WriteOp writeOp) { + Ensures.ensureNonNull(table, "null `table`"); if (this.rateLimiter != null) { - double timeSpent = this.rateLimiter.acquire(rows.pointCount()); + double timeSpent = this.rateLimiter.acquire(table.pointCount()); InnerMetricHelper.writeStreamLimiterTimeSpent().update((long) timeSpent); } - this.observer.onNext(new WriteTable(rows, writeOp)); + this.observer.onNext(new WriteTables(table, writeOp)); return this; } } diff --git a/ingester-protocol/src/main/java/io/greptime/models/Err.java b/ingester-protocol/src/main/java/io/greptime/models/Err.java index c4cdbfe..28edd7b 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Err.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Err.java @@ -30,10 +30,6 @@ public class Err { private Throwable error; // the server address where the error occurred private Endpoint errTo; - // the data of wrote failed, can be used to retry - private Collection
rowsFailed; - // the QL failed to query - private String failedQl; /** * Returns the error code. @@ -56,20 +52,6 @@ public Endpoint getErrTo() { return errTo; } - /** - * Returns the data of wrote failed, can be used to retry. - */ - public Collection
getRowsFailed() { - return rowsFailed; - } - - /** - * Returns the QL failed to query. - */ - public String getFailedQl() { - return failedQl; - } - /** * Returns a {@link Result} containing this error. */ @@ -83,7 +65,6 @@ public String toString() { "code=" + code + // ", error='" + error + '\'' + // ", errTo=" + errTo + // - ", failedQl=" + failedQl + // '}'; } @@ -93,33 +74,13 @@ public String toString() { * @param code the error code * @param error the error * @param errTo the server address where the error occurred - * @param rowsFailed the data of wrote failed, can be used to retry * @return a new {@link Err} for write error */ - public static Err writeErr(int code, Throwable error, Endpoint errTo, Collection
rowsFailed) { - Err err = new Err(); - err.code = code; - err.error = error; - err.errTo = errTo; - err.rowsFailed = rowsFailed; - return err; - } - - /** - * Creates a new {@link Err} for query error. - * - * @param code the error code - * @param error the error - * @param errTo the server address where the error occurred - * @param failedQl the QL failed to query - * @return a new {@link Err} for query error - */ - public static Err queryErr(int code, Throwable error, Endpoint errTo, String failedQl) { + public static Err writeErr(int code, Throwable error, Endpoint errTo) { Err err = new Err(); err.code = code; err.error = error; err.errTo = errTo; - err.failedQl = failedQl; return err; } } diff --git a/ingester-protocol/src/main/java/io/greptime/models/Table.java b/ingester-protocol/src/main/java/io/greptime/models/Table.java index 8f3681d..af1b34d 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Table.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Table.java @@ -109,18 +109,18 @@ public Table build() { Ensures.ensure(columnCount == dataTypes.size(), "Column names size not equal to data types size"); Ensures.ensure(columnCount == dataTypeExtensions.size(), "Column names size not equal to data type extensions size"); - return buildRow(tableName, columnCount, columnNames, semanticTypes, dataTypes, dataTypeExtensions); + return buildTable(tableName, columnCount, columnNames, semanticTypes, dataTypes, dataTypeExtensions); } - private static Table buildRow(String tableName, // - int columnCount, // - List columnNames, // - List semanticTypes, // - List dataTypes, // - List dataTypeExtensions) { - RowBasedTable rows = new RowBasedTable(); - rows.tableName = tableName; - rows.columnSchemas = new ArrayList<>(columnCount); + private static Table buildTable(String tableName, // + int columnCount, // + List columnNames, // + List semanticTypes, // + List dataTypes, // + List dataTypeExtensions) { + RowBasedTable table = new RowBasedTable(); + table.tableName = tableName; + table.columnSchemas = new ArrayList<>(columnCount); for (int i = 0; i < columnCount; i++) { RowData.ColumnSchema.Builder builder = RowData.ColumnSchema.newBuilder(); @@ -128,9 +128,9 @@ private static Table buildRow(String tableName, // .setSemanticType(semanticTypes.get(i)) // .setDatatype(dataTypes.get(i)) // .setDatatypeExtension(dataTypeExtensions.get(i)); - rows.columnSchemas.add(builder.build()); + table.columnSchemas.add(builder.build()); } - return rows; + return table; } } diff --git a/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java b/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java index 14f4295..df26a1e 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java +++ b/ingester-protocol/src/main/java/io/greptime/models/TableHelper.java @@ -26,17 +26,7 @@ */ public class TableHelper { - public static Database.GreptimeRequest toGreptimeRequest(Table rows, // - WriteOp writeOp, // - String database, // - AuthInfo authInfo) { - return toGreptimeRequest(Collections.singleton(rows), writeOp, database, authInfo); - } - - public static Database.GreptimeRequest toGreptimeRequest(Collection
rows, // - WriteOp writeOp, // - String database, // - AuthInfo authInfo) { + public static Database.GreptimeRequest toGreptimeRequest(WriteTables writeTables, String database, AuthInfo authInfo) { Common.RequestHeader.Builder headerBuilder = Common.RequestHeader.newBuilder(); if (database != null) { headerBuilder.setDbname(database); @@ -45,11 +35,14 @@ public static Database.GreptimeRequest toGreptimeRequest(Collection
rows, headerBuilder.setAuthorization(authInfo.into()); } + Collection
tables = writeTables.getTables(); + WriteOp writeOp = writeTables.getWriteOp(); + switch (writeOp) { case Insert: Database.RowInsertRequests.Builder insertBuilder = Database.RowInsertRequests.newBuilder(); - for (Table r : rows) { - insertBuilder.addInserts(r.intoRowInsertRequest()); + for (Table t : tables) { + insertBuilder.addInserts(t.intoRowInsertRequest()); } return Database.GreptimeRequest.newBuilder() // .setHeader(headerBuilder.build()) // @@ -57,8 +50,8 @@ public static Database.GreptimeRequest toGreptimeRequest(Collection
rows, .build(); case Delete: Database.RowDeleteRequests.Builder deleteBuilder = Database.RowDeleteRequests.newBuilder(); - for (Table r : rows) { - deleteBuilder.addDeletes(r.intoRowDeleteRequest()); + for (Table t : tables) { + deleteBuilder.addDeletes(t.intoRowDeleteRequest()); } return Database.GreptimeRequest.newBuilder() // .setHeader(headerBuilder.build()) // diff --git a/ingester-protocol/src/main/java/io/greptime/models/WriteTable.java b/ingester-protocol/src/main/java/io/greptime/models/WriteTables.java similarity index 67% rename from ingester-protocol/src/main/java/io/greptime/models/WriteTable.java rename to ingester-protocol/src/main/java/io/greptime/models/WriteTables.java index 4a0eb3a..9cae922 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/WriteTable.java +++ b/ingester-protocol/src/main/java/io/greptime/models/WriteTables.java @@ -16,21 +16,27 @@ package io.greptime.models; import io.greptime.WriteOp; +import java.util.Collection; +import java.util.Collections; /** * @author jiachun.fjc */ -public class WriteTable { - private final Table rows; +public class WriteTables { + private final Collection
tables; private final WriteOp writeOp; - public WriteTable(Table rows, WriteOp writeOp) { - this.rows = rows; + public WriteTables(Table table, WriteOp writeOp) { + this(Collections.singleton(table), writeOp); + } + + public WriteTables(Collection
tables, WriteOp writeOp) { + this.tables = tables; this.writeOp = writeOp; } - public Table getRows() { - return rows; + public Collection
getTables() { + return tables; } public WriteOp getWriteOp() { diff --git a/ingester-protocol/src/test/java/io/greptime/TestUtil.java b/ingester-protocol/src/test/java/io/greptime/TestUtil.java index b60e918..174f848 100644 --- a/ingester-protocol/src/test/java/io/greptime/TestUtil.java +++ b/ingester-protocol/src/test/java/io/greptime/TestUtil.java @@ -34,10 +34,10 @@ public static Collection
testTable(String tableName, int rowCount) { .addColumn("cpu", SemanticType.Field, DataType.Float64) // .build(); - Table rows = Table.from(tableSchema); + Table table = Table.from(tableSchema); for (int i = 0; i < rowCount; i++) { - rows.addRow("127.0.0.1", System.currentTimeMillis(), i); + table.addRow("127.0.0.1", System.currentTimeMillis(), i); } - return Collections.singleton(rows); + return Collections.singleton(table); } } diff --git a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java index e0beaff..3f4b3da 100644 --- a/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java +++ b/ingester-protocol/src/test/java/io/greptime/WriteClientTest.java @@ -88,12 +88,12 @@ public void testWriteSuccess() throws ExecutionException, InterruptedException { .addColumn("field16", Field, DataType.TimestampMillisecond) // .addColumn("field17", Field, DataType.TimestampNanosecond) // .build(); - Table rows = Table.from(schema); + Table table = Table.from(schema); long ts = System.currentTimeMillis(); - rows.addRow("tag1", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 11, 12L, 13L, 14L, 15L); - rows.addRow("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 0.9, 0.10, false, new byte[0], 11, 12, 13, 14, 15); - rows.addRow("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, false, new byte[] {0, 1}, 11, 12, 13, 14, 15); + table.addRow("tag1", ts, 1, 2, 3, 4L, 5, 6, 7, 8L, 0.9F, 0.10D, true, new byte[0], 11, 12L, 13L, 14L, 15L); + table.addRow("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 0.9, 0.10, false, new byte[0], 11, 12, 13, 14, 15); + table.addRow("tag1", ts, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, false, new byte[] {0, 1}, 11, 12, 13, 14, 15); Endpoint addr = Endpoint.parse("127.0.0.1:8081"); Database.GreptimeResponse response = Database.GreptimeResponse.newBuilder() // @@ -105,7 +105,7 @@ public void testWriteSuccess() throws ExecutionException, InterruptedException { Mockito.when(this.routerClient.invoke(Mockito.eq(addr), Mockito.any(), Mockito.any())) // .thenReturn(Util.completedCf(response)); - Result res = this.writeClient.write(Collections.singleton(rows)).get(); + Result res = this.writeClient.write(Collections.singleton(table)).get(); Assert.assertTrue(res.isOk()); Assert.assertEquals(3, res.getOk().getSuccess()); diff --git a/ingester-protocol/src/test/java/io/greptime/models/ErrTest.java b/ingester-protocol/src/test/java/io/greptime/models/ErrTest.java index f3edbbf..93d108d 100644 --- a/ingester-protocol/src/test/java/io/greptime/models/ErrTest.java +++ b/ingester-protocol/src/test/java/io/greptime/models/ErrTest.java @@ -26,22 +26,10 @@ public class ErrTest { @Test public void testWriteErr() { - Err err = Err.writeErr(300, new IllegalStateException("test_err"), Endpoint.of("127.0.0.1", 8081), null); + Err err = Err.writeErr(300, new IllegalStateException("test_err"), Endpoint.of("127.0.0.1", 8081)); Assert.assertFalse(err.mapToResult().isOk()); Assert.assertEquals("test_err", err.getError().getMessage()); Assert.assertEquals("127.0.0.1:8081", err.getErrTo().toString()); - Assert.assertNull(err.getRowsFailed()); - } - - @Test - public void testQueryErr() { - IllegalStateException error = new IllegalStateException("test_err"); - Err err = Err.queryErr(300, error, Endpoint.of("127.0.0.1", 8081), "select * from test"); - - Assert.assertFalse(err.mapToResult().isOk()); - Assert.assertEquals("test_err", err.getError().getMessage()); - Assert.assertEquals("127.0.0.1:8081", err.getErrTo().toString()); - Assert.assertEquals(err.getFailedQl(), "select * from test"); } } diff --git a/ingester-protocol/src/test/java/io/greptime/models/ResultTest.java b/ingester-protocol/src/test/java/io/greptime/models/ResultTest.java index 8e39eca..8021984 100644 --- a/ingester-protocol/src/test/java/io/greptime/models/ResultTest.java +++ b/ingester-protocol/src/test/java/io/greptime/models/ResultTest.java @@ -29,7 +29,7 @@ public void testMap() { final Result r2 = r1.map(WriteOk::getSuccess); Assert.assertEquals(2, r2.getOk().intValue()); - final Result r5 = Result.err(Err.writeErr(400, null, null, null)); + final Result r5 = Result.err(Err.writeErr(400, null, null)); final Result r6 = r5.map(WriteOk::getSuccess); Assert.assertFalse(r6.isOk()); } @@ -40,7 +40,7 @@ public void testMapOr() { final Integer r2 = r1.mapOr(-1, WriteOk::getSuccess); Assert.assertEquals(2, r2.intValue()); - final Result r3 = Result.err(Err.writeErr(400, null, null, null)); + final Result r3 = Result.err(Err.writeErr(400, null, null)); final Integer r4 = r3.mapOr(-1, WriteOk::getSuccess); Assert.assertEquals(-1, r4.intValue()); } @@ -51,7 +51,7 @@ public void testMapOrElse() { final Integer r2 = r1.mapOrElse(err -> -1, WriteOk::getSuccess); Assert.assertEquals(2, r2.intValue()); - final Result r3 = Result.err(Err.writeErr(400, null, null, null)); + final Result r3 = Result.err(Err.writeErr(400, null, null)); final Integer r4 = r3.mapOrElse(err -> -1, WriteOk::getSuccess); Assert.assertEquals(-1, r4.intValue()); } @@ -63,7 +63,7 @@ public void testMapErr() { Assert.assertEquals(2, r2.getOk().getSuccess()); IllegalStateException error = new IllegalStateException("error test"); - final Result r3 = Result.err(Err.writeErr(400, error, null, null)); + final Result r3 = Result.err(Err.writeErr(400, error, null)); final Result r4 = r3.mapErr(Err::getError); Assert.assertEquals("error test", r4.getErr().getMessage()); } @@ -77,7 +77,7 @@ public void testAndThen() { }); Assert.assertEquals(3, r2.getOk().getSuccess()); - final Result r3 = Result.err(Err.writeErr(400, null, null, null)); + final Result r3 = Result.err(Err.writeErr(400, null, null)); final Result r4 = r3.andThen(writeOk -> { WriteOk newOne = WriteOk.ok(writeOk.getSuccess() + 1, 0); return newOne.mapToResult(); @@ -91,7 +91,7 @@ public void testOrElse() { final Result r2 = r1.orElse(err -> Result.ok(WriteOk.ok(0, 0))); Assert.assertEquals(2, r2.getOk().getSuccess()); - final Result r3 = Result.err(Err.writeErr(400, null, null, null)); + final Result r3 = Result.err(Err.writeErr(400, null, null)); final Result r4 = r3.orElse(err -> Result.ok(WriteOk.ok(0, 0))); Assert.assertEquals(0, r4.getOk().getSuccess()); } @@ -102,7 +102,7 @@ public void testUnwrapOr() { final WriteOk r2 = r1.unwrapOr(WriteOk.emptyOk()); Assert.assertEquals(2, r2.getSuccess()); - final Result r3 = Result.err(Err.writeErr(400, null, null, null)); + final Result r3 = Result.err(Err.writeErr(400, null, null)); final WriteOk r4 = r3.unwrapOr(WriteOk.emptyOk()); Assert.assertEquals(0, r4.getSuccess()); } @@ -113,7 +113,7 @@ public void testUnwrapOrElse() { final WriteOk r2 = r1.unwrapOrElse(err -> WriteOk.emptyOk()); Assert.assertEquals(2, r2.getSuccess()); - final Result r3 = Result.err(Err.writeErr(400, null, null, null)); + final Result r3 = Result.err(Err.writeErr(400, null, null)); final WriteOk r4 = r3.unwrapOrElse(err -> WriteOk.emptyOk()); Assert.assertEquals(0, r4.getSuccess()); } diff --git a/ingester-protocol/src/test/java/io/greptime/models/TableTest.java b/ingester-protocol/src/test/java/io/greptime/models/TableTest.java index ed0a6d4..1d8cc11 100644 --- a/ingester-protocol/src/test/java/io/greptime/models/TableTest.java +++ b/ingester-protocol/src/test/java/io/greptime/models/TableTest.java @@ -32,13 +32,13 @@ public void testTableNonNull() { .addColumn("col3", SemanticType.Field, DataType.Int32) // .build(); - Table.RowBasedTable rows = (Table.RowBasedTable) Table.from(schema); - rows.addRow("1", "11", 111) // + Table.RowBasedTable table = (Table.RowBasedTable) Table.from(schema); + table.addRow("1", "11", 111) // .addRow("2", "22", 222) // .addRow("3", "33", 333); - Assert.assertEquals(3, rows.rowCount()); - RowData.Rows rawRows = rows.into(); + Assert.assertEquals(3, table.rowCount()); + RowData.Rows rawRows = table.into(); Assert.assertEquals(111, rawRows.getRows(0).getValues(2).getI32Value()); Assert.assertEquals(222, rawRows.getRows(1).getValues(2).getI32Value()); Assert.assertEquals(333, rawRows.getRows(2).getValues(2).getI32Value()); @@ -52,13 +52,13 @@ public void testTableSomeNull() { .addColumn("col3", SemanticType.Field, DataType.Int32) // .build(); - Table.RowBasedTable rows = (Table.RowBasedTable) Table.from(schema); - rows.addRow("1", "11", 111) // + Table.RowBasedTable table = (Table.RowBasedTable) Table.from(schema); + table.addRow("1", "11", 111) // .addRow("2", null, 222) // .addRow("3", "33", null); - Assert.assertEquals(3, rows.rowCount()); - RowData.Rows rawRows = rows.into(); + Assert.assertEquals(3, table.rowCount()); + RowData.Rows rawRows = table.into(); Assert.assertEquals(111, rawRows.getRows(0).getValues(2).getI32Value()); Assert.assertEquals(222, rawRows.getRows(1).getValues(2).getI32Value()); Assert.assertFalse(rawRows.getRows(2).getValues(2).hasI32Value());