Skip to content

Commit

Permalink
chore: minor refactor (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun authored Dec 26, 2023
1 parent ad75b23 commit c0c1d0d
Show file tree
Hide file tree
Showing 14 changed files with 109 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
.addColumn("field2", SemanticType.Field, DataType.Float64) //
.build();

Table myMetric3Rows = Table.from(myMetric3Schema);
Table myMetric4Rows = Table.from(myMetric4Schema);
Table myMetric3 = Table.from(myMetric3Schema);
Table myMetric4 = Table.from(myMetric4Schema);

for (int i = 0; i < 10; i++) {
String tag1v = "tag_value_1_" + i;
Expand All @@ -80,7 +80,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
BigDecimal field3 = new BigDecimal(i);
int field4 = i + 1;

myMetric3Rows.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4);
myMetric3.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4);
}

for (int i = 0; i < 10; i++) {
Expand All @@ -90,17 +90,17 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
Date field1 = Calendar.getInstance().getTime();
double field2 = i + 0.1;

myMetric4Rows.addRow(tag1v, tag2v, ts, field1, field2);
myMetric4.addRow(tag1v, tag2v, ts, field1, field2);
}

StreamWriter<Table, WriteOk> writer = greptimeDB.streamWriter();

// write data into stream
writer.write(myMetric3Rows);
writer.write(myMetric4Rows);
writer.write(myMetric3);
writer.write(myMetric4);

// delete the first 5 rows
writer.write(myMetric3Rows.subRange(0, 5), WriteOp.Delete);
writer.write(myMetric3.subRange(0, 5), WriteOp.Delete);

// complete the stream
CompletableFuture<WriteOk> future = writer.completed();
Expand Down
16 changes: 8 additions & 8 deletions ingester-example/src/main/java/io/greptime/WriteQuickStart.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
.addColumn("field2", SemanticType.Field, DataType.Float64) //
.build();

Table myMetric3Rows = Table.from(myMetric3Schema);
Table myMetric4Rows = Table.from(myMetric4Schema);
Table myMetric3 = Table.from(myMetric3Schema);
Table myMetric4 = Table.from(myMetric4Schema);

for (int i = 0; i < 10; i++) {
String tag1v = "tag_value_1_" + i;
Expand All @@ -85,7 +85,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
BigDecimal field3 = new BigDecimal(i);
int field4 = i + 1;

myMetric3Rows.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4);
myMetric3.addRow(tag1v, tag2v, tag3v, ts, field1, field2, field3, field4);
}

for (int i = 0; i < 10; i++) {
Expand All @@ -95,15 +95,15 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
Date field1 = Calendar.getInstance().getTime();
double field2 = i + 0.1;

myMetric4Rows.addRow(tag1v, tag2v, ts, field1, field2);
myMetric4.addRow(tag1v, tag2v, ts, field1, field2);
}

Collection<Table> rows = Arrays.asList(myMetric3Rows, myMetric4Rows);
Collection<Table> tables = Arrays.asList(myMetric3, myMetric4);

// For performance reasons, the SDK is designed to be purely asynchronous.
// The return value is a future object. If you want to immediately obtain
// the result, you can call `future.get()`.
CompletableFuture<Result<WriteOk, Err>> future = greptimeDB.write(rows);
CompletableFuture<Result<WriteOk, Err>> future = greptimeDB.write(tables);

Result<WriteOk, Err> result = future.get();

Expand All @@ -113,8 +113,8 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
LOG.error("Failed to write: {}", result.getErr());
}

List<Table> delete_pojos = Arrays.asList(myMetric3Rows.subRange(0, 5), myMetric4Rows.subRange(0, 5));
Result<WriteOk, Err> deletes = greptimeDB.write(delete_pojos, WriteOp.Delete).get();
List<Table> delete_objs = Arrays.asList(myMetric3.subRange(0, 5), myMetric4.subRange(0, 5));
Result<WriteOk, Err> deletes = greptimeDB.write(delete_objs, WriteOp.Delete).get();

