Redis发布订阅

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。

  • 下图展示了频道channel1,以及订阅这个频道的三个客户端——client2、client5和client1之间的关系:

  • 当有新消息通过PUBLISH命令发送给频道channel1时,这个消息就会被发送给订阅它的三个客户端:

发布订阅相关命令

1
2
3
4
5
6
PSUBSCRIBE pattern [pattern ...] # 订阅一个或多个符合给定模式的频道
PUBSUB subcommand [argument [argument ...]] # 查看订阅与发布系统状态
PUBLISH channel message # 将信息发送到指定的频道
PUNSUBSCRIBE [pattern [pattern ...]] # 退订所有给定模式的频道
SUBSCRIBE channel [channel ...] # 订阅给定的一个或多个频道的信息
UNSUBSCRIBE [channel [channel ...]] # 指退订给定的频道

测试发布订阅

  • 开启一个Redis客户端,订阅一个新闻频道。

    1
    2
    3
    4
    5
    6
    # 第一个客户端:订阅端
    127.0.0.1:6379> SUBSCRIBE news
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "news"
    3) (integer) 1
  • 开启另一个Redis客户端,在同一个频道news发布两次消息,订阅者就能接收到消息。

    1
    2
    3
    4
    5
    # 第二个客户端:发送端
    127.0.0.1:6379> PUBLISH news "hello world"
    (integer) 1
    127.0.0.1:6379> PUBLISH news "hello redis"
    (integer) 1
  • 此时第一个客户端接收到该频道的消息。

    1
    2
    3
    4
    5
    6
    7
    # 第一个客户端
    1) "message"
    2) "news"
    3) "hello world"
    1) "message"
    2) "news"
    3) "hello redis"

原理

  • 关于保存所有频道的订阅关系的指针定义在server.h文件中。

    1
    2
    3
    4
    5
    6
    struct redisServer {
    //字典的键为正在被订阅的频道,字典的值则是一个链表,链表中保存了所有订阅这个频道的客户端
    dict *pubsub_channels; /* Map channels to list of subscribed clients */
    //list列表, 存放订阅频道的客户端
    list *pubsub_patterns; /* A list of pubsub_patterns */
    };
  • 通过pubsub_channels字典,程序只要检查某个频道是否为字典的键,就可以知道该频道是否正在被客户端订阅;只要取出某个键的值,就可以得到所有订阅该频道的客户端的信息。

  • 客户端订阅频道相关代码在pubsub.c文件中。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
    * 0 if the client was already subscribed to that channel. */
    int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    //先添加一个频道作为key,添加成功会返回DICT_OK值
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
    retval = 1;
    incrRefCount(channel);
    /* Add the client to the channel -> list of clients hash table */
    //通过key找出列表, 为null则先创建再添加订阅的客户端
    de = dictFind(server.pubsub_channels,channel);
    if (de == NULL) {
    clients = listCreate();
    dictAdd(server.pubsub_channels,channel,clients);
    incrRefCount(channel);
    } else {
    clients = dictGetVal(de);
    }
    //将客户端添加到末尾
    listAddNodeTail(clients,c);
    }
    /* Notify the client */
    addReplyPubsubSubscribed(c,channel);
    return retval;
    }
  • 频道发布消息的代码也在pubsub.c文件中。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    /* Publish a message */
    int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    dictIterator *di;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    //根据key即频道,找出订阅者列表
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
    list *list = dictGetVal(de);
    listNode *ln;
    listIter li;

    listRewind(list,&li);
    //遍历链表发送消息
    while ((ln = listNext(&li)) != NULL) {
    client *c = ln->value;
    addReplyPubsubMessage(c,channel,message);
    receivers++;
    }
    }
    /* Send to clients listening to matching channels */
    di = dictGetIterator(server.pubsub_patterns);
    if (di) {
    channel = getDecodedObject(channel);
    while((de = dictNext(di)) != NULL) {
    robj *pattern = dictGetKey(de);
    list *clients = dictGetVal(de);
    if (!stringmatchlen((char*)pattern->ptr,
    sdslen(pattern->ptr),
    (char*)channel->ptr,
    sdslen(channel->ptr),0)) continue;

    listRewind(clients,&li);
    while ((ln = listNext(&li)) != NULL) {
    client *c = listNodeValue(ln);
    addReplyPubsubPatMessage(c,pattern,channel,message);
    receivers++;
    }
    }
    decrRefCount(channel);
    dictReleaseIterator(di);
    }
    return receivers;
    }