普通事務ITransactionalSpout例項

2021-09-02 20:27:05 字數 4952 閱讀 4880

[size=large]1、普通事務spout[/size]

/*** 普通事務spout

*/public class mytxspout implements itransactionalspout;

string session_id = ;

string time = ;

for (long i = 0; i < 100; i++)

}public void declareoutputfields(outputfieldsdeclarer declarer)

public mapgetcomponentconfiguration()

public org.apache.storm.transactional.itransactionalspout.coordinatorgetcoordinator(map conf,

topologycontext context)

public org.apache.storm.transactional.itransactionalspout.emittergetemitter(map conf,

topologycontext context)

}

[size=large]2、事務spout建立乙個新的事務(元資料)metadata[/size]

[b]2、1元資料定義[/b]

public class mymata implements serializable

public long getbeginpoint()

public void setbeginpoint(long beginpoint)

public int getnum()

public void setnum(int num)

}

[b]2、2 獲得(元資料)metadata,逐個發射實際batch的tuple[/b]

public class myemitter implements itransactionalspout.emitter

//逐個發射實際batch的tuple

public void emitbatch(transactionattempt tx, mymata coordinatormeta, batchoutputcollector collector)

collector.emit(new values(tx, dbmap.get(i)));}}

public void cleanupbefore(biginteger txid)

public void close()

}

[size=x-large]3、事務bolt,會從emitter接收資料處理,處理完成,提交給finishbatch方法處理。[/size]

/*** 事務bolt

*/public class mytransactionbolt extends basetransactionalbolt

integer count = 0;

batchoutputcollector collector;

transactionattempt tx;

// 會從emitter接收資料處理,處理完成,提交給finishbatch方法處理。

public void execute(tuple tuple)

}// 批處理提交

public void finishbatch()

public void declareoutputfields(outputfieldsdeclarer declarer)

}

[size=large]4、icommitter,batch之間強制按照順序進行提交[/size]

public class mycommitter extends basetransactionalbolt implements icommitter

public void finishbatch() else

dbmap.put(global_key, newvalue);

} else

system.err.println("total*************************=:" + dbmap.get(global_key).count);

// collector.emit(tuple)

}public void prepare(map conf, topologycontext context, batchoutputcollector collector, transactionattempt id)

public void declareoutputfields(outputfieldsdeclarer declarer)

public static class dbvalue

}

[size=large]5、topo類[/size]

public class mytopo catch (alreadyaliveexception e) catch (invalidtopologyexception e) catch (authorizationexception e)

} else }}

[size=large]6、測試結果[/size]

[quote]

啟動乙個事務:0----10

mytransactionbolt prepare 1 attemptid3005464965348344518

mytransactionbolt prepare 1 attemptid3005464965348344518

mytransactionbolt transactionattempt 1 attemptid3005464965348344518

mytransactionbolt prepare 1 attemptid3005464965348344518

mytransactionbolt transactionattempt 1 attemptid3005464965348344518

mytransactionbolt transactionattempt 1 attemptid3005464965348344518

mytransactionbolt transactionattempt 1 attemptid3005464965348344518

mytransactionbolt transactionattempt 1 attemptid3005464965348344518

mytransactionbolt transactionattempt 1 attemptid3005464965348344518

finishbatch 3

mytransactionbolt transactionattempt 1 attemptid3005464965348344518

mytransactionbolt transactionattempt 1 attemptid3005464965348344518

mytransactionbolt transactionattempt 1 attemptid3005464965348344518

mytransactionbolt transactionattempt 1 attemptid3005464965348344518

finishbatch 4

finishbatch 3

total*************************=:10

啟動乙個事務:10----10

mytransactionbolt prepare 2 attemptid4420908201582652570

mytransactionbolt prepare 2 attemptid4420908201582652570

mytransactionbolt prepare 2 attemptid4420908201582652570

mytransactionbolt transactionattempt 2 attemptid4420908201582652570

mytransactionbolt transactionattempt 2 attemptid4420908201582652570

mytransactionbolt transactionattempt 2 attemptid4420908201582652570

mytransactionbolt transactionattempt 2 attemptid4420908201582652570

mytransactionbolt transactionattempt 2 attemptid4420908201582652570

mytransactionbolt transactionattempt 2 attemptid4420908201582652570

mytransactionbolt transactionattempt 2 attemptid4420908201582652570

mytransactionbolt transactionattempt 2 attemptid4420908201582652570

mytransactionbolt transactionattempt 2 attemptid4420908201582652570

finishbatch 3

mytransactionbolt transactionattempt 2 attemptid4420908201582652570

finishbatch 4

finishbatch 3

total*************************=:20[/quote]

日誌傳送是否可以與普通事務日誌備份並存

2016 03 18 15 18 整理,未發布 日誌傳送實際是備份日誌 拷貝日誌 還原日誌。我們可以將備份好的日誌檔案拷貝乙份到其他儲存,再結合必要的完整備份,就可以用來還原資料。如果做了日誌傳送,還需對其做普通的日誌備份。那就要考慮清楚,普通的日誌備份不能截斷事務日誌,不然日誌傳送中的日誌鏈就會中...

SQL server從入門精通 事務

事務 我的理解 執行幾條語句時,只要有一條語句執行不成功,其他的語句都不夠被執行 事務 將多個操作當做乙個獨立的邏輯單元的執行方式為事務 特點 多個操作只有在都執行成功時才算成功,只要有乙個執行失敗那應該整體就屬於失敗,成功了可以提交,失敗了可以回滾 語法begin transaction tr i...

關於SQLSERVER 事物的運用 1 普通事物

關於sqlserver 事物的運用 概述 以往在sql2000下處理異常通常的方式比較繁瑣,sql2005版本以上加入了begin try end try begin cath end catch 是異常的捕獲稍微顯得簡單一些,根據我的測試,將我對sql事物的處理做一下整理 本文分為三個部分來描述,...