Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomic increment/add #265

Open
wants to merge 2 commits into
base: next
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ tsdb_SRC := \
src/tsd/LogsRpc.java \
src/tsd/PipelineFactory.java \
src/tsd/PutDataPointRpc.java \
src/tsd/IncDataPointRpc.java \
src/tsd/RpcHandler.java \
src/tsd/StaticFileRpc.java \
src/tsd/TelnetRpc.java \
Expand Down
87 changes: 78 additions & 9 deletions src/core/TSDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
import java.util.List;
import java.util.Map;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import com.stumbleupon.async.DeferredGroupException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.opentsdb.stats.Histogram;
import net.opentsdb.stats.StatsCollector;
import net.opentsdb.uid.UniqueId;

import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.Bytes;
import org.hbase.async.ClientStats;
import org.hbase.async.DeleteRequest;
Expand All @@ -32,10 +30,12 @@
import org.hbase.async.HBaseException;
import org.hbase.async.KeyValue;
import org.hbase.async.PutRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.opentsdb.uid.UniqueId;
import net.opentsdb.stats.Histogram;
import net.opentsdb.stats.StatsCollector;
import com.stumbleupon.async.Callback;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid re-arranging imports if possible. Just clutters PRs.

import com.stumbleupon.async.Deferred;
import com.stumbleupon.async.DeferredGroupException;

/**
* Thread-safe implementation of the TSDB client.
Expand Down Expand Up @@ -351,6 +351,75 @@ private Deferred<Object> addPointInternal(final String metric,
return client.put(point);
}


/**
* Increments a single integer value data point in the TSDB.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need TSDB style formatting

*
* @param metric
* A non-empty string.
* @param timestamp
* The timestamp associated with the value.
* @param value
* The value of the data point.
* @param tags
* The tags on this series. This map must be non-empty.
* @return A deferred object that indicates the completion of the request.
* The {@link Long} has not special meaning and can be {@code null}
* (think of it as {@code Deferred<Void>}). But you probably want to
* attach at least an errback to this {@code Deferred} to handle
* failures.
* @throws IllegalArgumentException
* if the timestamp is less than or equal to the previous
* timestamp added or 0 for the first timestamp, or if the
* difference with the previous timestamp is too large.
* @throws IllegalArgumentException
* if the metric name is empty or contains illegal characters.
* @throws IllegalArgumentException
* if the tags list is empty or one of the elements contains
* illegal characters.
* @throws HBaseException
* (deferred) if there was a problem while persisting data.
*/
public Deferred<Long> incPoint(final String metric, final long timestamp,
final long value, final Map<String, String> tags) {
final short flags = 0x7; // An int stored on 8 bytes.
return incPointInternal(metric, timestamp, value, tags, flags);
}

private Deferred<Long> incPointInternal(final String metric,
final long timestamp, final long value,
final Map<String, String> tags, final short flags) {
if ((timestamp & 0xFFFFFFFF00000000L) != 0) {
// => timestamp < 0 || timestamp > Integer.MAX_VALUE
throw new IllegalArgumentException((timestamp < 0 ? "negative "
: "bad")
+ " timestamp="
+ timestamp
+ " when trying to add value="
+ value
+ '/'
+ flags
+ " to metric=" + metric + ", tags=" + tags);
}

IncomingDataPoints.checkMetricAndTags(metric, tags);
final byte[] row = IncomingDataPoints
.rowKeyTemplate(this, metric, tags);
final long base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN));
Bytes.setInt(row, (int) base_time, metrics.width());

//I guess that these points should never be scheduled for compaction.
//scheduleForCompaction(row, (int) base_time);

final short qualifier = (short) ((timestamp - base_time) << Const.FLAG_BITS | flags);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to support qualifier prefixes and milliseconds.

final AtomicIncrementRequest point = new AtomicIncrementRequest(table,
row, FAMILY, Bytes.fromShort(qualifier), value);
// TODO(tsuna): Add a callback to time the latency of HBase and store
// the
// timing in a moving Histogram (once we have a class for this).
return client.atomicIncrement(point, true);
}

