Storm应用系列之 | 您所在的位置:网站首页 › storm实例 › Storm应用系列之 |
前言: 本文会从如何写一个Storm的topology开始,来对Storm实现的细节进行阐述。避免干巴巴的讲理论。 1. 建立Maven项目 我们用Maven来管理项目,方便lib依赖的引用和版本控制。 建立最基本的pom.xml如下:
1. 3. 4.0.0 4. com.edi.storm 5. storm-samples 6. 0.0.1-SNAPSHOT 7. jar 8. 9. 10. 11. UTF-8 12. 13. 14. 15. 16. 17. clojars.org 18. http://clojars.org/repo 19. 20. 21. 22. 23. 24. storm-samples 25. 26. 27. org.apache.maven.plugins 28. maven-compiler-plugin 29. 3.1 30. 31. 1.7 32. 1.7 33. ${project.build.sourceEncoding} 34. 35. 36. 37. 38. 39. maven-assembly-plugin 40. 41. 42. jar-with-dependencies 43. 44. 45. 46. 47. make-assembly 48. package 49. 50. single 51. 52. 53. 54. 55. 56. 57. 58. 59. 60. 61. 62. 63. storm 64. storm 65. 0.9.0-rc2 66. provided 67. 68. 69.
这里我额外添加了两个build 插件:
maven-compiler-plugin : 为了方便指定编译时jdk。Storm的依赖包里面某些是jdk1.5的. 和 maven-assembly-plugin: 为了把所有依赖包最后打到一个jar包去,方便测试和部署。后面会提到如果不想打到一个jar该怎么做。
2. 建立Spout 前文提到过,Storm中的spout负责发射数据。
我们来实现这样一个spout: 它会随机发射一系列的句子,句子的格式是 谁:说的话 代码如下:
1. public class RandomSpout extends BaseRichSpout { 2. 3. private SpoutOutputCollector collector; 4. 5. private Random rand; 6. 7. private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"}; 8. 9. @Override 10. public void open(Map conf, TopologyContext context, 11. SpoutOutputCollector collector) { 12. this.collector = collector; 13. this.rand = new Random(); 14. } 15. 16. @Override 17. public void nextTuple() { 18. String toSay = sentences[rand.nextInt(sentences.length)]; 19. this.collector.emit(new Values(toSay)); 20. } 21. 22. @Override 23. public void declareOutputFields(OutputFieldsDeclarer declarer) { 24. new Fields("sentence")); 25. } 26. 27. }
这里要先理解Tuple的概念。 Storm中,基本元数据是靠Tuple才承载的。或者说,Tuple是数据的一个大抽象。它要求实现类必须能序列化。
该Spout代码里面最核心的部分有两个: a. 用collector.emit()方法发射tuple。我们不用自己实现tuple,我们只需要定义tuple的value,Storm会帮我们生成tuple。Values对象接受变长参数。Tuple中以List存放Values,List的Index按照new Values(obj1, obj2,...)的参数的index,例如我们emit(new Values("v1", "v2")), 那么Tuple的属性即为:{ [ "v1" ], [ "V2" ] } b. declarer.declare方法用来给我们发射的value在整个Stream中定义一个别名。可以理解为key。该值必须在整个topology定义中唯一。
3. 建立Bolt 既然有了源,那么我们就来建立节点处理源流出来的数据。怎么处理呢?为了演示,我们来做些无聊的事情:末尾添加"!",然后打印。 两个功能,两个Bolt。 先看添加"!"的Bolt
1. public class ExclaimBasicBolt extends BaseBasicBolt { 2. 3. @Override 4. public void execute(Tuple tuple, BasicOutputCollector collector) { 5. //String sentence = tuple.getString(0); 6. 0); 7. "!"; 8. new Values(out)); 9. } 10. 11. @Override 12. public void declareOutputFields(OutputFieldsDeclarer declarer) { 13. new Fields("excl_sentence")); 14. } 15. 16. }
在RandomSpout中,我们发射的Tuple具有这样的属性 { [ "edi:I'm Happy" ] }, 所以tuple的value list中第0个值,肯定是个String。我们用tuple.getvalue(0)取到。 Storm为tuple封装了一些方法方便我们取一些基本类型,例如String,我们可以直接用getString(int N) 。 取到以后,我们在末尾添加"!"后,仍然发射一个Tuple,定义其唯一的value的field 名字为"excl_sentence"
打印Bolt
1. public class PrintBolt extends BaseBasicBolt { 2. 3. @Override 4. public void execute(Tuple tuple, BasicOutputCollector collector) { 5. 0); 6. "String recieved: " + rec); 7. } 8. 9. @Override 10. public void declareOutputFields(OutputFieldsDeclarer declarer) { 11. // do nothing 12. } 13. 14. }
仍然是取第一个,因为我们并没有定义过第二个value
4. 建立Topology 现在我们建立拓扑结构的主要组件都有了,可以创建topology了。
1. public class ExclaimBasicTopo { 2. 3. public static void main(String[] args) throws Exception { 4. new TopologyBuilder(); 5. 6. "spout", new RandomSpout()); 7. "exclaim", new ExclaimBasicBolt()).shuffleGrouping("spout"); 8. "print", new PrintBolt()).shuffleGrouping("exclaim"); 9. 10. new Config(); 11. false); 12. 13. if (args != null && args.length > 0) { 14. 3); 15. 16. 0], conf, builder.createTopology()); 17. else { 18. 19. new LocalCluster(); 20. "test", conf, builder.createTopology()); 21. 100000); 22. "test"); 23. cluster.shutdown(); 24. } 25. } 26. }
很简单,对吧。 其中,
1. builder.setSpout("spout", new RandomSpout());
定义一个spout,id为"spout"
1. builder.setBolt("exclaim", new ExclaimBasicBolt()).shuffleGrouping("spout");
定义了一个id为"exclaim"的bolt,并且按照随机分组获得"spout"发射的tuple
1. builder.setBolt("print", new PrintBolt()).shuffleGrouping("exclaim");
定义了一个id为"print"的bolt,并且按照随机分组获得"exclaim”发射出来的tuple
1. .shuffleGrouping
是指明Storm按照何种策略将tuple分配到后续的bolt去。
可以看到,如果我们运行时不带参数,是把topology提交到了LocalCluster的,即所有的task都在一个本地JVM去执行。可以用LocalCluster来调试。如果后面带一个参数,即为该topology的名字,那么就把该topology提交到集群上去了。 把项目用M2E插件导入Eclipse直接运行试试
1. String recieved: marry:I'm angry! 2. String recieved: edi:I'm happy! 3. String recieved: john:I'm sad! 4. String recieved: edi:I'm happy! 5. String recieved: ted:I'm excited! 6. String recieved: laden:I'm dangerous! 7. String recieved: edi:I'm happy! 8. String recieved: edi:I'm happy!
这里我们并没有指定并行,那么其实是每个spout、bolt仅有一个线程对应去执行。
我们修改下代码,指定并行数
1. builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout"); 2. builder.setBolt("print", new PrintBolt(),3).shuffleGrouping("exclaim");
由于我们并没有多指定task数目,所以默认,会有两个exectuor去执行两个exclaimBasicBolt的task,3个executor去执行3个PrintBolt的task。
为了方便体现确实是并行,我们修改PrintBolt代码如下:
1. public class PrintBolt extends BaseBasicBolt { 2. 3. private int indexId; 4. 5. @Override 6. public void prepare(Map stormConf, TopologyContext context) { 7. this.indexId = context.getThisTaskIndex(); 8. } 9. 10. @Override 11. public void execute(Tuple tuple, BasicOutputCollector collector) { 12. 0); 13. "Bolt[%d] String recieved: %s",this.indexId, rec)); 14. } 15. 16. @Override 17. public void declareOutputFields(OutputFieldsDeclarer declarer) { 18. // do nothing 19. } 20. 21. }
这里从上下文中拿到该Bolt的TaskIndex,我们指定了3的并发度,所以理论上有3个task,那么该值应该为[1,2,3]。
运行下看看:
1. Bolt[0] String recieved: marry:I'm angry! 2. Bolt[2] String recieved: john:I'm sad! 3. Bolt[2] String recieved: ted:I'm excited! 4. Bolt[2] String recieved: john:I'm sad! 5. Bolt[2] String recieved: john:I'm sad!
证实确实是并发了。
本地测试通过了,我们用 mvn clean install 命令编译,然后把target目录下生成的 storm-samples-jar-with-dependencies.jar 拷到nimbus机器上,执行
1. ./storm jar storm-samples-jar-with-dependencies.jar com.edi.storm.topos.ExclaimBasicTopo test
在StormUI里面,点进 test
看到spout 已然已经emit了 11347280个tuple了…… 而id为exclaim的bolt也已经接受了2906920个tuple了。print没有输出,所以emit为0。
截止到这里,一个简单的Storm的topology已经完成了。 但是,这里依然有些问题: 1. 什么是acker? 2. Bolt为什么有两个继承类和接口? 3. Topology的提交方式到底有几种? 4. 除了随机分组,还有哪些分组策略? 5. Storm是如何保证tuple不被丢失的? 6. 我看到spout发送数据比bolt处理的速度快太多了,我能不能在spout里面sleep? 7. 并发数要如何指定呢?
|
今日新闻 |
推荐新闻 |
专题文章 |
CopyRight 2018-2019 实验室设备网 版权所有 |