面试题答案
一键面试1. 自定义读写锁和互斥锁实现
自定义读写锁实现(以Python为例,基于threading
模块)
import threading
class ReadWriteLock:
def __init__(self):
self.mutex = threading.Lock()
self.read_semaphore = threading.Semaphore(0)
self.write_semaphore = threading.Semaphore(1)
self.readers_count = 0
def acquire_read(self):
self.mutex.acquire()
self.readers_count += 1
if self.readers_count == 1:
self.write_semaphore.acquire()
self.mutex.release()
self.read_semaphore.acquire()
def release_read(self):
self.mutex.acquire()
self.readers_count -= 1
if self.readers_count == 0:
self.write_semaphore.release()
self.mutex.release()
self.read_semaphore.release()
def acquire_write(self):
self.write_semaphore.acquire()
def release_write(self):
self.write_semaphore.release()
自定义互斥锁实现(Python的threading.Lock
本身就是互斥锁,这里简单自定义一个类似结构)
import threading
class MyMutex:
def __init__(self):
self.lock = threading.Lock()
def acquire(self):
self.lock.acquire()
def release(self):
self.lock.release()
对同一个数据结构的读写操作实现
data = []
def read_with_rwlock(rw_lock):
rw_lock.acquire_read()
try:
local_data = data.copy()
print(f"Read data: {local_data}")
finally:
rw_lock.release_read()
def write_with_rwlock(rw_lock, value):
rw_lock.acquire_write()
try:
data.append(value)
print(f"Write data: {value}")
finally:
rw_lock.release_write()
def read_with_mutex(mutex):
mutex.acquire()
try:
local_data = data.copy()
print(f"Read data: {local_data}")
finally:
mutex.release()
def write_with_mutex(mutex, value):
mutex.acquire()
try:
data.append(value)
print(f"Write data: {value}")
finally:
mutex.release()
2. 性能测试工具进行性能对比分析
使用timeit
模块进行性能测试(简单示例,测试多次读写操作)
import timeit
def test_rwlock():
rw_lock = ReadWriteLock()
threads = []
for _ in range(5):
t1 = threading.Thread(target=read_with_rwlock, args=(rw_lock,))
t2 = threading.Thread(target=write_with_rwlock, args=(rw_lock, 1))
threads.extend([t1, t2])
for t in threads:
t.start()
for t in threads:
t.join()
def test_mutex():
mutex = MyMutex()
threads = []
for _ in range(5):
t1 = threading.Thread(target=read_with_mutex, args=(mutex,))
t2 = threading.Thread(target=write_with_mutex, args=(mutex, 1))
threads.extend([t1, t2])
for t in threads:
t.start()
for t in threads:
t.join()
rwlock_time = timeit.timeit(test_rwlock, number = 100)
mutex_time = timeit.timeit(test_mutex, number = 100)
print(f"Time taken by Read - Write Lock for 100 runs: {rwlock_time}")
print(f"Time taken by Mutex for 100 runs: {mutex_time}")
分析结果:
- 一般情况下,读写锁在读操作频繁的场景下性能优于互斥锁。因为读写锁允许多个读操作同时进行,而互斥锁在任何时候都只允许一个线程访问数据,无论是读还是写。
- 在写操作频繁的场景下,两者性能差异可能不明显,因为写操作都需要独占访问权。但读写锁在写操作完成后,可能会因为唤醒读操作线程的调度开销,在极端写频繁场景下略逊于互斥锁。
3. 优化自定义锁性能的思路和措施
读写锁优化
- 减少锁粒度:如果数据结构可以细分,对不同部分使用不同的读写锁,这样可以提高并发度。例如,将一个大的列表按块划分,每个块使用一个读写锁。
- 读写优先级调整:通过引入策略,例如读写公平策略或者写优先策略,避免读操作长时间占用资源导致写操作饥饿。可以在
acquire_read
和acquire_write
方法中增加逻辑来实现优先级控制。 - 使用信号量优化:对于读写锁中的信号量,可以考虑使用更高效的信号量实现,如基于操作系统原语的信号量,减少线程调度开销。
互斥锁优化
- 使用更轻量级锁:在一些场景下,自旋锁(Spinlock)可能更适合。自旋锁在短时间内需要获取锁的场景下,通过在用户态自旋等待而不是陷入内核态进行线程调度,减少上下文切换开销。但自旋锁不适合长时间持有锁的场景,否则会浪费CPU资源。
- 优化锁的竞争策略:如果互斥锁竞争激烈,可以考虑使用队列式的互斥锁实现,按照线程请求锁的顺序来分配锁,避免线程饥饿。