您现在的位置是:亿华云 > 系统运维
鸿蒙内核源码分析(消息队列篇) | 进程间如何异步解耦传递大数据
亿华云2025-10-09 08:44:33【系统运维】6人已围观
简介想了解更多内容,请访问:和华为官方合作共建的鸿蒙技术社区https://harmonyos.51cto.com基本概念● 队列又称消息队列,是一种常用于任务间通信的数据结构。队列接收来自任务或中断的不
想了解更多内容,鸿蒙何异请访问:
和华为官方合作共建的内核鸿蒙技术社区
https://harmonyos.51cto.com
基本概念
● 队列又称消息队列,是源码一种常用于任务间通信的数据结构。队列接收来自任务或中断的分析不固定长度消息,并根据不同的消息接口确定传递的消息是否存放在队列空间中。
● 任务能够从队列里面读取消息,队列递当队列中的篇进消息为空时,挂起读取任务;当队列中有新消息时,程间挂起的步解读取任务被唤醒并处理新消息。任务也能够往队列里写入消息,耦传当队列已经写满消息时,数据挂起写入任务;当队列中有空闲消息节点时,鸿蒙何异挂起的内核写入任务被唤醒并写入消息。如果将读队列和写队列的源码超时时间设置为0,则不会挂起任务,分析接口会直接返回,这就是非阻塞模式。
● 消息队列提供了异步处理机制,允许将一个消息放入队列,但不立即处理。同时队列还有缓冲消息的作用。企商汇
● 队列用于任务间通信,可以实现消息的异步处理。同时消息的发送方和接收方不需要彼此联系,两者间是解耦的。
队列特性
● 消息以先进先出的方式排队,支持异步读写。
● 读队列和写队列都支持超时机制。
● 每读取一条消息,就会将该消息节点设置为空闲。
● 发送消息类型由通信双方约定,可以允许不同长度(不超过队列的消息节点大小)的消息。
● 一个任务能够从任意一个消息队列接收和发送消息。
● 多个任务能够从同一个消息队列接收和发送消息。
● 创建队列时所需的队列空间,默认支持接口内系统自行动态申请内存的方式,同时也支持将用户分配的队列空间作为接口入参传入的方式。
消息队列长什么样?
#ifndef LOSCFG_BASE_IPC_QUEUE_LIMIT #define LOSCFG_BASE_IPC_QUEUE_LIMIT 1024 //队列个数 #endif LITE_OS_SEC_BSS LosQueueCB *g_allQueue = NULL;//消息队列池 LITE_OS_SEC_BSS STATIC LOS_DL_LIST g_freeQueueList;//空闲队列链表,管分配的,需要队列从这里申请 typedef struct { UINT8 *queueHandle; /**< Pointer to a queue handle */ //指向队列句柄的指针 UINT16 queueState; /**< Queue state */ //队列状态 UINT16 queueLen; /**< Queue length */ //队列中消息总数的上限值,由创建时确定,不再改变 UINT16 queueSize; /**< Node size */ //消息节点大小,由创建时确定,不再改变,即定义了每个消息长度的上限. UINT32 queueID; /**< queueID */ //队列ID UINT16 queueHead; /**< Node head */ //消息头节点位置(数组下标) UINT16 queueTail; /**< Node tail */ //消息尾节点位置(数组下标) UINT16 readWriteableCnt[OS_QUEUE_N_RW]; /**< Count of readable or writable resources, 0:readable, 1:writable */ //队列中可写或可读消息数,0表示可读,b2b信息网1表示可写 LOS_DL_LIST readWriteList[OS_QUEUE_N_RW]; /**< the linked list to be read or written, 0:readlist, 1:writelist */ //挂的都是等待读/写消息的任务链表,0表示读消息的链表,1表示写消息的任务链表 LOS_DL_LIST memList; /**< Pointer to the memory linked list */ //@note_why 这里尚未搞明白是啥意思 ,是共享内存吗? } LosQueueCB;//读写队列分离解读
● 和进程,线程,定时器一样,消息队列也由全局统一的消息队列池管理,池有多大?默认是1024
● 鸿蒙内核对消息的总个数有限制,queueLen消息总数的上限,在创建队列的时候需指定,不能更改.
● 对每个消息的长度也有限制, queueSize 规定了消息的大小,也是在创建的时候指定.
● 为啥要指定总个数和每个的总大小,是因为内核一次性会把队列的总内存(queueLen*queueSize)申请下来,确保不会出现后续使用过程中内存不够的问题出现,但同时也带来了内存的浪费,因为很可能大部分时间队列并没有跑满.
● 队列的读取由queueHead,queueTail管理,Head表示队列中被占用的消息节点的起始位置。Tail表示被占用的消息节点的结束位置,也是空闲消息节点的亿华云计算起始位置。队列刚创建时,Head和Tail均指向队列起始位置
● 写队列时,根据readWriteableCnt[1]判断队列是否可以写入,不能对已满(readWriteableCnt[1]为0)队列进行写操作。写队列支持两种写入方式:向队列尾节点写入,也可以向队列头节点写入。尾节点写入时,根据Tail找到起始空闲消息节点作为数据写入对象,如果Tail已经指向队列尾部则采用回卷方式。头节点写入时,将Head的前一个节点作为数据写入对象,如果Head指向队列起始位置则采用回卷方式。
● 读队列时,根据readWriteableCnt[0]判断队列是否有消息需要读取,对全部空闲(readWriteableCnt[0]为0)队列进行读操作会引起任务挂起。如果队列可以读取消息,则根据Head找到最先写入队列的消息节点进行读取。如果Head已经指向队列尾部则采用回卷方式。
● 删除队列时,根据队列ID找到对应队列,把队列状态置为未使用,把队列控制块置为初始状态。如果是通过系统动态申请内存方式创建的队列,还会释放队列所占内存。
● 留意readWriteList,这又是两个双向链表, 双向链表是内核最重要的结构体,牢牢的寄生在宿主结构体上.readWriteList上挂的是未来读/写消息队列的任务列表.
初始化队列
LITE_OS_SEC_TEXT_INIT UINT32 OsQueueInit(VOID)//消息队列模块初始化 { LosQueueCB *queueNode = NULL; UINT32 index; UINT32 size; size = LOSCFG_BASE_IPC_QUEUE_LIMIT * sizeof(LosQueueCB);//支持1024个IPC队列 /* system resident memory, dont free */ g_allQueue = (LosQueueCB *)LOS_MemAlloc(m_aucSysMem0, size);//常驻内存 if (g_allQueue == NULL) { return LOS_ERRNO_QUEUE_NO_MEMORY; } (VOID)memset_s(g_allQueue, size, 0, size);//清0 LOS_ListInit(&g_freeQueueList);//初始化空闲链表 for (index = 0; index < LOSCFG_BASE_IPC_QUEUE_LIMIT; index++) { //循环初始化每个消息队列 queueNode = ((LosQueueCB *)g_allQueue) + index;//一个一个来 queueNode->queueID = index;//这可是 队列的身份证 LOS_ListTailInsert(&g_freeQueueList, &queueNode->readWriteList[OS_QUEUE_WRITE]);//通过写节点挂到空闲队列链表上 }//这里要注意是用 readWriteList 挂到 g_freeQueueList链上的,所以要通过 GET_QUEUE_LIST 来找到 LosQueueCB if (OsQueueDbgInitHook() != LOS_OK) { //调试队列使用的. return LOS_ERRNO_QUEUE_NO_MEMORY; } return LOS_OK; }解读
● 初始队列模块,对几个全局变量赋值,创建消息队列池,所有池都是常驻内存,关于池后续有专门的文章整理,到目前为止已经解除到了进程池,任务池,定时器池,队列池,==
● 将LOSCFG_BASE_IPC_QUEUE_LIMIT个队列挂到空闲链表g_freeQueueList上,供后续分配和回收.熟悉内核全局资源管理的对这种方式应该不会再陌生.
创建队列
//创建一个队列,根据用户传入队列长度和消息节点大小来开辟相应的内存空间以供该队列使用,参数queueID带走队列ID LITE_OS_SEC_TEXT_INIT UINT32 LOS_QueueCreate(CHAR *queueName, UINT16 len, UINT32 *queueID, UINT32 flags, UINT16 maxMsgSize) { LosQueueCB *queueCB = NULL; UINT32 intSave; LOS_DL_LIST *unusedQueue = NULL; UINT8 *queue = NULL; UINT16 msgSize; (VOID)queueName; (VOID)flags; if (queueID == NULL) { return LOS_ERRNO_QUEUE_CREAT_PTR_NULL; } if (maxMsgSize > (OS_NULL_SHORT - sizeof(UINT32))) { // maxMsgSize上限 为啥要减去 sizeof(UINT32) ,因为前面存的是队列的大小 return LOS_ERRNO_QUEUE_SIZE_TOO_BIG; } if ((len == 0) || (maxMsgSize == 0)) { return LOS_ERRNO_QUEUE_PARA_ISZERO; } msgSize = maxMsgSize + sizeof(UINT32);//总size = 消息体内容长度 + 消息大小(UINT32) /* * Memory allocation is time-consuming, to shorten the time of disable interrupt, * move the memory allocation to here. *///内存分配非常耗时,为了缩短禁用中断的时间,将内存分配移到此处,用的时候分配队列内存 queue = (UINT8 *)LOS_MemAlloc(m_aucSysMem1, (UINT32)len * msgSize);//从系统内存池中分配,由这里提供读写队列的内存 if (queue == NULL) { //这里是一次把队列要用到的所有最大内存都申请下来了,能保证不会出现后续使用过程中内存不够的问题出现 return LOS_ERRNO_QUEUE_CREATE_NO_MEMORY;//调用处有 OsSwtmrInit sys_mbox_new DoMqueueCreate == } SCHEDULER_LOCK(intSave); if (LOS_ListEmpty(&g_freeQueueList)) { //没有空余的队列ID的处理,注意软时钟定时器是由 g_swtmrCBArray统一管理的,里面有正在使用和可分配空闲的队列 SCHEDULER_UNLOCK(intSave);//g_freeQueueList是管理可用于分配的队列链表,申请消息队列的ID需要向它要 OsQueueCheckHook(); (VOID)LOS_MemFree(m_aucSysMem1, queue);//没有就要释放 queue申请的内存 return LOS_ERRNO_QUEUE_CB_UNAVAILABLE; } unusedQueue = LOS_DL_LIST_FIRST(&g_freeQueueList);//找到一个没有被使用的队列 LOS_ListDelete(unusedQueue);//将自己从g_freeQueueList中摘除, unusedQueue只是个 LOS_DL_LIST 结点. queueCB = GET_QUEUE_LIST(unusedQueue);//通过unusedQueue找到整个消息队列(LosQueueCB) queueCB->queueLen = len; //队列中消息的总个数,注意这个一旦创建是不能变的. queueCB->queueSize = msgSize;//消息节点的大小,注意这个一旦创建也是不能变的. queueCB->queueHandle = queue; //队列句柄,队列内容存储区. queueCB->queueState = OS_QUEUE_INUSED; //队列状态使用中 queueCB->readWriteableCnt[OS_QUEUE_READ] = 0;//可读资源计数,OS_QUEUE_READ(0):可读. queueCB->readWriteableCnt[OS_QUEUE_WRITE] = len;//可些资源计数 OS_QUEUE_WRITE(1):可写, 默认len可写. queueCB->queueHead = 0;//队列头节点 queueCB->queueTail = 0;//队列尾节点 LOS_ListInit(&queueCB->readWriteList[OS_QUEUE_READ]);//初始化可读队列链表 LOS_ListInit(&queueCB->readWriteList[OS_QUEUE_WRITE]);//初始化可写队列链表 LOS_ListInit(&queueCB->memList);// OsQueueDbgUpdateHook(queueCB->queueID, OsCurrTaskGet()->taskEntry);//在创建或删除队列调试信息时更新任务条目 SCHEDULER_UNLOCK(intSave); *queueID = queueCB->queueID;//带走队列ID return LOS_OK; }解读
● 创建和初始化一个LosQueueCB
● 动态分配内存来保存消息内容,LOS_MemAlloc(m_aucSysMem1, (UINT32)len * msgSize);
● msgSize = maxMsgSize + sizeof(UINT32);头四个字节放消息的长度,但消息最大长度不能超过maxMsgSize
● readWriteableCnt记录读/写队列的数量,独立计算
● readWriteList挂的是等待读取队列的任务链表 将在OsTaskWait(&queueCB->readWriteList[readWrite], timeout, TRUE);中将任务挂到链表上.
关键函数OsQueueOperate
队列的读写都要经过 OsQueueOperate
很赞哦!(22)