Storm實(shí)現(xiàn)實(shí)時(shí)大數(shù)據(jù)分析

| 2022-09-27 admin

?隨著數(shù)據(jù)體積的越來越大,實(shí)時(shí)處理成為了許多機(jī)構(gòu)需要面對(duì)的首要挑戰(zhàn)。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上結(jié)合了汽車超速監(jiān)視,為我們演示了使用Storm進(jìn)行實(shí)時(shí)大數(shù)據(jù)分析。

簡單和明了,Storm讓大數(shù)據(jù)分析變得輕松加愉快。

當(dāng)今世界,公司的日常運(yùn)營經(jīng)常會(huì)生成TB級(jí)別的數(shù)據(jù)。數(shù)據(jù)來源囊括了互聯(lián)網(wǎng)裝置可以捕獲的任何類型數(shù)據(jù),網(wǎng)站、社交媒體、交易型商業(yè)數(shù)據(jù)以及其它商業(yè)環(huán)境中創(chuàng)建的數(shù)據(jù)??紤]到數(shù)據(jù)的生成量,實(shí)時(shí)處理成為了許多機(jī)構(gòu)需要面對(duì)的首要挑戰(zhàn)。我們經(jīng)常用的一個(gè)非常有效的開源實(shí)時(shí)計(jì)算工具就是Storm —— Twitter開發(fā),通常被比作“實(shí)時(shí)的Hadoop”。然而Storm遠(yuǎn)比Hadoop來的簡單,因?yàn)橛盟幚泶髷?shù)據(jù)不會(huì)帶來新老技術(shù)的交替。

Shruthi Kumar、Siddharth Patankar共同效力于Infosys,分別從事技術(shù)分析和研發(fā)工作。本文詳述了Storm的使用方法,例子中的項(xiàng)目名稱為“超速報(bào)警系統(tǒng)(Speeding Alert System)”。我們想實(shí)現(xiàn)的功能是:實(shí)時(shí)分析過往車輛的數(shù)據(jù),一旦車輛數(shù)據(jù)超過預(yù)設(shè)的臨界值 —— 便觸發(fā)一個(gè)trigger并把相關(guān)的數(shù)據(jù)存入數(shù)據(jù)庫。

Storm

對(duì)比Hadoop的批處理,Storm是個(gè)實(shí)時(shí)的、分布式以及具備高容錯(cuò)的計(jì)算系統(tǒng)。同Hadoop一樣Storm也可以處理大批量的數(shù)據(jù),然而Storm在保證高可靠性的前提下還可以讓處理進(jìn)行的更加實(shí)時(shí);也就是說,所有的信息都會(huì)被處理。Storm同樣還具備容錯(cuò)和分布計(jì)算這些特性,這就讓Storm可以擴(kuò)展到不同的機(jī)器上進(jìn)行大批量的數(shù)據(jù)處理。他同樣還有以下的這些特性:

  • 易于擴(kuò)展。對(duì)于擴(kuò)展,你只需要添加機(jī)器和改變對(duì)應(yīng)的topology(拓?fù)洌┰O(shè)置。Storm使用Hadoop Zookeeper進(jìn)行集群協(xié)調(diào),這樣可以充分的保證大型集群的良好運(yùn)行。
  • 每條信息的處理都可以得到保證。
  • Storm集群管理簡易。
  • Storm的容錯(cuò)機(jī)能:一旦topology遞交,Storm會(huì)一直運(yùn)行它直到topology被廢除或者被關(guān)閉。而在執(zhí)行中出現(xiàn)錯(cuò)誤時(shí),也會(huì)由Storm重新分配任務(wù)。
  • 盡管通常使用Java,Storm中的topology可以用任何語言設(shè)計(jì)。

當(dāng)然為了更好的理解文章,你首先需要安裝和設(shè)置Storm。需要通過以下幾個(gè)簡單的步驟:

從Storm官方下載Storm安裝文件將bin/directory解壓到你的PATH上,并保證bin/storm腳本是可執(zhí)行的。

Storm組件

Storm集群主要由一個(gè)主節(jié)點(diǎn)和一群工作節(jié)點(diǎn)(worker node)組成,通過 Zookeeper進(jìn)行協(xié)調(diào)。

