python搭建ssserver限制埠連線數

2022-09-13 20:03:16 字數 4482 閱讀 8513

新建檔案,編寫下面內容,儲存為socket.py檔案。放到ssserver.exe所在資料夾

from __future__ import absolute_import, division, print_function, \

with_statement, nested_scopes

import sys

del sys.modules['socket']

import sys

import time

import logging

import types

path = sys.path[0]

sys.path.pop(0)

import socket    # 匯入真正的socket包

sys.path.insert(0, path)

clean_time = 60        # 設定清理ip的時間間隔,在此時間內無連線的ip會被清理

ip_numbers = 1         # 設定每個埠的允許通過的ip數量,即設定客戶端ip數量

only_port = true       # 設定是否只根據埠判斷。如果為 true ,則只根據埠判斷。如果為 false ,則會嚴格的根據 ip+埠進行判斷。

# 動態path類方法

def re_class_method(_class, method_name, re_method):

method = getattr(_class, method_name)

info = sys.version_info

if info[0] >= 3:

setattr(_class, method_name,

types.methodtype(lambda *args, **kwds: re_method(method, *args, **kwds), _class))

else:

setattr(_class, method_name,

types.methodtype(lambda *args, **kwds: re_method(method, *args, **kwds), none, _class))

# 動態path例項方法

def re_self_method(self, method_name, re_method):

method = getattr(self, method_name)

setattr(self, method_name, types.methodtype(lambda *args, **kwds: re_method(method, *args, **kwds), self, self))

# 處理tcp連線

def re_accept(old_method, self, *args, **kwds):

while true:

return_value = old_method(self, *args, **kwds)

self_socket = return_value[0]

if only_port:

server_ip_port = '%s' % self.getsockname()[1]

else:

server_ip_port = '%s_%s' % (self.getsockname()[0], self.getsockname()[1])

client_ip = return_value[1][0]

for x in [x for x in self._list_client_ip[server_ip_port]]:

is_closed = true

if time.time() - float(x[0].split('#')[1]) > clean_time:

for y in x[1:]:

try:

y.getpeername()     # 判斷連線是否關閉

is_closed = false

break

except:                # 如果丟擲異常,則說明連線已經關閉,這時可以關閉套接字

logging.debug("[re_socket] close and remove the time out socket 1/%s" % (len(x[1:])))

x.remove(y)

if not is_closed:

logging.debug('[re_socket] the %s still exists and update last_time' % str(x[1].getpeername()[0]))

_ip_index = client_ip_list.index(x[0].split('#')[0])

self._list_client_ip[server_ip_port][_ip_index][0] = '%s#%s' % (x[0].split('#')[0], time.time())

if int(time.time()) % 5 == 0:

logging.debug("[re_socket] the port %s client more then the %s" % (server_ip_port, ip_numbers))

# 處理udp連線

def re_recvfrom(old_method, self, *args, **kwds):

while true:

return_value = old_method(*args, **kwds)

self_socket = ''

if only_port:

server_ip_port = '%s' % self.getsockname()[1]

else:

server_ip_port = '%s_%s' % (self.getsockname()[0], self.getsockname()[1])

client_ip = return_value[1][0]

client_ip_list = [x[0].split('#')[0] for x in self._list_client_ip[server_ip_port]]

for x in [x for x in self._list_client_ip[server_ip_port]]:

is_closed = true

if time.time() - float(x[0].split('#')[1]) > clean_time:

for y in x[1:]:

try:

y.getpeername()     # 判斷連線是否關閉

is_closed = false

break

except:                # 如果丟擲異常,則說明連線已經關閉,這時可以關閉套接字

logging.debug("[re_socket] close and remove the time out socket 1/%s" % (len(x[1:])))

x.remove(y)

if not is_closed:

logging.debug('[re_socket] the %s still exists and update last_time' % str(x[1].getpeername()[0]))

_ip_index = client_ip_list.index(x[0].split('#')[0])

self._list_client_ip[server_ip_port][_ip_index][0] = '%s#%s' % (x[0].split('#')[0], time.time())

if int(time.time()) % 5 == 0:

logging.debug("[re_socket] the port %s client more then the %s" % (server_ip_port, ip_numbers))

new_tuple = [b'', return_value[1]]

return_value = tuple(new_tuple)

return return_value

def re_bind(old_method, self, *args, **kwds):

if only_port:

port = '%s' % args[0][1]

else:

port = '%s_%s' % (args[0][0], args[0][1])

self._list_client_ip[port] =

re_self_method(self, 'recvfrom', re_recvfrom)

old_method(self, *args, **kwds)

setattr(socket.socket, '_list_client_ip', {})

re_class_method(socket.socket, 'bind', re_bind)

re_class_method(socket.socket, 'accept', re_accept)

Python 環境搭建

本文將介紹如何在本地搭建python開發環境,python可應用於多平台包括 linux 和 mac os x。你可以通過終端視窗輸入 python 命令來檢視本地是否已經安裝python以及python的安裝版本。python最新原始碼,二進位制文件,新聞資訊等可以在python的官網檢視到 py...

Python 環境搭建

參考 動態載入庫 方法一 import sys your path 方法二 在python安裝目錄下的 lib site packages資料夾中建立乙個.pth檔案,內容為自己寫的庫路徑。示例如下 正規表示式 參考 python學習 第三方庫 tools pip的安裝 第三方庫 有whl檔案 比如...

Python環境搭建

python環境搭建 1.python直譯器 當然還有其他的一些直譯器,如pypy,jython等。2.安裝第三方庫 注意 1 安裝第三方庫之前需要計算機上已經安裝有microsoft visual c 2008 express edition,否則會提示錯誤 unable to findvcvar...