专业 靠谱 的软件外包伙伴

您的位置:首页 > 新闻动态 > 携程电商平台异步消息系统软件开发实践

携程电商平台异步消息系统软件开发实践

2016-02-27 10:00:41

消息队列的优势

我们先回顾一下消息系统有哪些优势,为什么在企业里面会用这样的消息系统。MQ大家都会用到,像KAFKA等。企业里有很多的应用场景,比如实时的索引更新,或者是异步化的事情,使用MQ会发现这系统之间的耦合度降低了。MQ也是,它有什么特点?一个可能是可以做到异步的处理,这个处理时间可以很长,比如收完这个消息以后一小时再处理,非常灵活。还有就是可以抵御流量波峰。就算量很大,可以把这些量缓存在MQ里面,从而抵挡住这样的一些波峰,导致业务不会宕掉,按照我的吞吐量慢慢的做处理。另外它支持比较大的fan-out,我可以同时call很多人。目前我们面临着越来越多的个性化的实时的需求,一对多的场景会越来越多,这是MQ得以应用的地方。

MQ基本的模型如图1所示。最简单的它是一个Queue的模式,生产者的消息可以发给一个消费者,一个消息只能发送一次。Topic的话消息可以给多个人,每个人都可以拿到一个。这个名字可能不是标准的,很多MQ都会支持这样一种模式,就是Consumer可以组成一个组,你可以有好多个组,组之间是topic的语义,就是一个消息可以到达每一个组,但是组内只有一个人会消费,这种场景其实是最多的。比如说Group1是第一个业务,Group2是第二个业务,Group1之内这些机器其实是互备,或者说是并行处理,之间有一个吞吐量,它们之间不会互相影响,但是一个消息在一个组内只处理一次,最多的是这样的场景。

图片描述

 

图1

 

携程MQ系统架构演进

我们也曾做过很久的MQ的话,也犯过很多错踩过很多坑。1.0版本是在三四年前,那时需要的消息队列很简单,只要有一个地方可以存入和读取就可以,架构如图2所示。客户端直接写消息到mongo里面去,消费者直接从里面拖出来。它没有broker,就没有server,它支持queue和topic,不支持consumer gorup。1.0版本的特点在于,开发成本极低,但是一旦要升级,就会涉及大量的客户端,这个在企业里面基本上推不下去。还有它没有broker,但是客户端之间,主要是消费者之间,要做一定的协调。因为没有broker,只能通过DB做协调,可能就会涉及到DB里面有find and modify。大量的用DB去做协调,这时DB被用得很重,性能会很差。还有很多feature没有办法做,因为你没有broker,或者server帮你的协调,feature就没有办法去做。此外,没有弹性,或者说没有架构,基本上没法做任何调整。

图片描述

 

图2

 

基于这样一些东西,我们做了第2版,有broker,有中间的server,依然写到mongo DB里,如图3所示。差别在于,这个broke也不是一个纯粹意义上的集群,它其实是一个Master-Slave,通过mongo DB去做心跳。但是它是支持consum broker,因为它有broker,它可以做协调,broker来决定这个消息是要给谁。

图片描述

 

图3

 

还有一个比较大的变化在于,所有的客户端是不直接连向DB而是连向broker的,这在架构上也是一个很大的改善。此外,客户端很瘦,只是跟broker通讯,我告诉你我要发,consumer告诉broker我要收,然后broker把消息存进去或者推给consumer就可以了。这时候很多时候不需要做升级客户端这个操作,只要升级broker就可以了,而broker升级也很方便。

但是它的缺点在于,当初没有考虑到它的绑定关系。一个topic绑定到一个mongo DB的表上去,那么就是说,这个存储的粒度是一个topic。还有就是,broker之间不做协调不做管理,大家是不认识的,只是做心跳,所以还是有很多人工管理的成本在。还有mongo这边有一些改进,我刚才说这个消息怎么样去清。这可能跟Metric一样,要么去做分表,你写了这张表以后写下一张表,上一张表就可以清到了,drop掉就可以了,是很轻量的。如果放在一张表里面,你又要做insert,又要做delete,delete也是非常费的。所以当初我们选的时候,mongo有一个capped collection,这个是它是固定长度固定大小,比如就是2个G,超过2个G的存储它自己会滚回来。这样你不需要管数据清理这样的事情,只要设置好初始的大小就可以了,比如说10个G,满了自己会滚掉。一开始想还蛮好的,10个G,滚来滚去还不用做数据清理。后面你会发现,应用那边会说,一开始估计说可能2个G够了,后来你会发现其实长到20个G、200个G,然后mongo又不支持动态把capped-collection扩大,也蛮坑的。这个对架构上相对比较灵活,broker这个东西在,但是整个东西还是需要很多人工的管理,它是一个很笨的东西。

