Atomicless Concurrency

Let’s say we’re building an allocator. Good allocators need to serve many threads simultaneously, and as such any lock they take is going to be highly contended. One way to work around this, pioneered by TCMalloc, is to have thread-local caches of blocks (hence, the “TC” - thread cached).

Unfortunately threads can be ephemeral, so book-keeping needs to grow dynamically, and large, complex programs (like the Google Search ranking server) can have tens of thousands of threads, so per-thread cost can add up. Also, any time a thread context-switches and resumes, its CPU cache will contain different cache lines – likely the wrong ones. This is because either another thread doing something compeltely different executed on that CPU, or the switched thread migrated to execute on a different core.

These days, instead of caching per-thread, TCMalloc uses per-CPU data. This means that book-keeping is fixed, and this is incredibly friendly to the CPU’s cache: in the steady-state, each piece of the data will only ever be read or written to by a single CPU. It also has the amazing property that there are no atomic operations involved in the fast path, because operations on per-CPU data, by definition, do not need to be synchronized with other cores.

This post gives an overview of how to build a CPU-local data structure on modern Linux. The exposition will be for x86, but other than the small bits of assembly you need to write, the technique is architecture-independent.

The Kernel Primitive

Concurrency primitives require cooperating with the kernel, which is responsible for global scheduling decisions on the system. However, making syscalls is quite expensive; to alieviate this, there has been a trend in Linux to use shared memory as a kernelspace/userspace communication channel.

Futexes are the classic “cas-with-the-kernel” syscall (I’m assuming basic knowledge of atomic operations like cas in this article). In the happy path, we just need to cas on some memory to lock a futex, and only make a syscall if we need to go to sleep because of contention. The kernel will perform its own cas on this variable if necessary.

Restartable sequences are another such proto-primitive, which are used for per-CPUuprogramming. The relevant syscall for us, rseq(2), was added in Linux 4.18. Its manpage reads

A restartable sequence is a sequence of instructions guaranteed to be executed atomically with respect to other threads and signal handlers on the current CPU. If its execution does not complete atomically, the kernel changes the execution flow by jumping to an abort handler defined by userspace for that restartable sequence.

A restartable sequence, or “rseq” is a special kind of critical section that the kernel guarantees executes from start to finish without any kind of preemption. If preemption does happen (because of a signal or whatever), userspace observes this as a jump to a special handler for that critical section. Conceptually it’s like handling an exception:

try {
  // Per-CPU code here.
} catch (PremptionException) {
  // Handle having been preempted, which usally just means
  // "try again".
}

These critical sections are usually of the following form:

  1. Read the current CPU index (the rseq mechanism provides a way to do this).
  2. Index into some data structure and do something to it.
  3. Complete the operation with a single memory write. This is the “commit”.

All the kernel tells us is that we couldn’t finish successfully. We can always try again, but the critical section needs to be such that executing any prefix of it, up to the commit, has no effect on the data structure. We get no opportunity to perform “partial rollbacks”.

In other words, the critical section must be a transaction.

Enabling rseq

Using rseqs requires turning on support for it for a particular thread; this is what calling rseq(2) (the syscall) accomplishes.

The signature for this syscall looks like this:

// This type is part of Linux's ABI.
#[repr(C, align(32))]
struct Rseq {
  cpu_id_start: u32,
  cpu_id: u32,
  crit_sec: u64,
  flags: u32,
}

// Note: this is a syscall, not an actual Rust function.
fn rseq(rseq: *mut Rseq, len: u32, flags: i32, signature: u32) -> i32;

The syscall registers “the” Rseq struct for the current thread; there can be at most one, per thread.

rseq is a pointer to this struct. len should be size_of::<Rseq>(), and signature can be any 32-bit integer (more on this later). For our purposes, we can ignore flags on the struct.

flags on the syscall, on the other hand, is used to indicate whether we’re unregistering the struct; this is explained below.

In the interest of exposition, we’ll call the syscall directly. If you’ve never seen how a Linux syscall is done (on x86), you load the syscall number into rax, then up to six arguments in rdi, rsi, rdx, r10, r8, r91. We only need the first four.

The return value comes out in rax, which is 0 on success, and a negative of an errno code otherwise. In particular, we need to check for EINTR to deal with syscall interruption. (every Linux syscall can be interrupted).

unsafe fn raw_rseq(rseq: *mut Rseq, unregister: bool, signature: u32) {
  // Perform an open-coded Linux syscall.
  loop {
    let mut rax = 334;  // rseq(2) syscall number; x86-specific.
    asm! {
      "syscall",
      inout("rax") rax,
      /* rseq:      */ in("rdi") rseq,
      /* len:       */ in("rsi") mem::size_of::<Rseq>(),
      /* flags:     */ in("rdx") unregister as u64,
      /* signature: */ in("r10") signature,
    }
    match rax {
      0 => break,      // Success, we're done.
      -4 => continue,  // EINTR, try again.
      errno => panic!("error calling rseq(2): {}", -errno),
    }
  }
}

Note the unregister parameter: this is used to tear down rseq support on the way out of a thread. Generally, rseq will be a thread-local, and registration happens at thread startup. Glibc will do this and has a mechanism for acquiring the rseq pointer. Unfortunately, the glibc I have isn’t new enough to know to do this, so I hacked up something to register my own thread local.

I had the bright idea of putting my Rseq struct in a box, which triggered an interesting bug: when a thread exits, it destroys all of the thread local variables, including the box to hold our Rseq. But if the thread then syscalls to deallocate its stack, when the kernel goes to resume, it will attempt to write the current CPU index to the rseq.cpu_id field.

This presents a problem, because the kernel is probably going to write to a garbage location. This is all but guaranteed to result in a segfault. Debuggers observe this as a segfault on the instruction right after the syscall instruction; I spent half an hour trying to figure out what was causing a call to madvise(2) to segfault.

