Redis的bitmap底层实现是通过一个char数组来表示,每个元素对应8个bit位。这个数组可以看作是一个字符串,其中每个字符都是一个字节,而每个字节又对应8个bit位。这种底层实现方式使得bitmap在存储空间上非常高效。
c/* SETBIT key offset bitvalue */
void setbitCommand(client *c) {
robj *o;
char *err = "bit is not an integer or out of range";
uint64_t bitoffset;
ssize_t byte, bit;
int byteval, bitval;
long on;
// 从参数中获取位的偏移量
if (getBitOffsetFromArgument(c, c->argv[2], &bitoffset, 0, 0) != C_OK)
return;
// 从对象中获取设置或清除的位的值
if (getLongFromObjectOrReply(c, c->argv[3], &on, err) != C_OK)
return;
// 位只能设置或清除
if (on & ~1) {
addReplyError(c, err);
return;
}
int dirty;
// 查找字符串对象并进行必要的调整
if ((o = lookupStringForBitCommand(c, bitoffset, &dirty)) == NULL)
return;
// 获取当前值
byte = bitoffset >> 3;
byteval = ((uint8_t *)o->ptr)[byte];
bit = 7 - (bitoffset & 0x7);
bitval = byteval & (1 << bit);
// 判断是否需要更新
if (dirty || (!!bitval != on)) {
// 更新字节的位值
byteval &= ~(1 << bit);
byteval |= ((on & 0x1) << bit);
((uint8_t *)o->ptr)[byte] = byteval;
// 发送信号和通知
signalModifiedKey(c, c->db, c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING, "setbit", c->argv[1], c->db->id);
server.dirty++;
}
// 返回原始值
addReply(c, bitval ? shared.cone : shared.czero);
}
在这段代码中,首先通过getBitOffsetFromArgument函数获取偏移量bitoffset,然后通过getLongFromObjectOrReply函数获取要设置的值on。接下来,通过lookupStringForBitCommand函数获取bitmap对应的字符串对象o。
然后,根据偏移量bitoffset计算出对应的字节位置byte,获取该字节的值byteval。接着,根据bitoffset计算出在字节中的bit位位置bit。通过位运算,将byteval中的bit位值取出,保存在bitval中。
然后,根据要设置的值on,将bit位的值设置为on,并更新byteval的值。最后,将更新后的byteval写回到bitmap的字符串对象o中。
最后,通过signalModifiedKey函数和notifyKeyspaceEvent函数通知其他模块对该key的修改,并增加server.dirty计数器。最后,根据bitval的值,返回相应的回复给客户端。
c/* GETBIT key offset */
void getbitCommand(client *c) {
robj *o;
char llbuf[32];
uint64_t bitoffset;
size_t byte, bit;
size_t bitval = 0;
// 从参数中获取位的偏移量
if (getBitOffsetFromArgument(c, c->argv[2], &bitoffset, 0, 0) != C_OK)
return;
// 查找并检查 key 对应的字符串对象
if ((o = lookupKeyReadOrReply(c, c->argv[1], shared.czero)) == NULL ||
checkType(c, o, OBJ_STRING))
return;
// 计算字节和位的索引
byte = bitoffset >> 3;
bit = 7 - (bitoffset & 0x7);
// 根据对象类型获取位的值
if (sdsEncodedObject(o)) {
if (byte < sdslen(o->ptr))
bitval = ((uint8_t *)o->ptr)[byte] & (1 << bit);
} else {
// 从 long long 型对象中获取位的值
if (byte < (size_t)ll2string(llbuf, sizeof(llbuf), (long)o->ptr))
bitval = llbuf[byte] & (1 << bit);
}
// 返回位的值
addReply(c, bitval ? shared.cone : shared.czero);
}
c/* BITCOUNT key [start end [BIT|BYTE]] */
void bitcountCommand(client *c) {
robj *o;
long long start, end;
long strlen;
unsigned char *p;
char llbuf[LONG_STR_SIZE];
int isbit = 0;
unsigned char first_byte_neg_mask = 0, last_byte_neg_mask = 0;
/* 解析 start/end 范围,如果有的话 */
if (c->argc == 4 || c->argc == 5) {
if (getLongLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK)
return;
if (getLongLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)
return;
if (c->argc == 5) {
if (!strcasecmp(c->argv[4]->ptr, "bit"))
isbit = 1;
else if (!strcasecmp(c->argv[4]->ptr, "byte"))
isbit = 0;
else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
}
/* 查找并检查类型 */
o = lookupKeyRead(c->db, c->argv[1]);
if (checkType(c, o, OBJ_STRING))
return;
p = getObjectReadOnlyString(o, &strlen, llbuf);
long long totlen = strlen;
/* 确保不会溢出 */
serverAssert(totlen <= LLONG_MAX >> 3);
/* 转换负索引 */
if (start < 0 && end < 0 && start > end) {
addReply(c, shared.czero);
return;
}
if (isbit)
totlen <<= 3;
if (start < 0)
start = totlen + start;
if (end < 0)
end = totlen + end;
if (start < 0)
start = 0;
if (end < 0)
end = 0;
if (end >= totlen)
end = totlen - 1;
if (isbit && start <= end) {
/* 在将位偏移转换为字节偏移之前,创建边缘的负面掩码 */
first_byte_neg_mask = ~((1 << (8 - (start & 7))) - 1) & 0xFF;
last_byte_neg_mask = (1 << (7 - (end & 7))) - 1;
start >>= 3;
end >>= 3;
}
} else if (c->argc == 2) {
/* 查找并检查类型 */
o = lookupKeyRead(c->db, c->argv[1]);
if (checkType(c, o, OBJ_STRING))
return;
p = getObjectReadOnlyString(o, &strlen, llbuf);
/* 整个字符串 */
start = 0;
end = strlen - 1;
} else {
/* 语法错误 */
addReplyErrorObject(c, shared.syntaxerr);
return;
}
/* 不存在的键返回 0 */
if (o == NULL) {
addReply(c, shared.czero);
return;
}
/* 前提条件:end >= 0 && end < strlen,因此 zero 只有在 start > end 的情况下才会返回。 */
if (start > end) {
addReply(c, shared.czero);
} else {
long bytes = (long)(end - start + 1);
long long count = redisPopcount(p + start, bytes);
if (first_byte_neg_mask != 0 || last_byte_neg_mask != 0) {
unsigned char firstlast[2] = {0, 0};
/* 我们可能会计算超出范围的第一个字节和最后一个字节的位。
* 因此,我们需要将它们减去。这里我们使用一个技巧。我们将范围内的位设置为零。
* 因此,这些位将不会被排除。*/
if (first_byte_neg_mask != 0)
firstlast[0] = p[start] & first_byte_neg_mask;
if (last_byte_neg_mask != 0)
firstlast[1] = p[end] & last_byte_neg_mask;
count -= redisPopcount(firstlast, 2);
}
addReplyLongLong(c, count);
}
}
/* 计算二进制数组 's' 中设置的位数,数组长度为 'count' 字节。
* 此函数的实现要求适用于长度最多为 512 MB 或更多的输入字符串(server.proto_max_bulk_len) */
long long redisPopcount(void *s, long count) {
long long bits = 0;
unsigned char *p = s;
uint32_t *p4;
static const unsigned char bitsinbyte[256] = {
0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4,
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8
};
/* 计算不与 32 位对齐的初始字节。*/
while ((unsigned long)p & 3 && count) {
bits += bitsinbyte[*p++];
count--;
}
/* 每次计算 28 个字节中的位数 */
p4 = (uint32_t *)p;
while (count >= 28) {
uint32_t aux1, aux2, aux3, aux4, aux5, aux6, aux7;
aux1 = *p4++;
aux2 = *p4++;
aux3 = *p4++;
aux4 = *p4++;
aux5 = *p4++;
aux6 = *p4++;
aux7 = *p4++;
count -= 28;
aux1 = aux1 - ((aux1 >> 1) & 0x55555555);
aux1 = (aux1 & 0x33333333) + ((aux1 >> 2) & 0x33333333);
aux2 = aux2 - ((aux2 >> 1) & 0x55555555);
aux2 = (aux2 & 0x33333333) + ((aux2 >> 2) & 0x33333333);
aux3 = aux3 - ((aux3 >> 1) & 0x55555555);
aux3 = (aux3 & 0x33333333) + ((aux3 >> 2) & 0x33333333);
aux4 = aux4 - ((aux4 >> 1) & 0x55555555);
aux4 = (aux4 & 0x33333333) + ((aux4 >> 2) & 0x33333333);
aux5 = aux5 - ((aux5 >> 1) & 0x55555555);
aux5 = (aux5 & 0x33333333) + ((aux5 >> 2) & 0x33333333);
aux6 = aux6 - ((aux6 >> 1) & 0x55555555);
aux6 = (aux6 & 0x33333333) + ((aux6 >> 2) & 0x33333333);
aux7 = aux7 - ((aux7 >> 1) & 0x55555555);
aux7 = (aux7 & 0x33333333) + ((aux7 >> 2) & 0x33333333);
bits += ((((aux1 + (aux1 >> 4)) & 0x0F0F0F0F) +
((aux2 + (aux2 >> 4)) & 0x0F0F0F0F) +
((aux3 + (aux3 >> 4)) & 0x0F0F0F0F) +
((aux4 + (aux4 >> 4)) & 0x0F0F0F0F) +
((aux5 + (aux5 >> 4)) & 0x0F0F0F0F) +
((aux6 + (aux6 >> 4)) & 0x0F0F0F0F) +
((aux7 + (aux7 >> 4)) & 0x0F0F0F0F)) *
0x01010101) >>
24;
}
/* 计算剩余的字节。*/
p = (unsigned char *)p4;
while (count--)
bits += bitsinbyte[*p++];
return bits;
}
参考:
本文作者:yowayimono
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!