图4这个是现在的架构,其实还是三块,一块是producer,这边是消费者,中间是服务端,下面有存储。可以看到不一样的地方,一个是引入了metaserver它主要是做一些集群的协调。broker还在,底层的存储有两个,一个是用MySQL,一个是KAFKA。

图片描述

 

图4

 

两种消息类型

我们的消息存储有两种,一种是基于KAFKA,优点是吞吐和性能非常高,因为它写内存,OS刷磁盘,复制,很多事情让OS去做,非常高效。但是很多的特性它不支持,比如消息重发、优先级的消息、消息的过滤等。KAFKA的设计理念就是追求简单,所以吞吐和性能会负责好,但是很多feature你要自己去解决。还有就是,它的broker大量采用ZeroCopy,broker就像透明的东西一样透出来,它其实什么都不管的。在broker里面其实没有办法做很多的监控,很多事情没法做监控。这个在企业里面其实是非常关键的。KAFKA在外围生态方面比较欠缺,它有一些比较初级的监控系统,我们在用KAFKA的时候也遇到蛮多的坑。所以我们现在的策略就是说,核心的关键的,去走MySQL存储,整个系统都是我们自己搭建的,一旦出什么问题都可以找到,有什么问题也可以fix它。但是像一些日志的,或者用户行为监控的等等,这样的量非常大,但是其实不是关键的应用的数据,你可以去做KAFKA,一旦有什么问题影响也不是太大。

其实这种也是正好是比较,比如你这个消息是非常重要的,那么可能它的量就不是那么大,比如每天有好几个T,交易数据,可能淘宝有,携程其没有那么大,一天交易数据有上T,那公司的钱会非常多,我们可以投入更多的资源在这个上面。所以就现在来说这个模式可以做得非常好,非常大量的数据的,都不是特别重要的,可以KAFKA,很重要的都是MySQL这边,整套系统都是自己的。MySQL其实性能足以支撑绝大多数业务,我们可以做很多丰富的消息队列的特性,也可以做一些定制。整个的监控治理是非常深入和全面的。

如何构建高效的MQ

怎么构建MQ,这个事情比较大。所以我按这样一种思路来讲,先想一下这个系统如果是单机的系统,你怎么样做优化。然后你怎么把这样一个系统扩展到集群上去,因为单机肯定会有上限,你怎么扩展到集群。第三个扩展到集群以后你会发现涉及到集群的管理功能,怎么样做集群的管理?

单机优化

单机优化,对一个消息系统来说怎么评价这个消息系统是好的?从性能上来说,可能最重要的是两个指标,一个是我这个消息写入和读取速度很快,然后它的整个消息的通道非常宽,即使吞吐量非常大,也都可以发到这个消息系统里面去。还有一个是消息的延迟,消息写进去以后,消费者需要等待多少时间才能收到,这是消息的延迟。以MySQL为例,首先是讲消息的存储。一个是这些表的设计,只有ID的索引,即只有主键是有索引的,其他都没有索引。然后它的updata的操作也是根据主键去做,然后(sliver)的操作也是基本上一种,等于ID,大于ID,limit多少,就是这样一些非常轻量的一些操作。

原来MySQL有一个很大的问题是说,数据清理很烦。目前的版本可以支持一个表里面可以按ID或者按date等等去做一些partition,然后这个数据不要了,就drop掉一个partition就可以了。相对来说drop掉一个partition是比较轻量的操作,会比较快,数据的清理会非常方便。

那么它只做insert,当然会select。所以只是在ID上,所以它的insert是非常快的。还有消息队列一般都会有一个消息重发,那么就需要有一个地方来记录我这个消息什么时候需要重发出去。那么,重发时间对用户来说很重要吗?他自己也不知道。我们索性不让用户指定,直接隔十秒钟告知。这个schedule date就是当前时间加十秒,永远是递增的。那么索引就不需要了,这张表也可以变成只有ID的索引。这样你做一些优化,一种是从技术上,一种是从业务上考虑,这是消息系统需要解决的核心问题吗?如果不是但是又影响我的性能,那么你可以从业务的需求上想一些办法。

