Distributed Locks in C#

For anybody that has had to work on a project where work could be handled by any number of processes (perhaps Web Servers, perhaps Workers), taking an exclusive lock on a resource can be challenging. There are a lot of variables to consider, resulting in a lot of edge cases that must be considered. After considering all options for my project (and there are a few), I opted to write my own. Not because any of the others weren’t good enough, but rather because they didn’t quite fit with how I wanted to do things. So I wrote SharpLock to do exactly what I wanted.

The first step is determining a way to share a resource among all of the processes. The most common way to do this is with a database. After all, most distributed projects have a database, and most (if not all processes) have access to this database. This makes it an abvious candidate for our data store. In my project, I was using MongoDB, and I didn’t want to deploy another data layer, just to support locking. So I decided to implement locking on top of MongoDB. The key for me was, instead of having a collection of locks, I wanted to directly lock the objects I was working with. So given an Events collection, containing Event objects to be processed, I would directly lock an Event, allowing multiple consumers of my Events collection without relying a separate collection.

Given that, and dased on my knowledge of MongoDB, I knew it is fairly well suited for something like this as it has 2 very useful features:

  • FindAndModify
  • Change Streams

With FindAndModify, we are able to issue a query to the database, more or less blindly, and trust the database to tell us if we have acquired the lock. We could, for example, do something like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var lock = db.locks.findAndModify({
    "query":{
        "_id": "some-task-to-run",
        "$or":[
            {
                "lockId": null,
                "lockTs": null
            },
            {
                "lockId": {"$ne": null},
                "lockTs": {"$lte": "<some-time-in-the-past>"}
            }
        ]
    },
    "update":{
        "$set": {
            "lockId": new ObjectId(),
            "lockTs": new Date()
        }
    },
    "new": true
});

What this does is, find a particular lock document that matches the lock we want to take out, and either returns it if there is no lock (lockId and lockTs are null) or the lock is “old”. Choosing the proper “old” is important here. We don’t want to squash a lock for a process that is still actively working on it. So you want choose a time that is several “iterations” in the past. An “iteration” being the amount of time it takes to refresh the lock. Lets imagine we have a long running process, it takes 10 or 15 minutes. We wouldn’t want to set out timeout to 30 minutes, that could unnecessarily delay processing if a node fails. We want to update our lockTs every 30 seconds or so, that way when we run this acquisition query, we can choose a time 2 or 3 minutes in the past. Allowing the processing node 4-6 failed updates before stealing the lock.

When this query is run, lock will either be null (we didn’t get the lock) or be the lock document. This makes it fairly easy for us to know when we have or have not gotten the lock. Since the findAndModify query is atmoic on a single document, we can trust that the lock met those conditions and only this one process got the lock.

When it comes to refreshing our lock, there are 2 options:

  • Run a task in parallel, updating the lockTs
  • Update the lockTs every n iterations of our code.

While both are fairly easy, I tend to like the first option better for 2 reaons

  • Less traffic to the database. If, during processing, I loop 100,000 times, updating every n iterations could result in a lot of database traffic, potentially degrading performance for little benefit
  • A better guarantee of updates happening in a timely manner. If processing of a single iteration takes a long time, a single update statement within the loop will be insufficient. This would require peppering the method with calls to refresh the lock.

While running a Task more or less blindly alongside the processing method isn’t a guarantee of update times (the ThreadPool can become overloaded and tasks could get queued for a long time), I have not run into an issue with this yet. If it really becomes an issue, and explicity Thread could be used to reduct the potential for starving of the refresh code.

The update to refresh the lock looks something like:

1
2
3
4
var updateResult = db.locks.updateOne(
    {"_id": "some-task-to-run", "lockId": lock.lockId},
    {"$set": {"lockTs": new Date()}});

Note that we don’t need to do a findAndModify any more, we can just issue an update and examine update result, if the nModified is 0, we lost the lock and should stop processing. Depending on how your application works, this may mean aborting immediately, going through some error handling to stop in a safe state, or rolling back some changes. I tend to favor the “abort immediatly” paradigm and have code at the start of my processors handle the cleanup of bad state. This is minimized by only commiting the results at the very end of an iteration.

Finally, when we’re done processing we have to release the lock:

1
2
3
4
var updateResult = db.locks.updateOne(
    {"_id": "some-task-to-run", "lockId": lock.lockId},
    {"$set": {"lockId": null, "lockTs": null}});

Putting this into abstracted C# code actually turned out to be a bit harder than I expected, but I learned a considerable amount about LINQ and modifying expression trees. Ultimately, acquiring a lock looks something like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
ISharpLockLogger sharpLockLogger = new LoggingShim();
var col = new List<LockBase>();
var dataStore = new SharpLockMongoDataStore<LockBase>(col, _sharpLockLogger, TimeSpan.FromSeconds(30));
using(var lck = new DistributedLock<LockBase, string>(dataStore))
{
    var lockedObject = await lck.AcquireLockAsync(lockBase);
    var processingComplete = false;
    var watchdogTask = Task.Run(async () =>
    {
        while (await lck.RefreshLockAsync(true, cancellationToken) && !processingComplete)
        {
            await Task.Delay(5000, cancellationToken);
        }
    }, cancellationToken);
    // do some work
    processingComplete = true;
    await watchdogTask;
}

The LockBase class implements ISharpLockable<T> where T is the type of the _id of the document, generally in my case this is ObjectId. This forces us to add the 2 required fields, LastUpdated and LockId. The LockBase class here is just an example, I extend many classes with ISharpLockable<T>. Add the SharpLockMongoDataStore to a DI provider, so it can be taken as a constructor parameter. The TimeSpan parameter tells the underlying library how often you expect to refresh the lock. This is used in conjunction with the staleLockMultiplier (optional parameter to AcquireLockAsync) to determine if a lock is “stale” and can be overwritten. The using statement allows us to implicitly dispose of the lock and release it.

I like this simple implementation of the lock and it and can be fairly easily extended with other data stores (including an in memory version for testing).

I’m not 100% satisfied with the way the watchdog works, I am considering moving it into the library to abstract that away, although I won’t require it to be used.

You may have noticed I mentioned Change Streams early on as a second item that makes MongoDB good for this type of locking. I am working on implementing this in the code, but essentially it gives us the ability to know immediately if the lock has be overwritten. This allows us to start a change stream watcher on our lock, if something overwrites it, an exception can be thrown immediately, notifying the processing code that this lock has been lost. There is no need to wait for the RefreshLock code to attempt a lock update before notifying that the lock is lost. This is a work in progress and I have decided where in code this would live.

comments powered by Disqus