<p>为了支持 <a href="https://www.oschina.net/action/GoToLink?url=https%3A%2F%2Fwww.grapecity.com.cn%2Fdeveloper%2Fspreadjs" target="_blank">SpreadJS</a> 协同编辑场景,协同服务器需要持久化存储文档、操作、快照及里程碑数据。本文介绍了 <strong>MongoDB 数据库适配器</strong>的实现方法,包括集合初始化、适配器接口实现以及里程碑存储支持。</p>
一、MongoDB 集合初始化
协同编辑服务需要以下集合(Collections)来存储数据:
- documents:存储文档基本信息(类型、版本号、快照版本号)。
- operations:存储操作记录,用于实现基于 OT 的操作重放。
- snapshot_fragments:存储快照的分片数据,支持大文档分段管理。
- milestone_snapshot:存储里程碑快照,用于回溯和恢复。
初始化脚本示例如下:
export async function InitCollections(client) {const collectionsToCreate = [
{ name: 'documents', indexes: [{ key: { id: 1 }, unique: true }] },
{ name: 'operations', indexes: [{ key: { doc_id: 1, version: 1 }, unique: true }] },
{ name: 'snapshot_fragments', indexes: [{ key: { doc_id: 1, fragment_id: 1 }, unique: true }] },
{ name: 'milestone_snapshot', indexes: [{ key: { doc_id: 1, version: 1 }, unique: true }] },
];await client.connect();const db = client.db(dbName);
const existingCollections = (await db.listCollections().toArray()).map(c => c.name);
for (const col of collectionsToCreate) {if (!existingCollections.includes(col.name)) {await db.createCollection(col.name);console.log(`Collection '${col.name}' created.`);
} else {console.log(`Collection '${col.name}' already exists.`);
}for (const idx of col.indexes) {await db.collection(col.name).createIndex(idx.key, { unique: idx.unique });
}
}
await client.close();
}
二、MongoDB 适配器实现
- 适配器说明
适配器 MongoDb
继承了协同服务的数据库接口,负责:
- 文档信息存取
- 操作记录管理
- 快照存储与更新
- 分片快照管理
可根据业务需要,增加 事务(session) 或 并发冲突检测。
- 适配器核心实现
export class MongoDb extends Db {
constructor(client) {
super();
this.client = client;
this.client.connect()
this.db = client.db(dbName);
}
async getDocument(docId) {
const documents = this.db.collection('documents');
let row = await documents.findOne({ id: docId });
if (row) {
return {
id: row.id,
type: row.type,
version: row.version,
snapshotVersion: row.snapshot_version
};
}
}
async getSnapshot(docId) {
const documents = this.db.collection('documents');
let row = await documents.findOne({ id: docId });
if (!row) {
return null;
}
const fragments = await this.getFragments(docId);
return {
id: row.id,
v: row.snapshot_version,
type: row.type,
fragments: fragments
};
}
async getFragments(docId) {
const fragments = this.db.collection('snapshot_fragments');
const rows = await fragments.find({ doc_id: docId }).toArray();
if (rows.length === 0) {
return {};
}
const results = {};
for (const row of rows) {
results[row.fragment_id] = JSON.parse(row.data);
}
return results;
}
async getFragment(docId, fragmentId) {
const fragments = this.db.collection('snapshot_fragments');
const row = await fragments.findOne({ doc_id: docId, fragment_id: fragmentId });
if (row) {
return JSON.parse(row.data);
}
return null;
}
async getOps(docId, from, to) {
const operations = this.db.collection('operations');
const query = { doc_id: docId, version: { $gte: from } };
if (to !== undefined) {
query.version.$lte = to;
}
const rows = await operations.find(query).toArray();
if (rows.length === 0) {
return [];
}
return rows.map(row => JSON.parse(row.operation));
}
async commitOp(docId, op, document) {
try {
const documents = this.db.collection('documents');
const operations = this.db.collection('operations');
const row = await documents.findOne({ id: docId });
if (op.create) {
if (row) {
throw new Error(`Document with id ${docId} already exists.`);
}
await documents.insertOne(
{
id: docId,
type: document.type,
version: document.version,
snapshot_version: document.snapshotVersion
},
);
await operations.insertOne(
{
doc_id: docId,
version: op.v,
operation: JSON.stringify(op)
},
);
return true;
}
else if (op.del) {
if (!row) {
throw new Error(`Document with id ${docId} does not exist.`);
}
await documents.deleteOne(
{ id: docId },
);
return true;
}
else {
if (!row || row.version !== op.v) {
throw new Error(`Document with id ${docId} does not exist or version mismatch.`);
}
await operations.insertOne(
{
doc_id: docId,
version: op.v,
operation: JSON.stringify(op)
},
);
await documents.updateOne(
{ id: docId },
{ $set: { version: document.version } },
);
return true;
}
}
catch (error) {
console.error('Error committing operation:', error);
return false;
}
finally {
}
}
async commitSnapshot(docId, snapshot) {
try {
const documents = this.db.collection('documents');
const fragments = this.db.collection('snapshot_fragments');
const row = await documents.findOne({ id: docId },);
if (!row) {
throw new Error(`Document with id ${docId} does not exist.`);
}
const currentSnapshotVersion = row.snapshot_version;
if (snapshot.fromVersion !== currentSnapshotVersion || snapshot.v <= currentSnapshotVersion) {
throw new Error(`Snapshot version mismatch: expected ${currentSnapshotVersion}, got ${snapshot.v}`);
}
await documents.updateOne(
{ id: docId },
{ $set: { snapshot_version: snapshot.v } },
);
if (snapshot.fragmentsChanges.deleteSnapshot) {
fragments.deleteMany(
{ doc_id: docId },
);
}
else {
const { createFragments, updateFragments, deleteFragments } = snapshot.fragmentsChanges;
if (createFragments) {
const createOps = Object.entries(createFragments).map(([id, data]) => ({
doc_id: docId,
fragment_id: id,
data: JSON.stringify(data)
}));
if (createOps.length > 0) {
await fragments.insertMany(
createOps,
);
}
}
if (updateFragments) {
const updateOps = Object.entries(updateFragments).map(([id, data]) => ({
updateOne: {
filter: { doc_id: docId, fragment_id: id },
update: { $set: { data: JSON.stringify(data) } }
}
}));
if (updateOps.length > 0) {
await fragments.bulkWrite(
updateOps,
// { session }
);
}
}
if (deleteFragments) {
const deleteOps = deleteFragments.map(id => ({
deleteOne: {
filter: { doc_id: docId, fragment_id: id }
}
}));
if (deleteOps.length > 0) {
await fragments.bulkWrite(deleteOps,
);
}
}
}
return true;
}
catch (error) {
console.error('Error committing snapshot:', error);
return false;
}
finally {
}
}
async close() {
await this.client.close();
}
}
三、里程碑数据存储
里程碑快照用于优化快照恢复性能(避免从头重放所有操作)。 实现类 MongoMilestoneDb
提供 保存 与 读取 接口:
export class MongoMilestoneDb {constructor(client, interval) {this.client = client;this.interval = interval ? interval : 1000;this.db = client.db(dbName);
}
async saveMilestoneSnapshot(snapshot) {const milestones = this.db.collection('milestone_snapshot');await milestones.insertOne({doc_id: snapshot.id,version: snapshot.v,snapshot: JSON.stringify(snapshot)
});return true;
}
async getMilestoneSnapshot(id, version) {const milestones = this.db.collection('milestone_snapshot');const row = await milestones.findOne(
{ doc_id: id, version: { $lte: version } },
{ sort: { version: -1 } }
);if (row) {return JSON.parse(row.snapshot);
}return null;
}
}
四、在 DocumentServices 中配置
完成适配器与里程碑数据库后,需要在 DocumentServices
中进行配置:
const documentServices = new OT.DocumentServices(
{ db: new MongoDb(mongoClient),milestoneDb: new MongoMilestoneDb(mongoClient, 500)
});
这样,SpreadJS 协同服务器即可通过 MongoDB 实现文档存储、操作日志管理、快照与里程碑维护,保证协同编辑过程的高效与可扩展。
五、总结
本文展示了 SpreadJS 协同服务器对 MongoDB 数据库的适配实现,主要包括:
- 集合初始化:定义所需集合与索引。
- 数据库****适配器:支持文档、操作、快照的存储与管理。
- 里程碑存储:提供快照的高效回溯能力。
- 服务集成:在
DocumentServices
中配置 MongoDB 适配器。
借助 MongoDB 的高性能与灵活数据结构,SpreadJS 协同服务可实现稳定、可扩展的文档协作平台。
</div>
未经允许不得转载:紫竹林-程序员中文网 » SpreadJS 协同服务器 MongoDB 数据库适配支持