Memory management in mpmetrics
This article is an extended, semi-literate overview of the memory management in
mpmetrics
. I think there are a lot of
neat little details in the design of this library that lend themselves better
to a more guided exposition than API documentation. I’ve made a few
simplifications for didactic reasons, but the code otherwise closely mirrors
the actual implementation.
If you’re not familiar with mpmetrics
, check out my
introduction to mpmetrics. The problem
focused on in this post is dynamic allocation of variables backed by shared
memory… in Python. It’s a tough challenge since, in many ways, Python is the
wrong language for this kind of task. However its dynamic and flexible nature
allow unusual and satisfying solutions to many challenges.
In a pickle
In order to store data in a structured fashion, we are going to need some types
backed by shared memory. Let’s start by wrapping c_int64
:
import ctypes
class Int64:
size = ctypes.sizeof(ctypes.c_int64)
def __init__(self, mem):
self._mem = mem
self._value = ctypes.c_int64.from_buffer(mem)
self._value.value = 0
and proxy all of our attributes (except _value
) onto c_int64
:
def __getattr__(self, name):
return getattr(self.__dict__['_value'], name)
def __setattr__(self, name, value):
if '_value' in self.__dict__:
setattr(self.__dict__['_value'], name, value)
else:
self.__dict__[name] = value
def __delattr__(self, name):
delattr(self.__dict__['_value'], name)
Lets try it out:
>>> mem = bytearray(Int64.size) >>> x = Int64(mem) >>> x.value 0 >>> x.value += 1 >>> x.value 1 >>> mem bytearray(b'\x01\x00\x00\x00\x00\x00\x00\x00')
As I’m on a little-endian system, the least-significant byte comes first. We can even pickle and unpickle it as long as we define some helpers:
def __getstate__(self):
return self._mem
def __setstate__(self, mem):
self._mem = mem
self._value = ctypes.c_int64.from_buffer(mem)
Continuing our example from above,
>>> import pickle >>> pickle.loads(pickle.dumps(x)) >>> # No errors :)
This is an important feature, since with the forkserver
and spawn
start
methods,
all
objects are pickled when passing them to the subprocess.
That went well, so lets try and tackle an array next. The size of the array will depend on the element class, and the number of elements. In order to keep the size as a class attribute (so we know how much memory we need to allocate), we create a new class for each type of array:
def Array(cls, n):
class Array:
size = cls.size * n
def __init__(self, mem):
self._mem = mem
self._vals = []
for i in range(n):
off = i * cls.size
self._vals.append(cls(mem[off:off + cls.size]))
We can also define some extra methods to make our class behave more like an array:
def __len__(self):
return n
def __getitem__(self, key):
return self._vals[key]
def __iter__(self):
return iter(self._vals)
as well as some helpers for pickling:
def __getstate__(self):
return self._mem
def __setstate__(self, mem):
self._mem = mem
self._vals = []
for i in range(n):
off = i * cls.size
val = cls.__new__(cls)
val.__setstate__(self._mem[off:off + cls.size])
self._vals.append(val)
return Array
Let’s try it out:
>>> IntArray5 = Array(Int64, 5) >>> a = IntArray5(bytearray(IntArray5.size)) >>> a[0].value = 5 >>> a[0].value += 10 >>> a[0].value 15
But there’s a problem when pickling:
>>> pickle.dumps(a) Traceback (most recent call last): File "<stdin>", line 1, in <module> AttributeError: Can't pickle local object 'Array.<locals>.Array'
The problem is that pickle
uses the type’s __qualname__
to identify the
class to use when unpickling. We can see this if we disassemble our pickle from
earlier:
>>> import pickletools >>> pickletools.dis(pickletools.optimize(pickle.dumps(x))) 0: \x80 PROTO 4 2: \x95 FRAME 56 11: \x8c SHORT_BINUNICODE '__main__' 21: \x8c SHORT_BINUNICODE 'Int64' 28: \x93 STACK_GLOBAL 29: ) EMPTY_TUPLE 30: \x81 NEWOBJ 31: \x8c SHORT_BINUNICODE 'builtins' 41: \x8c SHORT_BINUNICODE 'bytearray' 52: \x93 STACK_GLOBAL 53: C SHORT_BINBYTES b'\x00\x00\x00\x00\x00\x00\x00\x00' 63: \x85 TUPLE1 64: R REDUCE 65: b BUILD 66: . STOP
But since we create a new class every time we call Array
, we can’t identify
the class we created this way, since pickle
has no way to tell what the
arguments were to Array
. We could rewrite Array
to take cls
and n
as
arguments to __init__
, but then we wouldn’t know how much memory to
allocate.
What we need is a way to record the arguments to Array
so that we can create
the correct class when unpickling. But the only thing we have to work with is
the __qualname__
What if we store the arguments to Array
in the class name itself?
The trick
Imagine for a moment that we just want to create Int64
Array
s, and we
only need to store the length. We could create an object like
class IntType:
def __init__(self, name, cls):
self.__qualname__ = name
self.name = name
self.cls = cls
def __getattr__(self, attr):
return self.cls(self.name + '.' + attr, int(attr))
The usage is perhaps best-demonstrated by example:
>>> test = IntType('test', lambda *args: args) >>> getattr(test, '5') ('test.5', 5)
The first argument to the function is the path we used to access that
attribute, and the second is the value of the attribute. Now we can use this to
create a new IntArray
:
def _IntArray(__name__, n):
cls = Int64
size = cls.size * n
...
return type(__name__, (), locals())
IntArray = IntType('IntArray', _IntArray)
We need to call the three-argument type
instead of using the class
keyword,
since the name of the class we create will change based on n
. Let’s try using
this class again
>>> IntArray5 = getattr(IntArray, '5') >>> a = IntArray5(bytearray(IntArray5.size)) >>> a[0].value = 5 >>> a[0].value += 10 >>> a[0].value 15
Looking good so far. Let’s try pickling it
>>> pickle.dumps(a) Traceback (most recent call last): File "<stdin>", line 1, in <module> _pickle.PicklingError: Can't pickle <class '__main__.IntArray.5'>: it's not the same object as __main__.IntArray.5
Whoops. The problem is that every time we call _IntArray
we create a new
class. This is pretty easy to solve by wrapping __getattr__
in a decorator
which saves its return value:
def saveattr(get):
def wrapped(self, name):
attr = get(self, name)
setattr(self, name, attr)
return attr
return wrapped
Python won’t bother calling __getattr__
if the relevant attribute is
already present in __dict__
. Lets try pickling again:
>>> pickle.loads(pickle.dumps(a))
Success!
Further objectives
That was a nice warm up. Let’s try something more challenging. What if instead of a known element type and an unknown length, we tried making a class with an unknown object type and a fixed length:
def _Array5(__name__, cls):
n = 5
size = cls.size * n
# ...
return type(__name__, (), locals())
Array5 = ObjectType('Array5', _Array5)
So what should the name of IntArray5
be? Well, perhaps the most obvious thing would be
Array5.__main__.Int64
but with this kind of name we wouldn’t know when the name of the object started
and when it ended. This would prevent us from nesting multiple ObjectType
s.
So let’s use this name instead:
Array5.<.__main__.Int64.>
The base ObjectType
just needs to have a <
attribute:
class ObjectType:
def __init__(self, name, cls):
self.__qualname__ = name
setattr(self, '<', self.Module(name + '.<', cls))
This attribute’s job is to parse the module portion of the class’s name. We do this by repeatedly trying to import the next attribute as a module:
class Module:
def __init__(self, name, cls, parent=None):
self.name = name
self.cls = cls
self.parent = parent
@saveattr
def __getattr__(self, name):
try:
if self.parent:
module = self.parent.__name__ + '.' + name
else:
module = name
return type(self)(self.name, self.cls, importlib.import_module(module))
except ModuleNotFoundError:
if self.parent:
prefix = self.name + '.' + self.parent.__name__
else:
prefix = self.name
return ObjectType.Attr(prefix, self.cls, getattr(self.parent, name))
For example, say that we have a file a/b.py
and inside that file we have a
class C
. When we access Array5.<.a.b.C
, we will have
Array5: ObjectType('Array5', _Array5) <: ObjectType.Module('Array5.<', _Array5, None) a: ObjectType.Module('Array5.<.a', _Array5, a) b: ObjectType.Module('Array5.<.a.b', _Array5, a.b) C: ObjectType.Attr('Array5.<.a.b.C', _Array5, a.b.C)
At this point, we’ve gotten through the modules and finally made it to an object. Now we need to walk its attributes:
class Attr:
def __init__(self, name, cls, obj, nesting=1):
self.name = name
self.cls = cls
self.obj = obj
self.nesting = nesting
@saveattr
def __getattr__(self, name):
nesting = self.nesting + (name == '<') - (name == '>')
if name == '>' and not nesting:
return self.cls(self.name + '.' + self.obj.__qualname__ + '.>', self.obj)
else:
return type(self)(self.name, self.cls, getattr(self.obj, name), nesting)
To continuing the above example, say that class C
has a nested class D
. When
we access Array5.<.a.b.C.D.>
we will have
Array5: ObjectType('Array5', _Array5) <: ObjectType.Module('Array5.<', _Array5, None) a: ObjectType.Module('Array5.<.a', _Array5, a) b: ObjectType.Module('Array5.<.a.b', _Array5, a.b) C: ObjectType.Attr('Array5.<.a.b.C', _Array5, a.b.C, 1) D: ObjectType.Attr('Array5.<.a.b.C.D', _Array5, a.b.C.D, 1) >: _Array5('Array5.<.a.b.C.D.>', a.b.C.D)
The nesting
attribute helps us keep track of nested objects. For example say
we wanted to create an Array5
of an Array5
of Int64
s:
Array5: ObjectType('Array5', _Array5) <: ObjectType.Module('Array5.<', _Array5, None) __main__: ObjectType.Module('Array5.<.__main__', _Array5, __main__) Array5: ObjectType.Attr('Array5.<.__main__.Array5', _Array5, __main__.Array5, 1) <: ObjectType.Attr('Array5.<.__main__.Array5.<', _Array5, __main__.Array5.<, 2) __main__: ObjectType.Attr('Array5.<.__main__.Array5.<.__main__', _Array5, __main__.Array5.<.__main__, 2) Int64: ObjectType.Attr('Array5.<.__main__.Array5.<.__main__.Int64', _Array5, __main__.Array5.<.__main__.Int64, 2) >: ObjectType.Attr('Array5.<.__main__.Array5.<.__main__.Int64.>', _Array5, __main__.Array5.<.__main__.Int64.>, 1) >: _Array5('Array5.<.__main__.Array5.<.__main__.Int64.>.>', __main__.Array5.<.__main__.Int64.>.>)
Of course, this also means that <
and >
are special, and you can’t include
unmatched brackets in your class hierarchy (like a certain ticklish language).
A more robust system could prefix the type with the number attributes in the
type:
Array5.6.__main__.Array5.2.__main__.Int64
but I like the aesthetics of angle brackets more. Speaking of which, to actually access the above class name, we’d have to type out something like
>>> getattr(getattr(getattr(getattr(Array5, '<').__main__.Array5, '<').__main__.Int64, '>'), '>') <class '__main__.Array5.<.__main__.Array5.<.__main__.Int64.>.>'>
This is a real pain. Let’s add a helper to ObjectType
:
def __getitem__(self, cls):
parent = getattr(self, '<')
for subpath in itertools.chain(cls.__module__.split('.'), cls.__qualname__.split('.')):
parent = getattr(parent, subpath)
return getattr(parent, '>')
Now we can do
>>> Array5[Array5[Int64]] <class '__main__.Array5.<.__main__.Array5.<.__main__.Int64.>.>'>
Much better.
A product of necessity
IntArray
s and Array5
s are all well and good, but what we really want is
an Array
where we can specify both the element type and the length. Since we
already have an IntType
and ObjectType
, we can combine them together with a
ProductType
class ProductType:
def __init__(self, name, cls, argtypes, args=()):
self.__qualname__ = name
self.name = name
self.cls = cls
self.argtype = argtypes[0](self.name, self._chain)
self.argtypes = argtypes[1:]
self.args = args
def _chain(self, name, arg):
if self.argtypes:
return type(self)(name, self.cls, self.argtypes, (*self.args, arg))
return self.cls(name, *self.args, arg)
@saveattr
def __getattr__(self, name):
return getattr(self.argtype, name)
# __getitem__ omitted for brevity
Instead of constructing the class immediately, as before, we instead _chain
into the next argtype
. With this, we can now redefine Array
:
def _Array(__name__, cls, n):
size = cls.size * n
# ...
return type(__name__, (), locals())
Array = ProductType('Array', _Array, (ObjectType, IntType))
When we access something like Array[Int64, 5]
, the attributes will look like:
Array: ProductType('Array', _Array, (ObjectType, IntType)) <: ObjectType.Module('Array.<', Array._chain, None) __main__: ObjectType.Module('Array5.<.__main__', Array._chain, __main__) Int64: ObjectType.Attr('Array5.<.__main__.Int64', Array._chain, __main__.Int64) >: IntType('Array5.<.__main__.Int64.>', Array[Int64]._chain) 5: _Array('Array5.<.__main__.Int64.>.5', Int64, 5)
And we can finally pickle and unpickle:
>>> IntArray5 = Array[Int64, 5] >>> a = IntArray5(bytearray(IntArray5.size)) >>> pickle.loads(pickle.dumps(a))
There’s a problem though:
>>> a[0].value = 15 >>> pickle.loads(pickle.dumps(a))[0].value 0
This is because when we slice a bytearray
, we get a new bytearray
with a
copy of the original bytearray
's contents. We can get around this by using
a memoryview
:
>>> a = IntArray5(memoryview(bytearray(IntArray5.size))) >>> a[0].value = 15 >>> bytearray(a[0]._mem) bytearray(b'\x0f\x00\x00\x00\x00\x00\x00\x00') >>> bytearray(a._mem)[0:8] bytearray(b'\x0f\x00\x00\x00\x00\x00\x00\x00')
But we can’t pickle it:
>>> pickle.dumps(a) Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: cannot pickle 'memoryview' object
It’s time to actually start working with shared memory.
Malloc madness
The first thing we need need is a bulk source of shared memory. Unfortunately,
we cannot use multiprocessing.shared_memory
because we can’t expand its
memory later. Metrics can be created at any point in the application’s
lifetime, and we don’t necessarily know how many we will need when we have to
create the first metric. For example, adding a label creates a new copy of a
metric for that label, and it’s common to generate labels dynamically based on
endpoints or status codes.
Instead, we open a TemporaryFile
and truncate it as necessary to extend it.
import os
import mmap
from tempfile import TemporaryFile
class Heap:
def __init__(self):
# File backing our shared memory
self._file = TemporaryFile()
self._fd = self._file.fileno()
# Allocate a page to start with
os.truncate(self._fd, 4096)
# Keep track of the memory we've mapped
self._maps = [mmap.mmap(self._fd, map_size)]
# Initialize a base pointer with the memory we just mapped
self._base = Int64(memoryview(self._maps[0])[:self.size])
# And make sure we don't reuse that memory later
self._base.value = self._base.size
We’re going to be making a basic "bump" style allocator. The algorithm is really simple; in pseudocode it’s just:
def malloc(size):
start = base
base += size
return start
Although it’s a little more complex than that: we need to ensure we have enough space in the file and take care when crossing page boundaries.
mmap
doesn’t have to return contiguous memory when extending an
existing mapping. For example, if we made two allocations of size 2048 and
4096, and we tried to allocate the first one at offset 0 and the second one at
offset 2048, the second allocation would span two pages (ending at byte 6143).
If the first page was mapped at address 16384, the second page would have to
be mapped at address 20480 to ensure a contiguous mapping. But we can’t
guarantee that with the mmap
API. So instead, we round up to the next page
boundary if we would otherwise cross it.
Allocations larger than a single page always cross page boundaries no matter
how we align things. To solve this issue, we map all the pages for these
allocations in one call to mmap
, ensuring that we get a contiguous mapping.
Then, we bump the base address to the next page boundary, ensuring that no
other allocations will need those pages.
In detail, if the allocation spans multiple pages, we page-align the size.
def _align_mask(x, mask):
return (x + mask) & ~mask
def align(x, a):
return _align_mask(x, a - 1)
def malloc(self, size):
if size > 4096:
size = align(size, 4096)
If we need to allocate a new page, enlarge the file and update the base:
if self._base.value + size >= total:
os.ftruncate(self._fd, align(total + size, 4096))
self._base.value = align(self._base.value, 4096)
And finally, we can bump the base pointer and return a new Block
:
start = self._base.value self._base.value += size return Block(self, start, size)
Block
is like a pointer, except it keeps track of how big it is and where it
was allocated from.
import itertools
class Block:
def __init__(self, heap, start, size):
self.heap = heap
self.start = start
self.size = size
There’s only one major method, deref
, which creates a memoryview
. The first
half of this function determines the page(s) we need to access, and what
their offsets are:
def deref(self):
heap = self.heap
first_page = int(self.start / 4096)
last_page = int((self.start + self.size - 1) / 4096)
nr_pages = last_page - first_page + 1
page_off = first_page * 4096
off = self.start - page_off
We store our mapped pages in list. Each element is a memoryview
of the page,
or None
if we haven’t mapped it yet. To start, we extend the length of our
list if it’s not big enough.
if len(heap._maps) <= last_page:
heap._maps.extend(itertools.repeat(None, last_page - len(heap._maps) + 1))
Then, we create a map at the location of the first page. malloc
ensures we
never have Block
s which cross page boundaries unless they are larger than a
single page. Since multi-page allocations are the only allocations in the pages
they use, we will never try to access the None
s occupying the later indices
in the list.
if not self.heap._maps[first_page]:
heap._maps[first_page] = mmap.mmap(heap._fd, nr_pages * 4096,
offset=page_off)
Finally, we can create a memory view out of the mapped page:
return memoryview(heap._maps[first_page])[off:off+self.size]
Let’s try it out:
>>> h = Heap() >>> block = h.malloc(InteArray5.size) >>> a = IntArray5(block.deref()) >>> a[0].value = 15 >>> bytearray(a[0]._mem) bytearray(b'\x0f\x00\x00\x00\x00\x00\x00\x00') >>> bytearray(a._mem)[0:8] bytearray(b'\x0f\x00\x00\x00\x00\x00\x00\x00')
Good so far, but we still can’t pickle the memoryview
:
>>> pickle.dumps(a) Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: cannot pickle 'memoryview' object
What about pickling the Block
, which can create the memoryview
from the Heap
?
>>> pickle.dumps(block) Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: cannot pickle '_io.BufferedRandom' object
Now the problem is that we can’t pickle
the open file backing the Heap
. And
in general, there’s no way to pickle an open file since it might not be around
whenever another python process gets around to unpickling it. But we just need
to make pickling work when spawning new processes. As it turns out, the
multiprocessing
authors had the same problem, and came up with DupFd
. We
can use it to implement Heap
's pickle
helpers:
from multiprocessing.reduction import DupFd
def __getstate__(self):
return DupFd(self._fd)
def __setstate__(self, df):
self._fd = df.detach()
self._file = open(self._fd, 'a+b')
self._maps = [mmap.mmap(self._fd, 4096)]
self._base = Int64(memoryview(self._maps[0])[:Int64.size])
Under the hood, DupFd
sets up a UNIX domain server which duplicates the file
descriptor, and then sends it to the client when requested. The pickle data is
just the address of the server:
>>> pickletools.dis(pickletools.optimize(pickle.dumps(h))) 0: \x80 PROTO 4 2: \x95 FRAME 113 11: \x8c SHORT_BINUNICODE '__main__' 21: \x8c SHORT_BINUNICODE 'Heap' 27: \x93 STACK_GLOBAL 28: ) EMPTY_TUPLE 29: \x81 NEWOBJ 30: \x8c SHORT_BINUNICODE 'multiprocessing.resource_sharer' 63: \x8c SHORT_BINUNICODE 'DupFd' 70: \x93 STACK_GLOBAL 71: ) EMPTY_TUPLE 72: \x81 NEWOBJ 73: } EMPTY_DICT 74: \x8c SHORT_BINUNICODE '_id' 79: \x8c SHORT_BINUNICODE '/tmp/pymp-i7ih27es/listener-a61oo9mt' 117: K BININT1 2 119: \x86 TUPLE2 120: s SETITEM 121: b BUILD 122: b BUILD 123: . STOP
The server shuts down after sending the file descriptor, so we can only unpickle the heap once. Lets try it out:
>>> block = h.malloc(IntArray5.size) >>> a = IntArray5(block.deref()) >>> b = IntArray5(pickle.loads(pickle.dumps(block)).deref()) >>> a[0].value = 85 >>> b[0].value 85
Success! But wouldn’t it be nice if we could just pickle the array directly?
Shipping and Receiving
Going back to Int64
, we could rewrite it to take a Heap
instead of raw
memory:
class Int64:
def __init__(self, heap):
self._block = heap.malloc(self.size)
self._value = ctypes.c_int64.from_buffer(self._block.deref())
self._value.value = 0
def __getstate__(self):
return self._block
def __setstate__(self, block):
self._block = block
self._value = ctypes.c_int64.from_buffer(block.deref())
...
But this breaks Array
and Heap
, since now we no longer have a way to create
an Int64
from memory. What we really want is a second BoxedInt64
which
takes a Heap
while the regular Int64
still uses memory directly.
class BoxedInt64(Int64):
def __init__(self, heap):
self._block = heap.malloc(self.size)
super().__init__(self._block.deref())
def __getstate__(self):
return self._block
def __setstate__(self, block):
self._block = block
super()._setstate(block.deref())
Where we implement _setstate
in Int64
like
def _setstate(self, mem):
self._value = ctypes.c_int64.from_buffer(mem)
Examining BoxedInt64
, you may notice that aside from inheriting from Int64
,
it is otherwise completely generic. In fact, we can create boxed types on the
fly by creating new subclasses with ObjectType
:
class _Box:
# Same as BoxedInt64
Box = ObjectType('Box', lambda name, cls: type(name, (_Box, cls), {}))
Which we can now use like
>>> a = Box[Array[Int64, 5]](h) >>> b = pickle.loads(pickle.dumps(a)) >>> a[0].value = 33 >>> b[0].value 33
∎
Epilogue
Hopefully this has been an interesting journey through the heart of
mpmetrics
. For expository purposes, I left out or skipped over many details,
such as the many other types, locking, and of course this doesn’t even cover
the metrics themselves. If you are interested in more details of how this
library works, check out the
mpmetrics
internals
documentation.