设计思路
- TCP 服务器基础:使用
socket
函数创建套接字,bind
函数绑定地址,listen
函数监听连接。
- 多线程处理:当有新的客户端连接时,创建新线程处理该客户端的通信,这样可以实现并发处理多个客户端连接。
- 线程池优化:为了避免频繁创建和销毁线程带来的开销,使用线程池。线程池初始化时创建一定数量的线程,将客户端连接请求放入任务队列,线程从任务队列中取出任务进行处理。
- 资源竞争处理:在访问共享资源(如任务队列)时,使用互斥锁(
pthread_mutex_t
)来保证同一时间只有一个线程能访问共享资源,避免资源竞争。同时,使用条件变量(pthread_cond_t
)来通知线程有新任务到来。
关键代码片段
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define PORT 8080
#define MAX_CLIENTS 100
#define THREAD_POOL_SIZE 10
// 任务结构体
typedef struct {
int client_socket;
struct sockaddr_in client_addr;
} Task;
// 线程池结构体
typedef struct {
Task tasks[MAX_CLIENTS];
int head;
int tail;
int count;
pthread_mutex_t mutex;
pthread_cond_t cond;
int stop;
} ThreadPool;
ThreadPool pool;
// 线程处理函数
void* handle_client(void* arg) {
while (1) {
Task task;
pthread_mutex_lock(&pool.mutex);
while (pool.count == 0 &&!pool.stop) {
pthread_cond_wait(&pool.cond, &pool.mutex);
}
if (pool.stop && pool.count == 0) {
pthread_mutex_unlock(&pool.mutex);
pthread_exit(NULL);
}
task = pool.tasks[pool.head];
pool.head = (pool.head + 1) % MAX_CLIENTS;
pool.count--;
pthread_mutex_unlock(&pool.mutex);
// 处理客户端通信
char buffer[1024] = {0};
int valread = read(task.client_socket, buffer, 1024);
printf("Message from client: %s\n", buffer);
char* hello = "Hello from server";
send(task.client_socket, hello, strlen(hello), 0);
close(task.client_socket);
}
return NULL;
}
int main(int argc, char const *argv[]) {
int server_fd, new_socket;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
// 创建套接字
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
// 设置套接字选项
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
perror("setsockopt");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// 绑定地址
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听连接
if (listen(server_fd, 3) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
// 初始化线程池
pool.head = 0;
pool.tail = 0;
pool.count = 0;
pool.stop = 0;
pthread_mutex_init(&pool.mutex, NULL);
pthread_cond_init(&pool.cond, NULL);
pthread_t threads[THREAD_POOL_SIZE];
for (int i = 0; i < THREAD_POOL_SIZE; i++) {
pthread_create(&threads[i], NULL, handle_client, NULL);
}
while (1) {
// 接受新的客户端连接
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) {
perror("accept");
continue;
}
pthread_mutex_lock(&pool.mutex);
while (pool.count == MAX_CLIENTS) {
pthread_mutex_unlock(&pool.mutex);
close(new_socket);
new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen);
if (new_socket < 0) {
perror("accept");
continue;
}
pthread_mutex_lock(&pool.mutex);
}
pool.tasks[pool.tail].client_socket = new_socket;
pool.tasks[pool.tail].client_addr = address;
pool.tail = (pool.tail + 1) % MAX_CLIENTS;
pool.count++;
pthread_cond_signal(&pool.cond);
pthread_mutex_unlock(&pool.mutex);
}
// 清理资源
pool.stop = 1;
pthread_cond_broadcast(&pool.cond);
for (int i = 0; i < THREAD_POOL_SIZE; i++) {
pthread_join(threads[i], NULL);
}
pthread_mutex_destroy(&pool.mutex);
pthread_cond_destroy(&pool.cond);
close(server_fd);
return 0;
}