Hiredis 实现 Redis 流水线

Pipelining(流水线)允许 Redis 客户端一次向 Redis 发送多个命令,Redis 在接收到这些命令后,按顺序进行处理,然后将请求的处理结果一次性返回给客户端。流水线可以减少客户端与 Redis 之间的网络通信次数来提升 Redis 客户端在发送多个命令时的性能,可谓提升客户端性能的一个利器。
我们熟悉的 Python 版本的 Redis 客户端 redis-py 提供了 StrictPipeline 对象来实现流水线,使用起来很是方便,具体用法可以参考文章《Redis 事务学习笔记》。作为 C/C++ 版本的 Redis 客户端,hiredis 实现流水线稍显有点复杂,不过通过使用 hiredis 来实现流水线却可以更深刻了解流水线的内部实现原理。

Hiredis 提供redisCommand()函数来向 Redis 服务端发送命令,redisCommand()函数的原型如下:

1
void *redisCommand(redisContext *c, const char *format, ...);

redisCommand()执行后,返回一个redisReply *指针,指向redisReply结构体,该结构体包含了返回的结果信息。
redisCommand()函数是阻塞的(是指使用阻塞版的redisContext对象,下文我们同样有这个假定),每调用一次,都会等待 Redis 服务端的返回,然后再继续执行程序下面的逻辑。
redisCommand()函数的使用示例如下所示,完整的代码和编译可以参考文章《Redis C 语言客户端 hiredis 的使用》

1
2
3
4
5
6
7
redisReply *reply;
reply = redisCommand(conn, "SET %s %s", "foo", "bar");
freeReplyObject(reply);
reply = redisCommand(conn, "GET %s", "foo");
printf("%s\n", reply->str);
freeReplyObject(reply);

如果我们需要向 Redis 服务端发送多次命令,如果都是使用redisCommand()函数来发送,那么每次发送后都得等待返回结果后才能继续下一次发送,这性能显然不是我们能接受的。Hiredis 提供了redisAppendCommand()函数来实现流水线的命令发送方案。

1
int redisAppendCommand(redisContext *c, const char *format, ...);

redisAppendCommand()函数执行成功时返回REDIS_OK,失败时返回REDIS_ERR

1
2
#define REDIS_ERR -1
#define REDIS_OK 0

redisCommand()函数一样,redisAppendCommand()函数在 hiredis 中也有其他变体,这里为了描述的简便,仅以redisCommand()函数为例说明。
redisAppendCommand()函数执行后,并没有立刻将命令发送到 Redis 执行,而是先将命令缓存到redisContext对象中。那么,redisContext对象中被缓存起来的命令什么时候会被发送出去呢?Hiredis 提供了redisGetReply()函数来将缓存的命令发送出去的功能。redisGetReply()函数的处理过程如下:

  1. 查看结果缓冲区是否还有结果没被取出,如果有,则取出结果后直接返回;如果没有,则执行步骤2
  2. 命令缓冲区的所有命令发送到 Redis 处理,然后一直等待,直到有一个 Redis 的处理结果返回

上面我们提到的redisCommand()函数执行后可以直接获取 Redis 的返回结果,这是由于其内部先调用redisAppendCommand()函数,然后再调用redisGetReply()函数实现的。

说到这里,hiredis 实现流水线的过程就很清晰了。无论redisCommand()函数还是redisAppendCommand()函数,都会先将命令缓存起来,然后再发送到 Redis 执行。不同的是 redisCommand()函数会马上发送命令然后取得返回结果,而redisAppendCommand()函数则在调用redisGetReply()函数才将所有命令一次性发送,并取得第一个命令的返回结果。

下面是使用redisAppendCommand()函数实现流水线方案的示例。

1
2
3
4
5
6
7
redisReply *reply;
redisAppendCommand(context,"SET foo bar");
redisAppendCommand(context,"GET foo");
redisGetReply(context,&reply); // SET命令的返回
freeReplyObject(reply);
redisGetReply(context,&reply); // GET命令的返回
freeReplyObject(reply);

值得注意的是,调用redisAppendCommand()函数的次数需要与调用redisGetReply()的次数要一致,否则会出现获取的 Redis 处理结果跟预期不一致的情况。

1
2
3
4
5
// 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况
redisAppendCommand(conn, "get t");
// 本来想取得 set a ddd 的返回,却获取了 get t 的返回
reply = redisCommand(conn, "set a ddd");
printf("set a res: %s\n", reply->str);

输出的结果将会是get t命令的返回,而不是set a ddd命令的返回。

参考资料

  1. https://github.com/redis/hiredis
  2. https://gist.github.com/dspezia/1893378
  3. http://www.leoox.com/?p=316
  4. http://www.redis.cn/topics/pipelining.html

附:示例程序 testhiredis.c

编译:

gcc -o testhiredis testhiredis.c -L/usr/local/lib -lhiredis

执行:

./testhiredis

输出:

bar
res: OK
res: b
watch res: OK
res: OK, num: 0, type: 5
res: QUEUED, num: 0, type: 5
res: QUEUED, num: 0, type: 5
res: QUEUED, num: 0, type: 5
res: (null), num: 3, type: 2
set a res: tt

源程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include <stdio.h>
#include <hiredis/hiredis.h>
int main() {
// 阻塞 redisContext
redisContext *conn = redisConnect("127.0.0.1", 6379);
if (conn != NULL && conn->err) {
printf("connection error: %s\n", conn->errstr);
return 0;
}
// 使用 redisCommand 发送命令并获取返回
redisReply *reply;
reply = redisCommand(conn, "SET %s %s", "foo", "bar");
freeReplyObject(reply);
reply = redisCommand(conn, "GET %s", "foo");
printf("%s\n", reply->str);
freeReplyObject(reply);
// 使用 redisAppendCommand 实现流水线
redisAppendCommand(conn, "set a b");
redisAppendCommand(conn,"get a");
int r = redisGetReply(conn, (void **)&reply);
if (r == REDIS_ERR) {
printf("ERROR\n");
}
printf("res: %s\n", reply->str);
freeReplyObject(reply);
r = redisGetReply(conn, (void **)&reply);
if (r == REDIS_ERR) {
printf("ERROR\n");
}
printf("res: %s\n", reply->str);
freeReplyObject(reply);
// 使用 watch 命令监控键 a
reply = redisCommand(conn, "watch a");
printf("watch res: %s\n", reply->str);
freeReplyObject(reply);
// 事务流水线,总共5个命令
redisAppendCommand(conn, "multi");
redisAppendCommand(conn, "get foo");
redisAppendCommand(conn, "set t tt");
redisAppendCommand(conn, "set a aa");
redisAppendCommand(conn, "exec");
for (int i = 0; i < 5; ++i) {
r = redisGetReply(conn, (void **)&reply);
if (r == REDIS_ERR) {
printf("ERROR\n");
}
printf("res: %s, num: %zu, type: %d\n", reply->str, reply->elements, reply->type);
freeReplyObject(reply);
}
// 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况
redisAppendCommand(conn, "get t");
// 本来想取得 set a ddd 的返回,却获取了 get t 的返回
reply = redisCommand(conn, "set a ddd");
printf("set a res: %s\n", reply->str);
redisFree(conn);
return 0;
}