Tornado實現乙個訊息牆。

2022-03-27 08:53:50 字數 4237 閱讀 2542

tornado對asynchronous http有很好的支援。 所以跟著demo,總結下乙個訊息牆要怎麼做。

思路: 首先查了下有兩種思路,一種是client pull 一種是server push。 

這裡使用的是server pull,技術就是挺流行的comet技術。

comet大概就是說:我客戶端傳送乙個請求到伺服器端,然後伺服器端啟動乙個無線迴圈,將clinet需要的資料放到response中,並繼續重新整理,直到整個clinet與server的連線斷開。 所以他是乙個基於長連線的技術。

1.第一步 就是要做傳送新訊息的處理,即乙個client傳送乙個訊息後,要廣播通知到所有的client

1

class

newmessage(tornado.web.requesthandler):2#

overwrite post method

3def

post(self):4#

define messages to send

5 message =

9 message["

html

"] =tornado.escape.to_basestring(

10 self.render_string("

message.html

",message=message))

11if self.get_argument("

next

",none):

12 self.redirect(self.get_arugment("

next"))

13else:14

self.write(message)15#

global_message_buffer is a global var

1617 global_message_buffer.new_message([message])

解釋: message[html] 這部分 是將message傳入模板,然後返回html**。

最關鍵的是,將這個mesage廣播出去。 使用的global_message_buffer. 

global_message_buffer = messagebuffer()

class

messagebuffe():

self.

__init__

(self):

self.waiters =set()

self.cache =

self.cache_size = 10 #

define max message cache

self.new_message(self,message):

#send message to waiters

for future in

self.waiters:

future.set_result(message)

#update waiters to empty

self.waiters =set()

#update cache

self.cache.extend(message)

#check cache size

if len(self.cache)>self.cache_size:

self.cache = self.cache[-self.cache_size:]

回來再寫。。

繼續。messagbuffer做的事情,waiters儲存了等待訊息的future.  what is future? placeholder for an asynchronous result.

cache儲存了已經傳送了訊息,當新的使用者連線進來後,會把cache裡面的訊息推送給他。 後面寫的update

new_message做的事情也是很清楚,將新傳送的訊息給每乙個在waiters裡的future。然後更新waiters為空。 (這個地方還沒有明天**的原因)

然後更新cache,如果新加訊息後超出了size,那麼就取最新的。

2. 新進來的使用者怎麼拿到cache裡的東西呢?

這個還是簡單,講所有的cache裡的東西,render到主頁就好了

1

class

mainhandler(object):

2def

get():

3 self.render("

index.html

", messages=global_message_buffer.cache)

3. 難點是,如何持續接受新發布的訊息呢?也就是說,我在第一步中廣播出去後,所有的client怎麼接受?

在前端的,有個showmessage的方法。這個方法應該是這樣被呼叫:每從伺服器接受到乙個response(也就是伺服器有訊息了),被呼叫一次,在這個area中新增乙個message.html中render後的東西,也就是在第一步中render_string中做的結果。然後呼叫下滑動的效果-_- 前端這個不會。。

誰呼叫這個showmessage呢。應該是這樣的流程。 client 進入頁面後,拿到cache中的message後,呼叫function poll(). 

poll() 使用ajax向後端請求update我當前的訊息,當返回success後,呼叫showmessage。

挑戰下js

1

function

poll())

4 }

前端部分暫時這樣。

那麼這並沒有解決剛才的問題,需要的是,我要傳乙個東西,讓伺服器知道,有了訊息要給這個client回覆response。

class

updatemessage(tornado.web.requesthanlder):

@gen.coroutine

defpost(self):

client_id = self.get_arguments("

client_id")

#將這個clinet加入到message_wait中去,等有了訊息就返回給這個future。

self.future =global_message_buffer.wait_for_message(client_id)

messages = yield

self.future

self.write(dict(messages=message))

這個wait_for_message

1 wait_for_message(self,cursor=none):2#

考慮如果要給新來的client返回訊息的話,需要知道從cache哪個地方開始返回3#

因為是非同步的,所以要知道你請求的時候的位置。也就是當前message的id

4 future =future()56

ifcursor:

78 count =0910

for m in

reversed(self.cache):11#

如果到了當前訊息了,就不再超後面拿。資料。

12if m["

id"] ==cursor:

13break

1415 count+=1

16if

count:

17 future.set_result(self.cache[-count:])

18return

future19#

將這個future加入到需要push到的集合中去。

20self.waiter.add(future)

21return future

到這步,我們總結下整個流程與呼叫。 首先是使用者載入,載入後呼叫updatemessage。 這一步引數為 client_id,初始為none 直接將其加入waiters。到 self.messages = yield self.future這部等待future被更新,然後呼叫self.write(). 何時被更新呢? 是在messagebuffer中  future.set_result(message) 這一步,此時被更新了,然後立即呼叫self.write().

當一次ajax請求結束後,js中設定timeout來進行下一次連線。 直到有response,此次ajax請求才結束。

需要注意的是,只有updatemessage的post方法需要@gen.coroutine

基本流程就是這樣了。

用 Promise 實現乙個訊息佇列

在此篇部落格中,我們的需求如下 有乙個訊息排程器去操作傳送來訊息 但處理訊息花費的事件是不確定的,有多有少 訊息是不斷傳送過來的 這個時候就會出現一種情況 前一條訊息還未執行結束,後一條訊息就被傳送過來了 如果這個時候要求後一條訊息必須在前一條執行完才開始執行,該如何實現?sync function...

通過訊息佇列實現乙個程序寫資料,乙個程序讀資料

include include include include include include typedef struct msg msg intmain int argc,char ar msg id msgget key,ipc creat 0666 建立訊息佇列 if msg id 1 pr...

win10的乙個好訊息和乙個壞訊息

更新到最新的win10以後,乙個重大的好訊息就是 win10終於自帶curl工具了,慶祝 撒花 同時自帶的還有tar工具。開啟cmd輸入curl help和tar help可以看到命令的幫助資訊。對於自帶curl工具正是乙個令人振奮的訊息。但是,但是 千萬不要用power shell來使用curl,...