批量写入比较简单,大家都会想到,从broker端都会做一些批量化,很多producer写过来,我们在broker这边做一些批量。批量的效果其实蛮明显的,有的场景可以达到5倍以上的性能提升。当然对于JDBC来说要打开这样的开关:rewriteBatchedStatements=true。

刚讲的是我能够比较快的把数据写进去,还有一个是写进去以后,broker需要把消息读出来发给消费者。正常做法就是broker会轮询这张消息表,这边就要考虑一个折中,这个消息的延迟和你对这个DB的开销。这里我们会把这个事件捕获到,我什么时候有一个新的消息写进来,这样我不需要去轮询这个数据库就可以知道。但是这个前提是你所有的消息都会经过同一个broker,所以这个待会儿会讲到是怎么做到。

还有当消费者非常多的时候,比如说一百个消费者,写一次就要select一百次出来,这时候消息的缓存效果是非常好,一般的消费者都是追着这个producer,就是它消费的位置其实差不多的。你做一个消息是比较临近的,做缓存效果会非常好。DB可以开销降低很多。另外消息延迟也可以降低,原来从DB去取出来,现在从本地的memory里面取出来就可以。这样有一个问题,最快的消费者和最慢的消费者差很多,内存里放了太多的东西,所以会涉及到这个缓存要分裂跟合并,当中间这一块太多了要裂开,中间这一块就可以释放掉。当你后面这一块追上来以后又可以合并成这样一条。这样的话,缓存清理是非常快,就是一个列表,找到它后面扔掉就可以了,填充也是非常快的,往前填就可以了。

还有一个是消费者这边,broker可以很快的取到这个,它怎么把这个消息推给消费者?两种做法,一种是可以做push,broker收到以后push给消费者,另外一种是消费者来pull这个消息。大家可能会觉得push会比较好,pull可能就是我盲目的轮询,会比较慢。我们一开始确实是选择push,我broker有个消息以后,push给消费者那边去做处理。push有一个问题,所有的消费者状态都需要在broker那边做存储做管理,因为是push,要知道这个消费者消费到哪里了,有什么消息需要做重发的,很多状态hold在broker这边,因为要做push。这样一旦broker做重启,很多都丢掉了。丢掉当然不影响整个消息的接收,但是会有很多消息的重发,比如这条消息我收到过的,但是因为broker还没有刷到DB里面去,然后它重启了,就会重发。然后对push的话,整条链在broker里面的代码很复杂,整个push过来。一旦消费者那边断连了,broker那边要做很多调整,代码上写起来也比较恶心。所以后来换成pull的方式,比较简单,就像一个web一样,发一个请求,pull一下过来,对server是很简单,你是被动的,你有一个pull请求过来,处理好这个请求,把东西写给它就ok了。这边的状态保存非常少,而且保存时间非常短,代码也很简单。但是有一个问题,跟刚才轮询DB是一样的,客户端这边要pull得很快的话,就是实时性好,但是broker这边压力很大。所以后来用到LongPolling,就是我发一个pull请求,客户端。如果没有消息,你不要马上回给客户端说没有消息你再来pull。你就hold这个请求,然后一旦broker端发现有新的消息进来,把这个消息准备好再发给他。这样客户端这边也不需要频繁的轮询,所以是一个long的pulling,pull以后就hold在那里,broker有消息了再推给它。这样这个代码比较简单架构比较好,又可以兼顾实时性。

单机到集群

单机的优化,主要是focus在两点,可以很快的写进去、很快的传递到客户端。但单机肯定有极限,怎么扩展到集群上?集群关注扩展性,怎么样通过加一个broker加一个DB,整个的容量就上去了?

因为我们的重点是broker,最简单的方式,就是像web一样加一个负载均衡,堆上三个broker,随便发producer。这样有一个问题,消息顺序没有办法做保证。比如我发了一二三,三个消息,我希望收到的时候也是一二三,如果随便放到某一个broker的话,可能最终存储到DB的时候变成二一三,顺序会不一样,会面临这样的问题。还有一个问题,你会发现单机的优化就不再有效。包括消息轮询和消息缓存。

因此,一个topic,我希望它由一台broker做处理,否则很多优化没有办法做。那么能不能我把一个topic连到某一台broker上比如我有两个topic,有两个broker,分配一下,这样前面的一些消息顺序、单机优化是继续有效的,这种有一个问题,它的topic的吞吐会始终小于单个broker的吞吐。所以这边会引入一个topic partition。如果你希望这几个消息是有序,你就发到同一个partition,不同的partition之间是没有关系。如果你希望一二三是有序的,就把一二三放到同一个partition,四五六可以发到另外一个partition,这样可以兼顾你的吞吐又高顺序又可以保证。它还会跟消费者牵涉到,消费的时候一个partition也只能被一个group里面的一个消费消费到,这样才能保证partition里面是顺序处理的。

