pair of pared pears

Apr 17, 2024

Memory management in mpmetrics

This article is an extended, semi-literate overview of the memory management in mpmetrics. I think there are a lot of neat little details in the design of this library that lend themselves better to a more guided exposition than API documentation. I’ve made a few simplifications for didactic reasons, but the code otherwise closely mirrors the actual implementation.

If you’re not familiar with mpmetrics, check out my introduction to mpmetrics. The problem focused on in this post is dynamic allocation of variables backed by shared memory…​ in Python. It’s a tough challenge since, in many ways, Python is the wrong language for this kind of task. However its dynamic and flexible nature allow unusual and satisfying solutions to many challenges.

In a pickle

In order to store data in a structured fashion, we are going to need some types backed by shared memory. Let’s start by wrapping c_int64:

import ctypes

class Int64:
    size = ctypes.sizeof(ctypes.c_int64)

    def __init__(self, mem):
        self._mem = mem
        self._value = ctypes.c_int64.from_buffer(mem)
        self._value.value = 0

and proxy all of our attributes (except _value) onto c_int64:

    def __getattr__(self, name):
        return getattr(self.__dict__['_value'], name)

    def __setattr__(self, name, value):
        if '_value' in self.__dict__:
            setattr(self.__dict__['_value'], name, value)
        else:
            self.__dict__[name] = value

    def __delattr__(self, name):
        delattr(self.__dict__['_value'], name)

Lets try it out:

>>> mem = bytearray(Int64.size)
>>> x = Int64(mem)
>>> x.value
0
>>> x.value += 1
>>> x.value
1
>>> mem
bytearray(b'\x01\x00\x00\x00\x00\x00\x00\x00')

As I’m on a little-endian system, the least-significant byte comes first. We can even pickle and unpickle it as long as we define some helpers:

    def __getstate__(self):
        return self._mem

    def __setstate__(self, mem):
        self._mem = mem
        self._value = ctypes.c_int64.from_buffer(mem)

Continuing our example from above,

>>> import pickle
>>> pickle.loads(pickle.dumps(x))
>>> # No errors :)

This is an important feature, since with the forkserver and spawn start methods, all objects are pickled when passing them to the subprocess.

That went well, so lets try and tackle an array next. The size of the array will depend on the element class, and the number of elements. In order to keep the size as a class attribute (so we know how much memory we need to allocate), we create a new class for each type of array:

def Array(cls, n):
    class Array:
        size = cls.size * n

        def __init__(self, mem):
            self._mem = mem
            self._vals = []
            for i in range(n):
                off = i * cls.size
                self._vals.append(cls(mem[off:off + cls.size]))

We can also define some extra methods to make our class behave more like an array:

        def __len__(self):
            return n

        def __getitem__(self, key):
            return self._vals[key]

        def __iter__(self):
            return iter(self._vals)

as well as some helpers for pickling:

        def __getstate__(self):
            return self._mem

        def __setstate__(self, mem):
            self._mem = mem
            self._vals = []
            for i in range(n):
                off = i * cls.size
                val = cls.__new__(cls)
                val.__setstate__(self._mem[off:off + cls.size])
                self._vals.append(val)

    return Array

Let’s try it out:

>>> IntArray5 = Array(Int64, 5)
>>> a = IntArray5(bytearray(IntArray5.size))
>>> a[0].value = 5
>>> a[0].value += 10
>>> a[0].value
15

But there’s a problem when pickling:

>>> pickle.dumps(a)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: Can't pickle local object 'Array.<locals>.Array'

The problem is that pickle uses the type’s __qualname__ to identify the class to use when unpickling. We can see this if we disassemble our pickle from earlier:

>>> import pickletools
>>> pickletools.dis(pickletools.optimize(pickle.dumps(x)))
    0: \x80 PROTO      4
    2: \x95 FRAME      56
   11: \x8c SHORT_BINUNICODE '__main__'
   21: \x8c SHORT_BINUNICODE 'Int64'
   28: \x93 STACK_GLOBAL
   29: )    EMPTY_TUPLE
   30: \x81 NEWOBJ
   31: \x8c SHORT_BINUNICODE 'builtins'
   41: \x8c SHORT_BINUNICODE 'bytearray'
   52: \x93 STACK_GLOBAL
   53: C    SHORT_BINBYTES b'\x00\x00\x00\x00\x00\x00\x00\x00'
   63: \x85 TUPLE1
   64: R    REDUCE
   65: b    BUILD
   66: .    STOP

But since we create a new class every time we call Array, we can’t identify the class we created this way, since pickle has no way to tell what the arguments were to Array. We could rewrite Array to take cls and n as arguments to __init__, but then we wouldn’t know how much memory to allocate.

What we need is a way to record the arguments to Array so that we can create the correct class when unpickling. But the only thing we have to work with is the __qualname__

What if we store the arguments to Array in the class name itself?

The trick

Imagine for a moment that we just want to create Int64 Arrays, and we only need to store the length. We could create an object like

class IntType:
    def __init__(self, name, cls):
        self.__qualname__ = name
        self.name = name
        self.cls = cls

    def __getattr__(self, attr):
        return self.cls(self.name + '.' + attr, int(attr))

The usage is perhaps best-demonstrated by example:

>>> test = IntType('test', lambda *args: args)
>>> getattr(test, '5')
('test.5', 5)

