背景:

监测平台的下游需要设计一套客户端管理和任务调度的系统来Hold住大量实时在线客户端(android和pc),监测平台业务层根据用户配置的任务周期和属性,周期性或即时性的下发任务到当前在线的客户端,客户端与服务器使用长连接方式,另外需要考虑整套系统的扩展和容灾能力,在发挥最大单机性能的前提下提供方便快速的横向扩展能力;

需求:

0、能够实时查看和更新客户端的在线(上/下线)情况;
1、能够根据客户端id实时下发指定任务到指定id的客户端;
2、能够根据通配符/属性标签组合实时筛选出符合条件的在线客户端并下发指定任务;
3、能够快速扩容(指提供快速方便的横向扩容方法且不影响系统的业务逻辑);
4、容灾冗余(指同构模块机器当机不影响系统业务运行);

设计:

因此将整个系统设计成以下5个模块层:客户端接入层,客户端数据持久层,客户端索引节点层,任务调度层,api层;
整体架构如下图所示;

图1.1 客户端管理和任务分发架构

1、客户端接入层
接入层主要用来维持与客户端的连接,它能够实时更新和通知上层模块客户端的在线情况;平台基础架构中还木有lvs,因此暂时使用域名做接入层的分流,与后端的broker集群保持长连接;具体实现上broker与监测客户端使用mqtt协议进行通信,开源的基于mqtt的broker很多,找一个做二次开发,添加一些与数据持久层交互的代码即可;这里用到的是mosqutto;
1.1、mosquitto(http://mosquitto.org)
mosquitto是一款实现了消息推送协议MQTT v3.1的开源消息代理软件,提供轻量级的,支持可发布/可订阅的的消息推送模式。订阅发布模式适用于多对多的通信业务,通过增加一个中间层的方式,让中间层来维护这种多对多的关系,每个客户端都只需要维护自己同服务器之间的连接即可,而客户端之间的关系则交由中间层来维护,这种设计模式将复杂的通信关系维护工作从客户端剥离出来。
1.2、实现快速扩容的需求
每一mosquitto模块都维持了一定数量的在线客户端(任何在线客户端都被包含于某一mosquitto客户端集群中),在任务调度模块发布任务到指定id客户端时需知道该客户端所在的mosquitto模块的服务器ip/域名,因此在这里我们需要实现这个小功能,将客户端的id和所在mosquitto客户端集群信息,传递给上层调度模块;

给mosquitto.conf增加上游redis集群配置:

修改mosquitto_broker.h增加配置项变量:

修改conf.c增加配置项变量读取:

使用hiredis.so库操作redis;初始化pc/android redis的redisContext,修改mosquitto.c:

1.3、实现客户端在线状态持久化
我们只需要在客户端与mosquitto建立连接/断开连接时更新客户端状态即可;
修改read_handle_server.c的mqtt3_handle_connect方法,增加客户端首次建立连接时状态更新:

修改read_handle.c的mqtt3_handle_publish方法,客户端状态发生变化或publish数据时状态更新:

修改context.c的mqtt3_context_disconnect方法,客户端断开连接时状态更新:

最后在自定义的连接/更新/断开方法中实现自定义的持久化逻辑即可;redis_mosq.h中函数声明:

2、客户端数据持久层
使用redis集群做客户端统计数据的持久化工作(主要通过实现1.3中的三个自定义方法来达到数据的持久化),用来监测实时接入和断开的客户端信息,同时作为监控和报表的数据源;同时也作为索引节点集群的统一接入入口,将broker集群层与索引节点集群层解耦;

索引节点与broker使用redis的订阅发布功能,分别为订阅和发布端;

mosquitto发布端,在mosquitto的read_handle_server.c自定义的redis_new_client_connected方法中除了实现客户端统计信息的持久化工作外,还需要向所有订阅redis主题的索引节点模块publish当前新连接的客户端信息(id,标识,mosquitto的ip等);同样也需要在mosquitto的redis_client_state_update方法和redis_client_disconnected方法中向所有订阅redis主题的索引节点模块publish当前客户端的更新和断开信息;

索引节点订阅端(java实现),继承jedis库中的JedisPubSub类,重写onMessage方法在其中实现订阅主题消息到达后的业务处理逻辑,片段如下;其中,索引节点在接收到客户端的状态信息后并不会立即更新客户端的索引,在这里使用队列来保存当前的需要更新的客户端状态任务,由另一个间隔时间为10s的定时器线程来异步的处理所有客户端索引的更新;

3、客户端索引节点层
索引节点与broker使用redis的订阅发布功能进行关联,分别为订阅和发布端,当有监测客户端建立连接和断开连接时broker都会通知下游的索引节点更新相关索引内容,索引节点层使用多模块同构的方式实现容灾冗余,一般情况下下游调度层模块会在配置中配置多个上游索引节点模块地址,通过重试机制保证任务调度的高可用性;

3.1、节点中的客户端索引
首先我们来看任务配置时,业务端的需求如图3.1,需根据不同的用户需求配置不同的在线客户端节点组,因此单一的客户端我们在这里使用一组标识来表示(设备码+国家码+省码+市码+运营商码);


图3.1 业务端的任务配置需求

索引节点将根据客户端标识(设备码+国家码+省码+市码+运营商码)建立每个属性的倒排索引;另外,使用属性组合后的字符串搜索已标识客户端;在这里我们使用lucene库来实现这些需求;创建doc片段;因此,任务调度层在下发任务时即可通过location/type/key标识,挑选特定节点组、特定渠道的客户端集合;

3.2、保证客户端高覆盖率(使用惩罚值)
索引节点之间通过共享惩罚值、客户端权重值来控制和保证客户端的任务覆盖面(保证同标识的客户端都能接收到任务,保证任务被平均下发到大部分的客户端);
惩罚值(punishment_value)根据环境来控制单次任务的权重衰减值,单次任务执行后客户端的权重按照公式:weight = weight – constant * punishment计算,其中weight初始权重值1,constant值0.001;

惩罚值的目的是控制客户端的任务覆盖面,此值出自公式:online_clients_count / avg_pickup_count_pre_task = (avg_task_count * constant * punishment_value)/ constant,意思是保证跑过并发任务的客户端权重恢复到满血需要持续(online_clients_count / avg_pickup_count_pre_task)个周期以上的单位时间,其中并发任务执行后会消耗(avg_task_count * constant * punishment_value)个单位的权重值,单位周期内恢复constant个单位的权重值;
因此,punishment_value = online_clients_count / (avg_pickup_count_pre_task * avg_task_count)
avg_pickup_count_pre_task = ∑ pickup_count_pre_taski / n (0 < i < n)
avg_task_count = ∑ task_count_pre_minutei / 60 (0 <= i < 60)

4、任务调度层
任务调度模块与索引节点使用rpc通信(使用thrift框架),与前端broker使用mqtt协议通信;任务调度模块通过索引节点挑选到在线的节点组客户端集合后使用mqtt协议发布任务信息到集合中的所有客户端;

调度层的实现主要能够满足下面两个需求;

1)保证所有满足地域需求的客户端都能接收到下发任务,地域需求指地区+省+城市+运营商,用地域属性来组成标识,比如:CN_520000_522200_1;
满足地域需求的客户端任务的覆盖面主要由上面提到的惩罚值来进行控制,调度层需要定时将更新后的任务表信息(avg_pickup_count_pre_task和avg_task_count)发送给索引节点集群,索引节点集群好即时调整单次任务的权重衰减值;