刚才我们讲消息表的存储,那个表是针对一个partition,不是针对一个topic,一个topic可能有十个partition,就会有十张消息表,每张有一个partition。所以我们最终可以看到,我希望不是把topic粘到broker上,我不需要这么强的约束,我只需要把topic点partition,某一个partition连到broker就可以了,这样我前面的优化都可以做到,消息顺序这些语义也可以保证到。拆了以后还有一个好处是粒度更细了,更容易做负载均衡。比如这个broker撑不住了,把这个partition迁走更方便。

基于Lease的集群管理

图5是我们希望达到的效果,怎么样做到?我可以把这个partition粘到某一个broker上去,这是集群的管理要做一些事情,就是我怎么样通过metaServer,把broker、topic管理好,这些partition是你管理的,你宕了之后可以balance到另外的broker上去。consumer也是,这是metaserver需要做的事情。

图片描述

 

图5

 

最基本的要做这样一些事情,broker加入和退出时要做什么,原来partition是你管理的,你宕了我要做什么事情。另外consumer加入、退出,你要做什么事情,因为消费者也会争抢消息,需要一定的管理。另外是partition怎么样做动态的分配,基本的一些事情。

这一块我们是基于lease去做,lease简单说就是一个有时限的lock,你lock了以后不用unlock,比如说给你一个10秒的lease,10秒之后自动的unlock,如果你说我需要这个lease,你可以去续,由metaserver决定要不要让你续。用lease的目的是说生成一个消息的路由表,就是说我这个消息从Producer出来以后先发给哪个broker,这个broker写到哪个DB,然后这个消息发给哪个consumer,因为这个是根据状态不断的做调整的。有的时候你是写到这个broker,不够了写到另外一个broker。这个时候是这个消费者在消费,如果这时候又起来一个消费者,可能一半的partition要分给另外一个消费者。通过这个lease来生成这个消息的路由表,就是producer到broker到consumer。

比如说看一个简单的例子,这个broker加入或者退出时候,metaserver要做什么事情。我们的broker跟metaserver之间是通过ZK做协调。然后比如左边是只有一台broker的时候,这个时候我有两个topic,T1、T2,各有两个partition。这个时候metaserver会感知到有一个broker在,会分配好这样的路由表。如果这个时候broker起来了,通过ZK,让metaserver感知到你之后,metaServer会把这个topic的partition重新做一个balance。原来一个broker管四个partition,现在只让你管两个,另外两个给另外一个。

因为broker,比如说这个broker是管理这个partition的,所有的消息都会写向你,你会拿到metaserver给你的lease,就是告诉你这个partition是你管理的,但是你要不断的续。metaserver如果这个时候想说这个partition不给你了,只要是broker来续的时候告诉你不续给你了就OK了。整个集群里面的交互是比较简单的,根据这个lease根据这个时间到达这个时间点之后,大家认为的集群状态肯定是一致的。比如说metaserver告诉broker说,这个lock只给你10秒,10秒之后不管这个broker能不能跟metaserver做通讯,跟ZK做通讯,它都会释放掉这个lease,也就是它不再接受写给它的这个partition的消息。这样整个集群协调很简单。

这样的话,你消息发送的时候做什么?producer需要不断的定时去刷这个路由,就是它会从metaserver这边刷一个路由过来,它就知道这个partition怎么去找broker,然后broker拿到这个partition的lease之后,要定时去续这个lease,只要说broker能够续上这个lease,那么producer发给它的消息它都可以处理。一旦metaserver要做一些调整,这个partition不让你管理了,只要这个lease不给就可以了。然后会发生两个事情,producer会拿到一个新的路由表,它知道这个消息要写到另外一个broker。broker那边会发现lease过了,这个时候你写给我我会告诉你现在不是我,你要写给另外一个broker。然后另外一个broker会拿到这个lease,通过这样一种机制,broker加入以后就可以提升整个消息的吞吐量。