Hence, we need to wrap our thread local in something that will call rseq(2) to unregister the struct. Putting everything together we get something like this.

fn current_thread_rseq() -> *mut Rseq {
  // This has to be its own struct so we can run a thread-exit destructor.
  pub struct RseqBox(Box<UnsafeCell<Rseq>>);
  impl Drop for RseqBox {
    fn drop(&mut self) {
      unsafe { raw_rseq(self.0.get(), true, RSEQ_SIG); }
    }
  }

  thread_local! {
    static RSEQ: RseqBox = {
      // Has to be in a box, since we need pointer stability.
      let rseq = RseqBox(Box::new(UnsafeCell::new(Rseq {
        cpu_id_start: 0,
        cpu_id: !0,
        crit_sec: 0,
        flags: 0,
      })));

      // Register it!!!
      unsafe { raw_rseq(rseq.0.get(), false, RSEQ_SIG); }
      rseq
    };
  }

  RSEQ.with(|ra| ra.0.get())
}

Per Rust’s semantics, this will execute the first time we access this thread local, instead of at thread startup. Not ideal, since now we pay for an (uncontended) atomic read every time we touch RSEQ, but it will do.

Creating a Critical Section

To set up and execute a restartable sequence, we need to assemble a struct that describes it. The following struct is also defined by Linux’s syscall ABI:

#[repr(C, align(32))]
struct CritSec {
  version: u32,
  flags: u32,
  start: u64,
  len: u64,
  abort_handler: u64,
}

start is the address of the first instruction in the sequence, and len is the length of the sequence in bytes. abort_handler is the address of the abort handler. version must be 0 and we can ignore flags.

Once we have a value of this struct (on the stack or as a constant), we grab RSEQ and atomically store the address of our CritSec to RSEQ.crit_sec. This needs to be atomic because the kernel may decide to look at this pointer from a different CPU core, but it likely will not be contended.

Note that RSEQ.crit_sec should be null before we do this; restartable sequences can’t nest.

Next time the kernel preempts our thread (and later gets ready to resume it), it will look at RSEQ.crit_sec to decide if it preempted a restartable sequence and, if so, jump to the abort handler.

Once we finish our critical section, we must reset RSEQ.crit_sec to 0.

Labels and Constants, Oh My

There is a wrinkle: we would like for our CritSec value to be a constant, but Rust doesn’t provide us with a way to initialize the start and abort_handler fields directly, since it doesn’t have a way to refer2 to the labels (jump targets) inside the inline assembly. The simplest way to get around this is to assemble (lol) the CritSec on the stack, with inline assembly. The overhead is quite minimal.

On x86, this is what our boilerplate will look like:

let mut cs = MaybeUninit::<CritSec>::uninit();
let mut ok = 1;

asm! {r"
  // We meed to do `rip`-relative loads so that this code is PIC;
  // otherwise we'll get linker errors. Thus, we can't `mov`
  // directly; we need to compute the address with a `lea`
  // first.

  // Initialize the first two fields to zero.
  mov qword ptr [{_cs}], 0

  // Load `90f` into `cs.start`. Note that this is 'forward
  // reference' to the jump target `90:` below.
  lea {_pc}, [90f + rip]
  mov qword ptr [{_cs} + 8], {_pc}

  // We need to get the difference `91f - 90f` into `cs.len`.
  // To do that, we write `-90f` to it, and then add `91f`.
  neg {_pc}
  mov qword ptr [{_cs} + 16], {_pc}
  lea {_pc}, [91f + rip]
  add qword ptr [{_cs} + 16], {_pc}

  // Same as the first line, but loading `cs.abort_handler`.
  lea {_pc}, [92f + rip]
  mov qword ptr [{_cs} + 24], {_pc}

  // Write `&cs` to `RSEQ.crit_sec`. This turns on
  // restartable sequence handling.
  mov qword ptr [{rseq} + 8], {_cs}

90:
  // Do something cool here (coming soon).

91:
  // Jump over the abort handler.
  jmp 93f

  .int 0x53053053  // The signature!
92:
  // Nothing special, just zero `ok` to indicate this was a failure.
  // This is written this way simply because we can't early-return
  // out of inline assembly.
  xor {_ok:e}, {_ok:e}

93:
  // Clear `RSEQ.crit_sec`, regardless of which exit path
  // we took.
  mov qword ptr [{rseq} + 8], 0
  ",
  _pc = out(reg) _,
  _ok = inout(reg) ok,
  _cs = in(reg) &mut cs as *mut CritSec,
  rseq = in(reg) current_thread_rseq(),
}

A few things to note:

  1. Because this is inline assembly, we need to use numeric labels. I’ve chosen labels in the 90s for no particular reason. 90: declares a jump target, and 90f is a forward reference to that instruction address.
  2. Most of this assembly is just initalizing a struct3. It’s not until the mov right before 90: (the critical section start) that anything interesting happens.
  3. Immediately before 92: (the abort handler) is an .int directive that emits the same four-byte signature we passed to rseq(2) into the instruction stream. This must be here, otherwise the kernel will issue a segfault to the thread. This is a very basic control-flow integrity feature.
  4. We clear RSEQ.crit_sec at the very end.

This is a lot of boilerplate. In an ideal world, we could have something like the following:

fn run_rseq(cs: extern "C" unsafe fn(u32));

Unfortunately, this is very hard to do, because the constraints on restartable sequences are draconian:

This means that you can’t have the compiler generating code for you; it might outline things or move things around in ways you don’t want. In something like ASAN mode, it might inject function calls that will completely break the primitive.

This means we muyst write our critical section in assembly. That assembly also almost unavoidably needs to be part of the boilerplate given above, and it means it can’t participate in ASAN or TSAN instrumentation.

