博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Strom的trident小例子
阅读量:5051 次
发布时间:2019-06-12

本文共 3956 字,大约阅读时间需要 13 分钟。

 

上代码:

1 public class TridentFunc { 2      3     /** 4      * 类似于普通的bolt 5      */ 6     public static class MyFunction extends BaseFunction{ 7         @Override 8         public void execute(TridentTuple tuple, TridentCollector collector) { 9             Integer value = tuple.getIntegerByField("sentence");10             System.out.println(value);11         }12     }13     14     public static void main(String[] args) {15         @SuppressWarnings("unchecked")16         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values(1),new Values(2));17         spout.setCycle(true);//让spout循环发送数据18         19         TridentTopology tridentTopology = new TridentTopology();20         tridentTopology.newStream("spoutid",spout)21             .each(new Fields("sentence"), new MyFunction(), new Fields(""));22         23         LocalCluster localCluster = new LocalCluster();24         String simpleName = TridentFunc.class.getSimpleName();25         localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());26         //运行结果就是  一直循环打印 1 2 1 2  27     }28 }

 多数据源

1 public class TridentMeger { 2      3     /** 4      * 类似于普通的bolt 5      */ 6     public static class MyFunction extends BaseFunction{ 7         @Override 8         public void execute(TridentTuple tuple, TridentCollector collector) { 9             Integer value = tuple.getIntegerByField("sentence");10             System.out.println(value);11         }12     }13     14     public static void main(String[] args) {15         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values(1),new Values(2));16         //spout.setCycle(true);//让spout循环发送数据17         18         TridentTopology tridentTopology = new TridentTopology();19         //指定多个数据源,流连接20         Stream newStream1 = tridentTopology.newStream("spoutid1",spout);21         Stream newStream2 = tridentTopology.newStream("spoutid2",spout);22         23         //tridentTopology.newStream("spoutid",spout) 之前是这种  但是只能有 一个数据源  24         tridentTopology.merge(newStream1,newStream2)//使用这种就可以有多个数据源.25             .each(new Fields("sentence"), new MyFunction(), new Fields(""));26         27         LocalCluster localCluster = new LocalCluster();28         String simpleName = TridentMeger.class.getSimpleName();29         localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());30     }31 }

 增加过滤器

1 public class TridentFilter { 2      3     /** 4      * 类似于普通的bolt 5      */ 6     public static class MyFunction extends BaseFunction{ 7         @Override 8         public void execute(TridentTuple tuple, TridentCollector collector) { 9             Integer value = tuple.getIntegerByField("sentence");10             System.out.println(value);11         }12     }13     14     public static class MyFilter extends BaseFilter{
//专门封装了一个Filter功能.15 //对数据进行过滤 如果过滤出的数据不要了就false 保留就ture16 @Override17 public boolean isKeep(TridentTuple tuple) {18 Integer value = tuple.getIntegerByField("sentence");19 return value%2==0?true:false; //只要偶数不要奇数20 }21 }22 23 public static void main(String[] args) {24 FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values(1),new Values(2));25 spout.setCycle(true);//让spout循环发送数据26 27 TridentTopology tridentTopology = new TridentTopology();28 tridentTopology.newStream("spoutid",spout) //这个地方只能指定一个数据源,如果想指定多个数据源Spout 看TridentMeger.java29 .each(new Fields("sentence"), new MyFilter())30 .each(new Fields("sentence"), new MyFunction(), new Fields(""));31 32 LocalCluster localCluster = new LocalCluster();33 String simpleName = TridentFilter.class.getSimpleName();34 localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());35 }36 }

 

转载于:https://www.cnblogs.com/DreamDrive/p/6675991.html

你可能感兴趣的文章
JDK JRE Java虚拟机的关系
查看>>
2018.11.20
查看>>
word20161215
查看>>
dijkstra (模板)
查看>>
编译Linux驱动程序 遇到的问题
查看>>
大型分布式网站架构技术总结
查看>>
HDU 1017[A Mathematical Curiosity]暴力,格式
查看>>
[算法之美] KMP算法的直观理解
查看>>
EntityFramework 性能优化
查看>>
【ASP.NET开发】菜鸟时期的ADO.NET使用笔记
查看>>
android圆角View实现及不同版本号这间的兼容
查看>>
OA项目设计的能力③
查看>>
Cocos2d-x3.0 文件处理
查看>>
全面整理的C++面试题
查看>>
Activity和Fragment生命周期对比
查看>>
查找 EXC_BAD_ACCESS 问题根源的方法
查看>>
日常报错
查看>>
list-style-type -- 定义列表样式
查看>>
Ubuntu 编译出现 ISO C++ 2011 不支持的解决办法
查看>>
Linux 常用命令——cat, tac, nl, more, less, head, tail, od
查看>>