pair of pared pears

Apr 17, 2024

Memory management in mpmetrics

This article is an extended, semi-literate overview of the memory management in mpmetrics. I think there are a lot of neat little details in the design of this library that lend themselves better to a more guided exposition than API documentation. I’ve made a few simplifications for didactic reasons, but the code otherwise closely mirrors the actual implementation.

If you’re not familiar with mpmetrics, check out my introduction to mpmetrics. The problem focused on in this post is dynamic allocation of variables backed by shared memory…​ in Python. It’s a tough challenge since, in many ways, Python is the wrong language for this kind of task. However its dynamic and flexible nature allow unusual and satisfying solutions to many challenges.

In a pickle

In order to store data in a structured fashion, we are going to need some types backed by shared memory. Let’s start by wrapping c_int64:

import ctypes

class Int64:
    size = ctypes.sizeof(ctypes.c_int64)

    def __init__(self, mem):
        self._mem = mem
        self._value = ctypes.c_int64.from_buffer(mem)
        self._value.value = 0

and proxy all of our attributes (except _value) onto c_int64:

    def __getattr__(self, name):
        return getattr(self.__dict__['_value'], name)

    def __setattr__(self, name, value):
        if '_value' in self.__dict__:
            setattr(self.__dict__['_value'], name, value)
        else:
            self.__dict__[name] = value

    def __delattr__(self, name):
        delattr(self.__dict__['_value'], name)

Lets try it out:

>>> mem = bytearray(Int64.size)
>>> x = Int64(mem)
>>> x.value
0
>>> x.value += 1
>>> x.value
1
>>> mem
bytearray(b'\x01\x00\x00\x00\x00\x00\x00\x00')

As I’m on a little-endian system, the least-significant byte comes first. We can even pickle and unpickle it as long as we define some helpers:

    def __getstate__(self):
        return self._mem

    def __setstate__(self, mem):
        self._mem = mem
        self._value = ctypes.c_int64.from_buffer(mem)

Continuing our example from above,

>>> import pickle
>>> pickle.loads(pickle.dumps(x))
>>> # No errors :)

This is an important feature, since with the forkserver and spawn start methods, all objects are pickled when passing them to the subprocess.

That went well, so lets try and tackle an array next. The size of the array will depend on the element class, and the number of elements. In order to keep the size as a class attribute (so we know how much memory we need to allocate), we create a new class for each type of array:

def Array(cls, n):
    class Array:
        size = cls.size * n

        def __init__(self, mem):
            self._mem = mem
            self._vals = []
            for i in range(n):
                off = i * cls.size
                self._vals.append(cls(mem[off:off + cls.size]))

We can also define some extra methods to make our class behave more like an array:

        def __len__(self):
            return n

        def __getitem__(self, key):
            return self._vals[key]

        def __iter__(self):
            return iter(self._vals)

as well as some helpers for pickling:

        def __getstate__(self):
            return self._mem

        def __setstate__(self, mem):
            self._mem = mem
            self._vals = []
            for i in range(n):
                off = i * cls.size
                val = cls.__new__(cls)
                val.__setstate__(self._mem[off:off + cls.size])
                self._vals.append(val)

    return Array

Let’s try it out:

>>> IntArray5 = Array(Int64, 5)
>>> a = IntArray5(bytearray(IntArray5.size))
>>> a[0].value = 5
>>> a[0].value += 10
>>> a[0].value
15

But there’s a problem when pickling:

>>> pickle.dumps(a)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: Can't pickle local object 'Array.<locals>.Array'

The problem is that pickle uses the type’s __qualname__ to identify the class to use when unpickling. We can see this if we disassemble our pickle from earlier:

>>> import pickletools
>>> pickletools.dis(pickletools.optimize(pickle.dumps(x)))
    0: \x80 PROTO      4
    2: \x95 FRAME      56
   11: \x8c SHORT_BINUNICODE '__main__'
   21: \x8c SHORT_BINUNICODE 'Int64'
   28: \x93 STACK_GLOBAL
   29: )    EMPTY_TUPLE
   30: \x81 NEWOBJ
   31: \x8c SHORT_BINUNICODE 'builtins'
   41: \x8c SHORT_BINUNICODE 'bytearray'
   52: \x93 STACK_GLOBAL
   53: C    SHORT_BINBYTES b'\x00\x00\x00\x00\x00\x00\x00\x00'
   63: \x85 TUPLE1
   64: R    REDUCE
   65: b    BUILD
   66: .    STOP

But since we create a new class every time we call Array, we can’t identify the class we created this way, since pickle has no way to tell what the arguments were to Array. We could rewrite Array to take cls and n as arguments to __init__, but then we wouldn’t know how much memory to allocate.

What we need is a way to record the arguments to Array so that we can create the correct class when unpickling. But the only thing we have to work with is the __qualname__

What if we store the arguments to Array in the class name itself?

The trick

Imagine for a moment that we just want to create Int64 Arrays, and we only need to store the length. We could create an object like

class IntType:
    def __init__(self, name, cls):
        self.__qualname__ = name
        self.name = name
        self.cls = cls

    def __getattr__(self, attr):
        return self.cls(self.name + '.' + attr, int(attr))

The usage is perhaps best-demonstrated by example:

>>> test = IntType('test', lambda *args: args)
>>> getattr(test, '5')
('test.5', 5)

The first argument to the function is the path we used to access that attribute, and the second is the value of the attribute. Now we can use this to create a new IntArray:

def _IntArray(__name__, n):
    cls = Int64
    size = cls.size * n

    ...

    return type(__name__, (), locals())

IntArray = IntType('IntArray', _IntArray)

We need to call the three-argument type instead of using the class keyword, since the name of the class we create will change based on n. Let’s try using this class again

>>> IntArray5 = getattr(IntArray, '5')
>>> a = IntArray5(bytearray(IntArray5.size))
>>> a[0].value = 5
>>> a[0].value += 10
>>> a[0].value
15

Looking good so far. Let’s try pickling it

>>> pickle.dumps(a)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
_pickle.PicklingError: Can't pickle <class '__main__.IntArray.5'>: it's not the same object as __main__.IntArray.5

Whoops. The problem is that every time we call _IntArray we create a new class. This is pretty easy to solve by wrapping __getattr__ in a decorator which saves its return value:

def saveattr(get):
    def wrapped(self, name):
        attr = get(self, name)
        setattr(self, name, attr)
        return attr
    return wrapped

Python won’t bother calling __getattr__ if the relevant attribute is already present in __dict__. Lets try pickling again:

>>> pickle.loads(pickle.dumps(a))

Success!

Further objectives

That was a nice warm up. Let’s try something more challenging. What if instead of a known element type and an unknown length, we tried making a class with an unknown object type and a fixed length:

def _Array5(__name__, cls):
    n = 5
    size = cls.size * n

    # ...

    return type(__name__, (), locals())

Array5 = ObjectType('Array5', _Array5)

So what should the name of IntArray5 be? Well, perhaps the most obvious thing would be

Array5.__main__.Int64

but with this kind of name we wouldn’t know when the name of the object started and when it ended. This would prevent us from nesting multiple ObjectTypes. So let’s use this name instead:

Array5.<.__main__.Int64.>

The base ObjectType just needs to have a < attribute:

class ObjectType:
    def __init__(self, name, cls):
        self.__qualname__ = name
        setattr(self, '<', self.Module(name + '.<', cls))

This attribute’s job is to parse the module portion of the class’s name. We do this by repeatedly trying to import the next attribute as a module:

    class Module:
        def __init__(self, name, cls, parent=None):
            self.name = name
            self.cls = cls
            self.parent = parent

        @saveattr
        def __getattr__(self, name):
            try:
                if self.parent:
                    module = self.parent.__name__ + '.' + name
                else:
                    module = name
                return type(self)(self.name, self.cls, importlib.import_module(module))
            except ModuleNotFoundError:
                if self.parent:
                    prefix = self.name + '.' + self.parent.__name__
                else:
                    prefix = self.name
                return ObjectType.Attr(prefix, self.cls, getattr(self.parent, name))