主節(jié)點(diǎn):

主節(jié)點(diǎn)通常運(yùn)行一個(gè)后臺(tái)程序 —— Nimbus,用于響應(yīng)分布在集群中的節(jié)點(diǎn),分配任務(wù)和監(jiān)測(cè)故障。這個(gè)很類似于Hadoop中的Job Tracker。

工作節(jié)點(diǎn):

工作節(jié)點(diǎn)同樣會(huì)運(yùn)行一個(gè)后臺(tái)程序 —— Supervisor,用于收聽工作指派并基于要求運(yùn)行工作進(jìn)程。每個(gè)工作節(jié)點(diǎn)都是topology中一個(gè)子集的實(shí)現(xiàn)。而Nimbus和Supervisor之間的協(xié)調(diào)則通過Zookeeper系統(tǒng)或者集群。

Zookeeper

Zookeeper是完成Supervisor和Nimbus之間協(xié)調(diào)的服務(wù)。而應(yīng)用程序?qū)崿F(xiàn)實(shí)時(shí)的邏輯則被封裝進(jìn)Storm中的“topology”。topology則是一組由Spouts(數(shù)據(jù)源)和Bolts(數(shù)據(jù)操作)通過Stream Groupings進(jìn)行連接的圖。下面對(duì)出現(xiàn)的術(shù)語進(jìn)行更深刻的解析。

Spout:

簡而言之,Spout從來源處讀取數(shù)據(jù)并放入topology。Spout分成可靠和不可靠兩種;當(dāng)Storm接收失敗時(shí),可靠的Spout會(huì)對(duì)tuple(元組,數(shù)據(jù)項(xiàng)組成的列表)進(jìn)行重發(fā);而不可靠的Spout不會(huì)考慮接收成功與否只發(fā)射一次。而Spout中最主要的方法就是nextTuple(),該方法會(huì)發(fā)射一個(gè)新的tuple到topology,如果沒有新tuple發(fā)射則會(huì)簡單的返回。

Bolt:

Topology中所有的處理都由Bolt完成。Bolt可以完成任何事,比如:連接的過濾、聚合、訪問文件/數(shù)據(jù)庫、等等。Bolt從Spout中接收數(shù)據(jù)并進(jìn)行處理,如果遇到復(fù)雜流的處理也可能將tuple發(fā)送給另一個(gè)Bolt進(jìn)行處理。而Bolt中最重要的方法是execute(),以新的tuple作為參數(shù)接收。不管是Spout還是Bolt,如果將tuple發(fā)射成多個(gè)流,這些流都可以通過declareStream()來聲明。

Stream Groupings:

Stream Grouping定義了一個(gè)流在Bolt任務(wù)間該如何被切分。這里有Storm提供的6個(gè)Stream Grouping類型:

1. 隨機(jī)分組(Shuffle grouping):隨機(jī)分發(fā)tuple到Bolt的任務(wù),保證每個(gè)任務(wù)獲得相等數(shù)量的tuple。

2. 字段分組(Fields grouping):根據(jù)指定字段分割數(shù)據(jù)流,并分組。例如,根據(jù)“user-id”字段,相同“user-id”的元組總是分發(fā)到同一個(gè)任務(wù),不同“user-id”的元組可能分發(fā)到不同的任務(wù)。

3. 全部分組(All grouping):tuple被復(fù)制到bolt的所有任務(wù)。這種類型需要謹(jǐn)慎使用。

4. 全局分組(Global grouping):全部流都分配到bolt的同一個(gè)任務(wù)。明確地說,是分配給ID最小的那個(gè)task。

5. 無分組(None grouping):你不需要關(guān)心流是如何分組。目前,無分組等效于隨機(jī)分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執(zhí)行(如果可能)。

6. 直接分組(Direct grouping):這是一個(gè)特別的分組類型。元組生產(chǎn)者決定tuple由哪個(gè)元組處理者任務(wù)接收。

當(dāng)然還可以實(shí)現(xiàn)CustomStreamGroupimg接口來定制自己需要的分組。

項(xiàng)目實(shí)施

