SpreadJS 协同服务器 MongoDB 数据库适配支持


                                                                                                                                                <p>为了支持 <a href="https://www.oschina.net/action/GoToLink?url=https%3A%2F%2Fwww.grapecity.com.cn%2Fdeveloper%2Fspreadjs" target="_blank">SpreadJS</a> 协同编辑场景,协同服务器需要持久化存储文档、操作、快照及里程碑数据。本文介绍了 <strong>MongoDB 数据库适配器</strong>的实现方法,包括集合初始化、适配器接口实现以及里程碑存储支持。</p> 

一、MongoDB 集合初始化

协同编辑服务需要以下集合(Collections)来存储数据:

  • documents:存储文档基本信息(类型、版本号、快照版本号)。
  • operations:存储操作记录,用于实现基于 OT 的操作重放。
  • snapshot_fragments:存储快照的分片数据,支持大文档分段管理。
  • milestone_snapshot:存储里程碑快照,用于回溯和恢复。

初始化脚本示例如下:

export async function InitCollections(client) {const collectionsToCreate = [
        { name: 'documents', indexes: [{ key: { id: 1 }, unique: true }] },
        { name: 'operations', indexes: [{ key: { doc_id: 1, version: 1 }, unique: true }] },
        { name: 'snapshot_fragments', indexes: [{ key: { doc_id: 1, fragment_id: 1 }, unique: true }] },
        { name: 'milestone_snapshot', indexes: [{ key: { doc_id: 1, version: 1 }, unique: true }] },
    ];await client.connect();const db = client.db(dbName);
const existingCollections = (await db.listCollections().toArray()).map(c =&gt; c.name);
for (const col of collectionsToCreate) {if (!existingCollections.includes(col.name)) {await db.createCollection(col.name);console.log(`Collection '${col.name}' created.`);
        } else {console.log(`Collection '${col.name}' already exists.`);
        }for (const idx of col.indexes) {await db.collection(col.name).createIndex(idx.key, { unique: idx.unique });
        }
    }
await client.close();
}

二、MongoDB 适配器实现

  1. 适配器说明

适配器 MongoDb 继承了协同服务的数据库接口,负责:

  • 文档信息存取
  • 操作记录管理
  • 快照存储与更新
  • 分片快照管理

可根据业务需要,增加 事务(session)并发冲突检测

  1. 适配器核心实现
export class MongoDb extends Db {
    constructor(client) {
        super();
        this.client = client;
        this.client.connect()
        this.db = client.db(dbName);
    }

    async getDocument(docId) {
        const documents = this.db.collection('documents');
        let row = await documents.findOne({ id: docId });
        if (row) {
            return {
                id: row.id,
                type: row.type,
                version: row.version,
                snapshotVersion: row.snapshot_version
            };
        }
    }

    async getSnapshot(docId) {
        const documents = this.db.collection('documents');
        let row = await documents.findOne({ id: docId });
        if (!row) {
            return null;
        }
        const fragments = await this.getFragments(docId);
        return {
            id: row.id,
            v: row.snapshot_version,
            type: row.type,
            fragments: fragments
        };
    }

    async getFragments(docId) {

        const fragments = this.db.collection('snapshot_fragments');
        const rows = await fragments.find({ doc_id: docId }).toArray();

        if (rows.length === 0) {
            return {};
        }

        const results = {};
        for (const row of rows) {
            results[row.fragment_id] = JSON.parse(row.data);
        }
        return results;
    }

    async getFragment(docId, fragmentId) {
        const fragments = this.db.collection('snapshot_fragments');
        const row = await fragments.findOne({ doc_id: docId, fragment_id: fragmentId });
        if (row) {
            return JSON.parse(row.data);
        }
        return null;
    }

    async getOps(docId, from, to) {
        const operations = this.db.collection('operations');
        const query = { doc_id: docId, version: { $gte: from } };
        if (to !== undefined) {
            query.version.$lte = to;
        }
        const rows = await operations.find(query).toArray();
        if (rows.length === 0) {
            return [];
        }

        return rows.map(row =&gt; JSON.parse(row.operation));
    }

    async commitOp(docId, op, document) {
        try {
            const documents = this.db.collection('documents');
            const operations = this.db.collection('operations');
            const row = await documents.findOne({ id: docId });
            if (op.create) {
                if (row) {
                    throw new Error(`Document with id ${docId} already exists.`);
                }
                await documents.insertOne(
                    {
                        id: docId,
                        type: document.type,
                        version: document.version,
                        snapshot_version: document.snapshotVersion
                    },
                );
                await operations.insertOne(
                    {
                        doc_id: docId,
                        version: op.v,
                        operation: JSON.stringify(op)
                    },
                );
                return true;
            }
            else if (op.del) {
                if (!row) {
                    throw new Error(`Document with id ${docId} does not exist.`);
                }
                await documents.deleteOne(
                    { id: docId },
                );

                return true;
            }
            else {
                if (!row || row.version !== op.v) {
                    throw new Error(`Document with id ${docId} does not exist or version mismatch.`);
                }

                await operations.insertOne(
                    {
                        doc_id: docId,
                        version: op.v,
                        operation: JSON.stringify(op)
                    },
                );
                await documents.updateOne(
                    { id: docId },
                    { $set: { version: document.version } },
                );
                return true;
            }

        }
        catch (error) {
            console.error('Error committing operation:', error);
            return false;
        }
        finally {
        }
    }

