写操作保证数据一致性
- 复制集写关注(Write Concern)
- 原理:在复制集环境下,MongoDB允许设置写关注级别。写关注决定了在返回写操作成功响应之前,有多少个副本集成员需要确认写操作。例如,
w: "majority"
表示只有当大多数副本集成员确认写操作后,客户端才会收到成功响应。这确保了写操作在大多数节点上持久化,从而保证了数据一致性。
- Python实现:
from pymongo import MongoClient
client = MongoClient()
db = client.test_database
collection = db.test_collection
# 使用写关注w: "majority"插入文档
result = collection.insert_one({"name": "test"}, write_concern={"w": "majority"})
- 分片集群写操作
- 原理:在分片集群中,写操作会被路由到对应的分片上。为了保证一致性,MongoDB同样支持写关注。此外,分片集群的配置服务器(Config Server)会维护元数据,确保写操作能准确地路由到正确的分片。
- Python实现:连接分片集群的方式与连接复制集类似,只是连接字符串可能不同。写操作同样可以设置写关注:
from pymongo import MongoClient
client = MongoClient("mongodb://mongos1.example.com:27017,mongos2.example.com:27017")
db = client.test_database
collection = db.test_collection
# 使用写关注w: "majority"插入文档
result = collection.insert_one({"name": "test"}, write_concern={"w": "majority"})
读操作保证数据一致性
- 复制集读偏好(Read Preference)
- 原理:读偏好决定了从哪个副本集成员读取数据。默认情况下,读操作会从主节点读取,以保证读到最新的数据。但是,在某些情况下,可以选择从Secondary节点读取,以分担读压力。如果选择从Secondary节点读取,可能会读到稍微滞后的数据。可以通过设置读偏好来平衡一致性和性能。例如,
ReadPreference.PRIMARY
总是从主节点读取,保证数据一致性;ReadPreference.SECONDARY_PREFERRED
优先从Secondary节点读取,如果Secondary节点不可用则从主节点读取。
- Python实现:
from pymongo import MongoClient, ReadPreference
client = MongoClient()
db = client.test_database
collection = db.test_collection.with_options(read_preference = ReadPreference.PRIMARY)
# 从主节点读取数据
documents = list(collection.find())
- 分片集群读操作
- 原理:在分片集群中,读操作同样由mongos路由到相应的分片。读偏好的设置与复制集类似,可以根据业务需求选择从主节点或Secondary节点读取。
- Python实现:与复制集读偏好设置类似,在连接分片集群后,对集合设置读偏好:
from pymongo import MongoClient, ReadPreference
client = MongoClient("mongodb://mongos1.example.com:27017,mongos2.example.com:27017")
db = client.test_database
collection = db.test_collection.with_options(read_preference = ReadPreference.PRIMARY)
# 从主节点读取数据
documents = list(collection.find())
事务支持
- 原理:MongoDB从4.0版本开始支持多文档事务。事务可以确保一组操作要么全部成功,要么全部失败,从而保证数据一致性。在分布式系统中,这对于跨文档、跨集合甚至跨数据库的操作非常重要。
- Python实现:
from pymongo import MongoClient
client = MongoClient()
session = client.start_session()
session.start_transaction()
try:
db = client.test_database
collection1 = db.test_collection1
collection2 = db.test_collection2
collection1.insert_one({"name": "doc1"}, session = session)
collection2.insert_one({"name": "doc2"}, session = session)
session.commit_transaction()
except Exception as e:
session.abort_transaction()
raise e
finally:
session.end_session()