當(dāng)下情況我們需要給Spout和Bolt設(shè)計(jì)一種能夠處理大量數(shù)據(jù)(日志文件)的topology,當(dāng)一個(gè)特定數(shù)據(jù)值超過預(yù)設(shè)的臨界值時(shí)促發(fā)警報(bào)。使用Storm的topology,逐行讀入日志文件并且監(jiān)視輸入數(shù)據(jù)。在Storm組件方面,Spout負(fù)責(zé)讀入輸入數(shù)據(jù)。它不僅從現(xiàn)有的文件中讀入數(shù)據(jù),同時(shí)還監(jiān)視著新文件。文件一旦被修改Spout會(huì)讀入新的版本并且覆蓋之前的tuple(可以被Bolt讀入的格式),將tuple發(fā)射給Bolt進(jìn)行臨界分析,這樣就可以發(fā)現(xiàn)所有可能超臨界的記錄。

下一節(jié)將對(duì)用例進(jìn)行詳細(xì)介紹。

臨界分析

這一節(jié),將主要聚焦于臨界值的兩種分析類型:瞬間臨界(instant thershold)和時(shí)間序列臨界(time series threshold)。

  • 瞬間臨界值監(jiān)測(cè):一個(gè)字段的值在那個(gè)瞬間超過了預(yù)設(shè)的臨界值,如果條件符合的話則觸發(fā)一個(gè)trigger。舉個(gè)例子當(dāng)車輛超越80公里每小時(shí),則觸發(fā)trigger。
  • 時(shí)間序列臨界監(jiān)測(cè):字段的值在一個(gè)給定的時(shí)間段內(nèi)超過了預(yù)設(shè)的臨界值,如果條件符合則觸發(fā)一個(gè)觸發(fā)器。比如:在5分鐘類,時(shí)速超過80KM兩次及以上的車輛。

Listing One顯示了我們將使用的一個(gè)類型日志,其中包含的車輛數(shù)據(jù)信息有:車牌號(hào)、車輛行駛的速度以及數(shù)據(jù)獲取的位置。

AB 123

60

North city

BC 123

70

South city

CD 234

40

South city

DE 123

40

East  city

EF 123

90

South city

GH 123

50

West  city

這里將創(chuàng)建一個(gè)對(duì)應(yīng)的XML文件,這將包含引入數(shù)據(jù)的模式。這個(gè)XML將用于日志文件的解析。XML的設(shè)計(jì)模式和對(duì)應(yīng)的說明請(qǐng)見下表。

XML文件和日志文件都存放在Spout可以隨時(shí)監(jiān)測(cè)的目錄下,用以關(guān)注文件的實(shí)時(shí)更新。

 

Figure 1:Storm中建立的topology,用以實(shí)現(xiàn)數(shù)據(jù)實(shí)時(shí)處理

如圖所示:FilelistenerSpout接收輸入日志并進(jìn)行逐行的讀入,接著將數(shù)據(jù)發(fā)射給ThresoldCalculatorBolt進(jìn)行更深一步的臨界值處理。一旦處理完成,被計(jì)算行的數(shù)據(jù)將發(fā)送給DBWriterBolt,然后由DBWriterBolt存入給數(shù)據(jù)庫。下面將對(duì)這個(gè)過程的實(shí)現(xiàn)進(jìn)行詳細(xì)的解析。

Spout的實(shí)現(xiàn)

Spout以日志文件和XML描述文件作為接收對(duì)象。XML文件包含了與日志一致的設(shè)計(jì)模式。不妨設(shè)想一下一個(gè)示例日志文件,包含了車輛的車牌號(hào)、行駛速度、以及數(shù)據(jù)的捕獲位置。

 

Figure2:數(shù)據(jù)從日志文件到Spout的流程圖

Listing Two顯示了tuple對(duì)應(yīng)的XML,其中指定了字段、將日志文件切割成字段的定界符以及字段的類型。XML文件以及數(shù)據(jù)都被保存到Spout指定的路徑。

