队列
队列里有多个消息块,每个消息块大小一致。
写:
- 有空间,成功。
- 无空间:返回Err;等待一段时间。
队列里面会有两个链表:发送链表和接收链表
struct rt_messagequeue
{struct rt_ipc_object parent; /**< inherit from ipc_object */void *msg_pool; /**< start address of message queue */rt_uint16_t msg_size; /**< message size of each message */rt_uint16_t max_msgs; /**< max number of messages */rt_uint16_t entry; /**< index of messages in the queue */void *msg_queue_head; /**< list head */void *msg_queue_tail; /**< list tail */void *msg_queue_free; /**< pointer indicated the free node of queue */rt_list_t suspend_sender_thread; //挂起的发送线程链表
};
struct rt_ipc_object
{struct rt_object parent; /**< inherit from rt_object */rt_list_t suspend_thread; //挂起的接收线程链表
};
线程2读取mq的数据,如果没有数据,愿意等待5个tick。
- 判断队列是否为空?
- 判断是否等待?
- 线程挂起->从ReadyList移除,放入mq->parent->suspend_thread。
- 启动线程自己的定时器
- 被唤醒:(1)其它线程写队列,从mq->parent->suspend_thread取出、唤醒(2)超时
struct rt_messagequeue
{struct rt_ipc_object parent; /**< inherit from ipc_object */void *msg_pool; /**< start address of message queue */rt_uint16_t msg_size; /**< message size of each message */rt_uint16_t max_msgs; /**< max number of messages */rt_uint16_t entry; //队列中消息的索引void *msg_queue_head; /**< list head */void *msg_queue_tail; /**< list tail */void *msg_queue_free; /**< pointer indicated the free node of queue */rt_list_t suspend_sender_thread; /**< sender thread suspended on this message queue */
};
while (mq->entry == 0) //如果消息队列为空{RT_DEBUG_IN_THREAD_CONTEXT;/* reset error number in thread */thread->error = RT_EOK;/* no waiting, return timeout */if (timeout == 0)//不愿意等待{/* enable interrupt */rt_hw_interrupt_enable(temp);thread->error = -RT_ETIMEOUT;return -RT_ETIMEOUT;//立即返回超时错误}/* suspend current thread */rt_ipc_list_suspend(&(mq->parent.suspend_thread),thread,mq->parent.parent.flag); //挂在等待接收链表/* has waiting time, start thread timer */if (timeout > 0){/* get the start tick of timer */tick_delta = rt_tick_get();RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\n",thread->name));/* reset the timeout of thread timer and start it */rt_timer_control(&(thread->thread_timer),RT_TIMER_CTRL_SET_TIME,&timeout);rt_timer_start(&(thread->thread_timer));}/* enable interrupt */rt_hw_interrupt_enable(temp);/* re-schedule */rt_schedule();/* recv message */if (thread->error != RT_EOK){/* return error */return thread->error;}/* disable interrupt */temp = rt_hw_interrupt_disable();/* if it's not waiting forever and then re-calculate timeout tick */if (timeout > 0){tick_delta = rt_tick_get() - tick_delta;timeout -= tick_delta;if (timeout < 0)timeout = 0;}
//挂起线程
rt_err_t rt_thread_suspend(rt_thread_t thread)
{register rt_base_t stat;register rt_base_t temp;/* thread check */RT_ASSERT(thread != RT_NULL);RT_ASSERT(rt_object_get_type((rt_object_t)thread) == RT_Object_Class_Thread);RT_DEBUG_LOG(RT_DEBUG_THREAD, ("thread suspend: %s\n", thread->name));stat = thread->stat & RT_THREAD_STAT_MASK;if ((stat != RT_THREAD_READY) && (stat != RT_THREAD_RUNNING)){RT_DEBUG_LOG(RT_DEBUG_THREAD, ("thread suspend: thread disorder, 0x%2x\n",thread->stat));return -RT_ERROR;}/* disable interrupt */temp = rt_hw_interrupt_disable();if (stat == RT_THREAD_RUNNING){/* not suspend running status thread on other core */RT_ASSERT(thread == rt_thread_self());}/* change thread stat */rt_schedule_remove_thread(thread); //从就绪链表中移出thread->stat = RT_THREAD_SUSPEND | (thread->stat & ~RT_THREAD_STAT_MASK);/* stop thread timer anyway */rt_timer_stop(&(thread->thread_timer));/* enable interrupt */rt_hw_interrupt_enable(temp);RT_OBJECT_HOOK_CALL(rt_thread_suspend_hook, (thread));return RT_EOK;
}
RTM_EXPORT(rt_thread_suspend);
由FLAG决定挂起的位置
rt_inline rt_err_t rt_ipc_list_suspend(rt_list_t *list,struct rt_thread *thread,rt_uint8_t flag)
{/* suspend thread */rt_thread_suspend(thread);switch (flag){case RT_IPC_FLAG_FIFO: //先进先出,按时间排队rt_list_insert_before(list, &(thread->tlist));break;case RT_IPC_FLAG_PRIO://优先级高的优先{struct rt_list_node *n;struct rt_thread *sthread;/* find a suitable position */for (n = list->next; n != list; n = n->next){sthread = rt_list_entry(n, struct rt_thread, tlist);/* find out */if (thread->current_priority < sthread->current_priority){/* insert this thread before the sthread */rt_list_insert_before(&(sthread->tlist), &(thread->tlist));break;}}/** not found a suitable position,* append to the end of suspend_thread list*/if (n == list)rt_list_insert_before(list, &(thread->tlist));}break;}return RT_EOK;
}
/* has waiting time, start thread timer */if (timeout > 0) //超时时间大于0{/* get the start tick of timer */tick_delta = rt_tick_get();RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\n",thread->name));/* reset the timeout of thread timer and start it */rt_timer_control(&(thread->thread_timer),RT_TIMER_CTRL_SET_TIME,&timeout); rt_timer_start(&(thread->thread_timer)); //启动定时器}
rt-schedule();//重新调度,运行其它线程写数据
/* recv message */if (thread->error != RT_EOK) //判断唤醒线程的原因,若是超时错误-ETIMEOUT,则直接返回错误{/* return error */return thread->error;}
写队列
while ((msg = mq->msg_queue_free) == RT_NULL) //队列满了{}
rt_memcpy(msg + 1, buffer, size); //将消息拷贝进Buffer/* resume suspended thread */if (!rt_list_isempty(&mq->parent.suspend_thread)){rt_ipc_list_resume(&(mq->parent.suspend_thread));//唤醒线程/* enable interrupt */rt_hw_interrupt_enable(temp);rt_schedule();return RT_EOK;}
/* disable interrupt */temp = rt_hw_interrupt_disable();/* remove from suspend list */rt_list_remove(&(thread->tlist));rt_timer_stop(&thread->thread_timer);/* enable interrupt */rt_hw_interrupt_enable(temp);/* insert to schedule ready list */rt_schedule_insert_thread(thread);