在nodejs中如何锁定由多个异步方法共享的对象



我在nodejs中有一个具有不同属性的对象,有不同的异步函数可以访问和修改一些复杂的执行对象。单个异步函数可能有内部回调(或异步函数),可能需要一些时间来执行,然后该函数将修改该对象。我想锁定这个对象,直到我完成所有的修改,只有在此之后,任何其他异步函数才能访问它。

的例子:

var machineList = {};
function operation1() {
    waitForMachineList();
    //complex logic
    //modification of machineList, some closure and callbacks functions and again modification of machineList
    leaveFromMachineList();
}
function operation2() {
    waitForMachineList();
    //complex logic
    //modification of machineList, some closure and callbacks functions and again modification of machineList
    leaveFromMachineList();
}
function operation3() {
    waitForMachineList();
    //complex logic
    //modification of machineList, some closure and callbacks functions and again modification of machineList
    leaveFromMachineList();
}
function operation4() {
    waitForMachineList();
    //complex logic
    //modification of machineList, some closure and callbacks functions and again modification of machineList
    leaveFromMachineList();
}

假设machineList是一个复杂对象,通过不同的异步方法(operation1(), operation2(),…)对该对象进行不同的操作来修改它。这些操作可以根据来自客户端的请求以任意顺序和任意时间调用。每个请求将执行单个操作。

在每个操作函数中都有一些内部闭包函数和回调函数(或async函数),这可能需要一些时间。但是我想锁定machineList对象,直到任何一个操作完成。

在任何操作开始时,我想锁定像waitForMachineList()这样的对象,并在leaveFromMachineList()之后释放锁。

最后我想在nodejs中实现锁机制。就像c++中的Critical Session和c#中的lock。

所以请一些将有助于实现它在nodejs?或建议我任何节点模块,我可以使用这个

我已经使用异步锁节点模块完成了锁定。现在我可以实现问题中提到的目标了。

的例子:

var AsyncLock = require('async-lock');
var lock = new AsyncLock();
function operation1() {
    console.log("Execute operation1");
    lock.acquire("key1", function(done) {
        console.log("lock1 enter")
        setTimeout(function() {
            console.log("lock1 Done")
            done();
        }, 3000)
    }, function(err, ret) {
        console.log("lock1 release")
    }, {});
}
function operation2() {
    console.log("Execute operation2");
    lock.acquire("key1", function(done) {
        console.log("lock2 enter")
        setTimeout(function() {
            console.log("lock2 Done")
            done();
        }, 1000)
    }, function(err, ret) {
        console.log("lock2 release")
    }, {});
}
function operation3() {
    console.log("Execute operation3");
    lock.acquire("key1", function(done) {
        console.log("lock3 enter")
        setTimeout(function() {
            console.log("lock3 Done")
            done();
        }, 1)
    }, function(err, ret) {
        console.log("lock3 release")
    }, {});
}operation1(); operation2(); operation3();
输出:

执行operation1

lock1输入

执行operation2

执行operation3

lock1做

lock1释放

lock2输入

lock2做

lock2释放

lock3输入

lock3做

lock3发布

我创建了一个async-lock的简化版本,我想在这里分享给那些想要了解这样的东西是如何在技术上实现的或者不想添加新包的人。

const createLock = () => {
  const queue = [];
  let active = false;
  return (fn) => {
    let deferredResolve;
    let deferredReject;
    const deferred = new Promise((resolve, reject) => {
      deferredResolve = resolve;
      deferredReject = reject;
    });
    const exec = async () => {
      await fn().then(deferredResolve, deferredReject);
      if (queue.length > 0) {
        queue.shift()();
      } else {
        active = false;
      }
    };
    if (active) {
      queue.push(exec);
    } else {
      active = true;
      exec();
    }
    return deferred;
  };
};

你可以这样使用这个函数:

function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}
const lock = createLock();
function test(id, ms) {
  lock(async () => {
    console.log(id, "start");
    await sleep(ms);
    console.log(id, "end");
  });
}
test(1, 400);
test(2, 300);
test(3, 200);
test(4, 100);
// Output:
// 1 start
// 1 end
// 2 start
// 2 end
// 3 start
// 3 end
// 4 start
// 4 end