The first argument to the function is the path we used to access that attribute, and the second is the value of the attribute. Now we can use this to create a new IntArray:

def _IntArray(__name__, n):
    cls = Int64
    size = cls.size * n

    ...

    return type(__name__, (), locals())

IntArray = IntType('IntArray', _IntArray)

We need to call the three-argument type instead of using the class keyword, since the name of the class we create will change based on n. Let’s try using this class again

>>> IntArray5 = getattr(IntArray, '5')
>>> a = IntArray5(bytearray(IntArray5.size))
>>> a[0].value = 5
>>> a[0].value += 10
>>> a[0].value
15

Looking good so far. Let’s try pickling it

>>> pickle.dumps(a)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
_pickle.PicklingError: Can't pickle <class '__main__.IntArray.5'>: it's not the same object as __main__.IntArray.5

Whoops. The problem is that every time we call _IntArray we create a new class. This is pretty easy to solve by wrapping __getattr__ in a decorator which saves its return value:

def saveattr(get):
    def wrapped(self, name):
        attr = get(self, name)
        setattr(self, name, attr)
        return attr
    return wrapped

Python won’t bother calling __getattr__ if the relevant attribute is already present in __dict__. Lets try pickling again:

>>> pickle.loads(pickle.dumps(a))

Success!

Further objectives

That was a nice warm up. Let’s try something more challenging. What if instead of a known element type and an unknown length, we tried making a class with an unknown object type and a fixed length:

def _Array5(__name__, cls):
    n = 5
    size = cls.size * n

    # ...

    return type(__name__, (), locals())

Array5 = ObjectType('Array5', _Array5)

So what should the name of IntArray5 be? Well, perhaps the most obvious thing would be

Array5.__main__.Int64

but with this kind of name we wouldn’t know when the name of the object started and when it ended. This would prevent us from nesting multiple ObjectTypes. So let’s use this name instead:

Array5.<.__main__.Int64.>

The base ObjectType just needs to have a < attribute:

class ObjectType:
    def __init__(self, name, cls):
        self.__qualname__ = name
        setattr(self, '<', self.Module(name + '.<', cls))

This attribute’s job is to parse the module portion of the class’s name. We do this by repeatedly trying to import the next attribute as a module:

    class Module:
        def __init__(self, name, cls, parent=None):
            self.name = name
            self.cls = cls
            self.parent = parent

        @saveattr
        def __getattr__(self, name):
            try:
                if self.parent:
                    module = self.parent.__name__ + '.' + name
                else:
                    module = name
                return type(self)(self.name, self.cls, importlib.import_module(module))
            except ModuleNotFoundError:
                if self.parent:
                    prefix = self.name + '.' + self.parent.__name__
                else:
                    prefix = self.name
                return ObjectType.Attr(prefix, self.cls, getattr(self.parent, name))

For example, say that we have a file a/b.py and inside that file we have a class C. When we access Array5.<.a.b.C, we will have

Array5: ObjectType('Array5', _Array5)
  <: ObjectType.Module('Array5.<', _Array5, None)
    a: ObjectType.Module('Array5.<.a', _Array5, a)
      b: ObjectType.Module('Array5.<.a.b', _Array5, a.b)
        C: ObjectType.Attr('Array5.<.a.b.C', _Array5, a.b.C)

At this point, we’ve gotten through the modules and finally made it to an object. Now we need to walk its attributes:

    class Attr:
        def __init__(self, name, cls, obj, nesting=1):
            self.name = name
            self.cls = cls
            self.obj = obj
            self.nesting = nesting

        @saveattr
        def __getattr__(self, name):
            nesting = self.nesting + (name == '<') - (name == '>')
            if name == '>' and not nesting:
                return self.cls(self.name + '.' + self.obj.__qualname__ + '.>', self.obj)
            else:
                return type(self)(self.name, self.cls, getattr(self.obj, name), nesting)

To continuing the above example, say that class C has a nested class D. When we access Array5.<.a.b.C.D.> we will have

Array5: ObjectType('Array5', _Array5)
  <: ObjectType.Module('Array5.<', _Array5, None)
    a: ObjectType.Module('Array5.<.a', _Array5, a)
      b: ObjectType.Module('Array5.<.a.b', _Array5, a.b)
        C: ObjectType.Attr('Array5.<.a.b.C', _Array5, a.b.C, 1)
          D: ObjectType.Attr('Array5.<.a.b.C.D', _Array5, a.b.C.D, 1)
            >: _Array5('Array5.<.a.b.C.D.>', a.b.C.D)

The nesting attribute helps us keep track of nested objects. For example say we wanted to create an Array5 of an Array5 of Int64s:

Array5: ObjectType('Array5', _Array5)
  <: ObjectType.Module('Array5.<', _Array5, None)
    __main__: ObjectType.Module('Array5.<.__main__', _Array5, __main__)
      Array5: ObjectType.Attr('Array5.<.__main__.Array5', _Array5, __main__.Array5, 1)
       <: ObjectType.Attr('Array5.<.__main__.Array5.<', _Array5, __main__.Array5.<, 2)
         __main__: ObjectType.Attr('Array5.<.__main__.Array5.<.__main__', _Array5, __main__.Array5.<.__main__, 2)
           Int64: ObjectType.Attr('Array5.<.__main__.Array5.<.__main__.Int64', _Array5, __main__.Array5.<.__main__.Int64, 2)
             >: ObjectType.Attr('Array5.<.__main__.Array5.<.__main__.Int64.>', _Array5, __main__.Array5.<.__main__.Int64.>, 1)
               >: _Array5('Array5.<.__main__.Array5.<.__main__.Int64.>.>', __main__.Array5.<.__main__.Int64.>.>)

