����λ�ã���ҳ > �����̳� > �̳� > iceoryxԴ���Ķ����ģ����������ڴ�ͨ�ţ�����
Ŀ¼0 ����1 �������ݽṹ2 �����ڴ��ȡ2.1 PublisherImpl::loan2.2 PublisherImpl::loanSample2.3 PublisherPortUser::tryAllocateChunk2.4 ChunkSender::tryAllocate3 ��Ϣ�����߼�3
iceoryxԴ���Ķ���һ������ȫ�ָ���
iceoryxԴ���Ķ����������������ڴ����
iceoryxԴ���Ķ����������������ڴ������һ��
iceoryxԴ���Ķ����ģ����������ڴ�ͨ�ţ�����
iceoryxԴ���Ķ����壩���������ڴ�ͨ�ţ�����
iceoryxԴ���Ķ����������������ڴ洴��
iceoryxԴ���Ķ����ߣ����������ֻ���
iceoryxԴ���Ķ����ˣ�����IPCͨ�Ż���
�����Ķ��빲���ڴ�ͨ����ص��߼������������Ȼ�ȡһ�鹲���ڴ棬������д�����ݣ�Ȼ������Ϣ������������Ϣ�������ݣ������ߴ���Ϣ�����ж�ȡ��Ϣ�������ݡ����Ĵ��ķ�����н�����������ݽṹ�������ڴ��ȡ����Ϣ�����߼�����Ϣ�����߼���
����ǰ��֪��������Ԫ��Ϊ
ShmSafeUnmanagedChunk
�����д�ŵ���
ChunkManagement
���ڹ����ڴ�ε�id����Ըù����ڴ��׵�ַ��ƫ�ƣ�����������ʾ��
��Ϣ���������´��붨�壺
struct ChunkQueueData : public LockingPolicy
{
// ...
static constexpr uint64_t MAX_CAPACITY = ChunkQueueDataProperties_t::MAX_QUEUE_CAPACITY;
cxx::VariantQueue m_queue;
// ...
};
struct ChunkDistributorData : public LockingPolicy
{
// ...
using QueueContainer_t =
cxx::vector, ChunkDistributorDataProperties_t::MAX_QUEUES>;
QueueContainer_t m_queues;
// ...
};
struct ChunkReceiverData : public ChunkQueueDataType
{
// ...
};
ChunkDistributorData
�Ƿ����������еĶ������ݽṹ������һ�������߻�ַ���������Ķˣ����Գ��ж�����С�
ChunkReceiverData
�Ƕ����ߵ���������̳���
ChunkQueueData
���ڲ�ֻ��һ�����У�����Ԫ������Ϊ
ShmSafeUnmanagedChunk
��
���������У��������ݽṹ������Ϊ
cxx::VariantQueue
��������������һ���䳤���飬��ʵ��������һ���������飬������������ݽṹ���壺
enum class VariantQueueTypes : uint64_t
{
FiFo_SingleProducerSingleConsumer = 0,
SoFi_SingleProducerSingleConsumer = 1,
FiFo_MultiProducerSingleConsumer = 2,
SoFi_MultiProducerSingleConsumer = 3
};
template
class VariantQueue
{
public:
using fifo_t = variant,
concurrent::SoFi,
concurrent::ResizeableLockFreeQueue,
concurrent::ResizeableLockFreeQueue>;
// ...
private:
VariantQueueTypes m_type;
fifo_t m_fifo;
};
fifo_t
�Ƕ��еײ�ṹ���ͣ�������
concurrent::FiFo
��
concurrent::SoFi
��
concurrent::ResizeableLockFreeQueue
֮һ������ʹ����һ�֣���ö��ֵ
m_type
ȷ�����������ڲ��������������ݽṹ��
template
struct NonZeroedBuffer
{
struct alignas(ElementType) element_t
{
cxx::byte_t data[sizeof(ElementType)];
};
element_t value[Capacity];
};
������һ�ṹ���ʾ���һ�����飬��Ԫ����������ΪElement��
��������ǰ��Ӧ�ó���������Ҫ�Ȼ�ȡһ����ʴ�С��Chunk��������д�����ݣ�Ȼ�������Ϣ���ͽӿڽ��з��͡�
ְ��
��ȡһ�鹲���ڴ棬�����ù��캯�����г�ʼ����
���
args��ģ���Σ����ڵ��ô������͵Ĺ��캯����Ҳ���Բ�����
���أ�
Sample����ʵ���������Ƕ��û��ɲ����Ĺ����ڴ�εķ�װ��
template
template
inline cxx::expected, AllocationError>
PublisherImpl::loan(Args&&... args) noexcept
{
return std::move(loanSample().and_then([&](auto& sample) { new (sample.get()) T(std::forward(args)...); }));
}
������������
���ȵ���loanSample������ȡ�����ڴ棬Ȼ����ù��캯�����г�ʼ��������ʹ��Placement new�﷨����Ҫָ�����ǣ�loanSample���ص��ǽ����ڴ���û����ݵ��׵�ַ��������Chunk���׵�ַ��
ְ��
���乲���ڴ棬������ת��ΪSample���ͣ������ء�
���أ�
Sample����ʵ����
template
inline cxx::expected, AllocationError> PublisherImpl::loanSample() noexcept
{
static constexpr uint32_t USER_HEADER_SIZE{std::is_same::value ? 0U : sizeof(H)};
auto result = port().tryAllocateChunk(sizeof(T), alignof(T), USER_HEADER_SIZE, alignof(H));
if (result.has_error())
{
return cxx::error(result.get_error());
}
else
{
return cxx::success>(convertChunkHeaderToSample(result.value()));
}
}
������������
���ȵ���
tryAllocateChunk
���һ�鹲���ڴ棬������Sampleʵ����
ְ��
���乲���ڴ棬������ת��ΪSample���ͣ������ء�
���
4�����ڼ������蹲���ڴ��С�IJ��������ﲻչ�������ˡ�
����ֵ��
�����ڴ��׵�ַ������Ϊ
ChunkHeader *
����
4.1 Chunk�����ṹ
��
cxx::expected
PublisherPortUser::tryAllocateChunk(const uint32_t userPayloadSize,
const uint32_t userPayloadAlignment,
const uint32_t userHeaderSize,
const uint32_t userHeaderAlignment) noexcept
{
return m_chunkSender.tryAllocate(
getUniqueID(), userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
}
������������
��������ֻ�Ǽ򵥵ص���
ChunkSender
��
tryAllocate
������
ְ��
���� MemoryManager�ij�Ա����getChunk �õ������ڴ��������һ��ʹ�õĹ����ڴ�顣
���
ͬ�ϣ��ԣ�
����ֵ��
ָ�����ڴ���׵�ַ��ָ�룬����Ϊ
ChunkHeader
��
template
inline cxx::expected
ChunkSender::tryAllocate(const UniquePortId originId,
const uint32_t userPayloadSize,
const uint32_t userPayloadAlignment,
const uint32_t userHeaderSize,
const uint32_t userHeaderAlignment) noexcept
{
const auto chunkSettingsResult =
mepoo::ChunkSettings::create(userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
if (chunkSettingsResult.has_error())
{
return cxx::error(AllocationError::INVALID_PARAMETER_FOR_USER_PAYLOAD_OR_USER_HEADER);
}
const auto& chunkSettings = chunkSettingsResult.value();
const uint32_t requiredChunkSize = chunkSettings.requiredChunkSize();
auto& lastChunkUnmanaged = getMembers()->m_lastChunkUnmanaged;
mepoo::ChunkHeader* lastChunkChunkHeader =
lastChunkUnmanaged.isNotLogicalNullptrAndHasNoOtherOwners() ? lastChunkUnmanaged.getChunkHeader() : nullptr;
if (lastChunkChunkHeader && (lastChunkChunkHeader->chunkSize() >= requiredChunkSize))
{
/* * * * * �������2-4-1���������һ�η���Ĺ����ڴ� * * * * */
}
else
{
/* * * * * �������2-4-2������һ���µ�δʹ�õĹ����ڴ� * * * * */
}
}
���������
LINE 09 �� LINE 17�� �������蹲���ڴ��С��
LINE 19 �� LINE 30�� �ж����һ�η���Ĺ����ڴ���Ƿ����ж����߶��Ѷ�ȡ�����Ҵ�С���������С���������һ�η���Ĺ����ڴ�飬�����·��乲���ڴ�顣
�����2-4-1���������һ�η���Ĺ����ڴ�
auto sharedChunk = lastChunkUnmanaged.cloneToSharedChunk();
if (getMembers()->m_chunksInUse.insert(sharedChunk))
{
auto chunkSize = lastChunkChunkHeader->chunkSize();
lastChunkChunkHeader->~ChunkHeader();
new (lastChunkChunkHeader) mepoo::ChunkHeader(chunkSize, chunkSettings);
lastChunkChunkHeader->setOriginId(originId);
return cxx::success(lastChunkChunkHeader);
}
else
{
return cxx::error(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
}
������������
�������ʹ�õĹ����ڴ��δ��������룬������֮ǰ�����ݣ�ͬʱ������ڴ��Ϲ����µ�
ChunkHeader
�����򷵻ش���
�����2-4-2������һ���µ�δʹ�õĹ����ڴ�
auto getChunkResult = getMembers()->m_memoryMgr->getChunk(chunkSettings);
if (!getChunkResult.has_error())
{
auto& chunk = getChunkResult.value();
// if the application allocated too much chunks, return no more chunks
if (getMembers()->m_chunksInUse.insert(chunk))
{
// END of critical section
chunk.getChunkHeader()->setOriginId(originId);
return cxx::success(chunk.getChunkHeader());
}
else
{
// release the allocated chunk
chunk = nullptr;
return cxx::error(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
}
}
else
{
/// @todo iox-#1012 use cxx::error::from(E1); once available
return cxx::error(cxx::into(getChunkResult.get_error()));
}
������������
����MemoryManager�ij�Ա����getChunk��ȡ�����ڴ�飬�����ȡ�ɹ�����������
m_chunksInUse
�������ȡʧ�ܻ������������ò·µ»Ø»ï¿½È¡Ê§ï¿½Ü£ï¿½ï¿½ï¿½Ê±ï¿½ï¿½ï¿½ï¿½RAIIÔ����
SharedChunk
�������������Զ��������ڴ�鷵����
MemPool
��
m_chunksInUse
�ڲ���װ������Ԫ�ص�����Ϊ������
��һƪ����
�н��ܵ�
ShmSafeUnmanagedChunk
��������Ͳ��������ü�����Ϊʲô�˳������򲻻ᱻ������
ΪʲôҪ��
m_chunksInUse
���飿Ô�����£����ǿ���
tryAllocate
���ص�����Ϣ�ڴ���ָ�룬����Ϣ���͵�ʱ����Ҫʹ��
SharedChunk
�������޷���ǰ��ת��Ϊ���ߡ����ԣ��˴��������飬��Ϣ���ͺ�����ͨ����Ϣ�ڴ���ָ����Ҷ�Ӧ����Ԫ�أ��ָ���
SharedChunk
ʵ����
�����3.3
��
����������Ϣ����������Ϣ�����ṹ
ShmSafeUnmanagedChunk
��
ְ��
�ϲ�Ӧ�ó�����ô˷���������Ϣ��
���
sample
���û��������ݵķ�װʵ����
template
inline void PublisherImpl::publish(Sample&& sample) noexcept
{
auto userPayload = sample.release(); // release the Samples ownership of the chunk before publishing
auto chunkHeader = mepoo::ChunkHeader::fromUserPayload(userPayload);
port().sendChunk(chunkHeader);
}
������������
���������
sample
��ȡ���û���������ָ�룬�ݴ˼���
Chunk
�׵�ַ��Ȼ�����
sendChunk
���з��͡�
�����û���������ָ�����
Chunk
�׵�ַ��ʵ���Ǽ�ȥһ��ƫ������������㷽�����£�
ChunkHeader* ChunkHeader::fromUserPayload(void* const userPayload) noexcept
{
if (userPayload == nullptr)
{
return nullptr;
}
uint64_t userPayloadAddress = reinterpret_cast(userPayload);
auto backOffset = reinterpret_cast(userPayloadAddress - sizeof(UserPayloadOffset_t));
return reinterpret_cast(userPayloadAddress - *backOffset);
}
����ƫ�Ʒ���payload֮ǰ������
*backOffset
��
ְ��
�����û����ݡ�
���
chunkHeader
��
ChunkHeader
���͵�ָ�룬
Chunk
���׵�ַ��
void PublisherPortUser::sendChunk(mepoo::ChunkHeader* const chunkHeader) noexcept
{
const auto offerRequested = getMembers()->m_offeringRequested.load(std::memory_order_relaxed);
if (offerRequested)
{
m_chunkSender.send(chunkHeader);
}
else
{
m_chunkSender.pushToHistory(chunkHeader);
}
}
������������
ְ��
�����û����ݡ�
���
chunkHeader
��
ChunkHeader
ָ�룬
Chunk
���׵�ַ��
template
inline uint64_t ChunkSender::send(mepoo::ChunkHeader* const chunkHeader) noexcept
{
uint64_t numberOfReceiverTheChunkWasDelivered{0};
mepoo::SharedChunk chunk(nullptr);
// BEGIN of critical section, chunk will be lost if the process terminates in this section
if (getChunkReadyForSend(chunkHeader, chunk))
{
numberOfReceiverTheChunkWasDelivered = this->deliverToAllStoredQueues(chunk);
getMembers()->m_lastChunkUnmanaged.releaseToSharedChunk();
getMembers()->m_lastChunkUnmanaged = chunk;
}
// END of critical section
return numberOfReceiverTheChunkWasDelivered;
}
���������
LINE 05 �� LINE 07��
����
chunkHeader
ָ���
m_chunksInUse
���飬�ָ�
SharedChunk
ʵ����
LINE 09 �� LINE 09��
���û���ij�Ա����
deliverToAllStoredQueues
������з��ͣ����룩��Ϣ��
LINE 11 �� LINE 12��
����
m_lastChunkUnmanaged
ʵ�������������ܡ�
template
inline uint64_t ChunkDistributor::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
{
uint64_t numberOfQueuesTheChunkWasDeliveredTo{0U};
typename ChunkDistributorDataType::QueueContainer_t remainingQueues;
/* * * * * �������3-3-1������з�����Ϣ��ʧ����remainingQueues * * * * */
/* * * * * �������3-3-2������ʧ�ܵIJ��ϳ������·��� * * * * */
addToHistoryWithoutDelivery(chunk);
return numberOfQueuesTheChunkWasDeliveredTo;
}
������������
�ⲿ��û��ʲô���ݣ���Ҫʵ���ڴ����3-3-1�ʹ����3-3-2��
�����3-3-1��
{
{
typename MemberType_t::LockGuard_t lock(*getMembers());
bool willWaitForConsumer = getMembers()->m_consumerTooSlowPolicy == ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER;
// send to all the queues
for (auto& queue : getMembers()->m_queues)
{
bool isBlockingQueue = (willWaitForConsumer && queue->m_queueFullPolicy == QueueFullPolicy::BLOCK_PRODUCER);
if (pushToQueue(queue.get(), chunk))
{
++numberOfQueuesTheChunkWasDeliveredTo;
}
else
{
if (isBlockingQueue)
{
remainingQueues.emplace_back(queue);
}
else
{
++numberOfQueuesTheChunkWasDeliveredTo;
ChunkQueuePusher_t(queue.get()).lostAChunk();
}
}
}
}
������������
��δ����������DZ������ж����߶��У�����
pushToQueue
����Ϣ����������Ϣ��ʵ����Ϣ���͡�������Ϣ���еij��������޵ģ�������ڶ����ߴ����ٶ�̫������������Ӧ����ô�������������ã�����ѡ������Ӧ�Բ��ԣ�
�����б���������LINE 17 �� LINE 20������������Щ���в��ϳ��Է��ͣ�ֱ�����ж������ͳɹ����������3-3-2��
�����б��Ϊ ����Ϣ��ʧ ��LINE 22 �� LINE 25����
template
inline void ChunkQueuePusher::lostAChunk() noexcept
{
getMembers()->m_queueHasLostChunks.store(true, std::memory_order_relaxed);
}
�����3-3-2�����ϳ��Է��ͣ�ֱ��������Ϣ���ͳɹ�
cxx::internal::adaptive_wait adaptiveWait;
while (!remainingQueues.empty())
{
adaptiveWait.wait();
{
typename MemberType_t::LockGuard_t lock(*getMembers());
/* * * * * �������3-3-3�����Ծ������ * * * * */
for (uint64_t i = remainingQueues.size() - 1U; !remainingQueues.empty(); --i)
{
if (pushToQueue(remainingQueues[i].get(), chunk))
{
remainingQueues.erase(remainingQueues.begin() + i);
++numberOfQueuesTheChunkWasDeliveredTo;
}
if (i == 0U)
{
break;
}
}
}
}
������������
�ⲿ�ִ�����Ƕ�ʣ��δ���ͳɹ��Ķ��н������·��ͣ�ֱ�����ж��з��ͳɹ���ÿ�ֳ����м��ʹ��yield��sleep�����ȴ�һ��ʱ�䣬���ⲻ��Ҫ�������˷ѡ�ͬʱ�����͹����У������뵱ǰ��Ծ�����󽻣����£�
�����3-3-3�����Ծ������
typename ChunkDistributorDataType::QueueContainer_t queueIntersection(remainingQueues.size());
auto greaterThan = [](memory::RelativePointer& a,
memory::RelativePointer& b) -> bool {
return reinterpret_cast(a.get()) > reinterpret_cast(b.get());
};
std::sort(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), greaterThan);
std::sort(remainingQueues.begin(), remainingQueues.end(), greaterThan);
auto iter = std::set_intersection(getMembers()->m_queues.begin(),
getMembers()->m_queues.end(),
remainingQueues.begin(),
remainingQueues.end(),
queueIntersection.begin(),
greaterThan);
queueIntersection.resize(static_cast(iter - queueIntersection.begin()));
remainingQueues = queueIntersection;
������������
��������������
remainingQueues
�͵�ǰ��Ծ����
m_queues
���������ⷢ������Ñ����
set_intersection
��C++��׼�⺯���������
https://en.cppreference.com/w/cpp/algorithm/set_intersection
���ˣ���Ϣ���͵����̷�����ϡ�
���Ľ�������Ϣ�����߻�ȡ�����ڴ��ͷ����߼������Ľ�������Ϣ�����ߵĽ����߼���
ʹ��Blender���ɳ���ģ��
�Ķ�ȫ����������ERA5�����ط���
�Ķ�Xpath���������﷨
�Ķ�����ѧϰ�������繹�����£�
�Ķ���ΪMateƷ��ʢ�䣺HarmonyOS NEXT�ӳ�����Ϸ���ܵõ�����ͷ�
�Ķ�ʵ�ֶ��󼯺���DataTable���໥ת��
�Ķ�Ӳ�̵Ļ���֪ʶ��ѡ��ָ��
�Ķ�������й��ƶ��ı�ͼ��ײ�
�Ķ�����NEXTԪ�����������ѿ����ϼ���Ʒ
�Ķ��ᳲ���С������������Ƽ��رշ���
�Ķ������ArcMap�����н���դ��ͼ���ز�������
�Ķ��㷨�����ݽṹ 1 - ģ��
�Ķ���Ѷ�����߿ͷ���Ӫ��ϵͳ����
�Ķ���Ѷ��Ƶҹ��ģʽ���ý̳�
�Ķ����ں���NEXT��Ѫ���Ŵ���������������
�Ķ�5. Spring Cloud OpenFeign ����ʽ WebService �ͻ��˵ij���ϸʹ��
�Ķ�Java����ģʽ����̬�����Ͷ�̬�����ĶԱȷ���
�Ķ�Win11�ʼDZ����Զ�����Ӧ�õ���ɫ����ʾ����
�Ķ�˼�� V1.5.6 ��׿��
��ս�귨 V7.5.0 ��׿��
У��������������׵������� V1.0 ��׿��
��˸֮�� V1.9.7 ��׿��
������Ե����� v1.0.4 ��׿��
������֮ŠV5.2.3 ��׿��
��������������Դ V1.0 ��׿��
���֮Ϣ V1.0 ��׿��
��ħ������������䣩 V1.0 ��׿��
���ں�������ϵ�����������������վ�����������������Ƽ�����
Ƶ�� ����Ƶ��������ר������������׿�������app����
�Ƽ� ��Ô���������°��������ܿ������ز���
���� ����ɫ������������ ���������ս������������
ɨ��ά�����������ֻ��汾��
ɨ��ά����������΢�Ź��ںţ�
��վ�������������������ϴ��������ַ���İ�Ȩ���뷢�ʼ�[email protected]
��ICP��2022002427��-10 �湫��������43070202000427��© 2013~2025 haote.com ������