Listing Two:用以描述日志文件的XML文件。

  1. <TUPLEINFO> 
  2. <FIELDLIST> 
  3. <FIELD> 
  4. <COLUMNNAME>vehicle_number</COLUMNNAME> 
  5. <COLUMNTYPE>string</COLUMNTYPE> 
  6. </FIELD> 
  7.  
  8. <FIELD>
  9. <COLUMNNAME>speed</COLUMNNAME> 
  10. <COLUMNTYPE>int</COLUMNTYPE> 
  11. </FIELD> 
  12.  
  13. <FIELD> 
  14. <COLUMNNAME>location</COLUMNNAME> 
  15. <COLUMNTYPE>string</COLUMNTYPE> 
  16. </FIELD> 
  17. </FIELDLIST> 
  18. <DELIMITER>,</DELIMITER> 
  19. </TUPLEINFO>   

通過構(gòu)造函數(shù)及它的參數(shù)Directory、PathSpout和TupleInfo對(duì)象創(chuàng)建Spout對(duì)象。TupleInfo儲(chǔ)存了日志文件的字段、定界符、字段的類型這些很必要的信息。這個(gè)對(duì)象通過XSTream序列化XML時(shí)建立。

Spout的實(shí)現(xiàn)步驟:

  • 對(duì)文件的改變進(jìn)行分開的監(jiān)聽,并監(jiān)視目錄下有無新日志文件添加。
  • 在數(shù)據(jù)得到了字段的說明后,將其轉(zhuǎn)換成tuple。
  • 聲明Spout和Bolt之間的分組,并決定tuple發(fā)送給Bolt的途徑。

Spout的具體編碼在Listing Three中顯示。

Listing Three:Spout中open、nextTuple和delcareOutputFields方法的邏輯。    

public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )   
{   
           _collector = collector;   
         try   
         {   
         fileReader  =  new BufferedReader(new FileReader(new File(file)));  
         }  
         catch (FileNotFoundException e)  
         {  
         System.exit(1);   
         }  
}                                                          

