����λ�ã���ҳ > �����̳� > �̳� > iceoryxԴ���Ķ����ģ����������ڴ�ͨ�ţ�����

iceoryxԴ���Ķ����ģ����������ڴ�ͨ�ţ�����

��Դ������������|��ʱ�䣺2024-05-29 08:55:57 |���Ķ���143��|�� ��ǩ�� C �Ķ� ͨ�� CEO �� |����������

Ŀ¼0 ����1 �������ݽṹ2 �����ڴ��ȡ2.1 PublisherImpl::loan2.2 PublisherImpl::loanSample2.3 PublisherPortUser::tryAllocateChunk2.4 ChunkSender::tryAllocate3 ��Ϣ�����߼�3

0 ����

  • iceoryxԴ���Ķ���һ������ȫ�ָ���

  • iceoryxԴ���Ķ����������������ڴ����

  • iceoryxԴ���Ķ����������������ڴ������һ��

  • iceoryxԴ���Ķ����ģ����������ڴ�ͨ�ţ�����

  • iceoryxԴ���Ķ����壩���������ڴ�ͨ�ţ�����

  • iceoryxԴ���Ķ����������������ڴ洴��

  • iceoryxԴ���Ķ����ߣ����������ֻ���

  • iceoryxԴ���Ķ����ˣ�����IPCͨ�Ż���

�����Ķ��빲���ڴ�ͨ����ص��߼������������Ȼ�ȡһ�鹲���ڴ棬������д�����ݣ�Ȼ������Ϣ������������Ϣ�������ݣ������ߴ���Ϣ�����ж�ȡ��Ϣ�������ݡ����Ĵ��ķ�����н�����������ݽṹ�������ڴ��ȡ����Ϣ�����߼�����Ϣ�����߼���

1 �������ݽṹ

����ǰ��֪��������Ԫ��Ϊ ShmSafeUnmanagedChunk �����д�ŵ��� ChunkManagement ���ڹ����ڴ�ε�id����Ըù����ڴ��׵�ַ��ƫ�ƣ�����������ʾ��

iceoryxÔ´ÂëÔĶÁ£¨ËÄ£©¡ª¡ª¹²ÏíÄÚ´æÍ¨ÐÅ£¨¶þ£©

��Ϣ���������´��붨�壺

struct ChunkQueueData : public LockingPolicy
{
    // ...
    static constexpr uint64_t MAX_CAPACITY = ChunkQueueDataProperties_t::MAX_QUEUE_CAPACITY;
    cxx::VariantQueue m_queue;
    // ...
};

struct ChunkDistributorData : public LockingPolicy
{
    // ...
    using QueueContainer_t =
    cxx::vector, ChunkDistributorDataProperties_t::MAX_QUEUES>;
    QueueContainer_t m_queues;
    // ...
};

struct ChunkReceiverData : public ChunkQueueDataType
{
    // ...
};
  • ChunkDistributorData �Ƿ����������еĶ������ݽṹ������һ�������߻�ַ���������Ķˣ����Գ��ж�����С�

  • ChunkReceiverData �Ƕ����ߵ���������̳��� ChunkQueueData ���ڲ�ֻ��һ�����У�����Ԫ������Ϊ ShmSafeUnmanagedChunk ��

���������У��������ݽṹ������Ϊ cxx::VariantQueue ��������������һ���䳤���飬��ʵ��������һ���������飬������������ݽṹ���壺

enum class VariantQueueTypes : uint64_t
{
    FiFo_SingleProducerSingleConsumer = 0,
    SoFi_SingleProducerSingleConsumer = 1,
    FiFo_MultiProducerSingleConsumer = 2,
    SoFi_MultiProducerSingleConsumer = 3
};

template 
class VariantQueue
{
public:
    using fifo_t = variant,
                           concurrent::SoFi,
                           concurrent::ResizeableLockFreeQueue,
                           concurrent::ResizeableLockFreeQueue>;
    // ...

private:
    VariantQueueTypes m_type;
    fifo_t m_fifo;
};

fifo_t �Ƕ��еײ�ṹ���ͣ������� concurrent::FiFo �� concurrent::SoFi �� concurrent::ResizeableLockFreeQueue ֮һ������ʹ����һ�֣���ö��ֵ m_type ȷ�����������ڲ��������������ݽṹ��

template 
struct NonZeroedBuffer
{
    struct alignas(ElementType) element_t
    {
        cxx::byte_t data[sizeof(ElementType)];
    };
    element_t value[Capacity];
};

������һ�ṹ���ʾ���һ�����飬��Ԫ����������ΪElement��

2 �����ڴ��ȡ

��������ǰ��Ӧ�ó���������Ҫ�Ȼ�ȡһ����ʴ�С��Chunk��������д�����ݣ�Ȼ�������Ϣ���ͽӿڽ��з��͡�

2.1 PublisherImpl::loan

ְ��

��ȡһ�鹲���ڴ棬�����ù��캯�����г�ʼ����

���

args��ģ���Σ����ڵ��ô������͵Ĺ��캯����Ҳ���Բ�����

���أ�

Sample����ʵ���������Ƕ��û��ɲ����Ĺ����ڴ�εķ�װ��

template 
template 
inline cxx::expected, AllocationError>
PublisherImpl::loan(Args&&... args) noexcept
{
    return std::move(loanSample().and_then([&](auto& sample) { new (sample.get()) T(std::forward(args)...); }));
}

������������

���ȵ���loanSample������ȡ�����ڴ棬Ȼ����ù��캯�����г�ʼ��������ʹ��Placement new�﷨����Ҫָ�����ǣ�loanSample���ص��ǽ����ڴ���û����ݵ��׵�ַ��������Chunk���׵�ַ��

2.2 PublisherImpl::loanSample

ְ��

���乲���ڴ棬������ת��ΪSample���ͣ������ء�

���أ�

Sample����ʵ����

template 
inline cxx::expected, AllocationError> PublisherImpl::loanSample() noexcept
{
    static constexpr uint32_t USER_HEADER_SIZE{std::is_same::value ? 0U : sizeof(H)};

    auto result = port().tryAllocateChunk(sizeof(T), alignof(T), USER_HEADER_SIZE, alignof(H));
    if (result.has_error())
    {
        return cxx::error(result.get_error());
    }
    else
    {
        return cxx::success>(convertChunkHeaderToSample(result.value()));
    }
}

������������

���ȵ��� tryAllocateChunk ���һ�鹲���ڴ棬������Sampleʵ����

2.3 PublisherPortUser::tryAllocateChunk

ְ��

���乲���ڴ棬������ת��ΪSample���ͣ������ء�

���

4�����ڼ������蹲���ڴ��С�IJ��������ﲻչ�������ˡ�

����ֵ��

�����ڴ��׵�ַ������Ϊ ChunkHeader * ���� 4.1 Chunk�����ṹ ��

cxx::expected
PublisherPortUser::tryAllocateChunk(const uint32_t userPayloadSize,
                                    const uint32_t userPayloadAlignment,
                                    const uint32_t userHeaderSize,
                                    const uint32_t userHeaderAlignment) noexcept
{
    return m_chunkSender.tryAllocate(
        getUniqueID(), userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
}

������������

��������ֻ�Ǽ򵥵ص��� ChunkSender �� tryAllocate ������

2.4 ChunkSender::tryAllocate

ְ��

���� MemoryManager�ij�Ա����getChunk �õ������ڴ��������һ��ʹ�õĹ����ڴ�顣

���

ͬ�ϣ��ԣ�

����ֵ��

ָ�����ڴ���׵�ַ��ָ�룬����Ϊ ChunkHeader ��

template 
inline cxx::expected
ChunkSender::tryAllocate(const UniquePortId originId,
                                              const uint32_t userPayloadSize,
                                              const uint32_t userPayloadAlignment,
                                              const uint32_t userHeaderSize,
                                              const uint32_t userHeaderAlignment) noexcept
{
    const auto chunkSettingsResult =
        mepoo::ChunkSettings::create(userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
    if (chunkSettingsResult.has_error())
    {
        return cxx::error(AllocationError::INVALID_PARAMETER_FOR_USER_PAYLOAD_OR_USER_HEADER);
    }

    const auto& chunkSettings = chunkSettingsResult.value();
    const uint32_t requiredChunkSize = chunkSettings.requiredChunkSize();

    auto& lastChunkUnmanaged = getMembers()->m_lastChunkUnmanaged;
    mepoo::ChunkHeader* lastChunkChunkHeader =
        lastChunkUnmanaged.isNotLogicalNullptrAndHasNoOtherOwners() ? lastChunkUnmanaged.getChunkHeader() : nullptr;

    if (lastChunkChunkHeader && (lastChunkChunkHeader->chunkSize() >= requiredChunkSize))
    {
        /* * * * *  �������2-4-1���������һ�η���Ĺ����ڴ�  * * * * */
    }
    else
    {
        /* * * * *  �������2-4-2������һ���µ�δʹ�õĹ����ڴ� * * * * */
    }
}

���������

  • LINE 09 �� LINE 17�� �������蹲���ڴ��С��

  • LINE 19 �� LINE 30�� �ж����һ�η���Ĺ����ڴ���Ƿ����ж����߶��Ѷ�ȡ�����Ҵ�С���������С���������һ�η���Ĺ����ڴ�飬�����·��乲���ڴ�顣

�����2-4-1���������һ�η���Ĺ����ڴ�

auto sharedChunk = lastChunkUnmanaged.cloneToSharedChunk();
if (getMembers()->m_chunksInUse.insert(sharedChunk))
{
    auto chunkSize = lastChunkChunkHeader->chunkSize();
    lastChunkChunkHeader->~ChunkHeader();
    new (lastChunkChunkHeader) mepoo::ChunkHeader(chunkSize, chunkSettings);
    lastChunkChunkHeader->setOriginId(originId);
    return cxx::success(lastChunkChunkHeader);
}
else
{
    return cxx::error(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
}

������������

�������ʹ�õĹ����ڴ��δ��������룬������֮ǰ�����ݣ�ͬʱ������ڴ��Ϲ����µ� ChunkHeader �����򷵻ش���

�����2-4-2������һ���µ�δʹ�õĹ����ڴ�

auto getChunkResult = getMembers()->m_memoryMgr->getChunk(chunkSettings);

if (!getChunkResult.has_error())
{
    auto& chunk = getChunkResult.value();

    // if the application allocated too much chunks, return no more chunks
    if (getMembers()->m_chunksInUse.insert(chunk))
    {
        // END of critical section
        chunk.getChunkHeader()->setOriginId(originId);
        return cxx::success(chunk.getChunkHeader());
    }
    else
    {
        // release the allocated chunk
        chunk = nullptr;
        return cxx::error(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
    }
}
else
{
    /// @todo iox-#1012 use cxx::error::from(E1); once available
    return cxx::error(cxx::into(getChunkResult.get_error()));
}

������������

����MemoryManager�ij�Ա����getChunk��ȡ�����ڴ�飬�����ȡ�ɹ����������� m_chunksInUse �������ȡʧ�ܻ������������򷵻ػ�ȡʧ�ܣ���ʱ����RAIIԭ���� SharedChunk �������������Զ��������ڴ�鷵���� MemPool ��

m_chunksInUse �ڲ���װ������Ԫ�ص�����Ϊ������ ��һƪ���� �н��ܵ� ShmSafeUnmanagedChunk ��������Ͳ��������ü�����Ϊʲô�˳������򲻻ᱻ������

ΪʲôҪ�� m_chunksInUse ���飿ԭ�����£����ǿ��� tryAllocate ���ص�����Ϣ�ڴ���ָ�룬����Ϣ���͵�ʱ����Ҫʹ�� SharedChunk �������޷���ǰ��ת��Ϊ���ߡ����ԣ��˴��������飬��Ϣ���ͺ�����ͨ����Ϣ�ڴ���ָ����Ҷ�Ӧ����Ԫ�أ��ָ��� SharedChunk ʵ���� �����3.3 ��

3 ��Ϣ�����߼�

����������Ϣ����������Ϣ�����ṹ ShmSafeUnmanagedChunk ��

3.1 PublisherImpl::publish

ְ��

�ϲ�Ӧ�ó�����ô˷���������Ϣ��

���

sample ���û��������ݵķ�װʵ����

template 
inline void PublisherImpl::publish(Sample&& sample) noexcept
{
    auto userPayload = sample.release(); // release the Samples ownership of the chunk before publishing
    auto chunkHeader = mepoo::ChunkHeader::fromUserPayload(userPayload);
    port().sendChunk(chunkHeader);
}

������������

��������� sample ��ȡ���û���������ָ�룬�ݴ˼��� Chunk �׵�ַ��Ȼ����� sendChunk ���з��͡�

�����û���������ָ����� Chunk �׵�ַ��ʵ���Ǽ�ȥһ��ƫ������������㷽�����£�

ChunkHeader* ChunkHeader::fromUserPayload(void* const userPayload) noexcept
{
    if (userPayload == nullptr)
    {
        return nullptr;
    }
    uint64_t userPayloadAddress = reinterpret_cast(userPayload);
    auto backOffset = reinterpret_cast(userPayloadAddress - sizeof(UserPayloadOffset_t));
    return reinterpret_cast(userPayloadAddress - *backOffset);
}

����ƫ�Ʒ���payload֮ǰ������ *backOffset ��

3.2 PublisherPortUser::sendChunk

ְ��

�����û����ݡ�

���

chunkHeader �� ChunkHeader ���͵�ָ�룬 Chunk ���׵�ַ��

void PublisherPortUser::sendChunk(mepoo::ChunkHeader* const chunkHeader) noexcept
{
    const auto offerRequested = getMembers()->m_offeringRequested.load(std::memory_order_relaxed);

    if (offerRequested)
    {
        m_chunkSender.send(chunkHeader);
    }
    else
    {
        m_chunkSender.pushToHistory(chunkHeader);
    }
}

������������

3.3 ChunkSender::send

ְ��

�����û����ݡ�

���

chunkHeader �� ChunkHeader ָ�룬 Chunk ���׵�ַ��

template 
inline uint64_t ChunkSender::send(mepoo::ChunkHeader* const chunkHeader) noexcept
{
    uint64_t numberOfReceiverTheChunkWasDelivered{0};
    mepoo::SharedChunk chunk(nullptr);
    // BEGIN of critical section, chunk will be lost if the process terminates in this section
    if (getChunkReadyForSend(chunkHeader, chunk))
    {
        numberOfReceiverTheChunkWasDelivered = this->deliverToAllStoredQueues(chunk);

        getMembers()->m_lastChunkUnmanaged.releaseToSharedChunk();
        getMembers()->m_lastChunkUnmanaged = chunk;
    }
    // END of critical section

    return numberOfReceiverTheChunkWasDelivered;
}

���������

  • LINE 05 �� LINE 07�� ���� chunkHeader ָ��� m_chunksInUse ���飬�ָ� SharedChunk ʵ����

  • LINE 09 �� LINE 09�� ���û���ij�Ա���� deliverToAllStoredQueues ������з��ͣ����룩��Ϣ��

  • LINE 11 �� LINE 12�� ���� m_lastChunkUnmanaged ʵ�������������ܡ�

3.4 ChunkDistributor::deliverToAllStoredQueues

template 
inline uint64_t ChunkDistributor::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
{
    uint64_t numberOfQueuesTheChunkWasDeliveredTo{0U};
    typename ChunkDistributorDataType::QueueContainer_t remainingQueues;

    /* * * * *  �������3-3-1������з�����Ϣ��ʧ����remainingQueues  * * * * */

    /* * * * *  �������3-3-2������ʧ�ܵIJ��ϳ������·���  * * * * */

    addToHistoryWithoutDelivery(chunk);

    return numberOfQueuesTheChunkWasDeliveredTo;
}

������������

�ⲿ��û��ʲô���ݣ���Ҫʵ���ڴ����3-3-1�ʹ����3-3-2��

�����3-3-1��

{
    {
    typename MemberType_t::LockGuard_t lock(*getMembers());

    bool willWaitForConsumer = getMembers()->m_consumerTooSlowPolicy == ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER;
    // send to all the queues
    for (auto& queue : getMembers()->m_queues)
    {
        bool isBlockingQueue = (willWaitForConsumer && queue->m_queueFullPolicy == QueueFullPolicy::BLOCK_PRODUCER);

        if (pushToQueue(queue.get(), chunk))
        {
            ++numberOfQueuesTheChunkWasDeliveredTo;
        }
        else
        {
            if (isBlockingQueue)
            {
                remainingQueues.emplace_back(queue);
            }
            else
            {
                ++numberOfQueuesTheChunkWasDeliveredTo;
                ChunkQueuePusher_t(queue.get()).lostAChunk();
            }
        }
    }
}

������������

��δ����������DZ������ж����߶��У����� pushToQueue ����Ϣ����������Ϣ��ʵ����Ϣ���͡�������Ϣ���еij��������޵ģ�������ڶ����ߴ����ٶ�̫������������Ӧ����ô�������������ã�����ѡ������Ӧ�Բ��ԣ�

  • �������������LINE 17 �� LINE 20������������Щ���в��ϳ��Է��ͣ�ֱ�����ж������ͳɹ����������3-3-2��

  • �����б��Ϊ ����Ϣ��ʧ ��LINE 22 �� LINE 25����

template 
inline void ChunkQueuePusher::lostAChunk() noexcept
{
    getMembers()->m_queueHasLostChunks.store(true, std::memory_order_relaxed);
}

�����3-3-2�����ϳ��Է��ͣ�ֱ��������Ϣ���ͳɹ�

cxx::internal::adaptive_wait adaptiveWait;
while (!remainingQueues.empty())
{
    adaptiveWait.wait();
    {
        typename MemberType_t::LockGuard_t lock(*getMembers());

        /* * * * *  �������3-3-3�����Ծ������  * * * * */

        for (uint64_t i = remainingQueues.size() - 1U; !remainingQueues.empty(); --i)
        {
            if (pushToQueue(remainingQueues[i].get(), chunk))
            {
                remainingQueues.erase(remainingQueues.begin() + i);
                ++numberOfQueuesTheChunkWasDeliveredTo;
            }

            if (i == 0U)
            {
                break;
            }
        }
    }
}

������������

�ⲿ�ִ�����Ƕ�ʣ��δ���ͳɹ��Ķ��н������·��ͣ�ֱ�����ж��з��ͳɹ���ÿ�ֳ����м��ʹ��yield��sleep�����ȴ�һ��ʱ�䣬���ⲻ��Ҫ�������˷ѡ�ͬʱ�����͹����У������뵱ǰ��Ծ�����󽻣����£�

�����3-3-3�����Ծ������

typename ChunkDistributorDataType::QueueContainer_t queueIntersection(remainingQueues.size());
auto greaterThan = [](memory::RelativePointer& a,
                  memory::RelativePointer& b) -> bool {
return reinterpret_cast(a.get()) > reinterpret_cast(b.get());
};
std::sort(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), greaterThan);
std::sort(remainingQueues.begin(), remainingQueues.end(), greaterThan);

auto iter = std::set_intersection(getMembers()->m_queues.begin(),
                              getMembers()->m_queues.end(),
                              remainingQueues.begin(),
                              remainingQueues.end(),
                              queueIntersection.begin(),
                              greaterThan);
queueIntersection.resize(static_cast(iter - queueIntersection.begin()));
remainingQueues = queueIntersection;

������������

������δ��������� remainingQueues �͵�ǰ��Ծ���� m_queues ���������ⷢ������ѭ���� set_intersection ��C++��׼�⺯��������� https://en.cppreference.com/w/cpp/algorithm/set_intersection

���ˣ���Ϣ���͵����̷�����ϡ�

4 ��

���Ľ�������Ϣ�����߻�ȡ�����ڴ��ͷ����߼������Ľ�������Ϣ�����ߵĽ����߼���

С���Ƽ��Ķ�

�������������Ľ�Ϊ������Ϣ����������������ͬ���޹۵��֤ʵ��������

�����Ƶ����

����

ͬ������

����

ɨ��ά�����������ֻ��汾��

ɨ��ά����������΢�Ź��ںţ�

��վ�������������������ϴ��������ַ���İ�Ȩ���뷢�ʼ�[email protected]

��ICP��2022002427��-10 �湫��������43070202000427��© 2013~2025 haote.com ������