if (deletes.isOk()) {
LOG.info("Delete result: {}", result.getOk());
Expand Down
8 changes: 4 additions & 4 deletions ingester-protocol/src/main/java/io/greptime/GreptimeDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ public StreamWriter<List<?>, WriteOk> streamWriterPOJOs(int maxPointsPerSecond,
return new StreamWriter<List<?>, WriteOk>() {
@Override
public StreamWriter<List<?>, WriteOk> write(List<?> val, WriteOp writeOp) {
Table rows = pojoMapper.toTableData(val);
delegate.write(rows, writeOp);
Table table = pojoMapper.toTableData(val);
delegate.write(table, writeOp);
return this;
}

Expand All @@ -165,9 +165,9 @@ public CompletableFuture<WriteOk> completed() {
}

@Override
public CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows, WriteOp writeOp, Context ctx) {
public CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> tables, WriteOp writeOp, Context ctx) {
ensureInitialized();
return this.writeClient.write(rows, writeOp, attachCtx(ctx));
return this.writeClient.write(tables, writeOp, attachCtx(ctx));
}

@Override
Expand Down
14 changes: 7 additions & 7 deletions ingester-protocol/src/main/java/io/greptime/Write.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,26 @@ public interface Write {
/**
* @see #write(Collection, WriteOp, Context)
*/
default CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows) {
return write(rows, WriteOp.Insert, Context.newDefault());
default CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> tables) {
return write(tables, WriteOp.Insert, Context.newDefault());
}

/**
* @see #write(Collection, WriteOp, Context)
*/
default CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows, WriteOp writeOp) {
return write(rows, writeOp, Context.newDefault());
default CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> tables, WriteOp writeOp) {
return write(tables, writeOp, Context.newDefault());
}

/**
* Write multi tables multi rows data to database.
* Write multi tables multi tables data to database.
*
* @param rows rows with multi tables
* @param tables rows with multi tables
* @param writeOp write operation(insert or delete)
* @param ctx invoke context
* @return write result
*/
CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows, WriteOp writeOp, Context ctx);
CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> tables, WriteOp writeOp, Context ctx);