In the interest of exposition, we can build a wrapper over this inline assembly boilerplate that looks something like this:

let result: Result<(), RseqAbort> = rseq! {r"
    // Assembly for our critical section...
  ",
  // Inline asm constraints.
};

When I wrote the snippet above, I chose numeric labels in the 90s to avoid potential conflicts with whatever assembly gets pasted here. This is also why I used a leading _ on the names of some of the assembly constraints; thise are private to the macro. rseq isn’t, though, since callers will want to access the CPU id in it.

The intent is for the assembly string to be pasted over the // Do something cool here comment, and for the constraints to be tacked on after the boilerplate’s constraints.

But with that we now have access to the full rseq primitive, in slightly sketchy macro form. Let’s use it to build a CPU-local data structure.

A Checkout Desk

Let’s say we have a pool of objects that are needed to perform an allocation, our putative page caches. Let’s say we have the following interface:

impl FreeList {
  unsafe fn get_cache(&self) -> *mut PageCache;
  unsafe fn return_cache(&self, *mut PageCache);
}

get_cache() grabs a cache of pages off the global free list. This requires taking a lock or traversing a lockless linked list, so it’s pretty expensive. return_cache() returns a cache back to the global free list for re-use; it is a similarly expensive operation. Both of these operations are going to be contended like crazy, so we want to memoize them.

To achieve this, we want one slot for every CPU to hold the cache it (or rather, a thread running on it) most recently acquired, so that it can be reused. These slots will have “checkout desk” semantics: if you take a thing, you must put something in its place, even if it’s just a sign that says you took the thing.

Matthew Kulukundis came up with this idea, and he’d totally put this gif in a slide deck about this data structure.

As a function signature, this is what it looks like:

impl<T> PerCpu<T> {
  fn checkout(&self, replacement: *mut T) -> *mut T;
}

We can then use it like this.

let free_list: &FreeList = ...;
let per_cpu: &PerCpu<PageCache> = ...;
let iou = 0 as *mut PageCache;

// Check out this CPU's cache pointer, and replace it with
// an IOU note (a null pointer).
let mut cache = per_cpu.checkout(iou);
if cache == iou {
  // If we got an IOU ourselves, this means another thread that
  // was executing on this CPU took the cache and left *us* with
  // a null, so we need to perform the super-expensive operation
  // to acquire a new one.
  cache = free_list.get_cache();
}

// Do stuff with `cache` here. We have unique access to it.
cache.alloc_page(...);

// Return the pointer to the checkout desk.
cache = per_cpu.checkout(cache);
if cache != iou {
  // Usually, we expect to get back the IOU we put into the cache.
  // If we don't, that probably means another thread (or
  // hundreds) are hammering this slot and fighting for page caches.
  // If this happens, we need to throw away the cache.
  free_list.return_cache(cache);
}

The semantics of PerCpu<T> is that it is an array of nprocs (the number of logical cores on the system) pointers, all initialized to null. checkout() swaps the pointer stored in the current CPU’s slot in the PerCpu<T> with the replacement argument.

Building the Checkout Desk

The implementation of this type is relatively simple, but the devil is in the details. Naively, you’d think you literally want an array of pointers:

pub struct PerCpu<T> {
  ptrs: Box<[*mut T]>,
}

unsafe impl<T> Send for PerCpu<T> {}
unsafe impl<T> Sync for PerCpu<T> {}

Unfortunately, this is cache-hostile. We expect that (depending on how ptrs is aligned in memory) for eight CPUs’ checkout pointers to be on the same cache line. This means eight separate cores are going to be writing to the same cache line, which is going to result in a lot of cache thrash. This memory wants to be in L1 cache, but will probably wind up mostly in shared L3 cache.

This effect is called “false sharing”, and is a fundamental part of the design of modern processors. We have to adjust for this.

Instead, we want to give each core a full cache line (64 bytes aligned to a 64-byte boundary) for it to store its pointer in. This sounds super wasteful (56 of those bytes will go unused), but this is the right call for a perf-sensitive primitive.

This amount of memory can add up pretty fast (two whole pages of memory for a 128-core server!), so we’ll want to lazilly initialize them. Our cache-friendly struct will look more like this:

pub struct PerCpu<T> {
  ptrs: Box<[AtomicPtr<CacheLine<*mut T>>]>,
}

// This struct wraps a T and forces it to take up an entire cache line.
#[repr(C, align(64))]
struct CacheLine<T>(T);

unsafe impl<T> Send for PerCpu<T> {}
unsafe impl<T> Sync for PerCpu<T> {}

Initializing it requires finding out how many cores there are on the machine. This is a… fairly platform-specific affair. Rust does offer a “maximum paralellism” query in its standard library, but it is intended as a hint for how many worker threads to spawn, as opposed to a hard upper bound on the number of CPU indices.

Instead, we call get_nprocs_conf(), which is fine since we’re already extremely non-portable already. This is a GNU libc extension.

In code…

impl<T> PerCpu<T> {
  pub fn new() -> Self {
    extern "C" {
      // #include <sys/sysinfo.h>
      //
      // This function returns the maximum number of cores the
      // kernel knows of for the current machine. This function
      // is very expensive to call, so we need to cache it.
      fn get_nprocs_conf() -> i32;
    }

    static mut NPROCS: usize = 0;
    static INIT: Once = Once::new();
    INIT.call_once(|| unsafe {
      NPROCS = get_nprocs_conf() as usize;
    });

    let len = unsafe { NPROCS };
    let mut ptrs = Vec::with_capacity(len);
    for _ in 0..len {
      ptrs.push(AtomicPtr::new(ptr::null_mut()));
    }

    Self { ptrs: ptrs.into_boxed_slice() }
  }
}

(I’m not going to implement Drop for this type. That’s an exercise for the reader.)

Implementing checkout()

