diff --git a/Makefile.am b/Makefile.am index b2a8dcd65a..7cd0ecc112 100644 --- a/Makefile.am +++ b/Makefile.am @@ -69,6 +69,7 @@ tsdb_SRC := \ src/tsd/LogsRpc.java \ src/tsd/PipelineFactory.java \ src/tsd/PutDataPointRpc.java \ + src/tsd/IncDataPointRpc.java \ src/tsd/RpcHandler.java \ src/tsd/StaticFileRpc.java \ src/tsd/TelnetRpc.java \ diff --git a/src/core/TSDB.java b/src/core/TSDB.java index b6a3f41f12..be7a0195f0 100644 --- a/src/core/TSDB.java +++ b/src/core/TSDB.java @@ -17,13 +17,11 @@ import java.util.List; import java.util.Map; -import com.stumbleupon.async.Callback; -import com.stumbleupon.async.Deferred; -import com.stumbleupon.async.DeferredGroupException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import net.opentsdb.stats.Histogram; +import net.opentsdb.stats.StatsCollector; +import net.opentsdb.uid.UniqueId; +import org.hbase.async.AtomicIncrementRequest; import org.hbase.async.Bytes; import org.hbase.async.ClientStats; import org.hbase.async.DeleteRequest; @@ -32,10 +30,12 @@ import org.hbase.async.HBaseException; import org.hbase.async.KeyValue; import org.hbase.async.PutRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import net.opentsdb.uid.UniqueId; -import net.opentsdb.stats.Histogram; -import net.opentsdb.stats.StatsCollector; +import com.stumbleupon.async.Callback; +import com.stumbleupon.async.Deferred; +import com.stumbleupon.async.DeferredGroupException; /** * Thread-safe implementation of the TSDB client. @@ -351,6 +351,75 @@ private Deferred addPointInternal(final String metric, return client.put(point); } + + /** + * Increments a single integer value data point in the TSDB. + * + * @param metric + * A non-empty string. + * @param timestamp + * The timestamp associated with the value. + * @param value + * The value of the data point. + * @param tags + * The tags on this series. This map must be non-empty. + * @return A deferred object that indicates the completion of the request. + * The {@link Long} has not special meaning and can be {@code null} + * (think of it as {@code Deferred}). But you probably want to + * attach at least an errback to this {@code Deferred} to handle + * failures. + * @throws IllegalArgumentException + * if the timestamp is less than or equal to the previous + * timestamp added or 0 for the first timestamp, or if the + * difference with the previous timestamp is too large. + * @throws IllegalArgumentException + * if the metric name is empty or contains illegal characters. + * @throws IllegalArgumentException + * if the tags list is empty or one of the elements contains + * illegal characters. + * @throws HBaseException + * (deferred) if there was a problem while persisting data. + */ + public Deferred incPoint(final String metric, final long timestamp, + final long value, final Map tags) { + final short flags = 0x7; // An int stored on 8 bytes. + return incPointInternal(metric, timestamp, value, tags, flags); + } + + private Deferred incPointInternal(final String metric, + final long timestamp, final long value, + final Map tags, final short flags) { + if ((timestamp & 0xFFFFFFFF00000000L) != 0) { + // => timestamp < 0 || timestamp > Integer.MAX_VALUE + throw new IllegalArgumentException((timestamp < 0 ? "negative " + : "bad") + + " timestamp=" + + timestamp + + " when trying to add value=" + + value + + '/' + + flags + + " to metric=" + metric + ", tags=" + tags); + } + + IncomingDataPoints.checkMetricAndTags(metric, tags); + final byte[] row = IncomingDataPoints + .rowKeyTemplate(this, metric, tags); + final long base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN)); + Bytes.setInt(row, (int) base_time, metrics.width()); + + //I guess that these points should never be scheduled for compaction. + //scheduleForCompaction(row, (int) base_time); + + final short qualifier = (short) ((timestamp - base_time) << Const.FLAG_BITS | flags); + final AtomicIncrementRequest point = new AtomicIncrementRequest(table, + row, FAMILY, Bytes.fromShort(qualifier), value); + // TODO(tsuna): Add a callback to time the latency of HBase and store + // the + // timing in a moving Histogram (once we have a class for this). + return client.atomicIncrement(point, true); + } + /** * Forces a flush of any un-committed in memory data. *

diff --git a/src/tsd/IncDataPointRpc.java b/src/tsd/IncDataPointRpc.java new file mode 100644 index 0000000000..9b37737778 --- /dev/null +++ b/src/tsd/IncDataPointRpc.java @@ -0,0 +1,135 @@ +package net.opentsdb.tsd; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; + +import net.opentsdb.core.TSDB; +import net.opentsdb.core.Tags; +import net.opentsdb.stats.StatsCollector; +import net.opentsdb.uid.NoSuchUniqueName; + +import org.jboss.netty.channel.Channel; + +import com.stumbleupon.async.Callback; +import com.stumbleupon.async.Deferred; + +/** + * Increment/add datapoint rpc. + * + */ +public class IncDataPointRpc implements TelnetRpc { + + private static final AtomicLong requests = new AtomicLong(); + private static final AtomicLong hbase_errors = new AtomicLong(); + private static final AtomicLong invalid_values = new AtomicLong(); + private static final AtomicLong illegal_arguments = new AtomicLong(); + private static final AtomicLong unknown_metrics = new AtomicLong(); + + @Override + public Deferred execute(final TSDB tsdb, final Channel chan, + final String[] cmd) { + + requests.incrementAndGet(); + String errmsg = null; + try { + final class PutErrback implements Callback { + public Exception call(final Exception arg) { + if (chan.isConnected()) { + chan.write("inc: HBase error: " + arg.getMessage() + + '\n'); + } + hbase_errors.incrementAndGet(); + return arg; + } + + public String toString() { + return "report error to channel"; + } + } + return importDataPoint(tsdb, cmd).addErrback(new PutErrback()); + } catch (NumberFormatException x) { + errmsg = "inc: invalid value: " + x.getMessage() + '\n'; + invalid_values.incrementAndGet(); + } catch (IllegalArgumentException x) { + errmsg = "inc: illegal argument: " + x.getMessage() + '\n'; + illegal_arguments.incrementAndGet(); + } catch (NoSuchUniqueName x) { + errmsg = "inc: unknown metric: " + x.getMessage() + '\n'; + unknown_metrics.incrementAndGet(); + } + if (errmsg != null && chan.isConnected()) { + chan.write(errmsg); + } + return Deferred.fromResult(null); + + } + + /** + * Collects the stats and metrics tracked by this instance. + * + * @param collector + * The collector to use. + */ + public static void collectStats(final StatsCollector collector) { + collector.record("rpc.received", requests, "type=inc"); + collector.record("rpc.errors", hbase_errors, "type=hbase_errors"); + collector.record("rpc.errors", invalid_values, "type=invalid_values"); + collector.record("rpc.errors", illegal_arguments, + "type=illegal_arguments"); + collector.record("rpc.errors", unknown_metrics, "type=unknown_metrics"); + } + + /** + * Increments a single data point. + * + * @param tsdb + * The TSDB to increment the data point into. + * @param words + * The words describing the data point to import, in the + * following format: {@code [metric, timestamp, value, ..tags..]} + * @return A deferred Long that indicates the completion of the request. + * @throws NumberFormatException + * if the timestamp or value is invalid. + * @throws IllegalArgumentException + * if any other argument is invalid. + * @throws net.opentsdb.uid.NoSuchUniqueName + * if the metric isn't registered. + */ + private Deferred importDataPoint(final TSDB tsdb, + final String[] words) { + + words[0] = null; // Ditch the "inc". + + + if (words.length < 5) { // Need at least: metric timestamp value tag + // ^ 5 and not 4 because words[0] is "inc". + throw new IllegalArgumentException("not enough arguments" + + " (need least 4, got " + (words.length - 1) + ')'); + } + final String metric = words[1]; + if (metric.length() <= 0) { + throw new IllegalArgumentException("empty metric name"); + } + final long timestamp = Tags.parseLong(words[2]); + if (timestamp <= 0) { + throw new IllegalArgumentException("invalid timestamp: " + + timestamp); + } + final String value = words[3]; + if (value.length() <= 0) { + throw new IllegalArgumentException("empty value"); + } + final HashMap tags = new HashMap(); + for (int i = 4; i < words.length; i++) { + if (!words[i].isEmpty()) { + Tags.parse(tags, words[i]); + } + } + if (value.indexOf('.') < 0) { // integer value + return tsdb.incPoint(metric, timestamp, Tags.parseLong(value), tags); + } else { // floating point value + throw new IllegalArgumentException("Submitted value for increment does not work: " + timestamp); + } + } + +} diff --git a/src/tsd/RpcHandler.java b/src/tsd/RpcHandler.java index 1bb955bcb4..56f9cb621e 100644 --- a/src/tsd/RpcHandler.java +++ b/src/tsd/RpcHandler.java @@ -18,22 +18,21 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; -import com.stumbleupon.async.Callback; -import com.stumbleupon.async.Deferred; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import net.opentsdb.BuildData; +import net.opentsdb.core.Aggregators; +import net.opentsdb.core.TSDB; +import net.opentsdb.stats.StatsCollector; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.handler.codec.http.HttpRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import net.opentsdb.BuildData; -import net.opentsdb.core.Aggregators; -import net.opentsdb.core.TSDB; -import net.opentsdb.stats.StatsCollector; +import com.stumbleupon.async.Callback; +import com.stumbleupon.async.Deferred; /** * Stateless handler for RPCs (telnet-style or HTTP). @@ -94,6 +93,7 @@ public RpcHandler(final TSDB tsdb) { telnet_commands.put("exit", new Exit()); telnet_commands.put("help", new Help()); telnet_commands.put("put", new PutDataPointRpc()); + telnet_commands.put("add", new IncDataPointRpc()); http_commands.put("", new HomePage()); http_commands.put("aggregators", new ListAggregators()); diff --git a/src/tsd/TelnetRpc.java b/src/tsd/TelnetRpc.java index c2fabce7f4..fe8512b607 100644 --- a/src/tsd/TelnetRpc.java +++ b/src/tsd/TelnetRpc.java @@ -28,6 +28,6 @@ interface TelnetRpc { * @param command The command received, split. * @return A deferred result. */ - Deferred execute(TSDB tsdb, Channel chan, String[] command); + Deferred execute(TSDB tsdb, Channel chan, String[] command); }