/**
* @see #streamWriter(int, Context)
Expand Down
52 changes: 25 additions & 27 deletions ingester-protocol/src/main/java/io/greptime/WriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import io.greptime.models.WriteOk;
import io.greptime.models.Table;
import io.greptime.models.TableHelper;
import io.greptime.models.WriteTable;
import io.greptime.models.WriteTables;
import io.greptime.options.WriteOptions;
import io.greptime.rpc.Context;
import io.greptime.rpc.Observer;
Expand Down Expand Up @@ -79,13 +79,13 @@ public void shutdownGracefully() {
}

@Override
public CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> rows, WriteOp writeOp, Context ctx) {
Ensures.ensureNonNull(rows, "null `rows`");
Ensures.ensure(!rows.isEmpty(), "empty `rows`");
public CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> tables, WriteOp writeOp, Context ctx) {
Ensures.ensureNonNull(tables, "null `tables`");
Ensures.ensure(!tables.isEmpty(), "empty `tables`");

long startCall = Clock.defaultClock().getTick();

return this.writeLimiter.acquireAndDo(rows, () -> write0(rows, writeOp, ctx, 0).whenCompleteAsync((r, e) -> {
WriteTables writeTables = new WriteTables(tables, writeOp);
return this.writeLimiter.acquireAndDo(tables, () -> write0(writeTables, ctx, 0).whenCompleteAsync((r, e) -> {
InnerMetricHelper.writeQps().mark();
if (r != null) {
if (Util.isRwLogging()) {
Expand Down Expand Up @@ -118,11 +118,11 @@ public StreamWriter<Table, WriteOk> streamWriter(int maxPointsPerSecond, Context
.thenApply(reqObserver -> new RateLimitingStreamWriter(reqObserver, permitsPerSecond) {

@Override
public StreamWriter<Table, WriteOk> write(Table rows, WriteOp writeOp) {
public StreamWriter<Table, WriteOk> write(Table table, WriteOp writeOp) {
if (respFuture.isCompletedExceptionally()) {
respFuture.getNow(null); // throw the exception now
}
return super.write(rows, writeOp); // may wait
return super.write(table, writeOp); // may wait
}

@Override
Expand All @@ -133,11 +133,11 @@ public CompletableFuture<WriteOk> completed() {
}).join();
}

private CompletableFuture<Result<WriteOk, Err>> write0(Collection<Table> rows, WriteOp writeOp, Context ctx, int retries) {
private CompletableFuture<Result<WriteOk, Err>> write0(WriteTables writeTables, Context ctx, int retries) {
InnerMetricHelper.writeByRetries(retries).mark();

return this.routerClient.route()
.thenComposeAsync(endpoint -> writeTo(endpoint, rows, writeOp, ctx, retries), this.asyncPool)
.thenComposeAsync(endpoint -> writeTo(endpoint, writeTables, ctx, retries), this.asyncPool)
.thenComposeAsync(r -> {
if (r.isOk()) {
LOG.debug("Success to write to {}, ok={}.", Keys.DB_NAME, r.getOk());
Expand All @@ -155,14 +155,14 @@ private CompletableFuture<Result<WriteOk, Err>> write0(Collection<Table> rows, W
return Util.completedCf(r);
}

return write0(rows, writeOp, ctx, retries + 1);
return write0(writeTables, ctx, retries + 1);
}, this.asyncPool);
}

private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, Collection<Table> rows, WriteOp writeOp, Context ctx, int retries) {
private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, WriteTables writeTables, Context ctx, int retries) {
String database = this.opts.getDatabase();
AuthInfo authInfo = this.opts.getAuthInfo();
Database.GreptimeRequest req = TableHelper.toGreptimeRequest(rows, writeOp, database, authInfo);
Database.GreptimeRequest req = TableHelper.toGreptimeRequest(writeTables, database, authInfo);
ctx.with("retries", retries);

CompletableFuture<Database.GreptimeResponse> future = this.routerClient.invoke(endpoint, req, ctx);
Expand All @@ -175,12 +175,12 @@ private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, Colle
int affectedRows = resp.getAffectedRows().getValue();
return WriteOk.ok(affectedRows, 0).mapToResult();
} else {
return Err.writeErr(statusCode, new ServerException(status.getErrMsg()), endpoint, rows).mapToResult();
return Err.writeErr(statusCode, new ServerException(status.getErrMsg()), endpoint).mapToResult();
}
}, this.asyncPool);
}

private Observer<WriteTable> streamWriteTo(Endpoint endpoint, Context ctx, Observer<WriteOk> respObserver) {
private Observer<WriteTables> streamWriteTo(Endpoint endpoint, Context ctx, Observer<WriteOk> respObserver) {
Observer<Database.GreptimeRequest> rpcObserver =
this.routerClient.invokeClientStreaming(endpoint, Database.GreptimeRequest.getDefaultInstance(), ctx,
new Observer<Database.GreptimeResponse>() {
Expand All @@ -207,15 +207,13 @@ public void onCompleted() {
}
});

return new Observer<WriteTable>() {
return new Observer<WriteTables>() {

@Override
public void onNext(WriteTable writeTable) {
Table rows = writeTable.getRows();
WriteOp writeOp = writeTable.getWriteOp();
public void onNext(WriteTables writeTables) {
String database = WriteClient.this.opts.getDatabase();
AuthInfo authInfo = WriteClient.this.opts.getAuthInfo();
Database.GreptimeRequest req = TableHelper.toGreptimeRequest(rows, writeOp, database, authInfo);
Database.GreptimeRequest req = TableHelper.toGreptimeRequest(writeTables, database, authInfo);
rpcObserver.onNext(req);
}

Expand Down Expand Up @@ -317,17 +315,17 @@ public Result<WriteOk, Err> rejected(Collection<Table> in, RejectedState state)
state.acquirePermits(), //
state.maxPermits(), //
state.availablePermits());
return Result.err(Err.writeErr(Result.FLOW_CONTROL, new LimitedException(errMsg), null, in));
return Result.err(Err.writeErr(Result.FLOW_CONTROL, new LimitedException(errMsg), null));
}
}

@SuppressWarnings("UnstableApiUsage")
static abstract class RateLimitingStreamWriter implements StreamWriter<Table, WriteOk> {

private final Observer<WriteTable> observer;
private final Observer<WriteTables> observer;
private final RateLimiter rateLimiter;

RateLimitingStreamWriter(Observer<WriteTable> observer, double permitsPerSecond) {
RateLimitingStreamWriter(Observer<WriteTables> observer, double permitsPerSecond) {
this.observer = observer;
if (permitsPerSecond > 0) {
this.rateLimiter = RateLimiter.create(permitsPerSecond);
Expand All @@ -337,14 +335,14 @@ static abstract class RateLimitingStreamWriter implements StreamWriter<Table, Wr
}

@Override
public StreamWriter<Table, WriteOk> write(Table rows, WriteOp writeOp) {
Ensures.ensureNonNull(rows, "null `rows`");
public StreamWriter<Table, WriteOk> write(Table table, WriteOp writeOp) {
Ensures.ensureNonNull(table, "null `table`");

if (this.rateLimiter != null) {
double timeSpent = this.rateLimiter.acquire(rows.pointCount());
double timeSpent = this.rateLimiter.acquire(table.pointCount());
InnerMetricHelper.writeStreamLimiterTimeSpent().update((long) timeSpent);
}
this.observer.onNext(new WriteTable(rows, writeOp));
this.observer.onNext(new WriteTables(table, writeOp));
return this;
}
}
Expand Down
41 changes: 1 addition & 40 deletions ingester-protocol/src/main/java/io/greptime/models/Err.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ public class Err {
private Throwable error;
// the server address where the error occurred
private Endpoint errTo;
// the data of wrote failed, can be used to retry
private Collection<Table> rowsFailed;
// the QL failed to query
private String failedQl;

/**
* Returns the error code.
Expand All @@ -56,20 +52,6 @@ public Endpoint getErrTo() {
return errTo;
}

/**
* Returns the data of wrote failed, can be used to retry.
*/
public Collection<Table> getRowsFailed() {
return rowsFailed;
}