但是要注意传递的函数必须是Promise

我写这段代码是为了解决类似的问题。

我认为这是一种经典的DB事务模型。它只实现锁机制,不实现回滚。它的工作方式如下:

  1. 你添加了一堆同步方法,按照顺序,他们应该被调用
  2. 这些方法将在N毫秒内被调用
  3. 将只有一个get请求来获取DB数据(或任何其他需要修改的数据)和一个请求来保存DB数据
  4. 每个方法将接收对"最新数据"的引用,即处理程序№2将接收被处理程序№1更改的数据对象。订单取决于第1号订单

它可以在浏览器中工作。它应该在Node.js中工作,但我没有测试它。

注意,因为每个方法都会接收到object的引用,你应该修改实际的引用,如果你想读取它并返回一些值,那么复制它,不要返回那个引用,因为那个引用的值将来可能会被将来的处理程序更改

TRANSACTION在执行前将等待N毫秒(默认为250)。它定义了哪些方法将被分组到单个事务中。您还可以拨打即时电话。

代码如下:

let TIMER_ID = 0;
let LOCK = Promise.resolve();
let RESOLVE_LOCK = undefined;
let FUNC_BUFFER = [];
let SUCCESS_BUFFER = [];
let FAIL_BUFFER = [];

/**
 * Gets DB data. 
 */
async function get() {
    return {
        key1: "value1",
        key2: {
            key3: "value2"
        }
    };
}
/**
 * Sets new data in DB.
 */
async function set(value) {
    return;
}

/**
 * Classic database transaction implementation.
 *
 * You adding bunch of methods, every method will be
 * executed with valid instance of data at call time,
 * after all functions end the transaction will end.
 * If any method failed, then entire transaction will fail
 * and no changes will be written. If current transaction is
 * active, new one will be not started until end of previous one.
 *
 * In short, this transaction have ACID properties.
 *
 * Operations will be auto grouped into separate transactions
 * based on start timeout, which is recreated every time on
 * operation call.
 *
 * @example
 * ```
 * // Without transaction:
 * create("1", 123)
 * update("1", 321)
 * read("1") => 123 // because `update` is async
 *
 * // With transaction:
 * create("1", 123)
 * update("1", 321)
 * read("1") => 321
 *
 * // Without transaction:
 * create("v", true)
 * update("v", false) // throws internal error,
 *                    // "v" will be created
 * read("v") => true // because `update` is async
 * update("v", false) // will update because `update` is async
 *
 * // With transaction:
 * create("v", true)
 * update("v", false) // throws internal error,
 *                    // entire transaction will throw error,
 *                   // "v" will be not created
 * read("v") => true // entire transaction will throw error
 * update("v", false) // entire transaction will throw error
 * ```
 *
 * @example
 * ```
 * // Transaction start
 * create()
 * update()
 * update()
 * remove()
 * // Transaction end
 *
 * // Transaction start
 * create()
 * update()
 * sleep(1000)
 * // Transaction end
 *
 * // Transaction start
 * update()
 * remove()
 * // Transaction end
 * ```
 */
