Passing Variable Length C String Through Mmap-ed Shared Memory

by HCSF   Last Updated July 05, 2018 22:26 PM

Let's say I have a process A and a process B, and process A would like to pass a C string to process B through a shm_open() + mmap() shared memory.

What's the most latency efficient way?

The answer of this post suggested that after C++11, std::atomic is the right way to share data over shared memory.

However, I fail to see how I can write something to write a C string with something like this:

struct Buffer {
std::atomic<uint32_t> length;
std::atomic<char*> str;
} __attribute__((packed));

Given I have a shared memory created this way:

class SHM {
    char* _ptr;
public:
    SHM() {
        const auto handle = shm_open("myTest", O_RDWR|O_CREAT, 0666);
        const auto size =  4 * 1024 * 1024;
        if (-1 == ftruncate(handle, size)) {
            throw;
        }
        _ptr = (char*)mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);

        if(_ptr == MAP_FAILED){
            throw;
        }

        int rc = fchmod(handle, 0666);
        if (rc == -1) {
            throw;
        }
    }

    // assume to caller will do buffer.size.store(someLength, std::memory_order_release); after filling up Buffer::str
    Buffer& getBuffer() noexcept {
        return *reinrepret_cast<Buffer*>(_ptr);
    }

    Buffer& read() {
        auto& buffer = *reinrepret_cast<Buffer*>(_ptr);
        while (buffer.size.load(std::memory_order_acquire) > 0) {
            buffer.str.load(std::memory_order_relaxed);
            return buffer;
        }
    }
};

How can the caller to SHM::getBuffer() properly write to Buffer::str char by char so that process B can call SHM::read() to retrieve?

Does buffer.str.load(std::memory_order_relaxed) actually load atomically and correctly? I doubt that as it doesn't even know the length.

This is for Linux, X86-64, GCC 7.

Thanks in advance.



Answers 1


Here is a working sketch for single-producer-single-consumer case (it doesn't matter if the producer/consumer threads from the same process or not), wait-free:

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <unistd.h>
#include <fcntl.h>
#include <utility>
#include <cstring>
#include <string>
#include <atomic>

class SingleProducerSingleConsumerIndexes {
    std::atomic<uint64_t> produced_ = {};
    std::atomic<uint64_t> consumed_ = {};

public: // Producer interface.
    uint64_t produced() {
        auto consumed = consumed_.load(std::memory_order_acquire); // Syncronizes with store 2.
        auto produced = produced_.load(std::memory_order_relaxed);
        if(produced != consumed)
            return produced;
        // Entire buffer was consumed. Rewind.
        consumed_.store(0, std::memory_order_relaxed);
        produced_.store(0, std::memory_order_release); // Store 1.
        return 0;
    }

    void produce(uint64_t end) {
        produced_.store(end, std::memory_order_release); // Store 1.
    }

public: // Consumer interface.
    std::pair<uint64_t, uint64_t> available() const {
        auto produced = produced_.load(std::memory_order_acquire); // Syncronizes with store 1.
        auto consumed = consumed_.load(std::memory_order_relaxed);
        return {consumed, produced};
    }

    void consume(uint64_t end) {
        consumed_.store(end, std::memory_order_release); // Store 2.
    }
};

class SharedMemoryStrings {
    void* p_;
    static constexpr int size = 4 * 1024 * 1024;
    static constexpr int buffer_size = size - sizeof(SingleProducerSingleConsumerIndexes);
public:
    SharedMemoryStrings() {
        auto handle = ::shm_open("/another-test", O_RDWR|O_CREAT, 0666);
        if(-1 == ::ftruncate(handle, size))
            throw;
        p_ = ::mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);
        ::close(handle);
        if(p_ == MAP_FAILED)
            throw;
    }

    ~SharedMemoryStrings() {
        ::munmap(p_, size);
    }

    void produce(std::string const& s) {
        auto* indexes = static_cast<SingleProducerSingleConsumerIndexes*>(p_);
        auto produced = indexes->produced();
        uint64_t new_end = produced + sizeof(uint64_t) + s.size();
        if(new_end > buffer_size)
            throw; // Out of buffer space.

        auto* buffer = reinterpret_cast<unsigned char*>(indexes + 1) + produced;
        uint64_t size = s.size();
        memcpy(buffer, &size, sizeof size);
        buffer += sizeof size;
        memcpy(buffer, s.data(), s.size());

        indexes->produce(new_end);
    }

    bool try_consume(std::string& s) {
        auto* indexes = static_cast<SingleProducerSingleConsumerIndexes*>(p_);
        auto available = indexes->available();
        auto consumed = available.first;
        auto produced = available.second;
        if(consumed == produced)
            return false; // No data available.

        auto* buffer = reinterpret_cast<unsigned char*>(indexes + 1) + consumed;
        uint64_t size;
        memcpy(&size, buffer, sizeof size);
        buffer += sizeof size;
        s.resize(size);
        memcpy(&s[0], buffer, size);

        indexes->consume(consumed + sizeof(uint64_t) + size);
        return true;
    }
};

int main(int ac, char** av) {
    if(ac > 1) {
        // Producer.
        SharedMemoryStrings a;
        for(int i = 1; i < ac; ++i)
            a.produce(av[i]);
    }
    else {
        // Consumer.
        SharedMemoryStrings a;
        for(std::string s;;) { // Busy-wait loop.
            if(a.try_consume(s)) 
                printf("%s\n", s.c_str());
            // else // Potential optimization.
            //     _mm_pause();
        }
    }
}

Start the consumer with ./test. Then the producer like ./test hello world. The order does not matter.

Notes:

  • Is is a single-producer-single-consumer solution. It is wait-free (producer and consumer calls complete in a fixed number of instructions, no loop). Cannot go faster that that.

  • It never calls SingleProducerSingleConsumerIndexes constructor. It relies on the fact that a new file is zero-initialized and that is what the constructor would do. In more complex scenarios it needs to invoke the constructor of shared data if the file has just been created. That can be done by creating a temporary file with a unique name first (if the file does not exist yet), mapping the file into memory and invoking the constructor. Then renaming that temporary file to the final name (rename is atomic). If renaming fails because the file already exists, delete the temporary file and start again.

  • The consumer does busy-waiting for lowest possible latency. If you would like the consumer to block while waiting it is possible to add a process shared mutex and condition variable to make that happen. It takes a few microseconds to wake up a thread waiting on a condition variable (futex in Linux) in the kernel, though. That would require calling SingleProducerSingleConsumerIndexes constructor to do all required initialization (e.g. initialize a robust adaptive process-shared mutex and a process-shared condition variable).

Maxim Egorushkin
Maxim Egorushkin
July 05, 2018 17:21 PM

Related Questions


Updated November 21, 2018 22:26 PM

Updated January 11, 2019 13:26 PM

Updated March 08, 2017 23:26 PM

Updated December 06, 2018 14:26 PM

Updated May 08, 2017 14:26 PM