Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Commit

Permalink
Introduced randomized tokens through the Matcher. TokenLayer is remov…
Browse files Browse the repository at this point in the history
…ed. TOKEN_SIZE_LIMIT allows to limit the number of token bytes.
  • Loading branch information
Matthias Kovatsch committed Oct 6, 2015
1 parent 5ee58e4 commit c25deaa
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,21 @@ protected CoapObserveRelation(Request request, Endpoint endpoint) {
public void reregister() {
if (!request.isCanceled()) {
Request refresh = Request.newGet();
refresh.setOptions(request.getOptions());
// make sure Observe is set and zero
refresh.setObserve();
// use same Token
refresh.setToken(request.getToken());
refresh.setDestination(request.getDestination());
refresh.setDestinationPort(request.getDestinationPort());
// use same Token
refresh.setToken(request.getToken());
// copy options, but set Observe to zero
refresh.setOptions(request.getOptions());
refresh.setObserve();

// use same message observers
for (MessageObserver mo : request.getMessageObservers()) {
refresh.addMessageObserver(mo);
}

endpoint.sendRequest(refresh);

// update request in observe handle for correct cancellation
this.request = refresh;
// reset orderer to accept any sequence number since server might have rebooted
Expand All @@ -104,17 +107,21 @@ public void reregister() {
*/
public void proactiveCancel() {
Request cancel = Request.newGet();
cancel.setDestination(request.getDestination());
cancel.setDestinationPort(request.getDestinationPort());
// use same Token
cancel.setToken(request.getToken());
// copy options, but set Observe to cancel
cancel.setOptions(request.getOptions());
cancel.setObserveCancel();
// use same Token
cancel.setToken(request.getToken());
cancel.setDestination(request.getDestination());
cancel.setDestinationPort(request.getDestinationPort());

// dispatch final response to the same message observers
for (MessageObserver mo: request.getMessageObservers())
for (MessageObserver mo: request.getMessageObservers()) {
cancel.addMessageObserver(mo);
}

endpoint.sendRequest(cancel);

// cancel old ongoing request
request.cancel();
setCanceled(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,9 @@ public String toString() {
*/
public static EmptyMessage newACK(Message message) {
EmptyMessage ack = new EmptyMessage(Type.ACK);
ack.setMID(message.getMID());
ack.setToken(new byte[0]);
ack.setDestination(message.getSource());
ack.setDestinationPort(message.getSourcePort());
ack.setMID(message.getMID());
return ack;
}

Expand All @@ -86,10 +85,9 @@ public static EmptyMessage newACK(Message message) {
*/
public static EmptyMessage newRST(Message message) {
EmptyMessage rst = new EmptyMessage(Type.RST);
rst.setMID(message.getMID());
rst.setToken(new byte[0]);
rst.setDestination(message.getSource());
rst.setDestinationPort(message.getSourcePort());
rst.setMID(message.getMID());
return rst;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package org.eclipse.californium.core.coap;

import org.eclipse.californium.core.coap.CoAP.ResponseCode;
import org.eclipse.californium.core.network.Matcher;
import org.eclipse.californium.core.network.stack.ReliabilityLayer;

/**
Expand All @@ -44,8 +45,9 @@ public class Response extends Message {
/**
* Creates a response to the specified request with the specified response
* code. The destination address of the response is the source address of
* the request. The response has the same token as the request.
* the request.
* Type and MID are usually set automatically by the {@link ReliabilityLayer}.
* The token is set automatically by the {@link Matcher}.
*
* @param request
* the request
Expand All @@ -57,7 +59,6 @@ public static Response createResponse(Request request, ResponseCode code) {
Response response = new Response(code);
response.setDestination(request.getSource());
response.setDestinationPort(request.getSourcePort());
response.setToken(request.getToken());
return response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.eclipse.californium.core.network.stack.CoapStack;
import org.eclipse.californium.core.network.stack.ObserveLayer;
import org.eclipse.californium.core.network.stack.ReliabilityLayer;
import org.eclipse.californium.core.network.stack.TokenLayer;
import org.eclipse.californium.core.server.MessageDeliverer;
import org.eclipse.californium.elements.Connector;
import org.eclipse.californium.elements.RawData;
Expand Down Expand Up @@ -79,33 +78,31 @@
* |
* * A
* +-Endpoint--+-A---------+
* | v A |
* | v A |
* | +---------v-+-------+ |
* | | Stack Top | |
* | +-------------------+ |
* | | {@link TokenLayer} | |
* | +-------------------+ |
* | v A |
* | v A |
* | +---------v-+-------+ |
* | | Stack Top | |
* | +-------------------+ |
* | | {@link ObserveLayer} | |
* | +-------------------+ |
* | +-------------------+ |
* | | {@link BlockwiseLayer} | |
* | +-------------------+ |
* | +-------------------+ |
* | | {@link ReliabilityLayer} | |
* | +-------------------+ |
* | | Stack Bottom | |
* | +--------+-+--------+ |
* | v A |
* | v A |
* | +-------------------+ |
* | | Stack Bottom | |
* | +--------+-+--------+ |
* | v A |
* | v A |
* | {@link Matcher} |
* | v A |
* | v A |
* | {@link MessageInterceptor} |
* | v A |
* | v A |
* | +--------v-+--------+ |
* | v A |
* | v A |
* | +--------v-+--------+ |
* +-| {@link Connector} |-+
* +--------+-A--------+
* v A
* v A
* +--------+-A--------+
* v A
* v A
* (Network)
* </pre>
* <p>
Expand Down Expand Up @@ -580,9 +577,10 @@ private void receiveMessage(RawData raw) {
if (!parser.isReply()) {
// manually build RST from raw information
EmptyMessage rst = new EmptyMessage(Type.RST);
rst.setMID(parser.getMID());
rst.setToken(new byte[0]);
rst.setDestination(raw.getAddress());
rst.setDestinationPort(raw.getPort());
rst.setMID(parser.getMID());
for (MessageInterceptor interceptor:interceptors)
interceptor.sendEmptyMessage(rst);
connector.send(serializer.serialize(rst));
Expand Down Expand Up @@ -679,6 +677,8 @@ private void receiveMessage(RawData raw) {

private void reject(Message message) {
EmptyMessage rst = EmptyMessage.newRST(message);
// sending directly through connector, not stack, thus set token
rst.setToken(new byte[0]);

for (MessageInterceptor interceptor:interceptors)
interceptor.sendEmptyMessage(rst);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.network.stack.BlockwiseLayer;
import org.eclipse.californium.core.network.stack.BlockwiseStatus;
import org.eclipse.californium.core.network.stack.CoapStack;
import org.eclipse.californium.core.observe.ObserveRelation;
import org.eclipse.californium.core.server.resources.CoapExchange;

Expand Down Expand Up @@ -358,6 +359,10 @@ public boolean isTimedOut() {
return timedOut;
}

/**
* This method also cleans up the Matcher state by eventually calling the
* exchange observer.
*/
public void setTimedOut() {
this.timedOut = true;
// clean up
Expand Down Expand Up @@ -420,12 +425,31 @@ public boolean isComplete() {
return complete;
}

/**
* Call this method to trigger a clean-up in the Matcher through its
* ExchangeObserverImpl. Usually, it is called automatically when reaching
* the StackTopAdapter in the {@link CoapStack}, when timing out, when
* rejecting a response, or when sending the (last) response.
*/
public void setComplete() {
this.complete = true;
ExchangeObserver obs = this.observer;
if (obs != null)
obs.completed(this);
}

/**
* This method is only needed when the same {@link Exchange} instance uses
* different tokens during its lifetime, e.g., when using a different token
* for retrieving the rest of a blockwise notification (when not altered,
* Californium reuses the same token for this).
* See {@link BlockwiseLayer} for an example use case.
*/
public void completeCurrentRequest() {
ExchangeObserver obs = this.observer;
if (obs != null)
obs.completed(this);
}

public long getTimestamp() {
return timestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ public class Matcher {
private boolean started;
private ExchangeObserver exchangeObserver = new ExchangeObserverImpl();

/** The executor. */
/* the executor, by default the one of the protocol stage */
private ScheduledExecutorService executor;

// TODO: Make per endpoint
/* managing the MID per endpoint requires remote endpoint management */
private AtomicInteger currendMID;

/* limit the token size to save bytes in closed environments */
private int tokenSizeLimit;

private ConcurrentHashMap<KeyMID, Exchange> exchangesByMID; // for all
private ConcurrentHashMap<KeyToken, Exchange> exchangesByToken; // for outgoing
private ConcurrentHashMap<KeyUri, Exchange> ongoingExchanges; // for blockwise
Expand All @@ -63,7 +66,7 @@ public class Matcher {
private Deduplicator deduplicator;
// Idea: Only store acks/rsts and not the whole exchange. Responses should be sent CON.

/** Health status output */
/* Health status output */
private Level healthStatusLevel;
private int healthStatusInterval; // seconds

Expand All @@ -76,12 +79,17 @@ public Matcher(NetworkConfig config) {
DeduplicatorFactory factory = DeduplicatorFactory.getDeduplicatorFactory();
this.deduplicator = factory.createDeduplicator(config);

if (config.getBoolean(NetworkConfig.Keys.USE_RANDOM_MID_START)) {
boolean randomMID = config.getBoolean(NetworkConfig.Keys.USE_RANDOM_MID_START);
if (randomMID) {
currendMID = new AtomicInteger(new Random().nextInt(1<<16));
} else {
currendMID = new AtomicInteger(0);
}

tokenSizeLimit = config.getInt(NetworkConfig.Keys.TOKEN_SIZE_LIMIT);

LOGGER.config("Matcher uses USE_RANDOM_MID_START="+randomMID+" and TOKEN_SIZE_LIMIT="+tokenSizeLimit);

healthStatusLevel = Level.parse(config.getString(NetworkConfig.Keys.HEALTH_STATUS_PRINT_LEVEL));
healthStatusInterval = config.getInt(NetworkConfig.Keys.HEALTH_STATUS_INTERVAL);
}
Expand Down Expand Up @@ -121,19 +129,31 @@ public synchronized void setExecutor(ScheduledExecutorService executor) {

public void sendRequest(Exchange exchange, Request request) {

if (request.getMID() == Message.NONE)
// ensure MID is set
if (request.getMID() == Message.NONE) {
request.setMID(currendMID.getAndIncrement()%(1<<16));

/*
* The request is a CON or NON and must be prepared for these responses
* - CON => ACK / RST / ACK+response / CON+response / NON+response
* - NON => RST / CON+response / NON+response
* If this request goes lost, we do not get anything back.
*/

// the MID is from the local namespace -- use blank address
}
// request MID is from the local namespace -- use blank address
KeyMID idByMID = new KeyMID(request.getMID(), null, 0);
KeyToken idByToken = new KeyToken(request.getToken());

// ensure Token is set
KeyToken idByToken;
if (request.getToken() == null) {
byte[] token;
do {
token = createNewToken();
idByToken = new KeyToken(token);
} while (exchangesByToken.get(idByToken) != null);

request.setToken(token);

} else {
idByToken = new KeyToken(request.getToken());
// ongoing requests may reuse token
if (!(request.getOptions().hasBlock1() || request.getOptions().hasBlock2() || request.getOptions().hasObserve()) && exchangesByToken.get(idByToken) != null) {
LOGGER.warning("Manual token overrides existing open request: "+idByToken);
}
}

exchange.setObserver(exchangeObserver);

Expand All @@ -145,19 +165,13 @@ public void sendRequest(Exchange exchange, Request request) {

public void sendResponse(Exchange exchange, Response response) {

// ensure MID is set
if (response.getMID() == Message.NONE) {
response.setMID(currendMID.getAndIncrement()%(1<<16));
}

/*
* The response is a CON or NON or ACK and must be prepared for these
* - CON => ACK / RST // we only care to stop retransmission
* - NON => RST // we only care for observe
* - ACK => nothing!
* If this response goes lost, we must be prepared to get the same
* CON/NON request with same MID again. We then find the corresponding
* exchange and the ReliabilityLayer resends this response.
*/
// ensure Token is set
response.setToken(exchange.getCurrentRequest().getToken());

// If this is a CON notification we now can forget all previous NON notifications
if (response.getType() == Type.CON || response.getType() == Type.ACK) {
Expand Down Expand Up @@ -200,6 +214,9 @@ public void sendResponse(Exchange exchange, Response response) {

public void sendEmptyMessage(Exchange exchange, EmptyMessage message) {

// ensure Token is set
message.setToken(new byte[0]);

if (message.getType() == Type.RST && exchange != null) {
// We have rejected the request or response
exchange.setComplete();
Expand Down Expand Up @@ -320,8 +337,8 @@ public Exchange receiveResponse(Response response) {
response.setDuplicate(true);
} else {
idByMID = new KeyMID(exchange.getCurrentRequest().getMID(), null, 0);
if (LOGGER.isLoggable(Level.FINE)) LOGGER.fine("Exchange got response: Cleaning up "+idByMID);
exchangesByMID.remove(idByMID);
if (LOGGER.isLoggable(Level.FINE)) LOGGER.fine("Closed open request with "+idByMID);
}

if (response.getType() == Type.ACK && exchange.getCurrentRequest().getMID() != response.getMID()) {
Expand Down Expand Up @@ -384,6 +401,22 @@ private void removeNotificatoinsOf(ObserveRelation relation) {
}
}

/**
* Creates a new token that is never the empty token (i.e., always 1-8 bytes).
* @return the new token
*/
private byte[] createNewToken() {

Random random = new Random();

// random length between 1 and tokenSizeLimit
byte[] token = new byte[random.nextInt(tokenSizeLimit)+1];
// random value
random.nextBytes(token);

return token;
}

private class ExchangeObserverImpl implements ExchangeObserver {

@Override
Expand Down
Loading

0 comments on commit c25deaa

Please sign in to comment.