记录一下工作点滴;

监测平台的下游需要设计一套客户端管理和任务调度的系统来Hold住2w+的实时在线客户端,pc客户端与服务器使用长连接方式,另外需要考虑整套系统的扩展和容灾能力;因此将整个系统设计成以下5个模块层:客户端接入层,客户端数据持久层,客户端索引节点层,任务调度层,api层;

整体架构如下图所示;

图1.1 客户端管理和任务分发架构

1、客户端接入层
平台基础架构中还木有lvs,因此暂时使用域名做接入层的分流,与后端的broker集群保持长连接;具体实现上broker与监测客户端使用mqtt协议进行通信,开源的基于mqtt的broker很多,找一个做二次开发,添加一些与数据持久层交互的代码即可;

2、客户端数据持久层
使用redis集群做客户端统计数据的持久化工作,用来监测实时接入和断开的客户端信息,同时作为监控和报表的数据源;同时也作为索引节点集群的统一接入入口,将broker集群层与索引节点集群层解耦;

3、客户端索引节点层
索引节点与broker使用redis的订阅发布功能,分别为订阅和发布端,当有监测客户端建立连接和断开连接时broker都会通知下游的索引节点更新相关索引内容;索引节点将根据客户端标识(设备码+国家码+省码+市码+运营商码)建立每个属性的倒排索引;另外,使用属性组合后的字符串搜索已标识客户端;
索引节点之间通过共享惩罚值、客户端权重值来控制和保证客户端的任务覆盖面(保证同标识的客户端都能接收到任务,保证任务被平均下发到大部分的客户端);
惩罚值(punishment_value)根据环境来控制单次任务的权重衰减值,单次任务执行后客户端的权重按照公式:weight = weight – constant * punishment计算,其中weight初始权重值1,constant值0.001;
惩罚值的目的是控制客户端的任务覆盖面,此值出处公式:online_clients_count / avg_pickup_count_pre_task = (avg_task_count * constant * punishment_value)/ constant,意思是保证跑过并发任务的客户端权重恢复到满血需要持续(online_clients_count / avg_pickup_count_pre_task)个周期以上的单位时间,其中并发任务执行后会消耗(avg_task_count * constant * punishment_value)个单位的权重值,单位周期内恢复constant个单位的权重值;
因此,punishment_value = online_clients_count / (avg_pickup_count_pre_task * avg_task_count)
avg_pickup_count_pre_task = ∑ pickup_count_pre_taski / n (0 < i < n)
avg_task_count = ∑ task_count_pre_minutei / 60 (0 <= i < 60)

4、任务调度层
任务调度模块与索引节点使用rpc通信,与前端broker使用mqtt协议通信;
调度层的实现主要能够满足下面两个需求;

1)保证所有满足地域需求的客户端都能接收到下发任务,地域需求指地区+省+城市+运营商,用地域属性来组成标识,比如:CN_520000_522200_1;
满足地域需求的客户端任务的覆盖面主要由上面提到的惩罚值来进行控制,调度层需要定时将更新后的任务表信息(avg_pickup_count_pre_task和avg_task_count)发送给索引节点集群,索引节点集群好即时调整单次任务的权重衰减值;

2)支持两种任务下发模式;分别为按频率下发模式和按任务总个数打散下发模式;
下发模式由客户需求方指定,目前分成按频率下发模式(比如:用户在web端选择建立5分钟一次100个样本点的监测任务)和按任务总数打散下发的模式(比如:用户在web端选择建立1小时下发300个样本点的监测任务):
在具体实现上使用轮盘的数据结构来储存任务队列,即使用数组+链表的方式,数组的长度为60(指60分钟),每单位存储该分钟需要运行的任务队列指针,单位时间内(每分钟)的任务队列按照任务的权重等级(大客户>企业客户>普通用户)次序排队;按频率下发模式只需要将任务平均分配到时间轮盘中并保持该任务执行周期为60/freg_value即可;按任务总数打散下发的模式只需要保证时间轮盘中的单位时间内运行tasks_count/60个任务即可;

5、对web端暴露api
对上层模块暴露两种功能api,即时监测api和刷新任务api;即时监测提供rpc和restful api,持续监测则通过mysql与web端进行交互,web端对持续监测任务表更新后调用刷新任务api通知任务调度层更新任务队列;