/**
* Returns the QL failed to query.
*/
public String getFailedQl() {
return failedQl;
}

/**
* Returns a {@link Result} containing this error.
*/
Expand All @@ -83,7 +65,6 @@ public String toString() {
"code=" + code + //
", error='" + error + '\'' + //
", errTo=" + errTo + //
", failedQl=" + failedQl + //
'}';
}

Expand All @@ -93,33 +74,13 @@ public String toString() {
* @param code the error code
* @param error the error
* @param errTo the server address where the error occurred
* @param rowsFailed the data of wrote failed, can be used to retry
* @return a new {@link Err} for write error
*/
public static Err writeErr(int code, Throwable error, Endpoint errTo, Collection<Table> rowsFailed) {
Err err = new Err();
err.code = code;
err.error = error;
err.errTo = errTo;
err.rowsFailed = rowsFailed;
return err;
}

/**
* Creates a new {@link Err} for query error.
*
* @param code the error code
* @param error the error
* @param errTo the server address where the error occurred
* @param failedQl the QL failed to query
* @return a new {@link Err} for query error
*/
public static Err queryErr(int code, Throwable error, Endpoint errTo, String failedQl) {
public static Err writeErr(int code, Throwable error, Endpoint errTo) {
Err err = new Err();
err.code = code;
err.error = error;
err.errTo = errTo;
err.failedQl = failedQl;
return err;
}
}
Loading

0 comments on commit c0c1d0d

Please sign in to comment.