public void nextTuple()  
{  
         protected void ListenFile(File file)  
         {  
         Utils.sleep(2000);  
         RandomAccessFile access = null;  
         String line = null;   
            try   
            {  
                while ((line = access.readLine()) != null)  
                {  
                    if (line !=null)  
                    {   
                         String[] fields=null;  
                          if (tupleInfo.getDelimiter().equals("|"))  fields = line.split("\"+tupleInfo.getDelimiter());   
                          else   
                          fields = line.split  (tupleInfo.getDelimiter());   
                          if (tupleInfo.getFieldList().size() == fields.length)  _collector.emit(new Values(fields));  
                    }  
               }  
            }  
            catch (IOException ex){ }  
            }  
}  

public void declareOutputFields(OutputFieldsDeclarer declarer)  
{  
      String[] fieldsArr = new String [tupleInfo.getFieldList().size()];  
      for(int i=0; i<tupleInfo.getFieldList().size(); i++)  
      {  
              fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();  
      }  
declarer.declare(new Fields(fieldsArr));  
}

 

 

declareOutputFileds()決定了tuple發(fā)射的格式,這樣的話Bolt就可以用類似的方法將tuple譯碼。Spout持續(xù)對(duì)日志文件的數(shù)據(jù)的變更進(jìn)行監(jiān)聽,一旦有添加Spout就會(huì)進(jìn)行讀入并且發(fā)送給Bolt進(jìn)行處理。

Bolt的實(shí)現(xiàn)

Spout的輸出結(jié)果將給予Bolt進(jìn)行更深一步的處理。經(jīng)過對(duì)用例的思考,我們的topology中需要如Figure 3中的兩個(gè)Bolt。

Figure 3:Spout到Bolt的數(shù)據(jù)流程。

ThresholdCalculatorBolt

Spout將tuple發(fā)出,由ThresholdCalculatorBolt接收并進(jìn)行臨界值處理。在這里,它將接收好幾項(xiàng)輸入進(jìn)行檢查;分別是:

臨界值檢查

  • 臨界值欄數(shù)檢查(拆分成字段的數(shù)目)
  • 臨界值數(shù)據(jù)類型(拆分后字段的類型)
  • 臨界值出現(xiàn)的頻數(shù)
  • 臨界值時(shí)間段檢查

Listing Four中的類,定義用來保存這些值。

Listing Four:ThresholdInfo類

public class ThresholdInfo implementsSerializable  

{    
        private String action;   
        private String rule;   
        private Object thresholdValue;  
        private int thresholdColNumber;   
        private Integer timeWindow;   
        private int frequencyOfOccurence;   
}

基于字段中提供的值,臨界值檢查將被Listing Five中的execute()方法執(zhí)行。代碼大部分的功能是解析和接收值的檢測(cè)。

Listing Five:臨界值檢測(cè)代碼段

public void execute(Tuple tuple, BasicOutputCollector collector)   
    {  
        if(tuple!=null)   
        {  
            List<Object> inputTupleList = (List<Object>) tuple.getValues();  
            int thresholdColNum = thresholdInfo.getThresholdColNumber();   
            Object thresholdValue = thresholdInfo.getThresholdValue();   
            String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();   
            Integer timeWindow = thresholdInfo.getTimeWindow();  
             int frequency = thresholdInfo.getFrequencyOfOccurence();  
             if(thresholdDataType.equalsIgnoreCase("string"))  
             {  
                 String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();  
                 String frequencyChkOp = thresholdInfo.getAction();  
                 if(timeWindow!=null)  
                 {  
                     long curTime = System.currentTimeMillis();  
                     long diffInMinutes = (curTime-startTime)/(1000);  
                     if(diffInMinutes>=timeWindow)  
                     {  
                         if(frequencyChkOp.equals("=="))  
                         {  
                              if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
                              {  
                                  count.incrementAndGet();  
                                  if(count.get() > frequency)  
                                      splitAndEmit(inputTupleList,collector);  
                              }  
                         }  
                         else if(frequencyChkOp.equals("!="))  
                         {  
                             if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
                             {  
                                  count.incrementAndGet();  
                                  if(count.get() > frequency)  
                                      splitAndEmit(inputTupleList,collector);  
                              }  
                          }  
                          else                         System.out.println("Operator not supported");   
                      }  
                  }  
                  else 
                  {  
                      if(frequencyChkOp.equals("=="))  
                      {  
                          if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
                          {  
                              count.incrementAndGet();  
                              if(count.get() > frequency)  
                                  splitAndEmit(inputTupleList,collector);  
                              }  
                      }  
                      else if(frequencyChkOp.equals("!="))  
                      {  
                           if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
                           {  
                               count.incrementAndGet();  
                               if(count.get() > frequency)  
                                   splitAndEmit(inputTupleList,collector);  
                              }  
                       }  
                   }  
                }  
                else if(thresholdDataType.equalsIgnoreCase("int") ||                     thresholdDataType.equalsIgnoreCase("double") ||                     thresholdDataType.equalsIgnoreCase("float") ||                     thresholdDataType.equalsIgnoreCase("long") ||                     thresholdDataType.equalsIgnoreCase("short"))  
                {  
                    String frequencyChkOp = thresholdInfo.getAction();  
                    if(timeWindow!=null)  
                    {  
                         long valueToCheck =                          Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());  
                         long curTime = System.currentTimeMillis();  
                         long diffInMinutes = (curTime-startTime)/(1000);  
                         System.out.println("Difference in minutes="+diffInMinutes);  
                         if(diffInMinutes>=timeWindow)  
                         {  
                              if(frequencyChkOp.equals("<"))  
                              {  
                                  if(valueToCheck < Double.parseDouble(thresholdValue.toString()))  
                                  {  
                                       count.incrementAndGet();  
                                       if(count.get() > frequency)  
                                           splitAndEmit(inputTupleList,collector);  
                                  }  
                              }  
                              else if(frequencyChkOp.equals(">"))  
                              {  
                                   if(valueToCheck > Double.parseDouble(thresholdValue.toString()))  
                                    {  
                                       count.incrementAndGet();  
                                       if(count.get() > frequency)  
                                           splitAndEmit(inputTupleList,collector);  
                                   }  
                               }  
                               else if(frequencyChkOp.equals("=="))  
                               {  
                                  if(valueToCheck == Double.parseDouble(thresholdValue.toString()))  
                                  {  
                                      count.incrementAndGet();  
                                      if(count.get() > frequency)  
                                          splitAndEmit(inputTupleList,collector);  
                                   }  
                               }  
                               else if(frequencyChkOp.equals("!="))  
                               {  
        . . .  
                                }  
                           }  
                 }  
          else 
              splitAndEmit(null,collector);  
          }  
          else 
         {  
               System.err.println("Emitting null in bolt");  
               splitAndEmit(null,collector);  
        }  
    }

 

 

經(jīng)由Bolt發(fā)送的的tuple將會(huì)傳遞到下一個(gè)對(duì)應(yīng)的Bolt,在我們的用例中是DBWriterBolt。

DBWriterBolt

經(jīng)過處理的tuple必須被持久化以便于觸發(fā)tigger或者更深層次的使用。DBWiterBolt做了這個(gè)持久化的工作并把tuple存入了數(shù)據(jù)庫。表的建立由prepare()函數(shù)完成,這也將是topology調(diào)用的第一個(gè)方法。方法的編碼如Listing Six所示。

Listing Six:建表編碼。

public void prepare( Map StormConf, TopologyContext context )   
    {         
        try   
        {  
            Class.forName(dbClass);  
        }   
        catch (ClassNotFoundException e)   
        {  
            System.out.println("Driver not found");  
            e.printStackTrace();  
        }  

        try   
        {  
           connection driverManager.getConnection(   
               "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);  
           connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();  

           StringBuilder createQuery = new StringBuilder(  
               "CREATE TABLE IF NOT EXISTS "+tableName+"(");  
           for(Field fields : tupleInfo.getFieldList())  
           {  
               if(fields.getColumnType().equalsIgnoreCase("String"))  
                   createQuery.append(fields.getColumnName()+" VARCHAR(500),");  
               else 
                   createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");  
           }  
           createQuery.append("thresholdTimeStamp timestamp)");  
           connection.prepareStatement(createQuery.toString()).execute();  

           // Insert Query  
           StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");  
           String tempCreateQuery = new String();  
           for(Field fields : tupleInfo.getFieldList())  
           {  
                insertQuery.append(fields.getColumnName()+",");  
           }  
           insertQuery.append("thresholdTimeStamp").append(") values (");  
           for(Field fields : tupleInfo.getFieldList())  
           {  
               insertQuery.append("?,");  
           }  

           insertQuery.append("?)");  
           prepStatement = connection.prepareStatement(insertQuery.toString());  
        }  
        catch (SQLException e)   
        {         
            e.printStackTrace();  
        }         
    }

 

 

數(shù)據(jù)分批次的插入數(shù)據(jù)庫。插入的邏輯由Listting Seven中的execute()方法提供。大部分的編碼都是用來實(shí)現(xiàn)可能存在不同類型輸入的解析。

Listing Seven:數(shù)據(jù)插入的代碼部分。

public void execute(Tuple tuple, BasicOutputCollector collector)   
    {  
        batchExecuted=false;  
        if(tuple!=null)  
        {  
           List<Object> inputTupleList = (List<Object>) tuple.getValues();  
           int dbIndex=0;  
           for(int i=0;i<tupleInfo.getFieldList().size();i++)  
           {  
               Field field = tupleInfo.getFieldList().get(i);  
               try {  
                   dbIndex = i+1;  
                   if(field.getColumnType().equalsIgnoreCase("String"))               
                       prepStatement.setString(dbIndex, inputTupleList.get(i).toString());  
                   else if(field.getColumnType().equalsIgnoreCase("int"))  
                       prepStatement.setInt(dbIndex,  
                           Integer.parseInt(inputTupleList.get(i).toString()));  
                   else if(field.getColumnType().equalsIgnoreCase("long"))  
                       prepStatement.setLong(dbIndex,   
                           Long.parseLong(inputTupleList.get(i).toString()));  
                   else if(field.getColumnType().equalsIgnoreCase("float"))  
                       prepStatement.setFloat(dbIndex,   
                           Float.parseFloat(inputTupleList.get(i).toString()));  
                   else if(field.getColumnType().equalsIgnoreCase("double"))  
                       prepStatement.setDouble(dbIndex,   
                           Double.parseDouble(inputTupleList.get(i).toString()));  
                   else if(field.getColumnType().equalsIgnoreCase("short"))  
                       prepStatement.setShort(dbIndex,   
                           Short.parseShort(inputTupleList.get(i).toString()));  
                   else if(field.getColumnType().equalsIgnoreCase("boolean"))  
                       prepStatement.setBoolean(dbIndex,   
                           Boolean.parseBoolean(inputTupleList.get(i).toString()));  
                   else if(field.getColumnType().equalsIgnoreCase("byte"))  
                       prepStatement.setByte(dbIndex,   
                           Byte.parseByte(inputTupleList.get(i).toString()));  
                   else if(field.getColumnType().equalsIgnoreCase("Date"))  
                   {  
                      Date dateToAdd=null;  
                      if (!(inputTupleList.get(i) instanceof Date))    
                      {    
                           DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");  
                           try   
                           {  
                               dateToAdd = df.parse(inputTupleList.get(i).toString());  
                           }  
                           catch (ParseException e)   
                           {  
                               System.err.println("Data type not valid");  
                           }  
                       }    
                       else 
                       {  
                dateToAdd = (Date)inputTupleList.get(i);  
                java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());  
                prepStatement.setDate(dbIndex, sqlDate);  
                }     
                }   
            catch (SQLException e)   
            {  
                 e.printStackTrace();  
            }  
        }  
        Date now = new Date();            
        try 
        {  
            prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime()));  
            prepStatement.addBatch();  
            counter.incrementAndGet();  
            if (counter.get()== batchSize)   
            executeBatch();  
        }   
        catch (SQLException e1)   
        {  
            e1.printStackTrace();  
        }             
       }  
       else 
       {  
            long curTime = System.currentTimeMillis();  
           long diffInSeconds = (curTime-startTime)/(60*1000);  
           if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds)  
           {  
                try {  
                    executeBatch();  
                    startTime = System.currentTimeMillis();  
                }  
                catch (SQLException e) {  
                     e.printStackTrace();  
                }  
           }  
       }  
    }  

    public void executeBatch() throws SQLException  
    {  
        batchExecuted=true;  
        prepStatement.executeBatch();  
        counter = new AtomicInteger(0);  
    }

 

 