For example, say that we have a file a/b.py and inside that file we have a class C. When we access Array5.<.a.b.C, we will have

Array5: ObjectType('Array5', _Array5)
  <: ObjectType.Module('Array5.<', _Array5, None)
    a: ObjectType.Module('Array5.<.a', _Array5, a)
      b: ObjectType.Module('Array5.<.a.b', _Array5, a.b)
        C: ObjectType.Attr('Array5.<.a.b.C', _Array5, a.b.C)

At this point, we’ve gotten through the modules and finally made it to an object. Now we need to walk its attributes:

    class Attr:
        def __init__(self, name, cls, obj, nesting=1):
            self.name = name
            self.cls = cls
            self.obj = obj
            self.nesting = nesting

        @saveattr
        def __getattr__(self, name):
            nesting = self.nesting + (name == '<') - (name == '>')
            if name == '>' and not nesting:
                return self.cls(self.name + '.' + self.obj.__qualname__ + '.>', self.obj)
            else:
                return type(self)(self.name, self.cls, getattr(self.obj, name), nesting)

To continuing the above example, say that class C has a nested class D. When we access Array5.<.a.b.C.D.> we will have

Array5: ObjectType('Array5', _Array5)
  <: ObjectType.Module('Array5.<', _Array5, None)
    a: ObjectType.Module('Array5.<.a', _Array5, a)
      b: ObjectType.Module('Array5.<.a.b', _Array5, a.b)
        C: ObjectType.Attr('Array5.<.a.b.C', _Array5, a.b.C, 1)
          D: ObjectType.Attr('Array5.<.a.b.C.D', _Array5, a.b.C.D, 1)
            >: _Array5('Array5.<.a.b.C.D.>', a.b.C.D)

The nesting attribute helps us keep track of nested objects. For example say we wanted to create an Array5 of an Array5 of Int64s:

Array5: ObjectType('Array5', _Array5)
  <: ObjectType.Module('Array5.<', _Array5, None)
    __main__: ObjectType.Module('Array5.<.__main__', _Array5, __main__)
      Array5: ObjectType.Attr('Array5.<.__main__.Array5', _Array5, __main__.Array5, 1)
       <: ObjectType.Attr('Array5.<.__main__.Array5.<', _Array5, __main__.Array5.<, 2)
         __main__: ObjectType.Attr('Array5.<.__main__.Array5.<.__main__', _Array5, __main__.Array5.<.__main__, 2)
           Int64: ObjectType.Attr('Array5.<.__main__.Array5.<.__main__.Int64', _Array5, __main__.Array5.<.__main__.Int64, 2)
             >: ObjectType.Attr('Array5.<.__main__.Array5.<.__main__.Int64.>', _Array5, __main__.Array5.<.__main__.Int64.>, 1)
               >: _Array5('Array5.<.__main__.Array5.<.__main__.Int64.>.>', __main__.Array5.<.__main__.Int64.>.>)

Of course, this also means that < and > are special, and you can’t include unmatched brackets in your class hierarchy (like a certain ticklish language). A more robust system could prefix the type with the number attributes in the type:

Array5.6.__main__.Array5.2.__main__.Int64

but I like the aesthetics of angle brackets more. Speaking of which, to actually access the above class name, we’d have to type out something like

>>> getattr(getattr(getattr(getattr(Array5, '<').__main__.Array5, '<').__main__.Int64, '>'), '>')
<class '__main__.Array5.<.__main__.Array5.<.__main__.Int64.>.>'>

