如何使用RabbitMQ實現事件匯流排

2022-06-10 02:30:09 字數 2600 閱讀 7593

首先,事件源與事件處理的對映字典。

private static dictionary> eventhandlers = new dictionary>();
然後,初始化rabbitmq,建立到伺服器的連線,建立乙個通道等

public rabbitmqeventbus(iconnectionfactory connectionfactory,

string exchangename,

string exchangetype = exchangetype.fanout,

string queuename = null,

bool autoack = false)

接著,實現訂閱,往字典表中新增事件處理例項,並繫結佇列

public void subscribe(ieventhandlereventhandler) where tevent : ievent

else);}

this.channel.queuebind(this.queuename, this.exchangename, typeof(tevent).fullname);

}

接著,實現取消訂閱,從字典表中刪除事件處理例項,並取消繫結佇列

public void unsubscribe(ieventhandlereventhandler) where tevent : ievent

}}

接著,實現發布,往佇列發布事件

public void publish(tevent @event) where tevent : ievent

); var eventbody = encoding.utf8.getbytes(json);

channel.basicpublish(this.exchangename,

@event.gettype().fullname,

null,

eventbody);

}

接著,在eventingbasicconsumer.received事件處理中,通過事件源找到對應的事件處理類,並執行它

private string initializeeventconsumer(string queue)

else

var consumer = new eventingbasicconsumer(this.channel);

consumer.received += (model, eventargument) =>

);var eventtypename = eventargument.routingkey;

if (eventhandlers.containskey(eventtypename)));}

}catch (exception ex)

}if (!autoack)

};this.channel.basicconsume(localqueuename, autoack: this.autoack, consumer: consumer);

return localqueuename;

}

最後,建立客戶端類,具體事件源類,具體事件處理類。

using example.eventbus;

using rabbitmq.client;

using system;

public sendedevent(string name)

}public class customerasendedeventhandler : ieventhandler通知!");}}

public class customerbsendedeventhandler : ieventhandler通知!");}}

class program

;var sendedevent = new sendedevent("優惠");

var customerasendedeventhandler = new customerasendedeventhandler();

eventbus.subscribe(customerasendedeventhandler);

var customerbsendedeventhandler = new customerbsendedeventhandler();

eventbus.subscribe(customerbsendedeventhandler);

console.writeline($"商店發了通知!");

eventbus.publish(sendedevent);

console.readkey();}}

}

讓我們來看看輸出結果:

商店發布優惠通知!

顧客a收到優惠通知。

顧客b收到優惠通知。

RabbitMQ 一二事 簡單佇列使用

訊息佇列目前流行的有三種 1.rabbitmq 2.activemq 3.kafka 這三種都非常強大,rabbitmq目前用的比較多,也比較流行,阿里也在用 activemq是阿帕奇出品,但是效能上和rmq相比相對差一些 卡夫卡呢,使用場景不同,不多介紹,主要是用於日誌收集方面,結合hadoop非...

RabbitMQ如何實現延遲佇列?

延遲佇列儲存的物件肯定是對應的延遲訊息,所謂 延遲訊息 是指當訊息被傳送以後,並不想讓消費者立即拿到訊息,而是等待指定時間後,消費者才拿到這個訊息進行消費。場景一 在訂單系統中,乙個使用者下單之後通常有30分鐘的時間進行支付,如果30分鐘之內沒有支付成功,那麼這個訂單將進行一場處理。這是就可以使用延...

RabbitMq 如何使用Fanout方式進行廣播

在專案中使用rabbitmq 傳送訊息和接收訊息,如果專案在一台機器上部署則使用direct方式即可,但是如果把專案部署到n臺機器上,傳送一條訊息,則n臺機器都能同時接收訊息,則需要使用fanout方式來實現,而且需要建立n個佇列進行接收訊息.這樣就實現了一條訊息被多個消費者同時消費。首先,配置fa...