上代码:
1 public class TridentFunc { 2 3 /** 4 * 类似于普通的bolt 5 */ 6 public static class MyFunction extends BaseFunction{ 7 @Override 8 public void execute(TridentTuple tuple, TridentCollector collector) { 9 Integer value = tuple.getIntegerByField("sentence");10 System.out.println(value);11 }12 }13 14 public static void main(String[] args) {15 @SuppressWarnings("unchecked")16 FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values(1),new Values(2));17 spout.setCycle(true);//让spout循环发送数据18 19 TridentTopology tridentTopology = new TridentTopology();20 tridentTopology.newStream("spoutid",spout)21 .each(new Fields("sentence"), new MyFunction(), new Fields(""));22 23 LocalCluster localCluster = new LocalCluster();24 String simpleName = TridentFunc.class.getSimpleName();25 localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());26 //运行结果就是 一直循环打印 1 2 1 2 27 }28 }
多数据源
1 public class TridentMeger { 2 3 /** 4 * 类似于普通的bolt 5 */ 6 public static class MyFunction extends BaseFunction{ 7 @Override 8 public void execute(TridentTuple tuple, TridentCollector collector) { 9 Integer value = tuple.getIntegerByField("sentence");10 System.out.println(value);11 }12 }13 14 public static void main(String[] args) {15 FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values(1),new Values(2));16 //spout.setCycle(true);//让spout循环发送数据17 18 TridentTopology tridentTopology = new TridentTopology();19 //指定多个数据源,流连接20 Stream newStream1 = tridentTopology.newStream("spoutid1",spout);21 Stream newStream2 = tridentTopology.newStream("spoutid2",spout);22 23 //tridentTopology.newStream("spoutid",spout) 之前是这种 但是只能有 一个数据源 24 tridentTopology.merge(newStream1,newStream2)//使用这种就可以有多个数据源.25 .each(new Fields("sentence"), new MyFunction(), new Fields(""));26 27 LocalCluster localCluster = new LocalCluster();28 String simpleName = TridentMeger.class.getSimpleName();29 localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());30 }31 }
增加过滤器
1 public class TridentFilter { 2 3 /** 4 * 类似于普通的bolt 5 */ 6 public static class MyFunction extends BaseFunction{ 7 @Override 8 public void execute(TridentTuple tuple, TridentCollector collector) { 9 Integer value = tuple.getIntegerByField("sentence");10 System.out.println(value);11 }12 }13 14 public static class MyFilter extends BaseFilter{ //专门封装了一个Filter功能.15 //对数据进行过滤 如果过滤出的数据不要了就false 保留就ture16 @Override17 public boolean isKeep(TridentTuple tuple) {18 Integer value = tuple.getIntegerByField("sentence");19 return value%2==0?true:false; //只要偶数不要奇数20 }21 }22 23 public static void main(String[] args) {24 FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values(1),new Values(2));25 spout.setCycle(true);//让spout循环发送数据26 27 TridentTopology tridentTopology = new TridentTopology();28 tridentTopology.newStream("spoutid",spout) //这个地方只能指定一个数据源,如果想指定多个数据源Spout 看TridentMeger.java29 .each(new Fields("sentence"), new MyFilter())30 .each(new Fields("sentence"), new MyFunction(), new Fields(""));31 32 LocalCluster localCluster = new LocalCluster();33 String simpleName = TridentFilter.class.getSimpleName();34 localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());35 }36 }