activemq接收訊息pull與push模式

2021-08-17 13:21:53 字數 4449 閱讀 9884

1。同步和非同步的方式

public

static

void

main(string args) else

}// 非同步的方式接收訊息

consumer.setmessagelistener(new messagelistener()

}catch(exception e)

}}); //(非同步接收)

} catch (exception e) finally catch (throwable ignore)

}}

現在講講consumer.receive和consumer.setmessagelistener方式的不同;

1。 consumer.receive方式

public message receive() throws jm***ception 

beforemessageisconsumed(md);

aftermessageisconsumed(md, false);

return createactivemqmessage(md);

}

可以看到sendpullcommand,其會先傳送乙個pull命令,然後然後從接收的訊息列表中dequeue一條訊息,dequeue的引數-1表示,如果沒有訊息返回,會阻塞在這兒,如果有訊息便會進行消費。

下面講解非同步消費訊息過程。

@override

public

void

setmessagelistener(messagelistener listener) throws jm***ception

if (listener != null)

this.messagelistener.set(listener);

session.redispatch(this, unconsumedmessages);

if (wasrunning)

} else

}

有一句session.start();然後進入activemqsession類,看下這個start方法

/**

* start this session.

**@throws jm***ception

*/protected

void

start() throws jm***ception

executor.start();

}

在看這句c.start();

public

void

start() throws jm***ception

started.set(true);

unconsumedmessages.start();

session.executor.wakeup();

}

在看activemqsessionexecutor中的wakeup方法

public

void

wakeup()

this.taskrunner = session.connection.getsessiontaskrunner().createtaskrunner(this,

"activemq session: " + session.getsessionid());

}taskrunner = this.taskrunner;}}

taskrunner.wakeup();

} catch (interruptedexception e)

} else }}

}

其建立了乙個執行緒執行的類;

public taskrunner createtaskrunner(task task, string name)  else 

}

看其構造方法

public

dedicatedtaskrunner(final task task, string name, int priority, boolean daemon) finally ", task);}}

};thread.setdaemon(daemon);

thread.setname(name);

thread.setpriority(priority);

thread.start();

}

在看runtask方法:

final

void runtask()

}log.trace("running task {}", task);

if (!task.iterate())

while (!pending) }}

}} catch (interruptedexception e) finally

}}

其中有這句話task.iterate()。。就是執行到activemqsessionexecutor的iterate方法

public

boolean

iterate()

}// no messages left queued on the listeners.. so now dispatch messages

// queued on the session

messagedispatch message = messagequeue.dequeuenowait();

if (message == null) else

}

檢視dispatch方法

void dispatch(messagedispatch message) 

}}

然後看看consumer.dispatch(message);這句話。最終是執行到了activemqmessageconsumer的dispatch方法。。

@override

public

void

dispatch(messagedispatch md)

activemqmessage message = createactivemqmessage(md);

beforemessageisconsumed(md);

try

aftermessageisconsumed(md, expired);

} catch (runtimeexception e) exception while processing message: {}", getconsumerid(), md.getmessage().getmessageid(), e);

md.setrollbackcause(e);

if (isautoacknowledgebatch() || isautoacknowledgeeach() || session.isindividualacknowledge()) else

}} else

if (md.getmessage() == null) else

} else }}

}} else tracking transacted redelivery {}", getconsumerid(), md.getmessage());

if (transactedindividualack) else

} else

if ((consumerwithpendingtransaction = redeliverypendingincompetingtransaction(md)) != null) delivering duplicate {}, pending transaction completion on {} will rollback", getconsumerid(), md.getmessage(), consumerwithpendingtransaction);

session.getconnection().rollbackduplicate(this, md.getmessage());

dispatch(md);

} else suppressing duplicate delivery on connection, poison acking: {}", getconsumerid(), md);

posionack(md, "suppressing duplicate delivery on connection, consumer " + getconsumerid());}}

}}

if (++dispatchedcount % 1000 == 0)

} catch (exception e)

}

然後檢視上述方法。你會發現最終執行到了。consumer上的listener。

ActiveMq點對點模式傳送 接收訊息

訊息傳送流程 1 客戶機傳送訊息到jms訊息中介軟體 2 服務端負責監聽jms訊息目的地。3 發現jms裡面有訊息產生,服務就可以接受訊息。點對點訊息傳送服務 1 訊息只能被乙個服務接受 2 多個服務同時監聽訊息伺服器,遵循先來後到原則。3 訊息一旦被接受,訊息自動消失。4 如果訊息一直沒有被接受,...

ActiveMQ五種訊息的傳送 接收

1.生產者 連線工廠 connectionfactory connfactory new activemqconnectionfactory activemqconnection.default user,activemqconnection.default password,tcp localho...

突然不發訊息給ActiveMQ但能接收

今天專案在聯調過程中,activemq突然不好使了。在此之前一月內,專案組的人都沒有去修改 重啟過mq服務。雖然現在知道是由於許可權的問題導致只能收不能發 站在activemq角度是收不到,但可以發 只是到現在還不知道原來沒變過的 怎麼以前可以用,現在卻不行了?通過查詢示例 發現有connectio...