Storm集群和Hadoop集群表面上看很类似。但是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这两者之间是非常不一样的。Topology的定义是一个Thrift结构,并且Nimbus就是一个Thrift服务, 你可以提交由任何语言创建的topology。
一、Storm根本概念
在运转一个Storm使命之前,需求了解一些概念:
- Topologies
- Streams
- Spouts
- Bolts
- Stream groupings
- Reliability
- Tasks
- Workers
- Configuration
Storm集群和Hadoop集群表面上看很相似。可是Hadoop上运转的是MapReduce jobs,而在Storm上运转的是拓扑(topology),这两者之间是十分不相同的。一个要害的差异是: 一个MapReduce job最终会完毕, 而一个topology永久会运转(除非你手动kill掉)。
在Storm的集群里边有两种节点: 操控节点(master node)和作业节点(worker node)。操控节点上面运转一个叫Nimbus后台程序,它的作用相似Hadoop里边的JobTracker。Nimbus担任在集群里边分发代码,分配核算使命给机器, 而且监控状况。
每一个作业节点上面运转一个叫做Supervisor的节点。Supervisor会监听分配给它那台机器的作业,根据需求发动/封闭作业进程。每一个作业进程履行一个topology的一个子集;一个运转的topology由运转在许多机器上的许多作业进程组成。
Nimbus和Supervisor之间的一切和谐作业都是经过Zookeeper集群完结。其他,Nimbus进程和Supervisor进程都是快速失利(fail-fast)和无状况的。一切的状况要么在zookeeper里边, 要么在本地磁盘上。这也就意味着你能够用kill -9来杀死Nimbus和Supervisor进程, 然后再重启它们,就好像什么都没有发生过。这个规划使得Storm反常的安稳。
1、Topologies
一个topology是spouts和bolts组成的图, 经过stream groupings将图中的spouts和bolts连接起来,如下图:
一个topology会一向运转直到你手动kill掉,Storm主动从头分配履行失利的使命, 而且Storm能够确保你不会有数据丢掉(假如敞开了高牢靠性的话)。假如一些机器意外停机它上面的一切使命会被转移到其他机器上。
运转一个topology很简略。首要,把你一切的代码以及所依靠的jar打进一个jar包。然后运转相似下面的这个指令:
Storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
这个指令会运转主类:backtype.strom.MyTopology, 参数是arg1,arg2。这个类的main函数界说这个topology而且把它提交给Nimbus。storm jar担任连接到Nimbus而且上传jar包。
Topology的界说是一个Thrift结构,而且Nimbus便是一个Thrift服务, 你能够提交由任何言语创立的topology。上面的方面是用JVM-based言语提交的最简略的办法。
2、Streams
音讯流stream是storm里的要害笼统。一个音讯流是一个没有鸿沟的tuple序列, 而这些tuple序列会以一种分布式的办法并行地创立和处理。经过对stream中tuple序列中每个字段命名来界说stream。在默许的情况下,tuple的字段类型能够是:integer,long,short, byte,string,double,float,boolean和byte array。你也能够自界说类型(只需完结相应的序列化器)。
每个音讯流在界说的时分会被分配给一个id,由于单向音讯流运用的适当遍及, OutputFieldsDeclarer界说了一些办法让你能够界说一个stream而不必指定这个id。在这种情况下这个stream会分配个值为‘default’默许的id 。
Storm供给的最根本的处理stream的原语是spout和bolt。你能够完结spout和bolt供给的接口来处理你的事务逻辑。
3、Spouts
音讯源spout是Storm里边一个topology里边的音讯生产者。一般来说音讯源会从一个外部源读取数据而且向topology里边宣告音讯:tuple。Spout能够是牢靠的也能够是不牢靠的。假如这个tuple没有被storm成功处理,牢靠的音讯源spouts能够从头发射一个tuple, 可是不牢靠的音讯源spouts一旦宣告一个tuple就不能重发了。
音讯源能够发射多条音讯流stream。运用OutputFieldsDeclarer.declareStream来界说多个stream,然后运用SpoutOutputCollector来发射指定的stream。
Spout类里边最重要的办法是nextTuple。要么发射一个新的tuple到topology里边或许简略的回来假如现已没有新的tuple。要注意的是nextTuple办法不能堵塞,由于storm在同一个线程上面调用一切音讯源spout的办法。
其他两个比较重要的spout办法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时分调用ack,不然调用fail。storm只对牢靠的spout调用ack和fail。
4、Bolts
一切的音讯处理逻辑被封装在bolts里边。Bolts能够做许多作业:过滤,聚合,查询数据库等等。
Bolts能够简略的做音讯流的传递。杂乱的音讯流处理往往需求许多进程,然后也就需求经过许多bolts。比方算出一堆图片里边被转发最多的图片就至少需求两步:***步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(假如要把这个进程做得更具有扩展性那么或许需求更多的进程)。
Bolts能够发射多条音讯流, 运用OutputFieldsDeclarer.declareStream界说stream,运用OutputCollector.emit来选择要发射的stream。
Bolts的首要办法是execute, 它以一个tuple作为输入,bolts运用OutputCollector来发射tuple,bolts有必要要为它处理的每一个tuple调用OutputCollector的ack办法,以告诉Storm这个tuple被处理完结了,然后告诉这个tuple的发射者spouts。 一般的流程是: bolts处理一个输入tuple, 发射0个或许多个tuple, 然后调用ack告诉storm自己现已处理过这个tuple了。storm供给了一个IBasicBolt会主动调用ack。
5、Stream groupings
界说一个topology的其间一步是界说每个bolt接纳什么样的流作为输入。stream grouping便是用来界说一个stream应该假如分配数据给bolts上面的多个tasks。
Storm里边有7种类型的stream grouping
- Shuffle Grouping: 随机分组, 随机派发stream里边的tuple,确保每个bolt接纳到的tuple数目大致相同。
- Fields Grouping:按字段分组, 比方按userid来分组, 具有相同userid的tuple会被分到相同的Bolts里的一个task, 而不同的userid则会被分配到不同的bolts里的task。
- All Grouping:播送发送,关于每一个tuple,一切的bolts都会收到。
- Global Grouping:大局分组, 这个tuple被分配到storm中的一个bolt的其间一个task。再详细一点便是分配给id值***的那个task。
- Non Grouping:不分组,这个分组的意思是说stream不关心究竟谁会收到它的tuple。现在这种分组和Shuffle grouping是相同的作用, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里边去履行。
- Direct Grouping: 直接分组,这是一种比较特其他分组办法,用这种分组意味着音讯的发送者指定由音讯接纳者的哪个task处理这个音讯。 只要被声明为Direct Stream的音讯流能够声明这种分组办法。而且这种音讯tuple有必要运用emitDirect办法来发射。音讯处理者能够经过TopologyContext来获取处理它的音讯的task的id(OutputCollector.emit办法也会回来task的id)。
- Local or shuffle grouping:假如方针bolt有一个或许多个task在同一个作业进程中,tuple将会被随机发生给这些tasks。不然,和一般的Shuffle Grouping行为共同。
6、Reliability
Storm确保每个tuple会被topology完好的履行。Storm会追寻由每个spout tuple所发生的tuple树(一个bolt处理一个tuple之后或许会发射其他tuple然后构成树状结构),而且盯梢这棵tuple树什么时分成功处理完。每个topology都有一个音讯超时的设置,假如storm在这个超时的时间内检测不到某个tuple树究竟有没有履行成功, 那么topology会把这个tuple标记为履行失利,而且过一瞬间从头发射这个tuple。
为了运用Storm的牢靠性特性,在你宣告一个新的tuple以及你完结处理一个tuple的时分你有必要要告诉storm。这一切是由OutputCollector来完结的。经过emit办法来告诉一个新的tuple发生了,经过ack办法告诉一个tuple处理完结了。
Storm的牢靠性咱们在第四章会深化介绍。
7、Tasks
每一个spout和bolt会被当作许多task在整个集群里履行。每一个executor对应到一个线程,在这个线程上运转多个task,而stream grouping则是界说怎样从一堆task发射tuple到其他一堆task。你能够调用TopologyBuilder类的setSpout和setBolt来设置并行度(也便是有多少个task)。
8、Workers
一个topology或许会在一个或许多个worker(作业进程)里边履行,每个worker是一个物理JVM而且履行整个topology的一部分。比方,关于并行度是300的topology来说,假如咱们运用50个作业进程来履行,那么每个作业进程会处理其间的6个tasks。Storm会尽量均匀的作业分配给一切的worker。
9、Configuration
Storm里边有一堆参数能够装备来调整Nimbus, Supervisor以及正在运转的topology的行为,一些装备是体系级其他,一些装备是topology级其他。default.yaml里边有一切的默许装备。你能够经过界说个storm.yaml在你的classpath里来掩盖这些默许装备。而且你也能够在代码里边设置一些topology相关的装备信息(运用StormSubmitter)。
#p#
二、构建Topology
1. 完结的方针:
咱们将规划一个topology,来完结对一个语句里边的单词呈现的频率进行核算。这是一个简略的比方,意图是让咱们关于topology快速上手,有一个开端的了解。
2. 规划Topology结构:
在开端开发Storm项意图***步,便是要规划topology。确定好你的数据处理逻辑,咱们今日将的这个简略的比方,topology也十分简略。整个topology如下:
整个topology分为三个部分:
- KestrelSpout:数据源,担任发送sentence
- Splitsentence:担任将sentence切分
- Wordcount:担任对单词的频率进行累加
3. 规划数据流
这个topology从kestrel queue读取语句,并把语句区分成单词,然后汇总每个单词呈现的次数,一个tuple担任读取语句,每一个tuple别离对应核算每一个单词呈现的次数,大约姿态如下所示:
4. 代码完结:
1) 构建maven环境:
为了开发storm topology, 你需求把storm相关的jar包添加到classpath里边去: 要么手动添加一切相关的jar包, 要么运用maven来管理一切的依靠。storm的jar包发布在Clojars(一个maven库), 假如你运用maven的话,把下面的装备添加在你项意图pom.xml里边。
- <repository>
- <id>clojars.org</id>
- <url>http://clojars.org/repo</url>
- </repository>
- <dependency>
- <groupId>storm</groupId>
- <artifactId>storm</artifactId>
- <version>0.5.3</version>
- <scope>test</scope>
- </dependency>
2) 界说topology:
- TopologyBuilderbuilder=newTopologyBuilder();
- builder.setSpout(1,newKestrelSpout(“kestrel.backtype.com”,22133,”sentence_queue”,newStringScheme()));
- builder.setBolt(2,newSplitSentence(),10)
- .shuffleGrouping(1);
- builder.setBolt(3,newWordCount(),20)
- .fieldsGrouping(2,newFields(“word”));
这种topology的spout从语句行列中读取语句,在kestrel.backtype.com坐落一个Kestrel的服务器端口22133。
Spout用setSpout办法刺进一个共同的id到topology。Topology中的每个节点有必要给予一个id,id是由其他bolts用于订阅该节点的输出流。KestrelSpout在topology中id为1。
setBolt是用于在Topology中刺进bolts。在topology中界说的***个bolts是切开语句的bolts。这个bolts将语句流转成成单词流。
让咱们看看SplitSentence施行:
- publicclassSplitSentenceimplementsIBasicBolt{
- publicvoidprepare(Mapconf,TopologyContextcontext){
- }
- publicvoidexecute(Tupletuple,BasicOutputCollectorcollector){
- Stringsentence=tuple.getString(0);
- for(Stringword:sentence.split(“”)){
- collector.emit(newValues(word));
- }
- }
- publicvoidcleanup(){
- }
- publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){
- declarer.declare(newFields(“word”));
- }
- }
要害的办法是execute办法。正如你能够看到,它将语句拆分成单词,并宣告每个单词作为一个新的元组。另一个重要的办法是declareOutputFields,其间宣告bolts输出元组的架构。在这里宣告,它宣告一个域为word的元组。
setBolt的***一个参数是你想为bolts的并行量。SplitSentencebolts是10个并发,这将导致在storm集群中有十个线程并行履行。你所要做的的是添加bolts的并行量在遇到topology的瓶颈时。
setBolt办法回来一个目标,用来界说bolts的输入。例如,SplitSentence螺栓订阅组件“1”运用随机分组的输出流。“1”是指现已界说KestrelSpout。我将解说在某一时间的随机分组的一部分。到现在为止,最要紧的是,SplitSentencebolts会耗费KestrelSpout宣告的每一个元组。
下面在让咱们看看wordcount的完结:
- publicclassWordCountimplementsIBasicBolt{
- privateMap<String,Integer>_counts=newHashMap<String,Integer>();
- publicvoidprepare(Mapconf,TopologyContextcontext){
- }
- publicvoidexecute(Tupletuple,BasicOutputCollectorcollector){
- Stringword=tuple.getString(0);
- intcount;
- if(_counts.containsKey(word)){
- count=_counts.get(word);
- }else{
- count=0;
- }
- count++;
- _counts.put(word,count);
- collector.emit(newValues(word,count));
- }
- publicvoidcleanup(){
- }
- publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){
- declarer.declare(newFields(“word”,“count”));
- }
- }
SplitSentence关于语句里边的每个单词发射一个新的tuple, WordCount在内存里边保护一个单词->次数的mapping, WordCount每收到一个单词, 它就更新内存里边的核算状况。
5. 运转Topology
storm的运转有两种形式: 本地形式和分布式形式.
1) 本地形式:
storm用一个进程里边的线程来模仿一切的spout和bolt. 本地形式对开发和测验来说比较有用。 你运转storm-starter里边的topology的时分它们便是以本地形式运转的, 你能够看到topology里边的每一个组件在发射什么音讯。
2) 分布式形式:
storm由一堆机器组成。当你提交topology给master的时分, 你一起也把topology的代码提交了。master担任分发你的代码而且担任给你的topolgoy分配作业进程。假如一个作业进程挂掉了, master节点会把以为从头分配到其它节点。
3) 下面是以本地形式运转的代码:
- Configconf=newConfig();
- conf.setDebug(true);
- conf.setNumWorkers(2);
- LocalClustercluster=newLocalCluster();
- cluster.submitTopology(“test”,conf,builder.createTopology());
- Utils.sleep(10000);
- cluster.killTopology(“test”);
- cluster.shutdown();
首要, 这个代码界说经过界说一个LocalCluster目标来界说一个进程内的集群。提交topology给这个虚拟的集群和提交topology给分布式集群是相同的。经过调用submitTopology办法来提交topology, 它承受三个参数:要运转的topology的姓名,一个装备目标以及要运转的topology自身。
topology的姓名是用来仅有差异一个topology的,这样你然后能够用这个姓名来杀死这个topology的。前面现已说过了, 你有必要显式的杀掉一个topology, 不然它会一向运转。
Conf目标能够装备许多东西, 下面两个是最常见的:
TOPOLOGY_WORKERS(setNumWorkers) 界说你期望集群分配多少个作业进程给你来履行这个topology. topology里边的每个组件会被需求线程来履行。每个组件究竟用多少个线程是经过setBolt和setSpout来指定的。这些线程都运转在作业进程里边. 每一个作业进程包括一些节点的一些作业线程。比方, 假如你指定300个线程,60个进程, 那么每个作业进程里边要履行6个线程, 而这6个线程或许归于不同的组件(Spout, Bolt)。你能够经过调整每个组件的并行度以及这些线程地点的进程数量来调整topology的功能。
TOPOLOGY_DEBUG(setDebug), 当它被设置成true的话, storm会记录下每个组件所发射的每条音讯。这在本地环境调试topology很有用, 可是在线上这么做的话会影响功能的。
定论:
本章从storm的根本目标的界说,到广泛的介绍了storm的开发环境,从一个简略的比方讲解了topology的构建和界说。期望咱们能够从本章的内容对storm有一个根本的了解和概念,而且现已能够构建一个简略的topology!!
知优网 » Storm入门教程:构建Topology(storm 教程)