WebSocket协议安全风险
- 跨站WebSocket劫持(CSWSH):攻击者利用用户已登录的会话,在用户不知情的情况下发起WebSocket连接,向服务器发送恶意指令。例如,攻击者在用户登录金融交易系统后,通过在恶意网页中嵌入JavaScript代码,利用用户浏览器的已认证会话发起WebSocket连接到金融交易系统服务器,执行交易操作。
- 中间人攻击:攻击者拦截客户端与服务器之间的通信,篡改数据或获取敏感信息。比如在WebSocket通信过程中,攻击者在网络中间截取WebSocket消息,修改交易金额等关键数据,或者窃取交易账号、密码等敏感信息。
安全实现方案及代码示例
- 加密
- TLS加密:在Python中,可以使用
websockets
库结合ssl
模块来实现TLS加密。
import asyncio
import websockets
import ssl
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_cert_chain(certfile='server.crt', keyfile='server.key')
async def echo(websocket, path):
while True:
try:
message = await websocket.recv()
await websocket.send(f"Received: {message}")
except websockets.exceptions.ConnectionClosedOK:
break
start_server = websockets.serve(echo, "localhost", 8765, ssl=ssl_context)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
- 身份验证
- 基于Token的身份验证:在连接WebSocket前,客户端先通过常规HTTP认证获取Token,然后在WebSocket连接时带上Token进行验证。
import asyncio
import websockets
from jwt import decode, exceptions
SECRET_KEY = "your_secret_key"
async def authenticate(websocket):
token = await websocket.recv()
try:
payload = decode(token, SECRET_KEY, algorithms=['HS256'])
return True
except exceptions.InvalidTokenError:
await websocket.close(code=1008, reason="Invalid token")
return False
async def echo(websocket, path):
if not await authenticate(websocket):
return
while True:
try:
message = await websocket.recv()
await websocket.send(f"Received: {message}")
except websockets.exceptions.ConnectionClosedOK:
break
start_server = websockets.serve(echo, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
- 访问控制
- 角色权限控制:根据用户角色,限制对不同功能的访问。假设金融交易系统有普通用户和管理员两种角色,普通用户只能进行查询操作,管理员可以进行所有操作。
import asyncio
import websockets
from jwt import decode, exceptions
SECRET_KEY = "your_secret_key"
async def authenticate(websocket):
token = await websocket.recv()
try:
payload = decode(token, SECRET_KEY, algorithms=['HS256'])
return payload
except exceptions.InvalidTokenError:
await websocket.close(code=1008, reason="Invalid token")
return None
async def echo(websocket, path):
payload = await authenticate(websocket)
if not payload:
return
role = payload.get('role')
while True:
try:
message = await websocket.recv()
if role == "admin":
await websocket.send(f"Admin received: {message}")
elif role == "user":
if "query" in message.lower():
await websocket.send(f"User query result: {message}")
else:
await websocket.send("You don't have permission for this operation.")
except websockets.exceptions.ConnectionClosedOK:
break
start_server = websockets.serve(echo, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()