面试题答案
一键面试性能优化
- 资源分配优化
- 线程/进程池:SocketServer默认每个连接创建一个新的线程或进程处理。可以使用线程池或进程池技术,预先创建一定数量的线程或进程,避免频繁创建和销毁带来的开销。例如,使用
concurrent.futures
模块的ThreadPoolExecutor
或ProcessPoolExecutor
来管理线程或进程。
import concurrent.futures import socket from socketserver import BaseRequestHandler, TCPServer executor = concurrent.futures.ThreadPoolExecutor(max_workers = 100) class MyRequestHandler(BaseRequestHandler): def handle(self): data = self.request.recv(1024) response = self.process_data(data) self.request.sendall(response) def process_data(self, data): # 处理数据逻辑 return data.upper() def serve(): with TCPServer(('localhost', 9999), MyRequestHandler) as server: while True: request, client_address = server.socket.accept() executor.submit(server.process_request, request, client_address) if __name__ == "__main__": serve()
- 内存管理:合理使用缓存,避免重复计算。对于频繁访问的数据,可以使用
functools.lru_cache
(用于函数结果缓存)等技术,减少不必要的计算开销。
- 线程/进程池:SocketServer默认每个连接创建一个新的线程或进程处理。可以使用线程池或进程池技术,预先创建一定数量的线程或进程,避免频繁创建和销毁带来的开销。例如,使用
- 网络I/O优化
- 非阻塞I/O:将Socket设置为非阻塞模式,使用
select
、poll
或epoll
(在Linux系统上)等多路复用技术。例如,使用selectors
模块(Python 3.4+)实现多路复用:
import socket import selectors sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): data = conn.recv(1024) if data: response = data.upper() conn.sendall(response) else: sel.unregister(conn) conn.close() server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind(('localhost', 9999)) server_socket.listen(100) server_socket.setblocking(False) sel.register(server_socket, selectors.EVENT_READ, accept) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask)
- 优化缓冲区:适当调整Socket的发送和接收缓冲区大小。可以使用
setsockopt
方法来设置缓冲区大小,例如:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
- 非阻塞I/O:将Socket设置为非阻塞模式,使用
安全性提升
- 防止DDoS攻击
- 流量限制:设置连接速率限制,限制单个IP地址在一定时间内的连接数。可以使用
collections.deque
来记录连接时间,并在新连接到来时检查连接速率。
import socket from collections import deque import time ip_connection_timestamps = {} MAX_CONNECTIONS_PER_SECOND = 10 def is_allowed_connection(ip): if ip not in ip_connection_timestamps: ip_connection_timestamps[ip] = deque() timestamps = ip_connection_timestamps[ip] current_time = time.time() while timestamps and timestamps[0] < current_time - 1: timestamps.popleft() if len(timestamps) < MAX_CONNECTIONS_PER_SECOND: timestamps.append(current_time) return True return False server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind(('localhost', 9999)) server_socket.listen(100) while True: conn, addr = server_socket.accept() if is_allowed_connection(addr[0]): # 处理连接 pass else: conn.close()
- 使用CDN:将静态资源分发到CDN网络,减轻服务器直接承受的流量压力。CDN可以缓存和分发内容,使客户端从距离更近的节点获取数据,同时CDN提供商通常有一定的DDoS防护能力。
- 流量限制:设置连接速率限制,限制单个IP地址在一定时间内的连接数。可以使用
- 数据加密传输
- SSL/TLS:使用
ssl
模块为Socket连接添加SSL/TLS加密。
其中,import socket import ssl context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) context.load_cert_chain(certfile='server.crt', keyfile='server.key') server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind(('localhost', 9999)) server_socket.listen(100) ssl_socket = context.wrap_socket(server_socket, server_side = True) while True: conn, addr = ssl_socket.accept() data = conn.recv(1024) response = data.upper() conn.sendall(response) conn.close()
server.crt
是服务器证书文件,server.key
是服务器私钥文件。通过SSL/TLS加密,数据在传输过程中被加密,防止中间人攻击和数据泄露。 - SSL/TLS:使用