Of course, this also means that < and > are special, and you can’t include unmatched brackets in your class hierarchy (like a certain ticklish language). A more robust system could prefix the type with the number attributes in the type:

Array5.6.__main__.Array5.2.__main__.Int64

but I like the aesthetics of angle brackets more. Speaking of which, to actually access the above class name, we’d have to type out something like

>>> getattr(getattr(getattr(getattr(Array5, '<').__main__.Array5, '<').__main__.Int64, '>'), '>')
<class '__main__.Array5.<.__main__.Array5.<.__main__.Int64.>.>'>

This is a real pain. Let’s add a helper to ObjectType:

    def __getitem__(self, cls):
        parent = getattr(self, '<')
        for subpath in itertools.chain(cls.__module__.split('.'), cls.__qualname__.split('.')):
            parent = getattr(parent, subpath)
        return getattr(parent, '>')

Now we can do

>>> Array5[Array5[Int64]]
<class '__main__.Array5.<.__main__.Array5.<.__main__.Int64.>.>'>

Much better.

A product of necessity

IntArrays and Array5s are all well and good, but what we really want is an Array where we can specify both the element type and the length. Since we already have an IntType and ObjectType, we can combine them together with a ProductType

class ProductType:
    def __init__(self, name, cls, argtypes, args=()):
        self.__qualname__ = name
        self.name = name
        self.cls = cls
        self.argtype = argtypes[0](self.name, self._chain)
        self.argtypes = argtypes[1:]
        self.args = args

    def _chain(self, name, arg):
        if self.argtypes:
            return type(self)(name, self.cls, self.argtypes, (*self.args, arg))
        return self.cls(name, *self.args, arg)

    @saveattr
    def __getattr__(self, name):
        return getattr(self.argtype, name)

    # __getitem__ omitted for brevity

Instead of constructing the class immediately, as before, we instead _chain into the next argtype. With this, we can now redefine Array:

def _Array(__name__, cls, n):
    size = cls.size * n

    # ...

    return type(__name__, (), locals())

Array = ProductType('Array', _Array, (ObjectType, IntType))

When we access something like Array[Int64, 5], the attributes will look like:

Array: ProductType('Array', _Array, (ObjectType, IntType))
  <: ObjectType.Module('Array.<', Array._chain, None)
    __main__: ObjectType.Module('Array5.<.__main__', Array._chain, __main__)
      Int64: ObjectType.Attr('Array5.<.__main__.Int64', Array._chain, __main__.Int64)
        >: IntType('Array5.<.__main__.Int64.>', Array[Int64]._chain)
          5: _Array('Array5.<.__main__.Int64.>.5', Int64, 5)

And we can finally pickle and unpickle:

>>> IntArray5 = Array[Int64, 5]
>>> a = IntArray5(bytearray(IntArray5.size))
>>> pickle.loads(pickle.dumps(a))

There’s a problem though:

>>> a[0].value = 15
>>> pickle.loads(pickle.dumps(a))[0].value
0

This is because when we slice a bytearray, we get a new bytearray with a copy of the original bytearray's contents. We can get around this by using a memoryview:

>>> a = IntArray5(memoryview(bytearray(IntArray5.size)))
>>> a[0].value = 15
>>> bytearray(a[0]._mem)
bytearray(b'\x0f\x00\x00\x00\x00\x00\x00\x00')
>>> bytearray(a._mem)[0:8]
bytearray(b'\x0f\x00\x00\x00\x00\x00\x00\x00')

But we can’t pickle it:

>>> pickle.dumps(a)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: cannot pickle 'memoryview' object

It’s time to actually start working with shared memory.

Malloc madness

The first thing we need need is a bulk source of shared memory. Unfortunately, we cannot use multiprocessing.shared_memory because we can’t expand its memory later. Metrics can be created at any point in the application’s lifetime, and we don’t necessarily know how many we will need when we have to create the first metric. For example, adding a label creates a new copy of a metric for that label, and it’s common to generate labels dynamically based on endpoints or status codes.

Instead, we open a TemporaryFile and truncate it as necessary to extend it.

import os
import mmap
from tempfile import TemporaryFile

class Heap:
    def __init__(self):
        # File backing our shared memory
        self._file = TemporaryFile()
        self._fd = self._file.fileno()
        # Allocate a page to start with
        os.truncate(self._fd, 4096)
        # Keep track of the memory we've mapped
        self._maps = [mmap.mmap(self._fd, map_size)]
        # Initialize a base pointer with the memory we just mapped
        self._base = Int64(memoryview(self._maps[0])[:self.size])
        # And make sure we don't reuse that memory later
        self._base.value = self._base.size

We’re going to be making a basic "bump" style allocator. The algorithm is really simple; in pseudocode it’s just:

def malloc(size):
    start = base
    base += size
    return start

Although it’s a little more complex than that: we need to ensure we have enough space in the file and take care when crossing page boundaries.

