1. 异步日志记录
- 优化思路:将同步的日志记录操作改为异步,避免主线程等待日志写入磁盘,从而提升整体性能。在高并发场景下,主线程执行关键业务逻辑,日志记录操作放到后台线程或协程中执行,减少对主线程的阻塞。
- 涉及模块:
concurrent.futures
模块中的ThreadPoolExecutor
或ProcessPoolExecutor
可以用来创建线程池或进程池,实现异步日志记录。例如:
import concurrent.futures
import logging
executor = concurrent.futures.ThreadPoolExecutor(max_workers = 10)
logger = logging.getLogger(__name__)
def log_async(message):
def wrapper():
logger.info(message)
executor.submit(wrapper)
- 另外,
asyncio
库适用于基于协程的异步编程,若应用是基于asyncio
的异步框架(如aiohttp
),可以使用asyncio
来异步记录日志:
import asyncio
import logging
logger = logging.getLogger(__name__)
async def log_async(message):
await asyncio.get_running_loop().run_in_executor(None, lambda: logger.info(message))
2. 批量日志写入
- 优化思路:减少频繁的磁盘I/O操作,将多条日志记录先缓存起来,达到一定数量或时间间隔后,一次性写入磁盘。这样可以减少磁盘I/O的次数,提高日志记录的效率。
- 涉及模块:可以自定义一个缓存类来实现批量写入。例如:
import logging
import time
class BatchLogger:
def __init__(self, batch_size = 100, flush_interval = 5):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.logs = []
self.last_flush_time = time.time()
def log(self, message):
self.logs.append(message)
if len(self.logs) >= self.batch_size or time.time() - self.last_flush_time >= self.flush_interval:
self.flush()
def flush(self):
if self.logs:
for log in self.logs:
logging.info(log)
self.logs = []
self.last_flush_time = time.time()
3. 优化日志级别和输出内容
- 优化思路:合理设置日志级别,在生产环境中避免记录过多的调试信息,只记录关键的、必要的日志。同时,精简日志输出的内容,减少不必要的字段和冗长的信息,从而减少日志记录的开销。
- 涉及模块:在Python的
logging
模块中,通过设置logger.setLevel(logging.INFO)
(或logging.ERROR
、logging.WARNING
等)来控制日志级别。例如:
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
4. 使用队列进行日志处理
- 优化思路:创建一个日志队列,主线程将日志消息放入队列,由专门的日志处理线程从队列中取出消息并写入日志文件。这样可以解耦日志记录和主线程的业务逻辑,提高整体的并发性能。
- 涉及模块:
queue
模块中的Queue
类可用于实现日志队列。示例代码如下:
import logging
import queue
import threading
log_queue = queue.Queue()
def log_worker():
while True:
try:
message = log_queue.get()
logging.info(message)
log_queue.task_done()
except Exception as e:
logging.error(f"Error in log worker: {e}")
logging_thread = threading.Thread(target = log_worker)
logging_thread.daemon = True
logging_thread.start()
def log_to_queue(message):
log_queue.put(message)
5. 选择高效的日志格式
- 优化思路:选择简单、高效的日志格式,避免复杂的格式化操作。例如,纯文本格式通常比XML或JSON格式更高效,因为解析和生成XML或JSON格式需要更多的计算资源。
- 涉及模块:在
logging
模块中,通过Formatter
类来设置日志格式。简单的纯文本格式示例:
import logging
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.addHandler(handler)