3 #include "lwp_messages.h"
4 #include "lwp_wkspace.h"
6 void __lwpmq_msg_insert(mq_cntrl
*mqueue
,mq_buffercntrl
*msg
,u32 type
)
8 ++mqueue
->num_pendingmsgs
;
12 printf("__lwpmq_msg_insert(%p,%p,%d)\n",mqueue
,msg
,type
);
16 case LWP_MQ_SEND_REQUEST
:
17 __lwpmq_msg_append(mqueue
,msg
);
19 case LWP_MQ_SEND_URGENT
:
20 __lwpmq_msg_prepend(mqueue
,msg
);
28 header
= &mqueue
->pending_msgs
;
30 while(!__lwp_queue_istail(header
,node
)) {
31 tmsg
= (mq_buffercntrl
*)node
;
32 if(tmsg
->prio
<=msg
->prio
) {
38 __lwp_queue_insert(node
->prev
,&msg
->node
);
43 if(mqueue
->num_pendingmsgs
==1 && mqueue
->notify_handler
)
44 mqueue
->notify_handler(mqueue
->notify_arg
);
47 u32
__lwpmq_initialize(mq_cntrl
*mqueue
,mq_attr
*attrs
,u32 max_pendingmsgs
,u32 max_msgsize
)
53 printf("__lwpmq_initialize(%p,%p,%d,%d)\n",mqueue
,attrs
,max_pendingmsgs
,max_msgsize
);
55 mqueue
->max_pendingmsgs
= max_pendingmsgs
;
56 mqueue
->num_pendingmsgs
= 0;
57 mqueue
->max_msgsize
= max_msgsize
;
58 __lwpmq_set_notify(mqueue
,NULL
,NULL
);
60 alloc_msgsize
= max_msgsize
;
61 if(alloc_msgsize
&(sizeof(u32
)-1))
62 alloc_msgsize
= (alloc_msgsize
+sizeof(u32
))&~(sizeof(u32
)-1);
64 buffering_req
= max_pendingmsgs
*(alloc_msgsize
+sizeof(mq_buffercntrl
));
65 mqueue
->msq_buffers
= (mq_buffer
*)__lwp_wkspace_allocate(buffering_req
);
67 if(!mqueue
->msq_buffers
) return 0;
69 __lwp_queue_initialize(&mqueue
->inactive_msgs
,mqueue
->msq_buffers
,max_pendingmsgs
,(alloc_msgsize
+sizeof(mq_buffercntrl
)));
70 __lwp_queue_init_empty(&mqueue
->pending_msgs
);
71 __lwp_threadqueue_init(&mqueue
->wait_queue
,__lwpmq_is_priority(attrs
)?LWP_THREADQ_MODEPRIORITY
:LWP_THREADQ_MODEFIFO
,LWP_STATES_WAITING_FOR_MESSAGE
,LWP_MQ_STATUS_TIMEOUT
);
76 u32
__lwpmq_seize(mq_cntrl
*mqueue
,u32 id
,void *buffer
,u32
*size
,u32 wait
,u64 timeout
)
80 lwp_cntrl
*exec
,*thread
;
82 exec
= _thr_executing
;
83 exec
->wait
.ret_code
= LWP_MQ_STATUS_SUCCESSFUL
;
85 printf("__lwpmq_seize(%p,%d,%p,%p,%d,%d)\n",mqueue
,id
,buffer
,size
,wait
,mqueue
->num_pendingmsgs
);
88 _CPU_ISR_Disable(level
);
89 if(mqueue
->num_pendingmsgs
!=0) {
90 --mqueue
->num_pendingmsgs
;
91 msg
= __lwpmq_get_pendingmsg(mqueue
);
92 _CPU_ISR_Restore(level
);
94 *size
= msg
->contents
.size
;
95 exec
->wait
.cnt
= msg
->prio
;
96 __lwpmq_buffer_copy(buffer
,msg
->contents
.buffer
,*size
);
98 thread
= __lwp_threadqueue_dequeue(&mqueue
->wait_queue
);
100 __lwpmq_free_msg(mqueue
,msg
);
101 return LWP_MQ_STATUS_SUCCESSFUL
;
104 msg
->prio
= thread
->wait
.cnt
;
105 msg
->contents
.size
= (u32
)thread
->wait
.ret_arg_1
;
106 __lwpmq_buffer_copy(msg
->contents
.buffer
,thread
->wait
.ret_arg
,msg
->contents
.size
);
108 __lwpmq_msg_insert(mqueue
,msg
,msg
->prio
);
109 return LWP_MQ_STATUS_SUCCESSFUL
;
113 _CPU_ISR_Restore(level
);
114 exec
->wait
.ret_code
= LWP_MQ_STATUS_UNSATISFIED_NOWAIT
;
115 return LWP_MQ_STATUS_UNSATISFIED_NOWAIT
;
118 __lwp_threadqueue_csenter(&mqueue
->wait_queue
);
119 exec
->wait
.queue
= &mqueue
->wait_queue
;
121 exec
->wait
.ret_arg
= (void*)buffer
;
122 exec
->wait
.ret_arg_1
= (void*)size
;
123 _CPU_ISR_Restore(level
);
125 __lwp_threadqueue_enqueue(&mqueue
->wait_queue
,timeout
);
126 return LWP_MQ_STATUS_SUCCESSFUL
;
129 u32
__lwpmq_submit(mq_cntrl
*mqueue
,u32 id
,void *buffer
,u32 size
,u32 type
,u32 wait
,u64 timeout
)
136 printf("__lwpmq_submit(%p,%p,%d,%d,%d,%d)\n",mqueue
,buffer
,size
,id
,type
,wait
);
138 if(size
>mqueue
->max_msgsize
)
139 return LWP_MQ_STATUS_INVALID_SIZE
;
141 if(mqueue
->num_pendingmsgs
==0) {
142 thread
= __lwp_threadqueue_dequeue(&mqueue
->wait_queue
);
144 __lwpmq_buffer_copy(thread
->wait
.ret_arg
,buffer
,size
);
145 *(u32
*)thread
->wait
.ret_arg_1
= size
;
146 thread
->wait
.cnt
= type
;
147 return LWP_MQ_STATUS_SUCCESSFUL
;
151 if(mqueue
->num_pendingmsgs
<mqueue
->max_pendingmsgs
) {
152 msg
= __lwpmq_allocate_msg(mqueue
);
153 if(!msg
) return LWP_MQ_STATUS_UNSATISFIED
;
155 __lwpmq_buffer_copy(msg
->contents
.buffer
,buffer
,size
);
156 msg
->contents
.size
= size
;
158 __lwpmq_msg_insert(mqueue
,msg
,type
);
159 return LWP_MQ_STATUS_SUCCESSFUL
;
162 if(!wait
) return LWP_MQ_STATUS_TOO_MANY
;
163 if(__lwp_isr_in_progress()) return LWP_MQ_STATUS_UNSATISFIED
;
166 lwp_cntrl
*exec
= _thr_executing
;
168 _CPU_ISR_Disable(level
);
169 __lwp_threadqueue_csenter(&mqueue
->wait_queue
);
170 exec
->wait
.queue
= &mqueue
->wait_queue
;
172 exec
->wait
.ret_arg
= (void*)buffer
;
173 exec
->wait
.ret_arg_1
= (void*)size
;
174 exec
->wait
.cnt
= type
;
175 _CPU_ISR_Restore(level
);
177 __lwp_threadqueue_enqueue(&mqueue
->wait_queue
,timeout
);
179 return LWP_MQ_STATUS_UNSATISFIED_WAIT
;
182 u32
__lwpmq_broadcast(mq_cntrl
*mqueue
,void *buffer
,u32 size
,u32 id
,u32
*count
)
189 printf("__lwpmq_broadcast(%p,%p,%d,%d,%p)\n",mqueue
,buffer
,size
,id
,count
);
191 if(mqueue
->num_pendingmsgs
!=0) {
193 return LWP_MQ_STATUS_SUCCESSFUL
;
197 while((thread
=__lwp_threadqueue_dequeue(&mqueue
->wait_queue
))) {
198 waitp
= &thread
->wait
;
202 if(size
>mqueue
->max_msgsize
)
203 rsize
= mqueue
->max_msgsize
;
205 __lwpmq_buffer_copy(waitp
->ret_arg
,buffer
,rsize
);
206 *(u32
*)waitp
->ret_arg_1
= size
;
208 *count
= num_broadcast
;
209 return LWP_MQ_STATUS_SUCCESSFUL
;
212 void __lwpmq_close(mq_cntrl
*mqueue
,u32 status
)
214 __lwp_threadqueue_flush(&mqueue
->wait_queue
,status
);
215 __lwpmq_flush_support(mqueue
);
216 __lwp_wkspace_free(mqueue
->msq_buffers
);
219 u32
__lwpmq_flush(mq_cntrl
*mqueue
)
221 if(mqueue
->num_pendingmsgs
!=0)
222 return __lwpmq_flush_support(mqueue
);
227 u32
__lwpmq_flush_support(mq_cntrl
*mqueue
)
231 lwp_node
*mqueue_first
;
232 lwp_node
*mqueue_last
;
235 _CPU_ISR_Disable(level
);
237 inactive
= mqueue
->inactive_msgs
.first
;
238 mqueue_first
= mqueue
->pending_msgs
.first
;
239 mqueue_last
= mqueue
->pending_msgs
.last
;
241 mqueue
->inactive_msgs
.first
= mqueue_first
;
242 mqueue_last
->next
= inactive
;
243 inactive
->prev
= mqueue_last
;
244 mqueue_first
->prev
= __lwp_queue_head(&mqueue
->inactive_msgs
);
246 __lwp_queue_init_empty(&mqueue
->pending_msgs
);
248 cnt
= mqueue
->num_pendingmsgs
;
249 mqueue
->num_pendingmsgs
= 0;
251 _CPU_ISR_Restore(level
);
255 void __lwpmq_flush_waitthreads(mq_cntrl
*mqueue
)
257 __lwp_threadqueue_flush(&mqueue
->wait_queue
,LWP_MQ_STATUS_UNSATISFIED_NOWAIT
);