Now’s the moment we’ve all be waiting for: writing our restartable sequence. As critical sections go, this one’s pretty simple:

  1. Index into the ptrs array to get this CPU’s pointer-to-cache-line.
  2. If that pointer is null, bail out of the rseq and initialize a fresh cache line (and then try again).
  3. If it’s not null, swap replacement with the value in the cache line.
impl<T> PerCpu<T> {
  fn checkout(&self, mut replacement: *mut T) -> *mut T {
    // We need to try this operation in a loop, to deal with
    // rseq aborts.
    loop {
      let ptrs = self.ptrs.as_ptr();
      let mut vcpu: i32 = -1;
      let mut need_alloc: i32 = 1;
      let result: Result<(), RseqAbort> = rseq! {r"
        // Load the current CPU number.
        mov {vcpu:e}, dword ptr [{rseq} + 4]

        // Load the `vcpu`th pointer from `ptrs`.
        // On x86, `mov` is atomic. The only threads we might
        // be condending with are those that are trying to
        // initialize this pointer if it's null.
        mov {scratch}, qword ptr [{ptrs} + 8 * {vcpu:r}]

        // If null, exit early and trigger an allocation
        // for this vcpu.
        test {scratch}, {scratch}
        jz 1f

        // Make sure the outer code knows not to allocate
        // a new cache line.
        xor {need_alloc:e}, {need_alloc:e}

        // Commit the checkout by exchanging `replacement`.
        xchg {ptr}, qword ptr [{scratch}]
      1:
        ",
        ptrs = in(reg) ptrs,
        scratch = out(reg) _,
        ptr = inout(reg) replacement,
        vcpu = out(reg) vcpu,
        need_alloc = inout(reg) need_alloc,
      };

      // We got preempted, so it's time to try again.
      if result.is_err() { continue }

      // If we don't need to allocate, we're done.
      if need_alloc == 0 { return replacement }

      // Otherwise, allocate a new cache line and cas it into
      // place. This is Atomics 101, nothing fancy.
      let mut cache_line = Box::new(CacheLine(ptr::null_mut()));
      loop {
        let cas = self.ptrs[vcpu as usize].compare_exchange_weak(
          ptr::null_mut(), cache_line.as_mut(),
          Ordering::AcqRel, Ordering::Relaxed,
        );

        match cas {
          Ok(p) => {
            // Successful allocation.
            debug_assert!(p.is_null());
            // Make sure to stop `cache_line`'s memory
            // from being freed by `Box`'s dtor.
            mem::forget(cache_line);
            break;
          }
          // Try again: this is a spurious failure.
          Err(p) if p.is_null() => continue,
          // Someone got here first; we can just discard
          // `Box`.
          Err(_) => break,
        }
      }
    }
  }
}

This code listing is a lot to take in. It can be broken into two parts: the restartable sequence itself, and the allocation fallback if the pointer-to-cache-line happens to be null.

The restartable sequence is super short. It looks at the pointer-to-cache-line, bails if its null (this triggers the later part of the function) and then does an xchg between the actual *mut T in the per-CPU cache line, and the replacement.

If the rseq aborts, we just try again. This is short enough that preemption in the middle of the rseq is quite rare. Then, if need_alloc was zeroed, that means we successfully committed, so we’re done.

Otherwise we need to allocate a cache line for this CPU. We’re now outside of the rseq, so we’re back to needing atomics. Many threads might be racing to be the thread that initializes the pointer-to-cache-line; we use a basic cas loop to make sure that we only initialize from null, and if someone beats us to it, we don’t leak the memory we had just allocated. This is an RMW operation, so we want both acquire and release ordering. Atomics 101!

Then, we try again. Odds are good we won’t have migrated CPUs when we execute again, so we won’t need to allocate again. Eventually all of the pointers in the ptrs array will be non-null, so in the steady state this needs_alloc case doesn’t need to happen.

Conclusion