consumer的加入不太一样的地方是说,它不是通过ZK去发现这个consumer起来了,那是通过什么?因为consumer,正好是lease,你要续lease也是通过metaserver去续的,那么只要把续lease这个操作做一个类似心跳一样,你就可以发现这个consumer起来了,consumer一起来它就问metaserver说,我想消费这个partition可不可以?这样通过获取lease或者续lease这样一个操作,metaserver就可以感知到这样一个consumer,就不需要借助ZK。这个时候lease的操作,metaserver发现以后,原来是有一个消费者消费两个partition,这个时候又起来一个消费者,就把一个partition分给另外一个消费者去做处理就可以了。

所以说,消息接收的时候,消费者也要刷这个消息的路由,知道我这个partition应该问哪个broker去要。然后consumer会连上broker说我需要消费这个partition,然后broker那边也会去通过这个metaserver续这个lease,这个是跟发送消息是一样的。然后如果说新的consumer起来以后,consumer这边也要续lease,它会续不到,这时候它就不会消费这个partition,会把这个partition让出来,让给其他的消费者去处理。

所以集群管理的话,跟KAFKA不太一样的地方是,consumer是不连接ZK的,可能也是在朝这个方向去走。然后通过metaserver去竞争lease,metaserver在这里面其实就是承担了一个大脑的角色,broker是很笨的,producer、consumer也是很笨的,这也是我们在做架构设计里面的一个经验,你在这个分布式系统里面只有一个人是聪明的做决策的,其他的角色都是服从于这个控制者,这样整个系统不会乱。只要控制好这个metaserver,整个系统都不会出乱子。broker是很笨的,你告诉它要处理就处理,不处理就不处理。如果每个人都很聪明就很容易出现一些不一致的情况,整个会变得不可控。这个也是我们希望metaserver对这个集群有非常灵活的控制能力,它是整个Hermes的核心。

图6中,把ZooKeeper,metaserver和(story)这边加在里面。还有一个没讲的点就是说,刚才讲broker是通过ZK去跟metaserver做通讯,实现HA。那么metaserver本身是怎么做HA的?metaserver的话就是说,它首先因为客户端要不断要刷路由,不断要访问这个metaserver,那肯定它本身是HA的,它怎么做?比较简单,跟metaserver的很多的都是走HTTP协议的,跟broker都是TCP,自定义的协议,跟metaserver是走HP,因为操作很简单,比如刷一个路由,要续一些Lease。所以它是放在一个域名下,通过这个域名来做HA,就比较简单。比如我这个环境里面的metaserver就是有一个固定域名,客户端起来以后就是通过这个固定域名扫出一堆IP来,然后任意找到一台就可以了。metaserver本身也是会分配,有一些topic是这个metaserver管理,有些是另外的管理。但是最终,只有一个人能够生成最终的消息,就是master,但是它会通过ZK去竞争说哪个metaserver是master,取路由的时候可以找到任意一台metaserver,如果它不是master,它会把你转到master的metaserver上去。

图片描述

 

图6

 

总结

所以说,消息队列的话,当然其实有很多的其他的一些事情,从我们想优化来说的话,我们需要关注几个事情,一个是消息的写入,我希望消息写入是很快,通道是很大,所以我们做了很多批量。然后在表的设计上做很多insertOnly这样一些事情。另外我希望消息的投递是非常及时的,延迟很短,你一直发出消息都可以收到。比如说我们做了一些partition Stick,stick到broker上去,很多优化都可以去做。写入事件,可以截获你,你不需要做轮询,可以做Long Pulling,很多事情是可以缩短整个因为写到DB再拿出来的时间的延迟。另外一个是在集群管理方面,我们是借助这个lease来做。整个lease实现起来会比较简单,当然lease会有一些它的问题,比如我拿到这个lease是二十秒,如果你不做额外的处理,你这台机器宕机了,20秒之内是没有办法给其他人的,因为你不知道是宕机了还是网络断了。所以我们会做一些主动释放lease的事情,把lease带来的时间延迟降到最低。归出来另外一种partition的顺序。我想可能还是只能做双选或者三选的方案。

 

关于:中科研拓

深圳市中科研拓科技有限公司专注提供软件外包、app开发、智能硬件开发、O2O电商平台、手机应用程序、大数据系统、物联网项目等开发外包服务,十年研发经验,上百成功案例,中科院软件外包合作企业。通过IT技术实现创造客户和社会的价值,致力于为用户提供很好的软件解决方案。联系电话400-0316-532,邮箱sales@zhongkerd.com,网址www.zhongkerd.com


  上一篇   [返回首页] [打印] [返回上页]   下一篇