/**
* Forces a flush of any un-committed in memory data.
* <p>
Expand Down
135 changes: 135 additions & 0 deletions src/tsd/IncDataPointRpc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package net.opentsdb.tsd;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Styling and would need to support HTTP writes


import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;

import net.opentsdb.core.TSDB;
import net.opentsdb.core.Tags;
import net.opentsdb.stats.StatsCollector;
import net.opentsdb.uid.NoSuchUniqueName;

import org.jboss.netty.channel.Channel;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;

/**
* Increment/add datapoint rpc.
*
*/
public class IncDataPointRpc implements TelnetRpc {

private static final AtomicLong requests = new AtomicLong();
private static final AtomicLong hbase_errors = new AtomicLong();
private static final AtomicLong invalid_values = new AtomicLong();
private static final AtomicLong illegal_arguments = new AtomicLong();
private static final AtomicLong unknown_metrics = new AtomicLong();

@Override
public Deferred<? extends Object> execute(final TSDB tsdb, final Channel chan,
final String[] cmd) {

requests.incrementAndGet();
String errmsg = null;
try {
final class PutErrback implements Callback<Exception, Exception> {
public Exception call(final Exception arg) {
if (chan.isConnected()) {
chan.write("inc: HBase error: " + arg.getMessage()
+ '\n');
}
hbase_errors.incrementAndGet();
return arg;
}

public String toString() {
return "report error to channel";
}
}
return importDataPoint(tsdb, cmd).addErrback(new PutErrback());
} catch (NumberFormatException x) {
errmsg = "inc: invalid value: " + x.getMessage() + '\n';
invalid_values.incrementAndGet();
} catch (IllegalArgumentException x) {
errmsg = "inc: illegal argument: " + x.getMessage() + '\n';
illegal_arguments.incrementAndGet();
} catch (NoSuchUniqueName x) {
errmsg = "inc: unknown metric: " + x.getMessage() + '\n';
unknown_metrics.incrementAndGet();
}
if (errmsg != null && chan.isConnected()) {
chan.write(errmsg);
}
return Deferred.fromResult(null);

}

/**
* Collects the stats and metrics tracked by this instance.
*
* @param collector
* The collector to use.
*/
public static void collectStats(final StatsCollector collector) {
collector.record("rpc.received", requests, "type=inc");
collector.record("rpc.errors", hbase_errors, "type=hbase_errors");
collector.record("rpc.errors", invalid_values, "type=invalid_values");
collector.record("rpc.errors", illegal_arguments,
"type=illegal_arguments");
collector.record("rpc.errors", unknown_metrics, "type=unknown_metrics");
}

/**
* Increments a single data point.
*
* @param tsdb
* The TSDB to increment the data point into.
* @param words
* The words describing the data point to import, in the
* following format: {@code [metric, timestamp, value, ..tags..]}
* @return A deferred Long that indicates the completion of the request.
* @throws NumberFormatException
* if the timestamp or value is invalid.
* @throws IllegalArgumentException
* if any other argument is invalid.
* @throws net.opentsdb.uid.NoSuchUniqueName
* if the metric isn't registered.
*/
private Deferred<? extends Object> importDataPoint(final TSDB tsdb,
final String[] words) {

words[0] = null; // Ditch the "inc".


if (words.length < 5) { // Need at least: metric timestamp value tag
// ^ 5 and not 4 because words[0] is "inc".
throw new IllegalArgumentException("not enough arguments"
+ " (need least 4, got " + (words.length - 1) + ')');
}
final String metric = words[1];
if (metric.length() <= 0) {
throw new IllegalArgumentException("empty metric name");
}
final long timestamp = Tags.parseLong(words[2]);
if (timestamp <= 0) {
throw new IllegalArgumentException("invalid timestamp: "
+ timestamp);
}
final String value = words[3];
if (value.length() <= 0) {
throw new IllegalArgumentException("empty value");
}
final HashMap<String, String> tags = new HashMap<String, String>();
for (int i = 4; i < words.length; i++) {
if (!words[i].isEmpty()) {
Tags.parse(tags, words[i]);
}
}
if (value.indexOf('.') < 0) { // integer value
return tsdb.incPoint(metric, timestamp, Tags.parseLong(value), tags);
} else { // floating point value
throw new IllegalArgumentException("Submitted value for increment does not work: " + timestamp);
}
}

}
18 changes: 9 additions & 9 deletions src/tsd/RpcHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,21 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.opentsdb.BuildData;
import net.opentsdb.core.Aggregators;
import net.opentsdb.core.TSDB;
import net.opentsdb.stats.StatsCollector;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.opentsdb.BuildData;
import net.opentsdb.core.Aggregators;
import net.opentsdb.core.TSDB;
import net.opentsdb.stats.StatsCollector;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;

/**
* Stateless handler for RPCs (telnet-style or HTTP).
Expand Down Expand Up @@ -94,6 +93,7 @@ public RpcHandler(final TSDB tsdb) {
telnet_commands.put("exit", new Exit());
telnet_commands.put("help", new Help());
telnet_commands.put("put", new PutDataPointRpc());
telnet_commands.put("add", new IncDataPointRpc());

http_commands.put("", new HomePage());
http_commands.put("aggregators", new ListAggregators());
Expand Down
2 changes: 1 addition & 1 deletion src/tsd/TelnetRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ interface TelnetRpc {
* @param command The command received, split.
* @return A deferred result.
*/
Deferred<Object> execute(TSDB tsdb, Channel chan, String[] command);
Deferred<? extends Object> execute(TSDB tsdb, Channel chan, String[] command);

}