This is just a glimpse of what per-CPU concurrent programming looks like. I’m pretty new to it myself, and this post was motivated by building an end-to-end example in Rust. You can read more about how TCMalloc makes use of restartable sequences here.

  1. This is annoyingly different from the function calling convention, which passes arguments in rdi, rsi, rdx, rcx, r8, r9, with the mnemonic “Diana’s silk dress cost $89.” I don’t know a cute mnemonic for the syscall registers. 

  2. It’s actually worse than that. You’d think you could do

      jmp foo
    pointers:
      .int foo
    
    foo:
      mov qword ptr [rax], pointers
    

    but this makes the resulting code non-position-independent on x86. What this means is that the code must know at link time what address it will be loaded at, which breaks the position-independent requirement of many modern platforms.

    Indeed, this code will produce a linker error like the following:

      = note: /usr/bin/ld: /home/mcyoung/projects/cpulocal/target/debug/deps/cpulocal-a7eeabaf0b1f2c43.2l48u2rfiak1q1ik.rcgu.o:
          relocation R_X86_64_32 against `.text._ZN8cpulocal15PerCpu$LT$T$GT$8checkout17h42fde3ce3bd0180aE'
          can not be used when making a PIE object; recompile with -fPIE
          collect2: error: ld returned 1 exit status
    

    Not only is .int foo a problem, but so is referring to pointers. Instead we must write

    lea rcx, qword ptr [pointers + rip]
    mov qword ptr [rax], rcx
    

    to be able to load the address of pointers at all. This can be worked around if you’re smart; after all, it is possible to put the addresses of functions into static variables and not have the linker freak out. It’s too hard to do in inline assembly tho. 

  3. Basically this code, which can’t be properly-expressed in Rust.

    let cs = CrtiticalSection {
      version: 0,
      flags: 0,
      start: /* &90f */,
      len: /* &91f - &90f */,
      abort: /* &92f */,
    };
    

Related Posts

Atomicless Concurrency

Let’s say we’re building an allocator. Good allocators need to serve many threads simultaneously, and as such any lock they take is going to be highly contended. One way to work around this, pioneered by TCMalloc, is to have thread-local caches of blocks (hence, the “TC” - thread cached).

Unfortunately threads can be ephemeral, so book-keeping needs to grow dynamically, and large, complex programs (like the Google Search ranking server) can have tens of thousands of threads, so per-thread cost can add up. Also, any time a thread context-switches and resumes, its CPU cache will contain different cache lines – likely the wrong ones. This is because either another thread doing something compeltely different executed on that CPU, or the switched thread migrated to execute on a different core.

These days, instead of caching per-thread, TCMalloc uses per-CPU data. This means that book-keeping is fixed, and this is incredibly friendly to the CPU’s cache: in the steady-state, each piece of the data will only ever be read or written to by a single CPU. It also has the amazing property that there are no atomic operations involved in the fast path, because operations on per-CPU data, by definition, do not need to be synchronized with other cores.

This post gives an overview of how to build a CPU-local data structure on modern Linux. The exposition will be for x86, but other than the small bits of assembly you need to write, the technique is architecture-independent.

The Kernel Primitive

Concurrency primitives require cooperating with the kernel, which is responsible for global scheduling decisions on the system. However, making syscalls is quite expensive; to alieviate this, there has been a trend in Linux to use shared memory as a kernelspace/userspace communication channel.

Futexes are the classic “cas-with-the-kernel” syscall (I’m assuming basic knowledge of atomic operations like cas in this article). In the happy path, we just need to cas on some memory to lock a futex, and only make a syscall if we need to go to sleep because of contention. The kernel will perform its own cas on this variable if necessary.

Restartable sequences are another such proto-primitive, which are used for per-CPUuprogramming. The relevant syscall for us, rseq(2), was added in Linux 4.18. Its manpage reads

A restartable sequence is a sequence of instructions guaranteed to be executed atomically with respect to other threads and signal handlers on the current CPU. If its execution does not complete atomically, the kernel changes the execution flow by jumping to an abort handler defined by userspace for that restartable sequence.

A restartable sequence, or “rseq” is a special kind of critical section that the kernel guarantees executes from start to finish without any kind of preemption. If preemption does happen (because of a signal or whatever), userspace observes this as a jump to a special handler for that critical section. Conceptually it’s like handling an exception:

try {
  // Per-CPU code here.
} catch (PremptionException) {
  // Handle having been preempted, which usally just means
  // "try again".
}

These critical sections are usually of the following form:

  1. Read the current CPU index (the rseq mechanism provides a way to do this).
  2. Index into some data structure and do something to it.
  3. Complete the operation with a single memory write. This is the “commit”.

All the kernel tells us is that we couldn’t finish successfully. We can always try again, but the critical section needs to be such that executing any prefix of it, up to the commit, has no effect on the data structure. We get no opportunity to perform “partial rollbacks”.

In other words, the critical section must be a transaction.

Enabling rseq

Using rseqs requires turning on support for it for a particular thread; this is what calling rseq(2) (the syscall) accomplishes.

The signature for this syscall looks like this:

// This type is part of Linux's ABI.
#[repr(C, align(32))]
struct Rseq {
  cpu_id_start: u32,
  cpu_id: u32,
  crit_sec: u64,
  flags: u32,
}

// Note: this is a syscall, not an actual Rust function.
fn rseq(rseq: *mut Rseq, len: u32, flags: i32, signature: u32) -> i32;

The syscall registers “the” Rseq struct for the current thread; there can be at most one, per thread.

rseq is a pointer to this struct. len should be size_of::<Rseq>(), and signature can be any 32-bit integer (more on this later). For our purposes, we can ignore flags on the struct.

flags on the syscall, on the other hand, is used to indicate whether we’re unregistering the struct; this is explained below.

In the interest of exposition, we’ll call the syscall directly. If you’ve never seen how a Linux syscall is done (on x86), you load the syscall number into rax, then up to six arguments in rdi, rsi, rdx, r10, r8, r91. We only need the first four.

The return value comes out in rax, which is 0 on success, and a negative of an errno code otherwise. In particular, we need to check for EINTR to deal with syscall interruption. (every Linux syscall can be interrupted).

unsafe fn raw_rseq(rseq: *mut Rseq, unregister: bool, signature: u32) {
  // Perform an open-coded Linux syscall.
  loop {
    let mut rax = 334;  // rseq(2) syscall number; x86-specific.
    asm! {
      "syscall",
      inout("rax") rax,
      /* rseq:      */ in("rdi") rseq,
      /* len:       */ in("rsi") mem::size_of::<Rseq>(),
      /* flags:     */ in("rdx") unregister as u64,
      /* signature: */ in("r10") signature,
    }
    match rax {
      0 => break,      // Success, we're done.
      -4 => continue,  // EINTR, try again.
      errno => panic!("error calling rseq(2): {}", -errno),
    }
  }
}

Note the unregister parameter: this is used to tear down rseq support on the way out of a thread. Generally, rseq will be a thread-local, and registration happens at thread startup. Glibc will do this and has a mechanism for acquiring the rseq pointer. Unfortunately, the glibc I have isn’t new enough to know to do this, so I hacked up something to register my own thread local.

I had the bright idea of putting my Rseq struct in a box, which triggered an interesting bug: when a thread exits, it destroys all of the thread local variables, including the box to hold our Rseq. But if the thread then syscalls to deallocate its stack, when the kernel goes to resume, it will attempt to write the current CPU index to the rseq.cpu_id field.

This presents a problem, because the kernel is probably going to write to a garbage location. This is all but guaranteed to result in a segfault. Debuggers observe this as a segfault on the instruction right after the syscall instruction; I spent half an hour trying to figure out what was causing a call to madvise(2) to segfault.

Hence, we need to wrap our thread local in something that will call rseq(2) to unregister the struct. Putting everything together we get something like this.

fn current_thread_rseq() -> *mut Rseq {
  // This has to be its own struct so we can run a thread-exit destructor.
  pub struct RseqBox(Box<UnsafeCell<Rseq>>);
  impl Drop for RseqBox {
    fn drop(&mut self) {
      unsafe { raw_rseq(self.0.get(), true, RSEQ_SIG); }
    }
  }

  thread_local! {
    static RSEQ: RseqBox = {
      // Has to be in a box, since we need pointer stability.
      let rseq = RseqBox(Box::new(UnsafeCell::new(Rseq {
        cpu_id_start: 0,
        cpu_id: !0,
        crit_sec: 0,
        flags: 0,
      })));

      // Register it!!!
      unsafe { raw_rseq(rseq.0.get(), false, RSEQ_SIG); }
      rseq
    };
  }

  RSEQ.with(|ra| ra.0.get())
}

Per Rust’s semantics, this will execute the first time we access this thread local, instead of at thread startup. Not ideal, since now we pay for an (uncontended) atomic read every time we touch RSEQ, but it will do.

Creating a Critical Section

To set up and execute a restartable sequence, we need to assemble a struct that describes it. The following struct is also defined by Linux’s syscall ABI:

#[repr(C, align(32))]
struct CritSec {
  version: u32,
  flags: u32,
  start: u64,
  len: u64,
  abort_handler: u64,
}

start is the address of the first instruction in the sequence, and len is the length of the sequence in bytes. abort_handler is the address of the abort handler. version must be 0 and we can ignore flags.

Once we have a value of this struct (on the stack or as a constant), we grab RSEQ and atomically store the address of our CritSec to RSEQ.crit_sec. This needs to be atomic because the kernel may decide to look at this pointer from a different CPU core, but it likely will not be contended.

Note that RSEQ.crit_sec should be null before we do this; restartable sequences can’t nest.

Next time the kernel preempts our thread (and later gets ready to resume it), it will look at RSEQ.crit_sec to decide if it preempted a restartable sequence and, if so, jump to the abort handler.

Once we finish our critical section, we must reset RSEQ.crit_sec to 0.

Labels and Constants, Oh My

There is a wrinkle: we would like for our CritSec value to be a constant, but Rust doesn’t provide us with a way to initialize the start and abort_handler fields directly, since it doesn’t have a way to refer2 to the labels (jump targets) inside the inline assembly. The simplest way to get around this is to assemble (lol) the CritSec on the stack, with inline assembly. The overhead is quite minimal.

On x86, this is what our boilerplate will look like:

let mut cs = MaybeUninit::<CritSec>::uninit();
let mut ok = 1;

asm! {r"
  // We meed to do `rip`-relative loads so that this code is PIC;
  // otherwise we'll get linker errors. Thus, we can't `mov`
  // directly; we need to compute the address with a `lea`
  // first.

  // Initialize the first two fields to zero.
  mov qword ptr [{_cs}], 0

  // Load `90f` into `cs.start`. Note that this is 'forward
  // reference' to the jump target `90:` below.
  lea {_pc}, [90f + rip]
  mov qword ptr [{_cs} + 8], {_pc}

  // We need to get the difference `91f - 90f` into `cs.len`.
  // To do that, we write `-90f` to it, and then add `91f`.
  neg {_pc}
  mov qword ptr [{_cs} + 16], {_pc}
  lea {_pc}, [91f + rip]
  add qword ptr [{_cs} + 16], {_pc}

  // Same as the first line, but loading `cs.abort_handler`.
  lea {_pc}, [92f + rip]
  mov qword ptr [{_cs} + 24], {_pc}

  // Write `&cs` to `RSEQ.crit_sec`. This turns on
  // restartable sequence handling.
  mov qword ptr [{rseq} + 8], {_cs}

90:
  // Do something cool here (coming soon).

91:
  // Jump over the abort handler.
  jmp 93f

  .int 0x53053053  // The signature!
92:
  // Nothing special, just zero `ok` to indicate this was a failure.
  // This is written this way simply because we can't early-return
  // out of inline assembly.
  xor {_ok:e}, {_ok:e}

93:
  // Clear `RSEQ.crit_sec`, regardless of which exit path
  // we took.
  mov qword ptr [{rseq} + 8], 0
  ",
  _pc = out(reg) _,
  _ok = inout(reg) ok,
  _cs = in(reg) &mut cs as *mut CritSec,
  rseq = in(reg) current_thread_rseq(),
}

A few things to note:

  1. Because this is inline assembly, we need to use numeric labels. I’ve chosen labels in the 90s for no particular reason. 90: declares a jump target, and 90f is a forward reference to that instruction address.
  2. Most of this assembly is just initalizing a struct3. It’s not until the mov right before 90: (the critical section start) that anything interesting happens.
  3. Immediately before 92: (the abort handler) is an .int directive that emits the same four-byte signature we passed to rseq(2) into the instruction stream. This must be here, otherwise the kernel will issue a segfault to the thread. This is a very basic control-flow integrity feature.
  4. We clear RSEQ.crit_sec at the very end.

This is a lot of boilerplate. In an ideal world, we could have something like the following:

fn run_rseq(cs: extern "C" unsafe fn(u32));

Unfortunately, this is very hard to do, because the constraints on restartable sequences are draconian:

  • Can’t jump out of the critical section until it completes or aborts. This means you can’t call functions or make syscalls!
  • Last instruction must be the commit, which is a memory store operation, not a return.

This means that you can’t have the compiler generating code for you; it might outline things or move things around in ways you don’t want. In something like ASAN mode, it might inject function calls that will completely break the primitive.

This means we muyst write our critical section in assembly. That assembly also almost unavoidably needs to be part of the boilerplate given above, and it means it can’t participate in ASAN or TSAN instrumentation.

In the interest of exposition, we can build a wrapper over this inline assembly boilerplate that looks something like this:

let result: Result<(), RseqAbort> = rseq! {r"
    // Assembly for our critical section...
  ",
  // Inline asm constraints.
};

When I wrote the snippet above, I chose numeric labels in the 90s to avoid potential conflicts with whatever assembly gets pasted here. This is also why I used a leading _ on the names of some of the assembly constraints; thise are private to the macro. rseq isn’t, though, since callers will want to access the CPU id in it.

The intent is for the assembly string to be pasted over the // Do something cool here comment, and for the constraints to be tacked on after the boilerplate’s constraints.

But with that we now have access to the full rseq primitive, in slightly sketchy macro form. Let’s use it to build a CPU-local data structure.

A Checkout Desk

Let’s say we have a pool of objects that are needed to perform an allocation, our putative page caches. Let’s say we have the following interface:

impl FreeList {
  unsafe fn get_cache(&self) -> *mut PageCache;
  unsafe fn return_cache(&self, *mut PageCache);
}

get_cache() grabs a cache of pages off the global free list. This requires taking a lock or traversing a lockless linked list, so it’s pretty expensive. return_cache() returns a cache back to the global free list for re-use; it is a similarly expensive operation. Both of these operations are going to be contended like crazy, so we want to memoize them.

To achieve this, we want one slot for every CPU to hold the cache it (or rather, a thread running on it) most recently acquired, so that it can be reused. These slots will have “checkout desk” semantics: if you take a thing, you must put something in its place, even if it’s just a sign that says you took the thing.

Matthew Kulukundis came up with this idea, and he’d totally put this gif in a slide deck about this data structure.

As a function signature, this is what it looks like:

impl<T> PerCpu<T> {
  fn checkout(&self, replacement: *mut T) -> *mut T;
}

We can then use it like this.

let free_list: &FreeList = ...;
let per_cpu: &PerCpu<PageCache> = ...;
let iou = 0 as *mut PageCache;

// Check out this CPU's cache pointer, and replace it with
// an IOU note (a null pointer).
let mut cache = per_cpu.checkout(iou);
if cache == iou {
  // If we got an IOU ourselves, this means another thread that
  // was executing on this CPU took the cache and left *us* with
  // a null, so we need to perform the super-expensive operation
  // to acquire a new one.
  cache = free_list.get_cache();
}

// Do stuff with `cache` here. We have unique access to it.
cache.alloc_page(...);

// Return the pointer to the checkout desk.
cache = per_cpu.checkout(cache);
if cache != iou {
  // Usually, we expect to get back the IOU we put into the cache.
  // If we don't, that probably means another thread (or
  // hundreds) are hammering this slot and fighting for page caches.
  // If this happens, we need to throw away the cache.
  free_list.return_cache(cache);
}

The semantics of PerCpu<T> is that it is an array of nprocs (the number of logical cores on the system) pointers, all initialized to null. checkout() swaps the pointer stored in the current CPU’s slot in the PerCpu<T> with the replacement argument.

Building the Checkout Desk

The implementation of this type is relatively simple, but the devil is in the details. Naively, you’d think you literally want an array of pointers:

pub struct PerCpu<T> {
  ptrs: Box<[*mut T]>,
}

unsafe impl<T> Send for PerCpu<T> {}
unsafe impl<T> Sync for PerCpu<T> {}

Unfortunately, this is cache-hostile. We expect that (depending on how ptrs is aligned in memory) for eight CPUs’ checkout pointers to be on the same cache line. This means eight separate cores are going to be writing to the same cache line, which is going to result in a lot of cache thrash. This memory wants to be in L1 cache, but will probably wind up mostly in shared L3 cache.

This effect is called “false sharing”, and is a fundamental part of the design of modern processors. We have to adjust for this.

Instead, we want to give each core a full cache line (64 bytes aligned to a 64-byte boundary) for it to store its pointer in. This sounds super wasteful (56 of those bytes will go unused), but this is the right call for a perf-sensitive primitive.

This amount of memory can add up pretty fast (two whole pages of memory for a 128-core server!), so we’ll want to lazilly initialize them. Our cache-friendly struct will look more like this:

pub struct PerCpu<T> {
  ptrs: Box<[AtomicPtr<CacheLine<*mut T>>]>,
}

// This struct wraps a T and forces it to take up an entire cache line.
#[repr(C, align(64))]
struct CacheLine<T>(T);

unsafe impl<T> Send for PerCpu<T> {}
unsafe impl<T> Sync for PerCpu<T> {}

Initializing it requires finding out how many cores there are on the machine. This is a… fairly platform-specific affair. Rust does offer a “maximum paralellism” query in its standard library, but it is intended as a hint for how many worker threads to spawn, as opposed to a hard upper bound on the number of CPU indices.

Instead, we call get_nprocs_conf(), which is fine since we’re already extremely non-portable already. This is a GNU libc extension.

In code…

impl<T> PerCpu<T> {
  pub fn new() -> Self {
    extern "C" {
      // #include <sys/sysinfo.h>
      //
      // This function returns the maximum number of cores the
      // kernel knows of for the current machine. This function
      // is very expensive to call, so we need to cache it.
      fn get_nprocs_conf() -> i32;
    }

    static mut NPROCS: usize = 0;
    static INIT: Once = Once::new();
    INIT.call_once(|| unsafe {
      NPROCS = get_nprocs_conf() as usize;
    });

    let len = unsafe { NPROCS };
    let mut ptrs = Vec::with_capacity(len);
    for _ in 0..len {
      ptrs.push(AtomicPtr::new(ptr::null_mut()));
    }

    Self { ptrs: ptrs.into_boxed_slice() }
  }
}

(I’m not going to implement Drop for this type. That’s an exercise for the reader.)

Implementing checkout()

Now’s the moment we’ve all be waiting for: writing our restartable sequence. As critical sections go, this one’s pretty simple:

  1. Index into the ptrs array to get this CPU’s pointer-to-cache-line.
  2. If that pointer is null, bail out of the rseq and initialize a fresh cache line (and then try again).
  3. If it’s not null, swap replacement with the value in the cache line.
impl<T> PerCpu<T> {
  fn checkout(&self, mut replacement: *mut T) -> *mut T {
    // We need to try this operation in a loop, to deal with
    // rseq aborts.
    loop {
      let ptrs = self.ptrs.as_ptr();
      let mut vcpu: i32 = -1;
      let mut need_alloc: i32 = 1;
      let result: Result<(), RseqAbort> = rseq! {r"
        // Load the current CPU number.
        mov {vcpu:e}, dword ptr [{rseq} + 4]

        // Load the `vcpu`th pointer from `ptrs`.
        // On x86, `mov` is atomic. The only threads we might
        // be condending with are those that are trying to
        // initialize this pointer if it's null.
        mov {scratch}, qword ptr [{ptrs} + 8 * {vcpu:r}]

        // If null, exit early and trigger an allocation
        // for this vcpu.
        test {scratch}, {scratch}
        jz 1f

        // Make sure the outer code knows not to allocate
        // a new cache line.
        xor {need_alloc:e}, {need_alloc:e}

        // Commit the checkout by exchanging `replacement`.
        xchg {ptr}, qword ptr [{scratch}]
      1:
        ",
        ptrs = in(reg) ptrs,
        scratch = out(reg) _,
        ptr = inout(reg) replacement,
        vcpu = out(reg) vcpu,
        need_alloc = inout(reg) need_alloc,
      };

      // We got preempted, so it's time to try again.
      if result.is_err() { continue }

      // If we don't need to allocate, we're done.
      if need_alloc == 0 { return replacement }

      // Otherwise, allocate a new cache line and cas it into
      // place. This is Atomics 101, nothing fancy.
      let mut cache_line = Box::new(CacheLine(ptr::null_mut()));
      loop {
        let cas = self.ptrs[vcpu as usize].compare_exchange_weak(
          ptr::null_mut(), cache_line.as_mut(),
          Ordering::AcqRel, Ordering::Relaxed,
        );

        match cas {
          Ok(p) => {
            // Successful allocation.
            debug_assert!(p.is_null());
            // Make sure to stop `cache_line`'s memory
            // from being freed by `Box`'s dtor.
            mem::forget(cache_line);
            break;
          }
          // Try again: this is a spurious failure.
          Err(p) if p.is_null() => continue,
          // Someone got here first; we can just discard
          // `Box`.
          Err(_) => break,
        }
      }
    }
  }
}

This code listing is a lot to take in. It can be broken into two parts: the restartable sequence itself, and the allocation fallback if the pointer-to-cache-line happens to be null.

The restartable sequence is super short. It looks at the pointer-to-cache-line, bails if its null (this triggers the later part of the function) and then does an xchg between the actual *mut T in the per-CPU cache line, and the replacement.

If the rseq aborts, we just try again. This is short enough that preemption in the middle of the rseq is quite rare. Then, if need_alloc was zeroed, that means we successfully committed, so we’re done.

Otherwise we need to allocate a cache line for this CPU. We’re now outside of the rseq, so we’re back to needing atomics. Many threads might be racing to be the thread that initializes the pointer-to-cache-line; we use a basic cas loop to make sure that we only initialize from null, and if someone beats us to it, we don’t leak the memory we had just allocated. This is an RMW operation, so we want both acquire and release ordering. Atomics 101!

Then, we try again. Odds are good we won’t have migrated CPUs when we execute again, so we won’t need to allocate again. Eventually all of the pointers in the ptrs array will be non-null, so in the steady state this needs_alloc case doesn’t need to happen.

Conclusion

This is just a glimpse of what per-CPU concurrent programming looks like. I’m pretty new to it myself, and this post was motivated by building an end-to-end example in Rust. You can read more about how TCMalloc makes use of restartable sequences here.

  1. This is annoyingly different from the function calling convention, which passes arguments in rdi, rsi, rdx, rcx, r8, r9, with the mnemonic “Diana’s silk dress cost $89.” I don’t know a cute mnemonic for the syscall registers. 

  2. It’s actually worse than that. You’d think you could do

      jmp foo
    pointers:
      .int foo
    
    foo:
      mov qword ptr [rax], pointers
    

    but this makes the resulting code non-position-independent on x86. What this means is that the code must know at link time what address it will be loaded at, which breaks the position-independent requirement of many modern platforms.

    Indeed, this code will produce a linker error like the following:

      = note: /usr/bin/ld: /home/mcyoung/projects/cpulocal/target/debug/deps/cpulocal-a7eeabaf0b1f2c43.2l48u2rfiak1q1ik.rcgu.o:
          relocation R_X86_64_32 against `.text._ZN8cpulocal15PerCpu$LT$T$GT$8checkout17h42fde3ce3bd0180aE'
          can not be used when making a PIE object; recompile with -fPIE
          collect2: error: ld returned 1 exit status
    

    Not only is .int foo a problem, but so is referring to pointers. Instead we must write

    lea rcx, qword ptr [pointers + rip]
    mov qword ptr [rax], rcx
    

    to be able to load the address of pointers at all. This can be worked around if you’re smart; after all, it is possible to put the addresses of functions into static variables and not have the linker freak out. It’s too hard to do in inline assembly tho. 

  3. Basically this code, which can’t be properly-expressed in Rust.

    let cs = CrtiticalSection {
      version: 0,
      flags: 0,
      start: /* &90f */,
      len: /* &91f - &90f */,
      abort: /* &92f */,
    };