2)支持两种任务下发模式;分别为按频率下发模式和按任务总个数打散下发模式;
下发模式由客户需求方指定,目前分成按频率下发模式(比如:用户在web端选择建立5分钟一次100个样本点的监测任务)和按任务总数打散下发的模式(比如:用户在web端选择建立1小时下发300个样本点的监测任务):
在具体实现上使用轮盘的数据结构来储存任务队列,即使用数组+链表的方式,数组的长度为60(指60分钟),每单位存储该分钟需要运行的任务队列指针,单位时间内(每分钟)的任务队列按照任务的权重等级(大客户>企业客户>普通用户)次序排队;按频率下发模式只需要将任务平均分配到时间轮盘中并保持该任务执行周期为60/freg_value即可;按任务总数打散下发的模式只需要保证时间轮盘中的单位时间内运行tasks_count/60个任务即可;

5、对web业务端暴露api
对上层模块暴露两种功能api,即时监测api和刷新任务api;即时监测提供rpc和restful api(使用grizzly框架),持续监测则通过mysql与web端进行交互,web端对持续监测任务表更新后调用刷新任务api通知任务调度层更新任务队列;

改进:

后期还需要在任务覆盖策略和索引更新效率上进行优化,并且在服务端逐渐docker化后,实现自动化的高峰扩容和灾难恢复的功能需求,降低运维的工作量;