mmap doesn’t have to return contiguous memory when extending an existing mapping. For example, if we made two allocations of size 2048 and 4096, and we tried to allocate the first one at offset 0 and the second one at offset 2048, the second allocation would span two pages (ending at byte 6143). If the first page was mapped at address 16384, the second page would have to be mapped at address 20480 to ensure a contiguous mapping. But we can’t guarantee that with the mmap API. So instead, we round up to the next page boundary if we would otherwise cross it.

Allocations larger than a single page always cross page boundaries no matter how we align things. To solve this issue, we map all the pages for these allocations in one call to mmap, ensuring that we get a contiguous mapping. Then, we bump the base address to the next page boundary, ensuring that no other allocations will need those pages.

In detail, if the allocation spans multiple pages, we page-align the size.

    def _align_mask(x, mask):
        return (x + mask) & ~mask

    def align(x, a):
        return _align_mask(x, a - 1)

    def malloc(self, size):
        if size > 4096:
            size = align(size, 4096)

If we need to allocate a new page, enlarge the file and update the base:

        if self._base.value + size >= total:
            os.ftruncate(self._fd, align(total + size, 4096))
            self._base.value = align(self._base.value, 4096)

And finally, we can bump the base pointer and return a new Block:

        start = self._base.value
        self._base.value += size
        return Block(self, start, size)

Block is like a pointer, except it keeps track of how big it is and where it was allocated from.

import itertools

class Block:
    def __init__(self, heap, start, size):
        self.heap = heap
        self.start = start
        self.size = size

There’s only one major method, deref, which creates a memoryview. The first half of this function determines the page(s) we need to access, and what their offsets are:

    def deref(self):
        heap = self.heap
        first_page = int(self.start / 4096)
        last_page = int((self.start + self.size - 1) / 4096)
        nr_pages = last_page - first_page + 1
        page_off = first_page * 4096
        off = self.start - page_off

We store our mapped pages in list. Each element is a memoryview of the page, or None if we haven’t mapped it yet. To start, we extend the length of our list if it’s not big enough.

       if len(heap._maps) <= last_page:
           heap._maps.extend(itertools.repeat(None, last_page - len(heap._maps) + 1))

Then, we create a map at the location of the first page. malloc ensures we never have Blocks which cross page boundaries unless they are larger than a single page. Since multi-page allocations are the only allocations in the pages they use, we will never try to access the Nones occupying the later indices in the list.

       if not self.heap._maps[first_page]:
           heap._maps[first_page] = mmap.mmap(heap._fd, nr_pages * 4096,
                                              offset=page_off)

Finally, we can create a memory view out of the mapped page:

        return memoryview(heap._maps[first_page])[off:off+self.size]

Let’s try it out:

>>> h = Heap()
>>> block = h.malloc(InteArray5.size)
>>> a = IntArray5(block.deref())
>>> a[0].value = 15
>>> bytearray(a[0]._mem)
bytearray(b'\x0f\x00\x00\x00\x00\x00\x00\x00')
>>> bytearray(a._mem)[0:8]
bytearray(b'\x0f\x00\x00\x00\x00\x00\x00\x00')

Good so far, but we still can’t pickle the memoryview:

>>> pickle.dumps(a)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: cannot pickle 'memoryview' object

What about pickling the Block, which can create the memoryview from the Heap?

>>> pickle.dumps(block)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: cannot pickle '_io.BufferedRandom' object

Now the problem is that we can’t pickle the open file backing the Heap. And in general, there’s no way to pickle an open file since it might not be around whenever another python process gets around to unpickling it. But we just need to make pickling work when spawning new processes. As it turns out, the multiprocessing authors had the same problem, and came up with DupFd. We can use it to implement Heap's pickle helpers:

    from multiprocessing.reduction import DupFd

    def __getstate__(self):
        return DupFd(self._fd)

    def __setstate__(self, df):
        self._fd = df.detach()
        self._file = open(self._fd, 'a+b')
        self._maps = [mmap.mmap(self._fd, 4096)]
        self._base = Int64(memoryview(self._maps[0])[:Int64.size])

Under the hood, DupFd sets up a UNIX domain server which duplicates the file descriptor, and then sends it to the client when requested. The pickle data is just the address of the server:

>>> pickletools.dis(pickletools.optimize(pickle.dumps(h)))
    0: \x80 PROTO      4
    2: \x95 FRAME      113
   11: \x8c SHORT_BINUNICODE '__main__'
   21: \x8c SHORT_BINUNICODE 'Heap'
   27: \x93 STACK_GLOBAL
   28: )    EMPTY_TUPLE
   29: \x81 NEWOBJ
   30: \x8c SHORT_BINUNICODE 'multiprocessing.resource_sharer'
   63: \x8c SHORT_BINUNICODE 'DupFd'
   70: \x93 STACK_GLOBAL
   71: )    EMPTY_TUPLE
   72: \x81 NEWOBJ
   73: }    EMPTY_DICT
   74: \x8c SHORT_BINUNICODE '_id'
   79: \x8c SHORT_BINUNICODE '/tmp/pymp-i7ih27es/listener-a61oo9mt'
  117: K    BININT1    2
  119: \x86 TUPLE2
  120: s    SETITEM
  121: b    BUILD
  122: b    BUILD
  123: .    STOP

The server shuts down after sending the file descriptor, so we can only unpickle the heap once. Lets try it out:

>>> block = h.malloc(IntArray5.size)
>>> a = IntArray5(block.deref())
>>> b = IntArray5(pickle.loads(pickle.dumps(block)).deref())
>>> a[0].value = 85
>>> b[0].value
85

Success! But wouldn’t it be nice if we could just pickle the array directly?

Shipping and Receiving

Going back to Int64, we could rewrite it to take a Heap instead of raw memory:

class Int64:
    def __init__(self, heap):
        self._block = heap.malloc(self.size)
        self._value = ctypes.c_int64.from_buffer(self._block.deref())
        self._value.value = 0

    def __getstate__(self):
        return self._block

    def __setstate__(self, block):
        self._block = block
        self._value = ctypes.c_int64.from_buffer(block.deref())

    ...

But this breaks Array and Heap, since now we no longer have a way to create an Int64 from memory. What we really want is a second BoxedInt64 which takes a Heap while the regular Int64 still uses memory directly.

class BoxedInt64(Int64):
    def __init__(self, heap):
        self._block = heap.malloc(self.size)
	super().__init__(self._block.deref())

    def __getstate__(self):
        return self._block

    def __setstate__(self, block):
        self._block = block
	super()._setstate(block.deref())

Where we implement _setstate in Int64 like

    def _setstate(self, mem):
	self._value = ctypes.c_int64.from_buffer(mem)

Examining BoxedInt64, you may notice that aside from inheriting from Int64, it is otherwise completely generic. In fact, we can create boxed types on the fly by creating new subclasses with ObjectType:

class _Box:
    # Same as BoxedInt64

Box = ObjectType('Box', lambda name, cls: type(name, (_Box, cls), {}))

Which we can now use like

>>> a = Box[Array[Int64, 5]](h)
>>> b = pickle.loads(pickle.dumps(a))
>>> a[0].value = 33
>>> b[0].value
33

Epilogue

Hopefully this has been an interesting journey through the heart of mpmetrics. For expository purposes, I left out or skipped over many details, such as the many other types, locking, and of course this doesn’t even cover the metrics themselves. If you are interested in more details of how this library works, check out the mpmetrics internals documentation.

Dec 09, 2023

Introducing mpmetrics

A brief introduction to metrics

Metrics are measurements we make on a program. For example, say we wanted to know how long a function takes to call. We can wrap that function in a metric:

from prometheus_client import Summary, generate_latest

# Create a metric to track time spent and requests made.
REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request')

# Decorate function with metric.
@REQUEST_TIME.time()
def process_request(t):
    """A dummy function that takes some time."""
    time.sleep(t)

After calling process_requests a few times, we might see something like the following when rendering the metrics:

>>> for i in range(25)
...  process_request(i/100)
...
>>> print(generate_latest().decode())
<snip>
# HELP request_processing_seconds Time spent processing request
# TYPE request_processing_seconds summary
request_processing_seconds_count 25.0
request_processing_seconds_sum 3.0034301709383726

From this output, it is possible to calculate the mean request time (120 ms). We can also add labels to our metrics:

REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request',
                       labelnames=['function'])

@REQUEST_TIME.labels(function='process_request').time()
def process_request(t):
    time.sleep(t)

@REQUEST_TIME.labels(function='execute_query').time()
def execute_query(t):
    time.sleep(t)

which are enclosed in brackets when exposed:

# HELP request_processing_seconds Time spent processing request
# TYPE request_processing_seconds summary
request_processing_seconds_count{function="process_requests"} 31.0
request_processing_seconds_sum{function="process_requests"} 15.573771364055574
request_processing_seconds_count{function="execute_query"} 30.0
request_processing_seconds_sum{function="execute_query"} 14.991517985239625

There are a variety of standard names and formats for metrics, as standardized by OpenMetrics, but they are all based on giving names to floating point numbers.

Prometheus Python Client

Python has a library for working with metrics called prometheus_client, which is shown in all of the above examples. This library works great for single-process applications. Python’s GIL makes it difficult to achieve good concurrency with just threads, so it’s common to run applications as multiple processes. To ensure we have coherent metrics (which don’t jump around based on which process served the request) we need to synchronize metrics across different processes.

prometheus_client does this by using the environmental variable PROMETHEUS_MULTIPROC_DIR to store several metrics files, one per process. Each file contains a series of length-prefixed key-value pairs. Each key is a string, and each value is a pair of doubles (the actual value and a timestamp). The files themselves are memory-mapped, and updating or reading them just involves a memcpy.

Unfortunately, this approach has several drawbacks:

  • Not all metrics are supported, and some features (such as exemplars) are not supported either. In threaded mode, prometheus_client also supports grouping different metrics in different registries, allowing them to be collected, filtered, and reported independently. In multiprocessing mode there is just one, global registry.

  • The environmental variable enabling multiprocess mode must be set outside of python. That is, it cannot be set programatically. This is inconvenient because the directory should change each run to avoid inadvertently using stale data from a previous run.

  • There is no synchronization between processes, nor is there any atomicity. To use the above example, it would be possible to read an old value of request_processing_seconds_count and a new value of request_processing_seconds_sum. This is especially problematic for infrequent events. On some architectures (although I don’t believe x86 is affected), torn reads/writes may result in completely bogus values being read.

mpmetrics

I found these restrictions to be limiting and unweildy, so I wrote a multiprocess-safe metrics library. It uses atomic integers (and doubles) in shared memory to ensure that we always get a consistent view of the metrics. All of the above restrictions are lifted. Although there’s a lot going on under the hood, this is all hidden behind the same API as prometheus_client. Let’s revisit the above example, but this time using mpmetrics:

from mpmetrics import Summary
from prometheus_client import start_http_server
import multiprocessing
import random
import time

# Create a metric to track time spent and requests made.
REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request')

# Decorate function with metric.
@REQUEST_TIME.time()
def process_request(t):
    """A dummy function that takes some time."""
    time.sleep(t)

This time we’ll generate requests from multiple processes:

# Function to generate requests
def generate_requests():
    while True:
        process_request(random.random())

if __name__ == '__main__':
    # Start up the server to expose the metrics.
    start_http_server(8000)
    # Generate some requests from two processes
    multiprocessing.Process(target=generate_requests).start()
    generate_requests()

You can navigate to http://localhost:8000/metrics to view the metrics. If you’re interested, check out some other examples, or head over to the documentation.

Jun 30, 2023

Three pitfalls in I2C

I recently implemented an I2C slave, and came across a few interesting corner cases in the specification.

I2C basics

I2C is a multi-master, low-speed, bidirectional bus specified in NXP UM10204. There are only two signals: SCL (the clock) and SDA (the data). Each of the signals is open-drain, with resistors pulling the signals high. This property is used throughout the protocol. For example, by defining an acknowledgement (ack) as holding SDA low, there is an implicit negative acknowledgement (nack) when no device responds to a transaction.

The general format of transactions is

  • a start condition

  • a 7-bit address

  • a read/write bit

  • an acknowledgement

  • any number of number of data bytes, each followed by an acknowledgement

  • a stop condition

For example, in the following diagram shows a single-byte read:

SDA is valid on the rising edge of SCL, and SDA changes on the falling edge of SCL. To signal the start and end of the transaction, SDA transitions with SCL high. This framing violation makes it easy to re-synchronize the master/slave state machines.

An important aspect of I2C that is not visible in the above diagram is who is sending data. Because the signals are open-drain, both the master and slave can drive the bus at the same time. The following diagram shows what the internal drivers of SDA in the above transaction might look like:

At the beginning of the transaction the master sends data on the bus, while the slave leaves its SDA high. Then the slave acknowledges the request and sends a byte of data. Since this is the last byte the master wants to read, the master doesn’t acknowledge the data and sends a stop condition.

Quick reads

One of the shortest types of I2C transactions is the quick read/write (so named by SMBus). These transfer one bit of data in the read/write bit following the address. Once the master receives an ack, it sends the stop condition to end the transaction. In addition to transfering a bit of data, these transactions can also be used as a heuristic way of detecting available slaves (such as with i2cdetect). The following diagram shows a successful quick read:

From the slave’s point of view, a quick read looks just like a regular read transaction. This can prevent the master from sending the stop condition if the first bit of the byte is a 0, since the slave will hold SDA low. If the read byte is all 0s, the slave won’t release SDA until the ack bit:

When designing a slave, this can be avoided by ensuring that the first bit of any read transaction is 1. If the slave has a “command” or sub-address register which needs to be written as the first byte of a transaction, the default data before the command register is written can be all 1s for the same effect.

From the master’s perspective, all that is needed is to continue reading out the byte until there is a high bit. This is guaranteed to happen when the slave waits for an ack.

SDA hold time ambiguity

While using coding violations for framing is a common technique, it creates a conflict on the falling edge of SCL. If a slave sees SDA fall before SCL, it can detect a spurious start/stop condition.

i2c timing upper

SMBus versions before 3.0 specified a 300 ns minimum hold time (tHD;DAT). This ensures that other devices on the bus see SCL transition before SDA.

I2C, on the other hand, has a minimum hold time of 0 seconds. Versions 6 and earlier of UM10204 suggested the following solution:

A device must internally provide a hold time of at least 300 ns for the SDA signal (with respect to the VIH(min) of the SCL signal) to bridge the undefined region of the falling edge of SCL.

That is, if a device detects a start/stop condition it must wait 300 ns before doing anything. If SCL is still high, it was a real start/stop. Otherwise it was just a data transition. The 300 ns value in both I2C and SMBus is tf, or the maximum fall time. Waiting this long ensures that SCL has transitioned before we sample SDA.

To allow for greater compatibility between SMBus and I2C devices, SMBus versions 3.0 and later reduce tHD;DAT to 0 seconds. In a lengthy appendix, they suggest using the same strategy as I2C.

Despite this, version 7 of UM10204 seems to suggest that neither a 300 ns hold time nor an internal delay are necessary to resolve this issue. Looking closely at the timing diagram, tHD;DAT is defined as the time between when SCL falls to 30% VDD (logical 0), and when SDA rises above 30% VDD or falls below 70% VDD. Therefore, it suggests that devices

Ensure SCL drops below 0.3 VDD on falling edge before SDA crosses into the indeterminate range of 0.3 VDD to 0.7 VDD.

Regarding masters which don’t support clock stretching and don’t have inputs on SCL, UM10204 continues:

For controllers that cannot observe the SCL falling edge then independent measurement of the time for the SCL transition from static high (VDD) to 0.3 VDD should be used to insert a delay of the SDA transition with respect to SCL

effectively mandating a 300 ns hold time…​ which is what SMBus switched away from.

