本文共 4342 字,大约阅读时间需要 14 分钟。
我们首先进官方看一下Storm的介绍:
Apache Storm is a free and open source distributed realtime computation system
Storm是一个分布式的实时计算系统。
分布式:
它的存储或者计算交由多台服务器上完成,最后汇总起来达到最终的效果。实时:
处理速度是毫秒级或者秒级的计算:
可以简单理解为对数据进行处理,比如清洗数据(对数据进行规整,取出有用的数据)。比如消息处理
消息管理平台是可以推送各类的消息的(IM/PUSH/短信/微信消息等等),消息下发后,肯定要知道这条消息的下发情况的(是否发送成功,如果用户没收到是由于什么原因导致用户没收到,消息是否被点击了等等)。基于上面问题,我们用了Storm做了一套自己的埋点方案,帮助我们快速确认消息是否成功下发到用户上以及统计消息下发的效果。
听起来好像很牛逼,下面我来讲讲背景,看完你就会发现一点儿都不难。
消息管理平台虽然看起来只是发消息的,但是系统设计还是有点东西的。我们以「微服务」的思想去看这个系统,会将不同的功能模块抽取到不同的系统的。
其中PUSH(推送)的链路是最长的,一条消息下发经过的后端系统就有7个,如图下:
这7个系统都有可能「干掉」了这条消息,导致用户没收到。如果我们每去查一个问题,都要逐一排查每个系统,那实在是太慢了。
很多时候客服反馈过来的问题都是当天的,甚至是前几分钟的,我们需要有一个及时的反馈给客服来帮助用户找到为什么收不到消息的原因。
于是我们要做两个功能:
如果是单纯查问题,我们将各个系统的日志收集到Kafka,然后写到Elasticsearch这个是完全没问题的(现在我们也是这么干的)
前面提到了「埋点」,实际上就是打日志。其实就是在关键的地方上打上日志做记录,方便排查问题。
比如,现在我们有7个系统,每个系统在执行消息的时候都会可能导致这条消息发不出去(可能是消息去重了,可能是用户的手机号不正确,可能是用户太久没有登录了等等都有可能)。我们在这些『关键位置』都打上日志,方便我们去排查。
这些「关键位置」我们都给它用简单的数字来命个名。比如说:我们用「11」来代表这个用户没有绑定手机号,用「12」来代表这个用户10分钟前收到了一条一模一样的消息,用「13」来代表这个用户屏蔽了消息.....
「11」「12」「13」「14」「15」「16」这些就叫做「点位」,把这些点位在关键的位置中打上日志,这个就叫做「埋点」
有了埋点,我们要做的就是将这些点位收集起来,然后统一处理成我们的格式,输出到数据源中。
OK,就是分三步:
收集日志我们有logAgent帮我们收集到Kafka,实时清洗日志我们用的就是Storm,清洗完我们输出到Redis(实时)/Hive(离线)。
Storm一般是在处理(清洗)那层,Storm的上下游也很明确了(上游是消息队列,下游写到各种数据源,这种是最常见的):
Storm统一清洗出来放到Redis,我们就可以通过接口来很方便去查一条消息的整体下发情况:
到这里,主要想说明可以通过Storm来实时清洗数据,下来来讲讲Storm的基本使用~我们从一段最简单的Storm代码入门,先看看下面的代码:
如果完全没看过Storm代码的同学,看到上面的代码会怎么分析?我是这样的:
我们简单搜一下,就可以发现它的流程大致是这样的:
Spout是数据的源头,一般我们用它去接收数据,Spout接收到数据后往Bolt上发送,Bolt处理数据(清洗)。
Bolt清洗完数据可以写到一个数据源或者传递给下一个Bolt继续清洗。Topology关联了我们在程序中定义好的Spout和Bolt。
各种 Spout 和 Bolt 连接在一起之后,就成了一个 Topology,一个 Topology 就是一个 Storm 应用。Spout往Bolt传递数据,Bolt往Bolt传递数据,这个传递的过程叫做Stream,Stream传递的是一个一个Tuple。
现在问题来了,我们的Spout和Bolt之间是怎么关联起来的呢?Bolt和Bolt之间是怎么关联起来的呢?
在上面的图我们知道一个Topology会有多个Spout和多个Bolt,那我怎么知道这个Spout传递的数据是给这个Bolt,这个Bolt传递的数据是给另外一个Bolt?(说白了,就是上面图上的箭头是怎么关联的呢?)
在Storm中,有Grouping的机制,就是决定Spout的数据流向哪个Bolt,Bolt的数据流向下一个Bolt。
为了提高并发度,我们在setBolt的时候,可以指定Bolt的线程数,也就是所谓的Executor(Spout也同样可以指定线程数的,只是这次我拿Bolt来举例)。我们的结构可能会是这样的:
分组的策略有以下:
1)shuffleGrouping(随机分组)2)fieldsGrouping(按照字段分组,在这里即是同一个单词只能发送给一个Bolt)3)allGrouping(广播发送,即每一个Tuple,每一个Bolt都会收到)4)globalGrouping(全局分组,将Tuple分配到task id值最低的task里面)5)noneGrouping(随机分派)6)directGrouping(直接分组,指定Tuple与Bolt的对应发送关系)7)Local or shuffle Grouping8)partialKeyGrouping(关键字分组,与按字段分组很相似,但他分配更加均衡)9)customGrouping (自定义的Grouping)
shuffleGrouping策略我们是用得最多的,比如上面的图上有两个Spout,我们会将这两个Spout的Tuple均匀分发到各个Bolt中执行。
说到这里,我们再回头看看最开始的代码,我给补充一下注释,你们应该就能看得懂了:
入门的过程复杂吗?不复杂。说白了就是Spout接收到数据,通过grouping机制将Spout的数据传到给Bolt处理,Bolt处理完看还需不需要继续往下处理,如果需要就传递给下一个Bolt,不需要就写到数据源、调接口等等。
当我们提交任务之后,会发生什么呢?我们来看看。
流程大致如下:
Nimbus和Supervisor都是节点(服务器),Storm用Zookeeper去管理Supervisor节点的信息。
Supervisor节点下会创建Worker进程,创建多少个Worker进程由Conf配置文件决定。线程Executor,由进程产生,用于执行任务,Executor线程数有多少个是在setBolt、setSpout的时候决定。Task是真正的任务执行者,Task其实就是包装了Bolt/Spout实例。
关于Worker、Executor、Task之间的关系,在官网有一个例子专门说明了,我们可以看看。先放出代码:
内部的图:
解释一下:
从上面我们可以知道threads ≤ tasks线程数是肯定小于等于Task数的。有没有好奇宝宝会问:「Storm用了线程,那么会有线程不安全的情况吗?」(其实这是三歪刚学的疑问)
一般来说不会,因为很多情况下,一个线程是对应一个Task的(Task你可以理解为Bolt/Spout的实例),既然每个线程是处理自己的实例了,那当然不会有线程安全的问题啦。(当然了,你如果在Bolt/Spout中设置了静态成员变量,那还是会有线程安全问题)
这篇文章简单地介绍了一下Storm,Storm的东西其实还有很多,包括ack机制什么的。现在进官方找文档,都在主推Trident了。
转载地址:http://ehcuz.baihongyu.com/