Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new hexpired notification for HFE #2

Open
wants to merge 12 commits into
base: unstable
Choose a base branch
from
37 changes: 23 additions & 14 deletions src/t_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -771,15 +771,17 @@ GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vs
propagateHashFieldDeletion(db, key, field, sdslen(field));

/* If the field is the last one in the hash, then the hash will be deleted */
robj *keyObj = createStringObject(key, sdslen(key));
if (hashTypeLength(o, 0) == 0) {
robj *keyObj = createStringObject(key, sdslen(key));
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyObj, db->id);
dbDelete(db,keyObj);
decrRefCount(keyObj);
return GETF_EXPIRED_HASH;
res = GETF_EXPIRED_HASH;
} else {
notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", keyObj, db->id);
res = GETF_EXPIRED;
}

return GETF_EXPIRED;
decrRefCount(keyObj);
return res;
}

/* Like hashTypeGetValue() but returns a Redis object, which is useful for
Expand Down Expand Up @@ -1126,7 +1128,8 @@ void hashTypeSetExDone(HashTypeSetEx *ex) {
if (ex->c) {
server.dirty += ex->fieldDeleted + ex->fieldUpdated;
signalModifiedKey(ex->c, ex->db, ex->key);
notifyKeyspaceEvent(NOTIFY_HASH, "hexpire", ex->key, ex->db->id);
notifyKeyspaceEvent(NOTIFY_HASH, ex->fieldDeleted ? "hexpired" : "hexpire",
ex->key, ex->db->id);
}
if (ex->fieldDeleted && hashTypeLength(ex->hashObj, 0) == 0) {
dbDelete(ex->db,ex->key);
Expand Down Expand Up @@ -1845,10 +1848,11 @@ static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) {
ActiveExpireCtx *activeExpireCtx = (ActiveExpireCtx *) ctx;
sds keystr = NULL;
ExpireInfo info = {0};
ExpireAction ret = ACT_STOP_ACTIVE_EXP;

/* If no more quota left for this callback, stop */
if (activeExpireCtx->fieldsToExpireQuota == 0)
return ACT_STOP_ACTIVE_EXP;
return ret;

if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) {
info = (ExpireInfo){
Expand Down Expand Up @@ -1885,22 +1889,27 @@ static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) {
activeExpireCtx->fieldsToExpireQuota -= info.itemsExpired;

/* If hash has no more fields to expire, remove it from HFE DB */
robj *key = createStringObject(keystr, sdslen(keystr));
if (info.nextExpireTime == EB_EXPIRE_TIME_INVALID) {
if (hashTypeLength(hashObj, 0) == 0) {
robj *key = createStringObject(keystr, sdslen(keystr));
dbDelete(activeExpireCtx->db, key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key, activeExpireCtx->db->id);
server.dirty++;
signalModifiedKey(NULL, &server.db[0], key);
decrRefCount(key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,activeExpireCtx->db->id);
} else {
notifyKeyspaceEvent(NOTIFY_HASH,"hexpired",key,activeExpireCtx->db->id);
}
return ACT_REMOVE_EXP_ITEM;
ret = ACT_REMOVE_EXP_ITEM;
} else {
/* Hash has more fields to expire. Update next expiration time of the hash
* and indicate to add it back to global HFE DS */
ebSetMetaExpTime(hashGetExpireMeta(hashObj), info.nextExpireTime);
return ACT_UPDATE_EXP_ITEM;
notifyKeyspaceEvent(NOTIFY_HASH,"hexpired",key,activeExpireCtx->db->id);
ret = ACT_UPDATE_EXP_ITEM;
}

server.dirty++;
signalModifiedKey(NULL, activeExpireCtx->db, key);
decrRefCount(key);
return ret;
}

/* Return the next/minimum expiry time of the hash-field. This is useful if a
Expand Down
24 changes: 22 additions & 2 deletions tests/unit/pubsub.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -360,21 +360,41 @@ start_server {tags {"pubsub network"}} {
r del myhash
set rd1 [redis_deferring_client]
assert_equal {1} [psubscribe $rd1 *]
r hmset myhash yes 1 no 0
r hmset myhash yes 1 no 0 f1 1 f2 2
r hincrby myhash yes 10
r hexpire myhash 999999 FIELDS 1 yes
r hexpireat myhash [expr {[clock seconds] + 999999}] NX FIELDS 1 no
r hpexpire myhash 999999 FIELDS 1 yes
r hpersist myhash FIELDS 1 yes
r hpexpire myhash 0 FIELDS 1 yes
assert_encoding $type myhash
assert_equal "pmessage * __keyspace@${db}__:myhash hset" [$rd1 read]
assert_equal "pmessage * __keyspace@${db}__:myhash hincrby" [$rd1 read]
assert_equal "pmessage * __keyspace@${db}__:myhash hexpire" [$rd1 read]
assert_equal "pmessage * __keyspace@${db}__:myhash hexpire" [$rd1 read]
assert_equal "pmessage * __keyspace@${db}__:myhash hexpire" [$rd1 read]
assert_equal "pmessage * __keyspace@${db}__:myhash hpersist" [$rd1 read]
assert_equal "pmessage * __keyspace@${db}__:myhash hexpired" [$rd1 read]

# Test that we will get `hexpired` notification when
# a hash field is removed by active expire.
r hpexpire myhash 10 FIELDS 1 f1
assert_equal "pmessage * __keyspace@${db}__:myhash hexpire" [$rd1 read]
after 100 ;# Wait for active expire
assert_equal "pmessage * __keyspace@${db}__:myhash hexpired" [$rd1 read]

# Test that we will get `hexpired` notification when
# a hash field is removed by lazy active.
r debug set-active-expire 0
r hpexpire myhash 10 FIELDS 1 f2
assert_equal "pmessage * __keyspace@${db}__:myhash hexpire" [$rd1 read]
after 20
r hstrlen myhash f2 ;# Trigger lazy expire
assert_equal "pmessage * __keyspace@${db}__:myhash hexpired" [$rd1 read]
r debug set-active-expire 1

$rd1 close
}
} {0} {needs:debug}
} ;# foreach

test "Keyspace notifications: stream events test" {
Expand Down