However, even masters supporting clock stretching should still use a delay for two reasons: First, it is difficult to detect when SCL falls below 30% VDD, since in typical implementations the entire region from 30–70% VDD is indeterminate. And second, devices with series protection resistors might not see the same value on SDA as the transmitter, since there will be a voltage difference across the resistor.

For maximum compatibility, devices should implement both an output hold time and an internal hold time when detecting start/stop conditions.

Implementation support

Unfortunately, despite much vacillation in SMBus and I2C, this issue does not seem to be known to some implementors. A quick survey of open-source implementations reveals fairly patchy handling:

  • Wikipedia’s bitbang implementation, doesn’t wait between clear_SCL and set_SDA in i2c_write_bit. That said, it doesn’t seem to support multi-master busses, so it may be assuming slaves with an internal hold time.

  • Linux doesn’t wait between scllo and setsda in i2c_outb, but it doesn’t seem to support multi-master busses either. Some of the hardware-accelerated drivers seem to be aware of this issue, and support configurable hold times. This allows using the SMBus pre-3.0 solution, as long as all slaves also support it.

  • Neither the master nor slaves in Alex Forencich’s I2C project seem to delay for a hold time or use an internal hold time.

  • Freecores’ master doesn’t add an internal hold time or use an internal hold time.

It’s often unclear whether commercial implementations correctly handle this ambiguity. For example, this AT24 EEPROM datasheet specifies a 0 second hold time, but doesn’t mention any internal hold time. Many vendors support configurable hold times, which shows they are aware of the issue. Occasionally, there are errata regarding it.

I suspect that for most hardware this ambiguity becomes an issue when the input threshold voltage is on the low end. This could cause a rising SDA to be detected before a falling SCL. This is exacerbated by high bus capacitance, but many busses have low (a few dozen pF) capacitance. As with many timing violations, mean time between failure can be quite long, and incorrect implementations may not be noticed.

Fast-mode Plus compatibility

The original (Standard-mode) I2C runs at 100 KHz, but UM10204 also includes a backwards-compatible “Fast-mode” which runs at 400 KHz. There are also “High-speed mode” and “Ultra Fast-mode” varients which are not backwards compatible. In 2007, NXP introduced a “Fast-mode Plus” which runs at 1 Mhz and was designed to be backwards-compatible. SMBus also incorporated this mode into version 3.0.

To determine what a Fast-mode Plus slave needs to do to be backwards compatible, let’s first examine Fast-mode backwards-compatibility. For a Fast-mode slave to be backwards compatible with Standard-mode, its input and output timings must be compatible with both Standard-mode and Fast-mode. Generally, output timings are the same as Fast-mode. Standard-mode only requires a longer setup time, which will be met as long as the slave doesn’t stretch the clock. Similarly, input timings are mostly the same as Fast-mode. One issue could be the internal hold time necessary for the SDA ambiguity detailed above. However, both Standard- and Fast-mode specify a 300 ns fall time (tf), which is less than Fast-mode’s 600 ns start condition setup time (tSU;STA). Therefore, the same 300 ns hold time can be used for both modes.

i2c timing lower

Unfortunately, Fast-mode Plus reduced tSU;STA to 260 ns in order to achieve a higher clock rate. This means that every Fast-mode Plus start condition is within the SDA hold time ambiguity in Fast- and Standard-mode. A slave which implements the 300 ns internal delay required by Fast- and Standard-mode will not be able to detect Fast-mode Plus start conditions with minimum-specified delay.

There are some ways to mitigate this at the system level:

  • All bust masters could be configured to run at 960 kHz, which (if tSU;STA is scaled as well) will provide enough of a delay to ensure start times will be detected correctly.

  • Components with higher slew rates could be selected to ensure tf remains below 260 ns. Alternatively, bus line capacitance could be reduced below the maximum.

As well as some ways to mitigate this at the device level:

  • A configuration bit (such as a register or a pin) could configure the device to be either Fast-mode or Fast-mode Plus compatible. This could even be automatically detected, although this would need to be done carefully since masters can switch speed at any time. For example, a master might run at one speed when accessing a certain device, and another speed when accessing a different device.

  • The input drivers could be engineered to have a lower VIH and a higher VIL, reducing the time of ambiguity (assuming monotonic transitions).

But, as-written, the Fast-mode Plus timings are incompatible with Fast- and Standard-mode. Pre-3.0 SMBus and post-v7 I2C are not affected because they do not require an internal hold time.

Dec 24, 2022

The smallest inter-frame gap in Fast Ethernet

Fast Ethernet in the form of 100BASE-TX is a very mature technology, although there were some hiccups in getting there. So it was surprising to me to find a way for the PCS receiver to accept an inter-frame gap shorter than the end-of-stream delimeter itself.

The 100BASE-TX Ethernet physical layer is broken up into several sub-layers:

100basex layers

Working from the bottom up, the PMD converts analog signals from the medium to MLT-3 symbols (correcting for channel loss and baseline wander), decodes those symbols to bits, descrambles the bits, and then encodes the bits with NRZI. The PMA converts from NRZI back to bits[1] and performs some other optional tasks (such as far-end fault detection when using a medium without autonegotiation). The PCS performs 4B5B[2] alignment and decoding and generates data on the MII.

