使用 MySQL 为 SpreadJS 协同服务器提供存储支持


                                                                                                                                                <p>在多人实时编辑的场景下,SpreadJS 协同服务器需要持久化存储 <strong>文档信息</strong>、<strong>操作日志</strong>、<strong>快照分片</strong> 以及 <strong>里程碑快照</strong>。 如果你的系统更偏向关系型数据库,那么 <strong>MySQL</strong> 就是一个很合适的选择。</p> 

本文将带你实现 SpreadJS 协同服务器的 MySQL 数据库适配器


🗂️ 数据库建表设计

我们需要 4 张核心表:

  1. documents:存储文档基本信息(文档 ID、类型、版本号、快照版本号)
  2. operations:存储用户的操作记录,用于 OT 算法重放
  3. snapshot_fragments:存储快照的分片数据
  4. milestone_snapshot:存储里程碑快照,提升文档恢复速度

📌 建表 SQL 示例:

CREATE TABLE IF NOT EXISTS documents(
    id VARCHAR(255) PRIMARY KEY,
    type VARCHAR(255) NOT NULL,
    version INT NOT NULL,
    snapshot_version INT NOT NULL
);

CREATE TABLE IF NOT EXISTS operations(
    doc_id VARCHAR(255) NOT NULL,
    version INT NOT NULL,
    operation TEXT NOT NULL,
    PRIMARY KEY(doc_id, version),
    FOREIGN KEY(doc_id) REFERENCES documents(id) ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS snapshot_fragments(
    doc_id VARCHAR(255) NOT NULL,
    fragment_id VARCHAR(255) NOT NULL,
    data TEXT NOT NULL,
    PRIMARY KEY(doc_id, fragment_id),
    FOREIGN KEY(doc_id) REFERENCES documents(id) ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS milestone_snapshot(
    doc_id VARCHAR(255) NOT NULL,
    version INT NOT NULL,
    snapshot TEXT NOT NULL,
    PRIMARY KEY(doc_id, version),
    FOREIGN KEY(doc_id) REFERENCES documents(id) ON DELETE CASCADE
);

⚠️ 列名并非固定,开发者可以根据需要调整。 如果要扩展更多业务数据,可以新建表,并通过 中间件 / 钩子函数 与协同服务集成。


🛠️ MySQL 适配器实现

适配器 MySQLDb 负责与数据库交互,实现以下功能:

  • 文档信息存取
  • 操作日志管理
  • 快照分片存储与更新
  • 事务保障一致性

核心逻辑:

  • getDocument / getSnapshot → 查询文档信息、快照和分片
  • getOps → 按版本范围获取操作日志
  • commitOp → 事务执行操作写入 & 文档版本更新
  • commitSnapshot → 事务更新快照版本 & 分片数据

示例:

export class MySQLDb extends Db {

    constructor(pool) {
        super();
        this.pool = pool;
    }

    async getDocument(docId) {
        console.log("Fetching document:", docId);
        const connection = await this.pool.getConnection();
        const [rows] = await connection.execute(
            'SELECT * FROM documents WHERE id = ?',
            [docId]
        );
        console.log("Fetched document rows:", rows);
        connection.release();
        if (0 === rows.length) return;

        const row = rows[0];
        return {
            id: row.id,
            type: row.type,
            version: row.version,
            snapshotVersion: row.snapshot_version
        };
    }
    async getSnapshot(docId) {
        console.log("Fetching snapshot for document:", docId);
        const connection = await this.pool.getConnection();
        const [rows] = await connection.execute(
            'SELECT * FROM documents WHERE id = ?',
            [docId]
        );
        connection.release();
        console.log("Fetched snapshot rows:", rows);
        if (0 === rows.length) return;

        const row = rows[0];
        const fragments = await this.getFragments(docId);
        return {
            id: row.id,
            v: row.snapshot_version,
            type: row.type,
            fragments: fragments
        };
    }

    async getFragments(docId) {
        console.log("Fetching fragments for document:", docId);
        const connection = await this.pool.getConnection();
        const [rows] = await connection.execute(
            'SELECT fragment_id, data FROM snapshot_fragments WHERE doc_id = ?',
            [docId]
        );
        console.log("Fetched fragments rows:", rows);
        connection.release();
        const fragments = {};
        for (const row of rows) {
            fragments[row.fragment_id] = JSON.parse(row.data);
        }
        return fragments;
    }

    async getFragment(docId, fragmentId) {
        console.log("Fetching fragment:", docId, fragmentId);
        const connection = await this.pool.getConnection();
        const [rows] = await connection.execute(
            'SELECT data FROM snapshot_fragments WHERE doc_id = ? AND fragment_id = ?',
            [docId, fragmentId]
        );
        connection.release();
        console.log("Fetched fragment rows:", rows);
        if (0 === rows.length) return;
        return JSON.parse(rows[0].data);
    }

    async getOps(docId, from, to) {
        console.log("Fetching operations for document:", docId, "from version:", from, "to version:", to);
        const connection = await this.pool.getConnection();
        const [rows] = await connection.execute(
            'SELECT operation FROM operations WHERE doc_id = ? AND version &gt;= ?' + (to ? ' AND version &lt;= ?' : '') + " ORDER BY version",
            to ? [docId, from, to] : [docId, from]
        );
        connection.release();
        console.log("Fetched operations rows:", rows);
        if (0 === rows.length) return [];
        return rows.map(row =&gt; JSON.parse(row.operation));
    }

    async commitOp(docId, op, document) {
        console.log("Committing operation:", docId, op, document);
        const connection = await this.pool.getConnection();
        try {
            connection.beginTransaction();

            // query version from document db
            const [rows] = await connection.execute(
                'SELECT version FROM documents WHERE id = ?',
                [docId]
            );
            if (op.create) {
                console.log("Creating new document:", docId, rows);
                if (rows.length &gt; 0) {
                    await connection.rollback();
                    return false;
                }
                await connection.execute(
                    'INSERT INTO documents (id, type, version, snapshot_version) VALUES (?, ?, ?, ?)',
                    [docId, document.type, document.version, document.snapshotVersion]
                );
                await connection.execute(
                    'INSERT INTO operations (doc_id, version, operation) VALUES (?, ?, ?)',
                    [docId, op.v, JSON.stringify(op)]
                );

                await connection.commit();
                console.log("Operation create successfully.");
                return true;
            }
            else if (op.del) {
                console.log("Deleting document:", docId, rows);
                if (rows.length === 0) {
                    await connection.rollback();
                    return false;
                }
                await connection.execute(
                    'DELETE FROM documents WHERE id = ?',
                    [docId]
                );
                await connection.commit();
                console.log("Operation delete successfully.");
                return true;
            }
            else {
                console.log("Updating operation:", docId, op, rows);
                if (rows.length === 0 || rows[0].version !== op.v) {
                    await connection.rollback();
                    return false;
                }
                await connection.execute(
                    'INSERT INTO operations (doc_id, version, operation) VALUES (?, ?, ?)',
                    [docId, op.v, JSON.stringify(op)]
                );
                await connection.execute(
                    'UPDATE documents SET version = ? WHERE id = ?',
                    [document.version, docId]
                );
                await connection.commit();
                console.log("Operation update successfully.");
                return true;

            }

        } catch (error) {
            console.error('Error committing operation:', error);
            await connection.rollback();
            return false;
        }
        finally {
            connection.release();
        }

    }

    async commitSnapshot(docId, snapshot) {
        console.log("Committing snapshot for document:", docId, snapshot);
        const connection = await this.pool.getConnection();
        try {
            connection.beginTransaction();

            // query snapshot_version from document db
            const [rows] = await connection.execute(
                'SELECT snapshot_version FROM documents WHERE id = ?',
                [docId]
            );
            if (0 === rows.length) {
                await connection.rollback();
                return false;
            }
            const currentSnapshotVersion = rows[0].snapshot_version;
            if (snapshot.fromVersion !== currentSnapshotVersion || snapshot.v &lt;= currentSnapshotVersion) {
                await connection.rollback();
                return false;
            }

            await connection.execute(
                'UPDATE documents SET snapshot_version = ? WHERE id = ?',
                [snapshot.v, docId]
            );

            if (snapshot.fragmentsChanges.deleteSnapshot) {
                await connection.execute(
                    'DELETE FROM snapshot_fragments WHERE doc_id = ?',
                    [docId]
                );
            } else {
                const { createFragments, updateFragments, deleteFragments } = snapshot.fragmentsChanges;
                console.log("Committing snapshot fragments changes:", createFragments, updateFragments, deleteFragments);
                if (createFragments) {
                    for (const [fragmentId, data] of Object.entries(createFragments)) {
                        await connection.execute(
                            'INSERT INTO snapshot_fragments (doc_id, fragment_id, data) VALUES (?, ?, ?)',
                            [docId, fragmentId, JSON.stringify(data)]
                        );
                    }
                }
                if (updateFragments) {
                    for (const [fragmentId, data] of Object.entries(updateFragments)) {
                        await connection.execute(
                            'UPDATE snapshot_fragments SET data = ? WHERE doc_id = ? AND fragment_id = ?',
                            [JSON.stringify(data), docId, fragmentId]
                        );
                    }
                }
                if (deleteFragments) {
                    for (const fragmentId of deleteFragments) {
                        await connection.execute(
                            'DELETE FROM snapshot_fragments WHERE doc_id = ? AND fragment_id = ?',
                            [docId, fragmentId]
                        );
                    }
                }
            }

            await connection.commit();
            return true;

        } catch (error) {
            console.error('Error committing snapshot:', error);
            await connection.rollback();
            return false;
        } finally {
            connection.release();
        }
    }

    async close() {
        await this.pool.end();
    }

}

📌 数据流转示意:

用户操作 --&gt; commitOp --&gt; operations表
                       --&gt; documents表(version更新)
快照提交 --&gt; commitSnapshot --&gt; snapshot_fragments表
                             --&gt; documents表(snapshot_version更新)

🏷️ 里程碑快照存储

与 MongoDB 版本类似,MySQL 适配器也支持 里程碑快照,用于快速恢复文档。

实现类 MySQLMilestoneDb 提供两个接口:

  • saveMilestoneSnapshot → 定期保存完整快照
  • getMilestoneSnapshot → 查找 ≤指定版本 的最近快照

示例:

export class MySQLMilestoneDb {
    constructor(pool, interval) {
        this.pool = pool;
        this.interval = interval || 100; // 默认间隔
    }

    async saveMilestoneSnapshot(snapshot) {
        const connection = await this.pool.getConnection();
        try {
            await connection.beginTransaction();
            await connection.execute(
                'INSERT INTO milestone_snapshot (doc_id, version, snapshot) VALUES (?, ?, ?)',
                [snapshot.id, snapshot.v, JSON.stringify(snapshot)]
            );
            await connection.commit();
            return true;
        } catch (error) {
            await connection.rollback();
            return false;
        } finally {
            connection.release();
        }
    }

    async getMilestoneSnapshot(id, version) {
        const connection = await this.pool.getConnection();
        const [rows] = await connection.execute(
            'SELECT * FROM milestone_snapshot WHERE doc_id = ? AND version &lt;= ? ORDER BY version DESC LIMIT 1',
            [id, version]
        );
        connection.release();
        if (0 === rows.length) return;
        return JSON.parse(rows[0].snapshot);
    }
}

📌 恢复流程:

里程碑快照 (v=1000) + 增量操作 (1001-1020) = 最新文档

⚙️ 在 DocumentServices 中集成

最后,在协同服务器中配置 MySQL 适配器:

const mySqlAdapter = new MySQLDb(mySqlPool);
const MySQLMilestoneAdapter = new MySQLMilestoneDb(mySqlPool, 100);

const documentServices = new OT.DocumentServices({ 
    db: mySqlAdapter, 
    milestoneDb: MySQLMilestoneAdapter
});

server.useFeature(OT.documentFeature(documentServices));

至此,SpreadJS 协同服务器即可使用 MySQL 作为后端存储。


✅ 总结

本文展示了如何基于 MySQL 为 SpreadJS 协同服务器实现数据库适配,主要内容包括:

  1. 建表设计 → documents / operations / snapshot_fragments / milestone_snapshot
  2. MySQL 适配器实现 → 支持事务,保证操作和快照一致性
  3. 里程碑快照 → 提升文档恢复效率
  4. 服务集成 → 在 DocumentServices 中配置适配器

通过 MySQL,你可以获得更强的数据一致性和事务控制能力,非常适合企业级协同编辑场景。

                                                                                </div>



Source link

未经允许不得转载:紫竹林-程序员中文网 » 使用 MySQL 为 SpreadJS 协同服务器提供存储支持

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
关于我们 免责申明 意见反馈 隐私政策
程序员中文网:公益在线网站,帮助学习者快速成长!
关注微信 技术交流
推荐文章
每天精选资源文章推送
推荐文章
随时随地碎片化学习
推荐文章
发现有趣的