Skip to content

Single node code example

David edited this page Jul 29, 2013 · 12 revisions

Running an example

In this example a prime number generator is used. A Topology with one spout that generates the natural numbers in order and a spout that finds which of them are prime is created and launched in a storm single node installation.

Topology code

public class PrimeNumberTopology {
    public static void main(String[] args) throws Exception{
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout( "spout", new PrimeNumberSpout() );
        builder.setBolt( "prime", new PrimeNumberBolt() )
            .shuffleGrouping("spout");

        //This is the way a configuration is created to be run in a cluster
    Config conf = new Config();
    conf.setDebug(true);
    conf.setNumWorkers(1);
    conf.setMaxSpoutPending(5000);

    try{
	StormSubmitter.submitTopology( args[0], conf, builder.createTopology() );
    }catch(AlreadyAliveException e){	
    }
    }
}

Spout Code

public class PrimeNumberSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private static int currentNumber = 1;
    
    @Override
    public void open( Map conf, TopologyContext context, SpoutOutputCollector collector ) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        // Emit the next number
        collector.emit( new Values( new Integer( currentNumber++ ) ) );
    }

    @Override
    public void ack(Object id) {
    }

    @Override
    public void fail(Object id){
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare( new Fields( "number" ) );
    }
}

Bolt Code

public class PrimeNumberBolt extends BaseRichBolt {
    private OutputCollector collector;

    public void prepare( Map conf, TopologyContext context, OutputCollector collector ){
        this.collector = collector;
    }

    public void execute( Tuple tuple ) {
        int number = tuple.getInteger( 0 );
        if( isPrime( number) ){
            System.out.println( number );
        }
        collector.ack( tuple );
    }

    public void declareOutputFields( OutputFieldsDeclarer declarer ){
        declarer.declare( new Fields( "number" ) );
    }   

    private boolean isPrime( int n ) {
        if( n == 1 || n == 2 || n == 3 ){
            return true;
        }
    
        // Is n an even number?
        if( n % 2 == 0 ){
            return false;
        }
    
        //if not, then just check the odds
        for( int i=3; i*i<=n; i+=2 ) {
            if( n % i == 0){
                 return false;
            }
        }
        return true;
    }
}

Clone this wiki locally