This is a real pain. Let’s add a helper to ObjectType:

    def __getitem__(self, cls):
        parent = getattr(self, '<')
        for subpath in itertools.chain(cls.__module__.split('.'), cls.__qualname__.split('.')):
            parent = getattr(parent, subpath)
        return getattr(parent, '>')

Now we can do

>>> Array5[Array5[Int64]]
<class '__main__.Array5.<.__main__.Array5.<.__main__.Int64.>.>'>

Much better.

A product of necessity

IntArrays and Array5s are all well and good, but what we really want is an Array where we can specify both the element type and the length. Since we already have an IntType and ObjectType, we can combine them together with a ProductType

class ProductType:
    def __init__(self, name, cls, argtypes, args=()):
        self.__qualname__ = name
        self.name = name
        self.cls = cls
        self.argtype = argtypes[0](self.name, self._chain)
        self.argtypes = argtypes[1:]
        self.args = args

    def _chain(self, name, arg):
        if self.argtypes:
            return type(self)(name, self.cls, self.argtypes, (*self.args, arg))
        return self.cls(name, *self.args, arg)

    @saveattr
    def __getattr__(self, name):
        return getattr(self.argtype, name)

    # __getitem__ omitted for brevity

Instead of constructing the class immediately, as before, we instead _chain into the next argtype. With this, we can now redefine Array:

def _Array(__name__, cls, n):
    size = cls.size * n

    # ...

    return type(__name__, (), locals())

Array = ProductType('Array', _Array, (ObjectType, IntType))

When we access something like Array[Int64, 5], the attributes will look like:

Array: ProductType('Array', _Array, (ObjectType, IntType))
  <: ObjectType.Module('Array.<', Array._chain, None)
    __main__: ObjectType.Module('Array5.<.__main__', Array._chain, __main__)
      Int64: ObjectType.Attr('Array5.<.__main__.Int64', Array._chain, __main__.Int64)
        >: IntType('Array5.<.__main__.Int64.>', Array[Int64]._chain)
          5: _Array('Array5.<.__main__.Int64.>.5', Int64, 5)

And we can finally pickle and unpickle:

>>> IntArray5 = Array[Int64, 5]
>>> a = IntArray5(bytearray(IntArray5.size))
>>> pickle.loads(pickle.dumps(a))

There’s a problem though:

>>> a[0].value = 15
>>> pickle.loads(pickle.dumps(a))[0].value
0

This is because when we slice a bytearray, we get a new bytearray with a copy of the original bytearray's contents. We can get around this by using a memoryview:

>>> a = IntArray5(memoryview(bytearray(IntArray5.size)))
>>> a[0].value = 15
>>> bytearray(a[0]._mem)
bytearray(b'\x0f\x00\x00\x00\x00\x00\x00\x00')
>>> bytearray(a._mem)[0:8]
bytearray(b'\x0f\x00\x00\x00\x00\x00\x00\x00')

But we can’t pickle it:

>>> pickle.dumps(a)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: cannot pickle 'memoryview' object

It’s time to actually start working with shared memory.

Malloc madness

The first thing we need need is a bulk source of shared memory. Unfortunately, we cannot use multiprocessing.shared_memory because we can’t expand its memory later. Metrics can be created at any point in the application’s lifetime, and we don’t necessarily know how many we will need when we have to create the first metric. For example, adding a label creates a new copy of a metric for that label, and it’s common to generate labels dynamically based on endpoints or status codes.

Instead, we open a TemporaryFile and truncate it as necessary to extend it.

import os
import mmap
from tempfile import TemporaryFile

class Heap:
    def __init__(self):
        # File backing our shared memory
        self._file = TemporaryFile()
        self._fd = self._file.fileno()
        # Allocate a page to start with
        os.truncate(self._fd, 4096)
        # Keep track of the memory we've mapped
        self._maps = [mmap.mmap(self._fd, map_size)]
        # Initialize a base pointer with the memory we just mapped
        self._base = Int64(memoryview(self._maps[0])[:self.size])
        # And make sure we don't reuse that memory later
        self._base.value = self._base.size