    async commitSnapshot(docId, snapshot) {
        try {
            const documents = this.db.collection('documents');
            const fragments = this.db.collection('snapshot_fragments');
            const row = await documents.findOne({ id: docId },);
            if (!row) {
                throw new Error(`Document with id ${docId} does not exist.`);
            }
            const currentSnapshotVersion = row.snapshot_version;
            if (snapshot.fromVersion !== currentSnapshotVersion || snapshot.v &lt;= currentSnapshotVersion) {
                throw new Error(`Snapshot version mismatch: expected ${currentSnapshotVersion}, got ${snapshot.v}`);
            }

            await documents.updateOne(
                { id: docId },
                { $set: { snapshot_version: snapshot.v } },
            );

            if (snapshot.fragmentsChanges.deleteSnapshot) {
                fragments.deleteMany(
                    { doc_id: docId },
                );
            }
            else {
                const { createFragments, updateFragments, deleteFragments } = snapshot.fragmentsChanges;
                if (createFragments) {

                    const createOps = Object.entries(createFragments).map(([id, data]) =&gt; ({
                        doc_id: docId,
                        fragment_id: id,
                        data: JSON.stringify(data)
                    }));
                    if (createOps.length &gt; 0) {
                        await fragments.insertMany(
                            createOps,
                        );
                    }
                }
                if (updateFragments) {
                    const updateOps = Object.entries(updateFragments).map(([id, data]) =&gt; ({
                        updateOne: {
                            filter: { doc_id: docId, fragment_id: id },
                            update: { $set: { data: JSON.stringify(data) } }
                        }
                    }));
                    if (updateOps.length &gt; 0) {
                        await fragments.bulkWrite(
                            updateOps,
                            // { session }
                        );
                    }
                }
                if (deleteFragments) {
                    const deleteOps = deleteFragments.map(id =&gt; ({
                        deleteOne: {
                            filter: { doc_id: docId, fragment_id: id }
                        }
                    }));
                    if (deleteOps.length &gt; 0) {
                        await fragments.bulkWrite(deleteOps,
                        );
                    }
                }
            }
            return true;
        }
        catch (error) {
            console.error('Error committing snapshot:', error);
            return false;
        }
        finally {
        }
    }

    async close() {
        await this.client.close();
    }
}

三、里程碑数据存储

里程碑快照用于优化快照恢复性能(避免从头重放所有操作)。 实现类 MongoMilestoneDb 提供 保存读取 接口:

export class MongoMilestoneDb {constructor(client, interval) {this.client = client;this.interval = interval ? interval : 1000;this.db = client.db(dbName);
    }
async saveMilestoneSnapshot(snapshot) {const milestones = this.db.collection('milestone_snapshot');await milestones.insertOne({doc_id: snapshot.id,version: snapshot.v,snapshot: JSON.stringify(snapshot)
        });return true;
    }
async getMilestoneSnapshot(id, version) {const milestones = this.db.collection('milestone_snapshot');const row = await milestones.findOne(
            { doc_id: id, version: { $lte: version } },
            { sort: { version: -1 } }
        );if (row) {return JSON.parse(row.snapshot);
        }return null;
    }
}

四、在 DocumentServices 中配置

完成适配器与里程碑数据库后,需要在 DocumentServices 中进行配置:

const documentServices = new OT.DocumentServices(
{ db: new MongoDb(mongoClient),milestoneDb: new MongoMilestoneDb(mongoClient, 500)
});

这样,SpreadJS 协同服务器即可通过 MongoDB 实现文档存储、操作日志管理、快照与里程碑维护,保证协同编辑过程的高效与可扩展。

五、总结

本文展示了 SpreadJS 协同服务器对 MongoDB 数据库的适配实现,主要包括:

  1. 集合初始化:定义所需集合与索引。
  2. 数据库****适配器:支持文档、操作、快照的存储与管理。
  3. 里程碑存储:提供快照的高效回溯能力。
  4. 服务集成:在 DocumentServices 中配置 MongoDB 适配器。

借助 MongoDB 的高性能与灵活数据结构,SpreadJS 协同服务可实现稳定、可扩展的文档协作平台。

                                                                                </div>



Source link

未经允许不得转载:紫竹林-程序员中文网 » SpreadJS 协同服务器 MongoDB 数据库适配支持

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
关于我们 免责申明 意见反馈 隐私政策
程序员中文网:公益在线网站,帮助学习者快速成长!
关注微信 技术交流
推荐文章
每天精选资源文章推送
推荐文章
随时随地碎片化学习
推荐文章
发现有趣的