一旦Spout和Bolt準(zhǔn)備就緒(等待被執(zhí)行),topology生成器將會(huì)建立topology并準(zhǔn)備執(zhí)行。下面就來看一下執(zhí)行步驟。

在本地集群上運(yùn)行和測(cè)試topology

  • 通過TopologyBuilder建立topology。
  • 使用Storm Submitter,將topology遞交給集群。以topology的名字、配置和topology的對(duì)象作為參數(shù)。
  • 提交topology。

Listing Eight:建立和執(zhí)行topology。

public class StormMain  
    {  
         public static void main(String[] args) throws AlreadyAliveException,   
                                                       InvalidTopologyException,   
                                                       InterruptedException   
         {  
              ParallelFileSpout parallelFileSpout = new ParallelFileSpout();  
              ThresholdBolt thresholdBolt = new ThresholdBolt();  
              DBWriterBolt dbWriterBolt = new DBWriterBolt();  
              TopologyBuilder builder = new TopologyBuilder();  
              builder.setSpout("spout", parallelFileSpout, 1);  
              builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");  
              builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");  
              if(this.argsMain!=null && this.argsMain.length > 0)   
              {  
                  conf.setNumWorkers(1);  
                  StormSubmitter.submitTopology(   
                       this.argsMain[0], conf, builder.createTopology());  
              }  
              else 
              {      
                  Config conf = new Config();  
                  conf.setDebug(true);  
                  conf.setMaxTaskParallelism(3);  
                  LocalCluster cluster = new LocalCluster();  
                  cluster.submitTopology(  
                  "Threshold_Test", conf, builder.createTopology());  
              }  
         }  
    }

topology被建立后將被提交到本地集群。一旦topology被提交,除非被取締或者集群關(guān)閉,它將一直保持運(yùn)行不需要做任何的修改。這也是Storm的另一大特色之一。