-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Atomic increment/add #265
base: next
Are you sure you want to change the base?
Atomic increment/add #265
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,13 +17,11 @@ | |
import java.util.List; | ||
import java.util.Map; | ||
|
||
import com.stumbleupon.async.Callback; | ||
import com.stumbleupon.async.Deferred; | ||
import com.stumbleupon.async.DeferredGroupException; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import net.opentsdb.stats.Histogram; | ||
import net.opentsdb.stats.StatsCollector; | ||
import net.opentsdb.uid.UniqueId; | ||
|
||
import org.hbase.async.AtomicIncrementRequest; | ||
import org.hbase.async.Bytes; | ||
import org.hbase.async.ClientStats; | ||
import org.hbase.async.DeleteRequest; | ||
|
@@ -32,10 +30,12 @@ | |
import org.hbase.async.HBaseException; | ||
import org.hbase.async.KeyValue; | ||
import org.hbase.async.PutRequest; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import net.opentsdb.uid.UniqueId; | ||
import net.opentsdb.stats.Histogram; | ||
import net.opentsdb.stats.StatsCollector; | ||
import com.stumbleupon.async.Callback; | ||
import com.stumbleupon.async.Deferred; | ||
import com.stumbleupon.async.DeferredGroupException; | ||
|
||
/** | ||
* Thread-safe implementation of the TSDB client. | ||
|
@@ -351,6 +351,75 @@ private Deferred<Object> addPointInternal(final String metric, | |
return client.put(point); | ||
} | ||
|
||
|
||
/** | ||
* Increments a single integer value data point in the TSDB. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need TSDB style formatting |
||
* | ||
* @param metric | ||
* A non-empty string. | ||
* @param timestamp | ||
* The timestamp associated with the value. | ||
* @param value | ||
* The value of the data point. | ||
* @param tags | ||
* The tags on this series. This map must be non-empty. | ||
* @return A deferred object that indicates the completion of the request. | ||
* The {@link Long} has not special meaning and can be {@code null} | ||
* (think of it as {@code Deferred<Void>}). But you probably want to | ||
* attach at least an errback to this {@code Deferred} to handle | ||
* failures. | ||
* @throws IllegalArgumentException | ||
* if the timestamp is less than or equal to the previous | ||
* timestamp added or 0 for the first timestamp, or if the | ||
* difference with the previous timestamp is too large. | ||
* @throws IllegalArgumentException | ||
* if the metric name is empty or contains illegal characters. | ||
* @throws IllegalArgumentException | ||
* if the tags list is empty or one of the elements contains | ||
* illegal characters. | ||
* @throws HBaseException | ||
* (deferred) if there was a problem while persisting data. | ||
*/ | ||
public Deferred<Long> incPoint(final String metric, final long timestamp, | ||
final long value, final Map<String, String> tags) { | ||
final short flags = 0x7; // An int stored on 8 bytes. | ||
return incPointInternal(metric, timestamp, value, tags, flags); | ||
} | ||
|
||
private Deferred<Long> incPointInternal(final String metric, | ||
final long timestamp, final long value, | ||
final Map<String, String> tags, final short flags) { | ||
if ((timestamp & 0xFFFFFFFF00000000L) != 0) { | ||
// => timestamp < 0 || timestamp > Integer.MAX_VALUE | ||
throw new IllegalArgumentException((timestamp < 0 ? "negative " | ||
: "bad") | ||
+ " timestamp=" | ||
+ timestamp | ||
+ " when trying to add value=" | ||
+ value | ||
+ '/' | ||
+ flags | ||
+ " to metric=" + metric + ", tags=" + tags); | ||
} | ||
|
||
IncomingDataPoints.checkMetricAndTags(metric, tags); | ||
final byte[] row = IncomingDataPoints | ||
.rowKeyTemplate(this, metric, tags); | ||
final long base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN)); | ||
Bytes.setInt(row, (int) base_time, metrics.width()); | ||
|
||
//I guess that these points should never be scheduled for compaction. | ||
//scheduleForCompaction(row, (int) base_time); | ||
|
||
final short qualifier = (short) ((timestamp - base_time) << Const.FLAG_BITS | flags); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needs to support qualifier prefixes and milliseconds. |
||
final AtomicIncrementRequest point = new AtomicIncrementRequest(table, | ||
row, FAMILY, Bytes.fromShort(qualifier), value); | ||
// TODO(tsuna): Add a callback to time the latency of HBase and store | ||
// the | ||
// timing in a moving Histogram (once we have a class for this). | ||
return client.atomicIncrement(point, true); | ||
} | ||
|
||
/** | ||
* Forces a flush of any un-committed in memory data. | ||
* <p> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
package net.opentsdb.tsd; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Styling and would need to support HTTP writes |
||
|
||
import java.util.HashMap; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
import net.opentsdb.core.TSDB; | ||
import net.opentsdb.core.Tags; | ||
import net.opentsdb.stats.StatsCollector; | ||
import net.opentsdb.uid.NoSuchUniqueName; | ||
|
||
import org.jboss.netty.channel.Channel; | ||
|
||
import com.stumbleupon.async.Callback; | ||
import com.stumbleupon.async.Deferred; | ||
|
||
/** | ||
* Increment/add datapoint rpc. | ||
* | ||
*/ | ||
public class IncDataPointRpc implements TelnetRpc { | ||
|
||
private static final AtomicLong requests = new AtomicLong(); | ||
private static final AtomicLong hbase_errors = new AtomicLong(); | ||
private static final AtomicLong invalid_values = new AtomicLong(); | ||
private static final AtomicLong illegal_arguments = new AtomicLong(); | ||
private static final AtomicLong unknown_metrics = new AtomicLong(); | ||
|
||
@Override | ||
public Deferred<? extends Object> execute(final TSDB tsdb, final Channel chan, | ||
final String[] cmd) { | ||
|
||
requests.incrementAndGet(); | ||
String errmsg = null; | ||
try { | ||
final class PutErrback implements Callback<Exception, Exception> { | ||
public Exception call(final Exception arg) { | ||
if (chan.isConnected()) { | ||
chan.write("inc: HBase error: " + arg.getMessage() | ||
+ '\n'); | ||
} | ||
hbase_errors.incrementAndGet(); | ||
return arg; | ||
} | ||
|
||
public String toString() { | ||
return "report error to channel"; | ||
} | ||
} | ||
return importDataPoint(tsdb, cmd).addErrback(new PutErrback()); | ||
} catch (NumberFormatException x) { | ||
errmsg = "inc: invalid value: " + x.getMessage() + '\n'; | ||
invalid_values.incrementAndGet(); | ||
} catch (IllegalArgumentException x) { | ||
errmsg = "inc: illegal argument: " + x.getMessage() + '\n'; | ||
illegal_arguments.incrementAndGet(); | ||
} catch (NoSuchUniqueName x) { | ||
errmsg = "inc: unknown metric: " + x.getMessage() + '\n'; | ||
unknown_metrics.incrementAndGet(); | ||
} | ||
if (errmsg != null && chan.isConnected()) { | ||
chan.write(errmsg); | ||
} | ||
return Deferred.fromResult(null); | ||
|
||
} | ||
|
||
/** | ||
* Collects the stats and metrics tracked by this instance. | ||
* | ||
* @param collector | ||
* The collector to use. | ||
*/ | ||
public static void collectStats(final StatsCollector collector) { | ||
collector.record("rpc.received", requests, "type=inc"); | ||
collector.record("rpc.errors", hbase_errors, "type=hbase_errors"); | ||
collector.record("rpc.errors", invalid_values, "type=invalid_values"); | ||
collector.record("rpc.errors", illegal_arguments, | ||
"type=illegal_arguments"); | ||
collector.record("rpc.errors", unknown_metrics, "type=unknown_metrics"); | ||
} | ||
|
||
/** | ||
* Increments a single data point. | ||
* | ||
* @param tsdb | ||
* The TSDB to increment the data point into. | ||
* @param words | ||
* The words describing the data point to import, in the | ||
* following format: {@code [metric, timestamp, value, ..tags..]} | ||
* @return A deferred Long that indicates the completion of the request. | ||
* @throws NumberFormatException | ||
* if the timestamp or value is invalid. | ||
* @throws IllegalArgumentException | ||
* if any other argument is invalid. | ||
* @throws net.opentsdb.uid.NoSuchUniqueName | ||
* if the metric isn't registered. | ||
*/ | ||
private Deferred<? extends Object> importDataPoint(final TSDB tsdb, | ||
final String[] words) { | ||
|
||
words[0] = null; // Ditch the "inc". | ||
|
||
|
||
if (words.length < 5) { // Need at least: metric timestamp value tag | ||
// ^ 5 and not 4 because words[0] is "inc". | ||
throw new IllegalArgumentException("not enough arguments" | ||
+ " (need least 4, got " + (words.length - 1) + ')'); | ||
} | ||
final String metric = words[1]; | ||
if (metric.length() <= 0) { | ||
throw new IllegalArgumentException("empty metric name"); | ||
} | ||
final long timestamp = Tags.parseLong(words[2]); | ||
if (timestamp <= 0) { | ||
throw new IllegalArgumentException("invalid timestamp: " | ||
+ timestamp); | ||
} | ||
final String value = words[3]; | ||
if (value.length() <= 0) { | ||
throw new IllegalArgumentException("empty value"); | ||
} | ||
final HashMap<String, String> tags = new HashMap<String, String>(); | ||
for (int i = 4; i < words.length; i++) { | ||
if (!words[i].isEmpty()) { | ||
Tags.parse(tags, words[i]); | ||
} | ||
} | ||
if (value.indexOf('.') < 0) { // integer value | ||
return tsdb.incPoint(metric, timestamp, Tags.parseLong(value), tags); | ||
} else { // floating point value | ||
throw new IllegalArgumentException("Submitted value for increment does not work: " + timestamp); | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid re-arranging imports if possible. Just clutters PRs.