We’re going to be making a basic "bump" style allocator. The algorithm is really simple; in pseudocode it’s just:

def malloc(size):
    start = base
    base += size
    return start

Although it’s a little more complex than that: we need to ensure we have enough space in the file and take care when crossing page boundaries.

mmap doesn’t have to return contiguous memory when extending an existing mapping. For example, if we made two allocations of size 2048 and 4096, and we tried to allocate the first one at offset 0 and the second one at offset 2048, the second allocation would span two pages (ending at byte 6143). If the first page was mapped at address 16384, the second page would have to be mapped at address 20480 to ensure a contiguous mapping. But we can’t guarantee that with the mmap API. So instead, we round up to the next page boundary if we would otherwise cross it.

Allocations larger than a single page always cross page boundaries no matter how we align things. To solve this issue, we map all the pages for these allocations in one call to mmap, ensuring that we get a contiguous mapping. Then, we bump the base address to the next page boundary, ensuring that no other allocations will need those pages.

In detail, if the allocation spans multiple pages, we page-align the size.

    def _align_mask(x, mask):
        return (x + mask) & ~mask

    def align(x, a):
        return _align_mask(x, a - 1)

    def malloc(self, size):
        if size > 4096:
            size = align(size, 4096)

If we need to allocate a new page, enlarge the file and update the base:

        if self._base.value + size >= total:
            os.ftruncate(self._fd, align(total + size, 4096))
            self._base.value = align(self._base.value, 4096)

And finally, we can bump the base pointer and return a new Block:

        start = self._base.value
        self._base.value += size
        return Block(self, start, size)

Block is like a pointer, except it keeps track of how big it is and where it was allocated from.

import itertools

class Block:
    def __init__(self, heap, start, size):
        self.heap = heap
        self.start = start
        self.size = size

There’s only one major method, deref, which creates a memoryview. The first half of this function determines the page(s) we need to access, and what their offsets are:

    def deref(self):
        heap = self.heap
        first_page = int(self.start / 4096)
        last_page = int((self.start + self.size - 1) / 4096)
        nr_pages = last_page - first_page + 1
        page_off = first_page * 4096
        off = self.start - page_off

We store our mapped pages in list. Each element is a memoryview of the page, or None if we haven’t mapped it yet. To start, we extend the length of our list if it’s not big enough.

       if len(heap._maps) <= last_page:
           heap._maps.extend(itertools.repeat(None, last_page - len(heap._maps) + 1))

Then, we create a map at the location of the first page. malloc ensures we never have Blocks which cross page boundaries unless they are larger than a single page. Since multi-page allocations are the only allocations in the pages they use, we will never try to access the Nones occupying the later indices in the list.

       if not self.heap._maps[first_page]:
           heap._maps[first_page] = mmap.mmap(heap._fd, nr_pages * 4096,
                                              offset=page_off)

Finally, we can create a memory view out of the mapped page:

        return memoryview(heap._maps[first_page])[off:off+self.size]

Let’s try it out:

>>> h = Heap()
>>> block = h.malloc(InteArray5.size)
>>> a = IntArray5(block.deref())
>>> a[0].value = 15
>>> bytearray(a[0]._mem)
bytearray(b'\x0f\x00\x00\x00\x00\x00\x00\x00')
>>> bytearray(a._mem)[0:8]
bytearray(b'\x0f\x00\x00\x00\x00\x00\x00\x00')

Good so far, but we still can’t pickle the memoryview:

>>> pickle.dumps(a)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: cannot pickle 'memoryview' object

What about pickling the Block, which can create the memoryview from the Heap?

>>> pickle.dumps(block)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: cannot pickle '_io.BufferedRandom' object

