JWT工作原理
- 组成结构:
- JWT 由三部分组成,分别是头部(Header)、负载(Payload)和签名(Signature)。这三部分通过点(.)连接,形成一个完整的 JWT 字符串。
- 头部:通常包含两部分信息,一是令牌的类型,如
JWT
;二是使用的签名算法,如 HMAC SHA256
或 RSA
等。例如:
{
"alg": "HS256",
"typ": "JWT"
}
- 负载:是存放实际需要传递的数据的地方。它可以包含一些标准字段,如
iss
(签发者)、exp
(过期时间)、sub
(主题)等,也可以包含自定义字段。例如:
{
"sub": "1234567890",
"name": "John Doe",
"iat": 1516239022
}
- 签名:用于验证消息在传递过程中有没有被更改,并且,对于使用私钥签名的令牌,它还可以验证 JWT 的发送者的身份。要创建签名部分,需要使用编码后的头部、编码后的负载、一个密钥和头部中指定的签名算法。例如,如果使用 HMAC SHA256 算法,签名将按如下方式创建:
HMACSHA256(
base64UrlEncode(header) + "." +
base64UrlEncode(payload),
secret)
- 工作流程:
- 用户登录:用户在客户端输入用户名和密码,将其发送到后端服务器进行认证。
- 服务器认证:后端服务器收到认证请求后,验证用户名和密码。如果认证成功,服务器会生成一个 JWT,其中负载部分包含用户的相关信息(如用户 ID、用户名等)。
- 返回 JWT:服务器将生成的 JWT 返回给客户端。
- 客户端存储和使用:客户端通常将 JWT 存储在本地(如
localStorage
、sessionStorage
或 cookie
中)。之后,客户端每次向服务器发送请求时,都会在请求头的 Authorization
字段中带上这个 JWT,格式通常为 Bearer <token>
。
- 服务器验证:服务器接收到请求后,从请求头中提取 JWT,验证 JWT 的签名是否有效,以及 JWT 是否过期等。如果验证通过,则认为请求是合法的,允许访问受保护的资源;否则,返回未授权错误。
基于JWT实现简单用户身份验证中间件
使用 Node.js 和 Express 实现
- 安装依赖:
首先确保安装了
jsonwebtoken
和 express
库。可以通过以下命令安装:
npm install jsonwebtoken express
- 生成 JWT:
const jwt = require('jsonwebtoken');
const secretKey = 'your - secret - key';
// 模拟用户登录成功后生成JWT
function generateToken(user) {
const payload = {
userId: user.id,
username: user.username
};
return jwt.sign(payload, secretKey, { expiresIn: '1h' });
}
- 验证 JWT:
function verifyToken(req, res, next) {
const token = req.headers['authorization'];
if (!token) {
return res.status(401).send('Token is missing');
}
const bearerToken = token.split(' ')[1];
jwt.verify(bearerToken, secretKey, (err, decoded) => {
if (err) {
return res.status(403).send('Invalid token');
}
req.decoded = decoded;
next();
});
}
- 完整示例:
const express = require('express');
const jwt = require('jsonwebtoken');
const app = express();
const secretKey = 'your - secret - key';
// 模拟用户数据
const users = [
{ id: 1, username: 'user1', password: 'password1' }
];
// 登录路由,生成JWT
app.post('/login', (req, res) => {
const { username, password } = req.body;
const user = users.find(u => u.username === username && u.password === password);
if (!user) {
return res.status(401).send('Invalid credentials');
}
const token = generateToken(user);
res.send({ token });
});
// 受保护的路由,验证JWT
app.get('/protected', verifyToken, (req, res) => {
res.send('This is a protected route. User ID:'+ req.decoded.userId);
});
function generateToken(user) {
const payload = {
userId: user.id,
username: user.username
};
return jwt.sign(payload, secretKey, { expiresIn: '1h' });
}
function verifyToken(req, res, next) {
const token = req.headers['authorization'];
if (!token) {
return res.status(401).send('Token is missing');
}
const bearerToken = token.split(' ')[1];
jwt.verify(bearerToken, secretKey, (err, decoded) => {
if (err) {
return res.status(403).send('Invalid token');
}
req.decoded = decoded;
next();
});
}
const port = 3000;
app.listen(port, () => {
console.log(`Server running on port ${port}`);
});
使用 Python 和 Flask 实现
- 安装依赖:
安装
PyJWT
和 Flask
库。可以通过以下命令安装:
pip install PyJWT Flask
- 生成 JWT:
import jwt
from flask import Flask, request, jsonify
from datetime import datetime, timedelta
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your - secret - key'
# 模拟用户登录成功后生成JWT
def generate_token(user):
payload = {
'user_id': user['id'],
'username': user['username'],
'exp': datetime.utcnow() + timedelta(hours = 1)
}
return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
- 验证 JWT:
def verify_token():
token = None
if 'Authorization' in request.headers:
token = request.headers['Authorization'].split(' ')[1]
if not token:
return jsonify({'message': 'Token is missing!'}), 401
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
return data
except jwt.ExpiredSignatureError:
return jsonify({'message': 'Token has expired!'}), 401
except jwt.InvalidTokenError:
return jsonify({'message': 'Invalid token!'}), 401
- 完整示例:
import jwt
from flask import Flask, request, jsonify
from datetime import datetime, timedelta
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your - secret - key'
# 模拟用户数据
users = [
{'id': 1, 'username': 'user1', 'password': 'password1'}
]
# 登录路由,生成JWT
@app.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
user = next((user for user in users if user['username'] == username and user['password'] == password), None)
if not user:
return jsonify({'message': 'Invalid credentials'}), 401
token = generate_token(user)
return jsonify({'token': token})
# 受保护的路由,验证JWT
@app.route('/protected', methods=['GET'])
def protected():
data = verify_token()
if isinstance(data, tuple):
return data
return jsonify({'message': 'This is a protected route. User ID: {}'.format(data['user_id'])})
def generate_token(user):
payload = {
'user_id': user['id'],
'username': user['username'],
'exp': datetime.utcnow() + timedelta(hours = 1)
}
return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
def verify_token():
token = None
if 'Authorization' in request.headers:
token = request.headers['Authorization'].split(' ')[1]
if not token:
return jsonify({'message': 'Token is missing!'}), 401
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
return data
except jwt.ExpiredSignatureError:
return jsonify({'message': 'Token has expired!'}), 401
except jwt.InvalidTokenError:
return jsonify({'message': 'Invalid token!'}), 401
if __name__ == '__main__':
app.run(debug = True)