The PMA provides the PCS with a raw stream of bits. There is no indication of where one nibble begins and another ends. To recover this information, each nibble (4 bits) is encoded using 5 bits. Half of this encoding is used for data, but the other half is either invalid or used for control code groups. When no data is being transferred, the /I/ control code group (0b11111) is continuously transmitted. The control code groups /J/ and /K/ (0b11000 and 0b10001) form the Start-of-Stream Delimiter (SSD), indicating the start of a new frame:

To ensure that this sequence can always be recognized, all other code groups which (in combination with any other code group) could contain the sequence 0b1100010001 are invalid. The last control code groups to cover are /T/ and /R/ (0b01101 and 0b00111), which form the End-of-Stream Delimiter (ESD), indicating the end of a frame. By convention, /V/ is used for invalid code groups.

IEEE 802.3 specifies the PCS receiver through a state diagram. Each state performs its actions continuously, and has several conditions leading to other states. Conditions are formed of several comparisons or events, with * expressing “and” (conjunction). All conditions are evaluated continuously (more on this later), and sometimes there are uncontrolled transitions (UCTs), immediately transitioning from one state to another.

100basex pcs rx

The variables used in the above diagram are:

link_status

Whether the link is up. For our purposes, we can assume that this is always true.

rx_bits

The last 10 bits we have received. The least significant bit (0) is the most recent bit. These are not aligned, except when gotCodeGroup.indicate occurs. In that case, rx_bits[4:0] is the most recent code group, and rx_bits[9:5] is the next most recent.

gotCodeGroup.indicate

This event occurs whenever there is a complete code group in rx_bits. It is automatically generated every five bits by the receive bits process (not shown). The initial alignment is determined by when RX_DV goes high.

RX_DV

An MII signal indicating whether RXD is valid or not. This also determines the alignment of gotCodeGroup.indicate.

RXD

An MII signal indicating the recieved data.

RX_ER

An MII signal indicating there was an error.

receiving

Whether we are currently receiving a packet.

rx_lpi

This is only used for Energy-Efficient Ethernet (EEE).

Let’s add some of these signals to the SSD diagram from before:

There are a few important things to note here. First, note that there is an instant transition from CARRIER DETECT to IDENTIFY JK. This is because one of the conditions for exiting CARRIER DETECT will always be true, and both will be evaluated immediately. Second, although state and gotCodeGroup.indicate are shown as having transition times, they are really instant from the state machine’s point of view. In particular, gotCodeGroup.indicate is only true for one instant. If a state and its successor both depend on gotCodeGroup.indicate to transition, they can’t both transition off of the same gotCodeGroup.indicate event. If implemented in hardware, states like CARRIER DETECT should be viewed more as “superstates” (a la superclass) rather than states proper. The last thing to note is that there’s a delay of two code groups between when the first bit of a code enters the PCS and when the data gets signalled on the MII.

Now, lets’s look at the end of a packet:

Note that like CARRIER DETECT, END OF STREAM instantly transitions to IDLE, but not before setting rx_bits to all 1s. This is as if we had already been sending /I/I/ instead of /T/R/. We can abuse this to create (through non-standard coding) the shortest possible inter-packet gap. The following diagram shows the contents of rx_bits after END OF STREAM as “virtual” bits. It also shows the actual received bits as “real” bits:

A fully-compliant 100BASE-X PCS will exactly follow the state transitions in the diagram above, allowing an inter-frame gap of just 0.8 octets.

It appears that the designers of the PCS state machine wished to allow back-to-back frames (e.g. /T/R/J/K/)[3]. If this is indeed the case, we need to be careful when fixing the state machine. We could omit the clearing of rx_bits, but this would cause us to erroneously transition to BAD SSD (as we would transition to CARRIER DETECT when rx_bits[9:0] = /R/J/). Adding a condition on gotCodeGroup.indicate would not work for similar reasons. Instead, we should add an intermediate PRE IDLE state after END OF STREAM. This state would set receiving, RX_ER, and RX_DV to FALSE. It would transition to IDLE on gotCodeGroup.indicate.

In practice, this “bug” has no consequences. This is for three reasons. First, conforming PCSs can never generate this bit pattern. Second, conforming MACs use an inter-frame gap of at least 12 octets. And, third, most PCSs do not implement the specification exactly. This is because an exact implementation imposes difficult-to-meet timing requirements due to the tight coupling between the receive bits process and the recieve process. A typical implementation may register the outputs of the receive bits process, which does not allow for the instant feedback necessary to re-align the receive bits process and trigger the bug.


1. Converting from binary to NRZI and back again is not terribly efficient (and I suspect that almost no integrated implementations of 100BASE-TX include it). So why is it specified in the standard? The 100BASE-TX PMD is largely based on the CDDI (Copper Distributed Data Interface) PMD. That PMD was designed to be a drop-in replacement for the FDDI (Fiber Distributed Data Interface) PMD, so that the PCS and PMA could be reused. The FDDI PMD just performed some filtering and conversion from optical to electrical signalling (much like modern optical SFP modules). The NRZI conversion itself was done in the PMA. To keep compatibility, the PMD decodes MLT-3 symbols and then re-encode them as NRZI. This is also why descrambling happens in the PMD, even though it probably should happen in the PMA.
2. Also inherited from FDDI
3. I don’t know whether back-to-back frame support was inherited from FDDI or was introduced with Fast Ethernet. It’s likely that someone produced PCSs which allowed this, and lobbied to ensure that such behavior remained standard

Dec 24, 2022