Now the problem is that we can’t pickle the open file backing the Heap. And in general, there’s no way to pickle an open file since it might not be around whenever another python process gets around to unpickling it. But we just need to make pickling work when spawning new processes. As it turns out, the multiprocessing authors had the same problem, and came up with DupFd. We can use it to implement Heap's pickle helpers:

    from multiprocessing.reduction import DupFd

    def __getstate__(self):
        return DupFd(self._fd)

    def __setstate__(self, df):
        self._fd = df.detach()
        self._file = open(self._fd, 'a+b')
        self._maps = [mmap.mmap(self._fd, 4096)]
        self._base = Int64(memoryview(self._maps[0])[:Int64.size])

Under the hood, DupFd sets up a UNIX domain server which duplicates the file descriptor, and then sends it to the client when requested. The pickle data is just the address of the server:

>>> pickletools.dis(pickletools.optimize(pickle.dumps(h)))
    0: \x80 PROTO      4
    2: \x95 FRAME      113
   11: \x8c SHORT_BINUNICODE '__main__'
   21: \x8c SHORT_BINUNICODE 'Heap'
   27: \x93 STACK_GLOBAL
   28: )    EMPTY_TUPLE
   29: \x81 NEWOBJ
   30: \x8c SHORT_BINUNICODE 'multiprocessing.resource_sharer'
   63: \x8c SHORT_BINUNICODE 'DupFd'
   70: \x93 STACK_GLOBAL
   71: )    EMPTY_TUPLE
   72: \x81 NEWOBJ
   73: }    EMPTY_DICT
   74: \x8c SHORT_BINUNICODE '_id'
   79: \x8c SHORT_BINUNICODE '/tmp/pymp-i7ih27es/listener-a61oo9mt'
  117: K    BININT1    2
  119: \x86 TUPLE2
  120: s    SETITEM
  121: b    BUILD
  122: b    BUILD
  123: .    STOP

The server shuts down after sending the file descriptor, so we can only unpickle the heap once. Lets try it out:

>>> block = h.malloc(IntArray5.size)
>>> a = IntArray5(block.deref())
>>> b = IntArray5(pickle.loads(pickle.dumps(block)).deref())
>>> a[0].value = 85
>>> b[0].value
85

Success! But wouldn’t it be nice if we could just pickle the array directly?

Shipping and Receiving

Going back to Int64, we could rewrite it to take a Heap instead of raw memory:

class Int64:
    def __init__(self, heap):
        self._block = heap.malloc(self.size)
        self._value = ctypes.c_int64.from_buffer(self._block.deref())
        self._value.value = 0

    def __getstate__(self):
        return self._block

    def __setstate__(self, block):
        self._block = block
        self._value = ctypes.c_int64.from_buffer(block.deref())

    ...

But this breaks Array and Heap, since now we no longer have a way to create an Int64 from memory. What we really want is a second BoxedInt64 which takes a Heap while the regular Int64 still uses memory directly.

class BoxedInt64(Int64):
    def __init__(self, heap):
        self._block = heap.malloc(self.size)
	super().__init__(self._block.deref())

    def __getstate__(self):
        return self._block

    def __setstate__(self, block):
        self._block = block
	super()._setstate(block.deref())

Where we implement _setstate in Int64 like

    def _setstate(self, mem):
	self._value = ctypes.c_int64.from_buffer(mem)

Examining BoxedInt64, you may notice that aside from inheriting from Int64, it is otherwise completely generic. In fact, we can create boxed types on the fly by creating new subclasses with ObjectType:

class _Box:
    # Same as BoxedInt64

Box = ObjectType('Box', lambda name, cls: type(name, (_Box, cls), {}))

Which we can now use like

>>> a = Box[Array[Int64, 5]](h)
>>> b = pickle.loads(pickle.dumps(a))
>>> a[0].value = 33
>>> b[0].value
33

Epilogue

Hopefully this has been an interesting journey through the heart of mpmetrics. For expository purposes, I left out or skipped over many details, such as the many other types, locking, and of course this doesn’t even cover the metrics themselves. If you are interested in more details of how this library works, check out the mpmetrics internals documentation.