virtual-sink: Fix a crash when moving the sink to a new master right after setup.
[pulseaudio-raopUDP/pulseaudio-raop-alac.git] / src / pulsecore / pstream.c
blob369e22ca82eb1943c9ea3da50785dbfcb43478de
1 /***
2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <unistd.h>
31 #ifdef HAVE_SYS_UN_H
32 #include <sys/un.h>
33 #endif
34 #ifdef HAVE_NETINET_IN_H
35 #include <netinet/in.h>
36 #endif
38 #include <pulse/xmalloc.h>
40 #include <pulsecore/socket.h>
41 #include <pulsecore/queue.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-scache.h>
44 #include <pulsecore/creds.h>
45 #include <pulsecore/refcnt.h>
46 #include <pulsecore/flist.h>
47 #include <pulsecore/macro.h>
49 #include "pstream.h"
51 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
52 #define PA_FLAG_SHMDATA 0x80000000LU
53 #define PA_FLAG_SHMRELEASE 0x40000000LU
54 #define PA_FLAG_SHMREVOKE 0xC0000000LU
55 #define PA_FLAG_SHMMASK 0xFF000000LU
56 #define PA_FLAG_SEEKMASK 0x000000FFLU
58 /* The sequence descriptor header consists of 5 32bit integers: */
59 enum {
60 PA_PSTREAM_DESCRIPTOR_LENGTH,
61 PA_PSTREAM_DESCRIPTOR_CHANNEL,
62 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
63 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
64 PA_PSTREAM_DESCRIPTOR_FLAGS,
65 PA_PSTREAM_DESCRIPTOR_MAX
68 /* If we have an SHM block, this info follows the descriptor */
69 enum {
70 PA_PSTREAM_SHM_BLOCKID,
71 PA_PSTREAM_SHM_SHMID,
72 PA_PSTREAM_SHM_INDEX,
73 PA_PSTREAM_SHM_LENGTH,
74 PA_PSTREAM_SHM_MAX
77 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
79 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
80 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
82 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
84 struct item_info {
85 enum {
86 PA_PSTREAM_ITEM_PACKET,
87 PA_PSTREAM_ITEM_MEMBLOCK,
88 PA_PSTREAM_ITEM_SHMRELEASE,
89 PA_PSTREAM_ITEM_SHMREVOKE
90 } type;
92 /* packet info */
93 pa_packet *packet;
94 #ifdef HAVE_CREDS
95 pa_bool_t with_creds;
96 pa_creds creds;
97 #endif
99 /* memblock info */
100 pa_memchunk chunk;
101 uint32_t channel;
102 int64_t offset;
103 pa_seek_mode_t seek_mode;
105 /* release/revoke info */
106 uint32_t block_id;
109 struct pa_pstream {
110 PA_REFCNT_DECLARE;
112 pa_mainloop_api *mainloop;
113 pa_defer_event *defer_event;
114 pa_iochannel *io;
116 pa_queue *send_queue;
118 pa_bool_t dead;
120 struct {
121 pa_pstream_descriptor descriptor;
122 struct item_info* current;
123 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
124 void *data;
125 size_t index;
126 pa_memchunk memchunk;
127 } write;
129 struct {
130 pa_pstream_descriptor descriptor;
131 pa_memblock *memblock;
132 pa_packet *packet;
133 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
134 void *data;
135 size_t index;
136 } read;
138 pa_bool_t use_shm;
139 pa_memimport *import;
140 pa_memexport *export;
142 pa_pstream_packet_cb_t recieve_packet_callback;
143 void *recieve_packet_callback_userdata;
145 pa_pstream_memblock_cb_t recieve_memblock_callback;
146 void *recieve_memblock_callback_userdata;
148 pa_pstream_notify_cb_t drain_callback;
149 void *drain_callback_userdata;
151 pa_pstream_notify_cb_t die_callback;
152 void *die_callback_userdata;
154 pa_pstream_block_id_cb_t revoke_callback;
155 void *revoke_callback_userdata;
157 pa_pstream_block_id_cb_t release_callback;
158 void *release_callback_userdata;
160 pa_mempool *mempool;
162 #ifdef HAVE_CREDS
163 pa_creds read_creds, write_creds;
164 pa_bool_t read_creds_valid, send_creds_now;
165 #endif
168 static int do_write(pa_pstream *p);
169 static int do_read(pa_pstream *p);
171 static void do_something(pa_pstream *p) {
172 pa_assert(p);
173 pa_assert(PA_REFCNT_VALUE(p) > 0);
175 pa_pstream_ref(p);
177 p->mainloop->defer_enable(p->defer_event, 0);
179 if (!p->dead && pa_iochannel_is_readable(p->io)) {
180 if (do_read(p) < 0)
181 goto fail;
182 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
183 goto fail;
185 if (!p->dead && pa_iochannel_is_writable(p->io)) {
186 if (do_write(p) < 0)
187 goto fail;
190 pa_pstream_unref(p);
191 return;
193 fail:
195 if (p->die_callback)
196 p->die_callback(p, p->die_callback_userdata);
198 pa_pstream_unlink(p);
199 pa_pstream_unref(p);
202 static void io_callback(pa_iochannel*io, void *userdata) {
203 pa_pstream *p = userdata;
205 pa_assert(p);
206 pa_assert(PA_REFCNT_VALUE(p) > 0);
207 pa_assert(p->io == io);
209 do_something(p);
212 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
213 pa_pstream *p = userdata;
215 pa_assert(p);
216 pa_assert(PA_REFCNT_VALUE(p) > 0);
217 pa_assert(p->defer_event == e);
218 pa_assert(p->mainloop == m);
220 do_something(p);
223 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
225 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
226 pa_pstream *p;
228 pa_assert(m);
229 pa_assert(io);
230 pa_assert(pool);
232 p = pa_xnew(pa_pstream, 1);
233 PA_REFCNT_INIT(p);
234 p->io = io;
235 pa_iochannel_set_callback(io, io_callback, p);
236 p->dead = FALSE;
238 p->mainloop = m;
239 p->defer_event = m->defer_new(m, defer_callback, p);
240 m->defer_enable(p->defer_event, 0);
242 p->send_queue = pa_queue_new();
244 p->write.current = NULL;
245 p->write.index = 0;
246 pa_memchunk_reset(&p->write.memchunk);
247 p->read.memblock = NULL;
248 p->read.packet = NULL;
249 p->read.index = 0;
251 p->recieve_packet_callback = NULL;
252 p->recieve_packet_callback_userdata = NULL;
253 p->recieve_memblock_callback = NULL;
254 p->recieve_memblock_callback_userdata = NULL;
255 p->drain_callback = NULL;
256 p->drain_callback_userdata = NULL;
257 p->die_callback = NULL;
258 p->die_callback_userdata = NULL;
259 p->revoke_callback = NULL;
260 p->revoke_callback_userdata = NULL;
261 p->release_callback = NULL;
262 p->release_callback_userdata = NULL;
264 p->mempool = pool;
266 p->use_shm = FALSE;
267 p->export = NULL;
269 /* We do importing unconditionally */
270 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
272 pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
273 pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
275 #ifdef HAVE_CREDS
276 p->send_creds_now = FALSE;
277 p->read_creds_valid = FALSE;
278 #endif
279 return p;
282 static void item_free(void *item, void *q) {
283 struct item_info *i = item;
284 pa_assert(i);
286 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
287 pa_assert(i->chunk.memblock);
288 pa_memblock_unref(i->chunk.memblock);
289 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
290 pa_assert(i->packet);
291 pa_packet_unref(i->packet);
294 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
295 pa_xfree(i);
298 static void pstream_free(pa_pstream *p) {
299 pa_assert(p);
301 pa_pstream_unlink(p);
303 pa_queue_free(p->send_queue, item_free, NULL);
305 if (p->write.current)
306 item_free(p->write.current, NULL);
308 if (p->write.memchunk.memblock)
309 pa_memblock_unref(p->write.memchunk.memblock);
311 if (p->read.memblock)
312 pa_memblock_unref(p->read.memblock);
314 if (p->read.packet)
315 pa_packet_unref(p->read.packet);
317 pa_xfree(p);
320 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
321 struct item_info *i;
323 pa_assert(p);
324 pa_assert(PA_REFCNT_VALUE(p) > 0);
325 pa_assert(packet);
327 if (p->dead)
328 return;
330 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
331 i = pa_xnew(struct item_info, 1);
333 i->type = PA_PSTREAM_ITEM_PACKET;
334 i->packet = pa_packet_ref(packet);
336 #ifdef HAVE_CREDS
337 if ((i->with_creds = !!creds))
338 i->creds = *creds;
339 #endif
341 pa_queue_push(p->send_queue, i);
343 p->mainloop->defer_enable(p->defer_event, 1);
346 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
347 size_t length, idx;
348 size_t bsm;
350 pa_assert(p);
351 pa_assert(PA_REFCNT_VALUE(p) > 0);
352 pa_assert(channel != (uint32_t) -1);
353 pa_assert(chunk);
355 if (p->dead)
356 return;
358 idx = 0;
359 length = chunk->length;
361 bsm = pa_mempool_block_size_max(p->mempool);
363 while (length > 0) {
364 struct item_info *i;
365 size_t n;
367 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
368 i = pa_xnew(struct item_info, 1);
369 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
371 n = PA_MIN(length, bsm);
372 i->chunk.index = chunk->index + idx;
373 i->chunk.length = n;
374 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
376 i->channel = channel;
377 i->offset = offset;
378 i->seek_mode = seek_mode;
379 #ifdef HAVE_CREDS
380 i->with_creds = FALSE;
381 #endif
383 pa_queue_push(p->send_queue, i);
385 idx += n;
386 length -= n;
389 p->mainloop->defer_enable(p->defer_event, 1);
392 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
393 struct item_info *item;
394 pa_assert(p);
395 pa_assert(PA_REFCNT_VALUE(p) > 0);
397 if (p->dead)
398 return;
400 /* pa_log("Releasing block %u", block_id); */
402 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
403 item = pa_xnew(struct item_info, 1);
404 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
405 item->block_id = block_id;
406 #ifdef HAVE_CREDS
407 item->with_creds = FALSE;
408 #endif
410 pa_queue_push(p->send_queue, item);
411 p->mainloop->defer_enable(p->defer_event, 1);
414 /* might be called from thread context */
415 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
416 pa_pstream *p = userdata;
418 pa_assert(p);
419 pa_assert(PA_REFCNT_VALUE(p) > 0);
421 if (p->dead)
422 return;
424 if (p->release_callback)
425 p->release_callback(p, block_id, p->release_callback_userdata);
426 else
427 pa_pstream_send_release(p, block_id);
430 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
431 struct item_info *item;
432 pa_assert(p);
433 pa_assert(PA_REFCNT_VALUE(p) > 0);
435 if (p->dead)
436 return;
437 /* pa_log("Revoking block %u", block_id); */
439 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
440 item = pa_xnew(struct item_info, 1);
441 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
442 item->block_id = block_id;
443 #ifdef HAVE_CREDS
444 item->with_creds = FALSE;
445 #endif
447 pa_queue_push(p->send_queue, item);
448 p->mainloop->defer_enable(p->defer_event, 1);
451 /* might be called from thread context */
452 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
453 pa_pstream *p = userdata;
455 pa_assert(p);
456 pa_assert(PA_REFCNT_VALUE(p) > 0);
458 if (p->revoke_callback)
459 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
460 else
461 pa_pstream_send_revoke(p, block_id);
464 static void prepare_next_write_item(pa_pstream *p) {
465 pa_assert(p);
466 pa_assert(PA_REFCNT_VALUE(p) > 0);
468 p->write.current = pa_queue_pop(p->send_queue);
470 if (!p->write.current)
471 return;
473 p->write.index = 0;
474 p->write.data = NULL;
475 pa_memchunk_reset(&p->write.memchunk);
477 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
478 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
479 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
480 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
481 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
483 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
485 pa_assert(p->write.current->packet);
486 p->write.data = p->write.current->packet->data;
487 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->packet->length);
489 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
491 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
492 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
494 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
496 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
497 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
499 } else {
500 uint32_t flags;
501 pa_bool_t send_payload = TRUE;
503 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
504 pa_assert(p->write.current->chunk.memblock);
506 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
507 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
508 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
510 flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
512 if (p->use_shm) {
513 uint32_t block_id, shm_id;
514 size_t offset, length;
516 pa_assert(p->export);
518 if (pa_memexport_put(p->export,
519 p->write.current->chunk.memblock,
520 &block_id,
521 &shm_id,
522 &offset,
523 &length) >= 0) {
525 flags |= PA_FLAG_SHMDATA;
526 send_payload = FALSE;
528 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
529 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
530 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
531 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
533 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
534 p->write.data = p->write.shm_info;
536 /* else */
537 /* pa_log_warn("Failed to export memory block."); */
540 if (send_payload) {
541 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
542 p->write.memchunk = p->write.current->chunk;
543 pa_memblock_ref(p->write.memchunk.memblock);
544 p->write.data = NULL;
547 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
550 #ifdef HAVE_CREDS
551 if ((p->send_creds_now = p->write.current->with_creds))
552 p->write_creds = p->write.current->creds;
553 #endif
556 static int do_write(pa_pstream *p) {
557 void *d;
558 size_t l;
559 ssize_t r;
560 pa_memblock *release_memblock = NULL;
562 pa_assert(p);
563 pa_assert(PA_REFCNT_VALUE(p) > 0);
565 if (!p->write.current)
566 prepare_next_write_item(p);
568 if (!p->write.current)
569 return 0;
571 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
572 d = (uint8_t*) p->write.descriptor + p->write.index;
573 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
574 } else {
575 pa_assert(p->write.data || p->write.memchunk.memblock);
577 if (p->write.data)
578 d = p->write.data;
579 else {
580 d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
581 release_memblock = p->write.memchunk.memblock;
584 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
585 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
588 pa_assert(l > 0);
590 #ifdef HAVE_CREDS
591 if (p->send_creds_now) {
593 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
594 goto fail;
596 p->send_creds_now = FALSE;
597 } else
598 #endif
600 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
601 goto fail;
603 if (release_memblock)
604 pa_memblock_release(release_memblock);
606 p->write.index += (size_t) r;
608 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
609 pa_assert(p->write.current);
610 item_free(p->write.current, NULL);
611 p->write.current = NULL;
613 if (p->write.memchunk.memblock)
614 pa_memblock_unref(p->write.memchunk.memblock);
616 pa_memchunk_reset(&p->write.memchunk);
618 if (p->drain_callback && !pa_pstream_is_pending(p))
619 p->drain_callback(p, p->drain_callback_userdata);
622 return 0;
624 fail:
626 if (release_memblock)
627 pa_memblock_release(release_memblock);
629 return -1;
632 static int do_read(pa_pstream *p) {
633 void *d;
634 size_t l;
635 ssize_t r;
636 pa_memblock *release_memblock = NULL;
637 pa_assert(p);
638 pa_assert(PA_REFCNT_VALUE(p) > 0);
640 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
641 d = (uint8_t*) p->read.descriptor + p->read.index;
642 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
643 } else {
644 pa_assert(p->read.data || p->read.memblock);
646 if (p->read.data)
647 d = p->read.data;
648 else {
649 d = pa_memblock_acquire(p->read.memblock);
650 release_memblock = p->read.memblock;
653 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
654 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
657 #ifdef HAVE_CREDS
659 pa_bool_t b = 0;
661 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
662 goto fail;
664 p->read_creds_valid = p->read_creds_valid || b;
666 #else
667 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
668 goto fail;
669 #endif
671 if (release_memblock)
672 pa_memblock_release(release_memblock);
674 p->read.index += (size_t) r;
676 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
677 uint32_t flags, length, channel;
678 /* Reading of frame descriptor complete */
680 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
682 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
683 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
684 return -1;
687 if (flags == PA_FLAG_SHMRELEASE) {
689 /* This is a SHM memblock release frame with no payload */
691 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
693 pa_assert(p->export);
694 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
696 goto frame_done;
698 } else if (flags == PA_FLAG_SHMREVOKE) {
700 /* This is a SHM memblock revoke frame with no payload */
702 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
704 pa_assert(p->import);
705 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
707 goto frame_done;
710 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
712 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
713 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
714 return -1;
717 pa_assert(!p->read.packet && !p->read.memblock);
719 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
721 if (channel == (uint32_t) -1) {
723 if (flags != 0) {
724 pa_log_warn("Received packet frame with invalid flags value.");
725 return -1;
728 /* Frame is a packet frame */
729 p->read.packet = pa_packet_new(length);
730 p->read.data = p->read.packet->data;
732 } else {
734 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
735 pa_log_warn("Received memblock frame with invalid seek mode.");
736 return -1;
739 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
741 if (length != sizeof(p->read.shm_info)) {
742 pa_log_warn("Received SHM memblock frame with Invalid frame length.");
743 return -1;
746 /* Frame is a memblock frame referencing an SHM memblock */
747 p->read.data = p->read.shm_info;
749 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
751 /* Frame is a memblock frame */
753 p->read.memblock = pa_memblock_new(p->mempool, length);
754 p->read.data = NULL;
755 } else {
757 pa_log_warn("Received memblock frame with invalid flags value.");
758 return -1;
762 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
763 /* Frame payload available */
765 if (p->read.memblock && p->recieve_memblock_callback) {
767 /* Is this memblock data? Than pass it to the user */
768 l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
770 if (l > 0) {
771 pa_memchunk chunk;
773 chunk.memblock = p->read.memblock;
774 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
775 chunk.length = l;
777 if (p->recieve_memblock_callback) {
778 int64_t offset;
780 offset = (int64_t) (
781 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
782 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
784 p->recieve_memblock_callback(
786 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
787 offset,
788 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
789 &chunk,
790 p->recieve_memblock_callback_userdata);
793 /* Drop seek info for following callbacks */
794 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
795 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
796 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
800 /* Frame complete */
801 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
803 if (p->read.memblock) {
805 /* This was a memblock frame. We can unref the memblock now */
806 pa_memblock_unref(p->read.memblock);
808 } else if (p->read.packet) {
810 if (p->recieve_packet_callback)
811 #ifdef HAVE_CREDS
812 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
813 #else
814 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
815 #endif
817 pa_packet_unref(p->read.packet);
818 } else {
819 pa_memblock *b;
821 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
823 pa_assert(p->import);
825 if (!(b = pa_memimport_get(p->import,
826 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
827 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
828 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
829 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
831 if (pa_log_ratelimit(PA_LOG_DEBUG))
832 pa_log_debug("Failed to import memory block.");
835 if (p->recieve_memblock_callback) {
836 int64_t offset;
837 pa_memchunk chunk;
839 chunk.memblock = b;
840 chunk.index = 0;
841 chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
843 offset = (int64_t) (
844 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
845 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
847 p->recieve_memblock_callback(
849 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
850 offset,
851 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
852 &chunk,
853 p->recieve_memblock_callback_userdata);
856 if (b)
857 pa_memblock_unref(b);
860 goto frame_done;
864 return 0;
866 frame_done:
867 p->read.memblock = NULL;
868 p->read.packet = NULL;
869 p->read.index = 0;
870 p->read.data = NULL;
872 #ifdef HAVE_CREDS
873 p->read_creds_valid = FALSE;
874 #endif
876 return 0;
878 fail:
879 if (release_memblock)
880 pa_memblock_release(release_memblock);
882 return -1;
885 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
886 pa_assert(p);
887 pa_assert(PA_REFCNT_VALUE(p) > 0);
889 p->die_callback = cb;
890 p->die_callback_userdata = userdata;
893 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
894 pa_assert(p);
895 pa_assert(PA_REFCNT_VALUE(p) > 0);
897 p->drain_callback = cb;
898 p->drain_callback_userdata = userdata;
901 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
902 pa_assert(p);
903 pa_assert(PA_REFCNT_VALUE(p) > 0);
905 p->recieve_packet_callback = cb;
906 p->recieve_packet_callback_userdata = userdata;
909 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
910 pa_assert(p);
911 pa_assert(PA_REFCNT_VALUE(p) > 0);
913 p->recieve_memblock_callback = cb;
914 p->recieve_memblock_callback_userdata = userdata;
917 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
918 pa_assert(p);
919 pa_assert(PA_REFCNT_VALUE(p) > 0);
921 p->release_callback = cb;
922 p->release_callback_userdata = userdata;
925 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
926 pa_assert(p);
927 pa_assert(PA_REFCNT_VALUE(p) > 0);
929 p->release_callback = cb;
930 p->release_callback_userdata = userdata;
933 pa_bool_t pa_pstream_is_pending(pa_pstream *p) {
934 pa_bool_t b;
936 pa_assert(p);
937 pa_assert(PA_REFCNT_VALUE(p) > 0);
939 if (p->dead)
940 b = FALSE;
941 else
942 b = p->write.current || !pa_queue_isempty(p->send_queue);
944 return b;
947 void pa_pstream_unref(pa_pstream*p) {
948 pa_assert(p);
949 pa_assert(PA_REFCNT_VALUE(p) > 0);
951 if (PA_REFCNT_DEC(p) <= 0)
952 pstream_free(p);
955 pa_pstream* pa_pstream_ref(pa_pstream*p) {
956 pa_assert(p);
957 pa_assert(PA_REFCNT_VALUE(p) > 0);
959 PA_REFCNT_INC(p);
960 return p;
963 void pa_pstream_unlink(pa_pstream *p) {
964 pa_assert(p);
966 if (p->dead)
967 return;
969 p->dead = TRUE;
971 if (p->import) {
972 pa_memimport_free(p->import);
973 p->import = NULL;
976 if (p->export) {
977 pa_memexport_free(p->export);
978 p->export = NULL;
981 if (p->io) {
982 pa_iochannel_free(p->io);
983 p->io = NULL;
986 if (p->defer_event) {
987 p->mainloop->defer_free(p->defer_event);
988 p->defer_event = NULL;
991 p->die_callback = NULL;
992 p->drain_callback = NULL;
993 p->recieve_packet_callback = NULL;
994 p->recieve_memblock_callback = NULL;
997 void pa_pstream_enable_shm(pa_pstream *p, pa_bool_t enable) {
998 pa_assert(p);
999 pa_assert(PA_REFCNT_VALUE(p) > 0);
1001 p->use_shm = enable;
1003 if (enable) {
1005 if (!p->export)
1006 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1008 } else {
1010 if (p->export) {
1011 pa_memexport_free(p->export);
1012 p->export = NULL;
1017 pa_bool_t pa_pstream_get_shm(pa_pstream *p) {
1018 pa_assert(p);
1019 pa_assert(PA_REFCNT_VALUE(p) > 0);
1021 return p->use_shm;