const TRANSACTION = {
    /**
     * Adds function in transaction.
     *
     * NOTE:
     * you shouldn't await this function, because
     * it only represents transcation lock, not
     * entire transaction end.
     *
     * @param f
     * Every function should only modify passed state, don't
     * reassign passed state and not save changes manually!
     * @param onSuccess
     * Will be called on entire transaction success.
     * @param onFail
     * Will be called on entire transaction fail.
     * @param startTimeout
     * Passed `f` will be added in current transaction,
     * and current transaction will be called after
     * `startTimeout` ms if there will be no more `f` passed.
     * Default value is recommended.
     */
    add: async function(
        f,
        onSuccess,
        onFail,
        startTimeout
    ) {
        await LOCK;
        window.clearTimeout(TIMER_ID);
        FUNC_BUFFER.push(f);
        if (onSuccess) {
            SUCCESS_BUFFER.push(onSuccess);
        }
        if (onFail) {
            FAIL_BUFFER.push(onFail);
        }
        if (startTimeout == null) {
            startTimeout = 250;
        }
        TIMER_ID = window.setTimeout(() => {
            TRANSACTION.start();
        }, startTimeout);
        console.debug("Added in transaction");
    },
    start: async function() {
        LOCK = new Promise((resolve) => {
            RESOLVE_LOCK = resolve;
        });
        console.debug("Transaction locked");
        let success = true;
        try {
            await TRANSACTION.run();
        } catch (error) {
            success = false;
            console.error(error);
            console.warn("Transaction failed");
        }
        if (success) {
            for (const onSuccess of SUCCESS_BUFFER) {
                try {
                    onSuccess();
                } catch (error) {
                    console.error(error);
                }
            }
        } else {
            for (const onFail of FAIL_BUFFER) {
                try {
                    onFail();
                } catch (error) {
                    console.error(error);
                }
            }
        }
        FUNC_BUFFER = [];
        SUCCESS_BUFFER = [];
        FAIL_BUFFER = [];
        RESOLVE_LOCK();
        console.debug("Transaction unlocked");
    },
    run: async function() {
        const data = await get();
        const state = {
            value1: data.key1,
            value2: data.key2
        };
        for (const f of FUNC_BUFFER) {
            console.debug("Transaction function started");
            f(state);
            console.debug("Transaction function ended");
        }
        await set({
            key1: state.value1,
            key2: state.value2
        });
    }
}

示例1:

/**
 * Gets DB data. 
 */
async function get() {
    return {
        key1: "value1",
        key2: {
            key3: "value2"
        }
    };
}
/**
 * Sets new data in DB.
 */
async function set(value) {
    console.debug("Will be set:", value);
    return;
}

new Promise(
    (resolve) => {
        TRANSACTION.add(
            (data) => {
                data.value2.key3 = "test1";
            },
            () => console.debug("success № 1")
        );
        TRANSACTION.add(
            (data) => {
                const copy = {
                    ...data.value2
                };
                
                resolve(copy);
            },
            () => console.debug("success № 2")
        );
        TRANSACTION.add(
            (data) => {
                data.value1 = "test10";
                data.value2.key3 = "test2";
            },
            () => console.debug("success № 3")
        );
    }
)
.then((value) => {
    console.debug("result:", value);
});
/* Output:
Added in transaction
Added in transaction
Added in transaction
Transaction locked
Transaction function started
Transaction function ended
Transaction function started
Transaction function ended
Transaction function started
Transaction function ended
Will be set: {key1: 'test10', key2: {key3: 'test2'}}
result: {key3: 'test1'}
success № 1
success № 2
success № 3
Transaction unlocked 
*/
例二:
TRANSACTION.add(
    () => {
        console.log(1);
    }
);
TRANSACTION.add(
    () => {
        console.log(2);
    },
    undefined,
    undefined,
    0 // instant call
);

/* Output:
16:15:34.715 Added in transaction
16:15:34.715 Added in transaction
16:15:34.717 Transaction locked
16:15:34.717 Transaction function started
16:15:34.718 1
16:15:34.718 Transaction function ended
16:15:34.718 Transaction function started
16:15:34.718 2
16:15:34.718 Transaction function ended
16:15:34.719 Transaction unlocked
*/

示例三:

TRANSACTION.add(
    () => {
        console.log(1);
    }
);
TRANSACTION.add(
    () => {
        console.log(2);
    },
    undefined,
    undefined,
    0 // instant call
);
await new Promise((resolve) => {
    window.setTimeout(() => {
        resolve();
    }, 1000);
});
TRANSACTION.add(
    () => {
        console.log(3);
    }
);

/* Output:
16:19:56.840 Added in transaction
16:19:56.840 Added in transaction
16:19:56.841 Transaction locked
16:19:56.841 Transaction function started
16:19:56.842 1
16:19:56.842 Transaction function ended
16:19:56.842 Transaction function started
16:19:56.842 2
16:19:56.842 Transaction function ended
16:19:56.842 Transaction unlocked
16:19:57.840 Added in transaction
16:19:58.090 Transaction locked
16:19:58.090 Transaction function started
16:19:58.090 3
16:19:58.091 Transaction function ended
16:19:58.091 Transaction unlocked
*/

最新更新