Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new hexpired notification for HFE #2

Open
wants to merge 12 commits into
base: unstable
Choose a base branch
from
77 changes: 50 additions & 27 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -5271,10 +5271,21 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) {

/* Handle XX and NX */
if (flags & (REDISMODULE_HASH_XX|REDISMODULE_HASH_NX)) {
int isHashDeleted;
int exists = hashTypeExists(key->db, key->value, field->ptr, &isHashDeleted);
/* hash-field-expiration is not exposed to modules */
serverAssert(isHashDeleted == 0);
int hfeFlags = HFE_LAZY_AVOID_HASH_DEL; /* Avoid invalidate the key */

/*
* The hash might contain expired fields. If we lazily delete expired
* field and the command was sent with XX flag, the operation could
* fail and leave the hash empty, which the caller might not expect.
* To prevent unexpected behavior, we avoid lazy deletion in this case
* yet let the operation fail. Note that moduleDelKeyIfEmpty()
* below won't delete the hash if it left with single expired key
* because hash counts blindly expired fields as well.
*/
if (flags & REDISMODULE_HASH_XX)
hfeFlags |= HFE_LAZY_AVOID_FIELD_DEL;

int exists = hashTypeExists(key->db, key->value, field->ptr, hfeFlags, NULL);
if (((flags & REDISMODULE_HASH_XX) && !exists) ||
((flags & REDISMODULE_HASH_NX) && exists))
{
Expand Down Expand Up @@ -5357,6 +5368,7 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) {
* RedisModule_FreeString(), or by enabling automatic memory management.
*/
int RM_HashGet(RedisModuleKey *key, int flags, ...) {
int hfeFlags = HFE_LAZY_AVOID_FIELD_DEL | HFE_LAZY_AVOID_HASH_DEL;
va_list ap;
if (key->value && key->value->type != OBJ_HASH) return REDISMODULE_ERR;

Expand All @@ -5378,21 +5390,16 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) {
if (flags & REDISMODULE_HASH_EXISTS) {
existsptr = va_arg(ap,int*);
if (key->value) {
int isHashDeleted;
*existsptr = hashTypeExists(key->db, key->value, field->ptr, &isHashDeleted);
/* hash-field-expiration is not exposed to modules */
serverAssert(isHashDeleted == 0);
*existsptr = hashTypeExists(key->db, key->value, field->ptr, hfeFlags, NULL);
} else {
*existsptr = 0;
}
} else {
int isHashDeleted;
valueptr = va_arg(ap,RedisModuleString**);
if (key->value) {
*valueptr = hashTypeGetValueObject(key->db,key->value,field->ptr, &isHashDeleted);
*valueptr = hashTypeGetValueObject(key->db, key->value, field->ptr,
hfeFlags, NULL);

/* Currently hash-field-expiration is not exposed to modules */
serverAssert(isHashDeleted == 0);
if (*valueptr) {
robj *decoded = getDecodedObject(*valueptr);
decrRefCount(*valueptr);
Expand Down Expand Up @@ -11080,6 +11087,11 @@ static void moduleScanKeyCallback(void *privdata, const dictEntry *de) {
value = NULL;
} else if (o->type == OBJ_HASH) {
sds val = dictGetVal(de);

/* If field is expired, then ignore */
if (hfieldIsExpired(key))
return;

field = createStringObject(key, hfieldlen(key));
value = createStringObject(val, sdslen(val));
} else if (o->type == OBJ_ZSET) {
Expand Down Expand Up @@ -11189,9 +11201,8 @@ int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleSc
ret = 0;
} else if (o->type == OBJ_ZSET || o->type == OBJ_HASH) {
unsigned char *lp, *p;
unsigned char *vstr;
unsigned int vlen;
long long vll;
/* is hash with expiry on fields, then lp tuples are [field][value][expire] */
int hfe = o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_LISTPACK_EX;

if (o->type == OBJ_HASH)
lp = hashTypeListpackGetLp(o);
Expand All @@ -11200,19 +11211,32 @@ int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleSc

p = lpSeek(lp,0);
while(p) {
vstr = lpGetValue(p,&vlen,&vll);
robj *field = (vstr != NULL) ?
createStringObject((char*)vstr,vlen) :
createStringObjectFromLongLongWithSds(vll);
long long vllField, vllValue, vllExpire;
unsigned int lenField, lenValue;
unsigned char *pField, *pValue;

pField = lpGetValue(p,&lenField,&vllField);
p = lpNext(lp,p);
vstr = lpGetValue(p,&vlen,&vll);
robj *value = (vstr != NULL) ?
createStringObject((char*)vstr,vlen) :
createStringObjectFromLongLongWithSds(vll);
fn(key, field, value, privdata);
pValue = lpGetValue(p,&lenValue,&vllValue);
p = lpNext(lp,p);
if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_LISTPACK_EX)
p = lpNext(lp, p); /* Skip expire time */

if (hfe) {
serverAssert(lpGetIntegerValue(p, &vllExpire));
p = lpNext(lp, p);

/* Skip expired fields */
if (hashTypeIsExpired(o, vllExpire))
continue;
}

robj *value = (pValue != NULL) ?
createStringObject((char*)pValue,lenValue) :
createStringObjectFromLongLongWithSds(vllValue);

robj *field = (pField != NULL) ?
createStringObject((char*)pField,lenField) :
createStringObjectFromLongLongWithSds(vllField);
fn(key, field, value, privdata);

decrRefCount(field);
decrRefCount(value);
Expand All @@ -11225,7 +11249,6 @@ int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleSc
return ret;
}


/* --------------------------------------------------------------------------
* ## Module fork API
* -------------------------------------------------------------------------- */
Expand Down
11 changes: 9 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3191,9 +3191,16 @@ typedef struct dictExpireMetadata {
#define HASH_SET_TAKE_VALUE (1<<1)
#define HASH_SET_COPY 0

/* Hash field lazy expiration flags. Used by core hashTypeGetValue() and its callers */
#define HFE_LAZY_EXPIRE (0) /* Delete expired field, and if last field also the hash */
#define HFE_LAZY_AVOID_FIELD_DEL (1<<0) /* Avoid deleting expired field */
#define HFE_LAZY_AVOID_HASH_DEL (1<<1) /* Avoid deleting hash if the field is the last one */
#define HFE_LAZY_NO_NOTIFICATION (1<<2) /* Do not send notification, used when multiple fields
* may expire and only one notification is desired. */

void hashTypeConvert(robj *o, int enc, ebuckets *hexpires);
void hashTypeTryConversion(redisDb *db, robj *subject, robj **argv, int start, int end);
int hashTypeExists(redisDb *db, robj *o, sds key, int *isHashDeleted);
int hashTypeExists(redisDb *db, robj *o, sds key, int hfeFlags, int *isHashDeleted);
int hashTypeDelete(robj *o, void *key, int isSdsField);
unsigned long hashTypeLength(const robj *o, int subtractExpiredFields);
hashTypeIterator *hashTypeInitIterator(robj *subject);
Expand All @@ -3210,7 +3217,7 @@ void hashTypeCurrentObject(hashTypeIterator *hi, int what, unsigned char **vstr,
unsigned int *vlen, long long *vll, uint64_t *expireTime);
sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what);
hfield hashTypeCurrentObjectNewHfield(hashTypeIterator *hi);
robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field, int *isHashDeleted);
robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field, int hfeFlags, int *isHashDeleted);
int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags);
robj *hashTypeDup(robj *o, sds newkey, uint64_t *minHashExpire);
uint64_t hashTypeRemoveFromExpires(ebuckets *hexpires, robj *o);
Expand Down
2 changes: 1 addition & 1 deletion src/sort.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
/* Retrieve value from hash by the field name. The returned object
* is a new object with refcount already incremented. */
int isHashDeleted;
o = hashTypeGetValueObject(db, o, fieldobj->ptr, &isHashDeleted);
o = hashTypeGetValueObject(db, o, fieldobj->ptr, HFE_LAZY_EXPIRE, &isHashDeleted);

if (isHashDeleted)
goto noobj;
Expand Down
Loading