build-sys: bump soname
[pulseaudio-mirror.git] / src / pulsecore / pstream.c
blob3e0bfa3beb58da9b6277f0d1867a93615296b418
1 /***
2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <unistd.h>
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
33 #endif
34 #ifdef HAVE_SYS_UN_H
35 #include <sys/un.h>
36 #endif
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #endif
42 #include <pulse/xmalloc.h>
44 #include <pulsecore/winsock.h>
45 #include <pulsecore/queue.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-scache.h>
48 #include <pulsecore/creds.h>
49 #include <pulsecore/refcnt.h>
50 #include <pulsecore/flist.h>
51 #include <pulsecore/macro.h>
53 #include "pstream.h"
55 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
56 #define PA_FLAG_SHMDATA 0x80000000LU
57 #define PA_FLAG_SHMRELEASE 0x40000000LU
58 #define PA_FLAG_SHMREVOKE 0xC0000000LU
59 #define PA_FLAG_SHMMASK 0xFF000000LU
60 #define PA_FLAG_SEEKMASK 0x000000FFLU
62 /* The sequence descriptor header consists of 5 32bit integers: */
63 enum {
64 PA_PSTREAM_DESCRIPTOR_LENGTH,
65 PA_PSTREAM_DESCRIPTOR_CHANNEL,
66 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
68 PA_PSTREAM_DESCRIPTOR_FLAGS,
69 PA_PSTREAM_DESCRIPTOR_MAX
72 /* If we have an SHM block, this info follows the descriptor */
73 enum {
74 PA_PSTREAM_SHM_BLOCKID,
75 PA_PSTREAM_SHM_SHMID,
76 PA_PSTREAM_SHM_INDEX,
77 PA_PSTREAM_SHM_LENGTH,
78 PA_PSTREAM_SHM_MAX
81 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
83 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
84 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
86 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
88 struct item_info {
89 enum {
90 PA_PSTREAM_ITEM_PACKET,
91 PA_PSTREAM_ITEM_MEMBLOCK,
92 PA_PSTREAM_ITEM_SHMRELEASE,
93 PA_PSTREAM_ITEM_SHMREVOKE
94 } type;
96 /* packet info */
97 pa_packet *packet;
98 #ifdef HAVE_CREDS
99 pa_bool_t with_creds;
100 pa_creds creds;
101 #endif
103 /* memblock info */
104 pa_memchunk chunk;
105 uint32_t channel;
106 int64_t offset;
107 pa_seek_mode_t seek_mode;
109 /* release/revoke info */
110 uint32_t block_id;
113 struct pa_pstream {
114 PA_REFCNT_DECLARE;
116 pa_mainloop_api *mainloop;
117 pa_defer_event *defer_event;
118 pa_iochannel *io;
120 pa_queue *send_queue;
122 pa_bool_t dead;
124 struct {
125 pa_pstream_descriptor descriptor;
126 struct item_info* current;
127 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
128 void *data;
129 size_t index;
130 pa_memchunk memchunk;
131 } write;
133 struct {
134 pa_pstream_descriptor descriptor;
135 pa_memblock *memblock;
136 pa_packet *packet;
137 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
138 void *data;
139 size_t index;
140 } read;
142 pa_bool_t use_shm;
143 pa_memimport *import;
144 pa_memexport *export;
146 pa_pstream_packet_cb_t recieve_packet_callback;
147 void *recieve_packet_callback_userdata;
149 pa_pstream_memblock_cb_t recieve_memblock_callback;
150 void *recieve_memblock_callback_userdata;
152 pa_pstream_notify_cb_t drain_callback;
153 void *drain_callback_userdata;
155 pa_pstream_notify_cb_t die_callback;
156 void *die_callback_userdata;
158 pa_pstream_block_id_cb_t revoke_callback;
159 void *revoke_callback_userdata;
161 pa_pstream_block_id_cb_t release_callback;
162 void *release_callback_userdata;
164 pa_mempool *mempool;
166 #ifdef HAVE_CREDS
167 pa_creds read_creds, write_creds;
168 pa_bool_t read_creds_valid, send_creds_now;
169 #endif
172 static int do_write(pa_pstream *p);
173 static int do_read(pa_pstream *p);
175 static void do_something(pa_pstream *p) {
176 pa_assert(p);
177 pa_assert(PA_REFCNT_VALUE(p) > 0);
179 pa_pstream_ref(p);
181 p->mainloop->defer_enable(p->defer_event, 0);
183 if (!p->dead && pa_iochannel_is_readable(p->io)) {
184 if (do_read(p) < 0)
185 goto fail;
186 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
187 goto fail;
189 if (!p->dead && pa_iochannel_is_writable(p->io)) {
190 if (do_write(p) < 0)
191 goto fail;
194 pa_pstream_unref(p);
195 return;
197 fail:
199 if (p->die_callback)
200 p->die_callback(p, p->die_callback_userdata);
202 pa_pstream_unlink(p);
203 pa_pstream_unref(p);
206 static void io_callback(pa_iochannel*io, void *userdata) {
207 pa_pstream *p = userdata;
209 pa_assert(p);
210 pa_assert(PA_REFCNT_VALUE(p) > 0);
211 pa_assert(p->io == io);
213 do_something(p);
216 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
217 pa_pstream *p = userdata;
219 pa_assert(p);
220 pa_assert(PA_REFCNT_VALUE(p) > 0);
221 pa_assert(p->defer_event == e);
222 pa_assert(p->mainloop == m);
224 do_something(p);
227 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
229 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
230 pa_pstream *p;
232 pa_assert(m);
233 pa_assert(io);
234 pa_assert(pool);
236 p = pa_xnew(pa_pstream, 1);
237 PA_REFCNT_INIT(p);
238 p->io = io;
239 pa_iochannel_set_callback(io, io_callback, p);
240 p->dead = FALSE;
242 p->mainloop = m;
243 p->defer_event = m->defer_new(m, defer_callback, p);
244 m->defer_enable(p->defer_event, 0);
246 p->send_queue = pa_queue_new();
248 p->write.current = NULL;
249 p->write.index = 0;
250 pa_memchunk_reset(&p->write.memchunk);
251 p->read.memblock = NULL;
252 p->read.packet = NULL;
253 p->read.index = 0;
255 p->recieve_packet_callback = NULL;
256 p->recieve_packet_callback_userdata = NULL;
257 p->recieve_memblock_callback = NULL;
258 p->recieve_memblock_callback_userdata = NULL;
259 p->drain_callback = NULL;
260 p->drain_callback_userdata = NULL;
261 p->die_callback = NULL;
262 p->die_callback_userdata = NULL;
263 p->revoke_callback = NULL;
264 p->revoke_callback_userdata = NULL;
265 p->release_callback = NULL;
266 p->release_callback_userdata = NULL;
268 p->mempool = pool;
270 p->use_shm = FALSE;
271 p->export = NULL;
273 /* We do importing unconditionally */
274 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
276 pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
277 pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
279 #ifdef HAVE_CREDS
280 p->send_creds_now = FALSE;
281 p->read_creds_valid = FALSE;
282 #endif
283 return p;
286 static void item_free(void *item, void *q) {
287 struct item_info *i = item;
288 pa_assert(i);
290 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
291 pa_assert(i->chunk.memblock);
292 pa_memblock_unref(i->chunk.memblock);
293 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
294 pa_assert(i->packet);
295 pa_packet_unref(i->packet);
298 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
299 pa_xfree(i);
302 static void pstream_free(pa_pstream *p) {
303 pa_assert(p);
305 pa_pstream_unlink(p);
307 pa_queue_free(p->send_queue, item_free, NULL);
309 if (p->write.current)
310 item_free(p->write.current, NULL);
312 if (p->write.memchunk.memblock)
313 pa_memblock_unref(p->write.memchunk.memblock);
315 if (p->read.memblock)
316 pa_memblock_unref(p->read.memblock);
318 if (p->read.packet)
319 pa_packet_unref(p->read.packet);
321 pa_xfree(p);
324 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
325 struct item_info *i;
327 pa_assert(p);
328 pa_assert(PA_REFCNT_VALUE(p) > 0);
329 pa_assert(packet);
331 if (p->dead)
332 return;
334 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
335 i = pa_xnew(struct item_info, 1);
337 i->type = PA_PSTREAM_ITEM_PACKET;
338 i->packet = pa_packet_ref(packet);
340 #ifdef HAVE_CREDS
341 if ((i->with_creds = !!creds))
342 i->creds = *creds;
343 #endif
345 pa_queue_push(p->send_queue, i);
347 p->mainloop->defer_enable(p->defer_event, 1);
350 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
351 size_t length, idx;
352 size_t bsm;
354 pa_assert(p);
355 pa_assert(PA_REFCNT_VALUE(p) > 0);
356 pa_assert(channel != (uint32_t) -1);
357 pa_assert(chunk);
359 if (p->dead)
360 return;
362 idx = 0;
363 length = chunk->length;
365 bsm = pa_mempool_block_size_max(p->mempool);
367 while (length > 0) {
368 struct item_info *i;
369 size_t n;
371 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
372 i = pa_xnew(struct item_info, 1);
373 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
375 n = PA_MIN(length, bsm);
376 i->chunk.index = chunk->index + idx;
377 i->chunk.length = n;
378 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
380 i->channel = channel;
381 i->offset = offset;
382 i->seek_mode = seek_mode;
383 #ifdef HAVE_CREDS
384 i->with_creds = FALSE;
385 #endif
387 pa_queue_push(p->send_queue, i);
389 idx += n;
390 length -= n;
393 p->mainloop->defer_enable(p->defer_event, 1);
396 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
397 struct item_info *item;
398 pa_assert(p);
399 pa_assert(PA_REFCNT_VALUE(p) > 0);
401 if (p->dead)
402 return;
404 /* pa_log("Releasing block %u", block_id); */
406 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
407 item = pa_xnew(struct item_info, 1);
408 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
409 item->block_id = block_id;
410 #ifdef HAVE_CREDS
411 item->with_creds = FALSE;
412 #endif
414 pa_queue_push(p->send_queue, item);
415 p->mainloop->defer_enable(p->defer_event, 1);
418 /* might be called from thread context */
419 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
420 pa_pstream *p = userdata;
422 pa_assert(p);
423 pa_assert(PA_REFCNT_VALUE(p) > 0);
425 if (p->dead)
426 return;
428 if (p->release_callback)
429 p->release_callback(p, block_id, p->release_callback_userdata);
430 else
431 pa_pstream_send_release(p, block_id);
434 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
435 struct item_info *item;
436 pa_assert(p);
437 pa_assert(PA_REFCNT_VALUE(p) > 0);
439 if (p->dead)
440 return;
441 /* pa_log("Revoking block %u", block_id); */
443 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
444 item = pa_xnew(struct item_info, 1);
445 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
446 item->block_id = block_id;
447 #ifdef HAVE_CREDS
448 item->with_creds = FALSE;
449 #endif
451 pa_queue_push(p->send_queue, item);
452 p->mainloop->defer_enable(p->defer_event, 1);
455 /* might be called from thread context */
456 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
457 pa_pstream *p = userdata;
459 pa_assert(p);
460 pa_assert(PA_REFCNT_VALUE(p) > 0);
462 if (p->revoke_callback)
463 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
464 else
465 pa_pstream_send_revoke(p, block_id);
468 static void prepare_next_write_item(pa_pstream *p) {
469 pa_assert(p);
470 pa_assert(PA_REFCNT_VALUE(p) > 0);
472 p->write.current = pa_queue_pop(p->send_queue);
474 if (!p->write.current)
475 return;
477 p->write.index = 0;
478 p->write.data = NULL;
479 pa_memchunk_reset(&p->write.memchunk);
481 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
482 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
483 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
484 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
485 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
487 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
489 pa_assert(p->write.current->packet);
490 p->write.data = p->write.current->packet->data;
491 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->packet->length);
493 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
495 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
496 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
498 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
500 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
501 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
503 } else {
504 uint32_t flags;
505 pa_bool_t send_payload = TRUE;
507 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
508 pa_assert(p->write.current->chunk.memblock);
510 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
511 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
512 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
514 flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
516 if (p->use_shm) {
517 uint32_t block_id, shm_id;
518 size_t offset, length;
520 pa_assert(p->export);
522 if (pa_memexport_put(p->export,
523 p->write.current->chunk.memblock,
524 &block_id,
525 &shm_id,
526 &offset,
527 &length) >= 0) {
529 flags |= PA_FLAG_SHMDATA;
530 send_payload = FALSE;
532 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
533 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
534 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
535 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
537 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
538 p->write.data = p->write.shm_info;
540 /* else */
541 /* pa_log_warn("Failed to export memory block."); */
544 if (send_payload) {
545 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
546 p->write.memchunk = p->write.current->chunk;
547 pa_memblock_ref(p->write.memchunk.memblock);
548 p->write.data = NULL;
551 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
554 #ifdef HAVE_CREDS
555 if ((p->send_creds_now = p->write.current->with_creds))
556 p->write_creds = p->write.current->creds;
557 #endif
560 static int do_write(pa_pstream *p) {
561 void *d;
562 size_t l;
563 ssize_t r;
564 pa_memblock *release_memblock = NULL;
566 pa_assert(p);
567 pa_assert(PA_REFCNT_VALUE(p) > 0);
569 if (!p->write.current)
570 prepare_next_write_item(p);
572 if (!p->write.current)
573 return 0;
575 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
576 d = (uint8_t*) p->write.descriptor + p->write.index;
577 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
578 } else {
579 pa_assert(p->write.data || p->write.memchunk.memblock);
581 if (p->write.data)
582 d = p->write.data;
583 else {
584 d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
585 release_memblock = p->write.memchunk.memblock;
588 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
589 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
592 pa_assert(l > 0);
594 #ifdef HAVE_CREDS
595 if (p->send_creds_now) {
597 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
598 goto fail;
600 p->send_creds_now = FALSE;
601 } else
602 #endif
604 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
605 goto fail;
607 if (release_memblock)
608 pa_memblock_release(release_memblock);
610 p->write.index += (size_t) r;
612 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
613 pa_assert(p->write.current);
614 item_free(p->write.current, NULL);
615 p->write.current = NULL;
617 if (p->write.memchunk.memblock)
618 pa_memblock_unref(p->write.memchunk.memblock);
620 pa_memchunk_reset(&p->write.memchunk);
622 if (p->drain_callback && !pa_pstream_is_pending(p))
623 p->drain_callback(p, p->drain_callback_userdata);
626 return 0;
628 fail:
630 if (release_memblock)
631 pa_memblock_release(release_memblock);
633 return -1;
636 static int do_read(pa_pstream *p) {
637 void *d;
638 size_t l;
639 ssize_t r;
640 pa_memblock *release_memblock = NULL;
641 pa_assert(p);
642 pa_assert(PA_REFCNT_VALUE(p) > 0);
644 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
645 d = (uint8_t*) p->read.descriptor + p->read.index;
646 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
647 } else {
648 pa_assert(p->read.data || p->read.memblock);
650 if (p->read.data)
651 d = p->read.data;
652 else {
653 d = pa_memblock_acquire(p->read.memblock);
654 release_memblock = p->read.memblock;
657 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
658 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
661 #ifdef HAVE_CREDS
663 pa_bool_t b = 0;
665 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
666 goto fail;
668 p->read_creds_valid = p->read_creds_valid || b;
670 #else
671 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
672 goto fail;
673 #endif
675 if (release_memblock)
676 pa_memblock_release(release_memblock);
678 p->read.index += (size_t) r;
680 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
681 uint32_t flags, length, channel;
682 /* Reading of frame descriptor complete */
684 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
686 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
687 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
688 return -1;
691 if (flags == PA_FLAG_SHMRELEASE) {
693 /* This is a SHM memblock release frame with no payload */
695 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
697 pa_assert(p->export);
698 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
700 goto frame_done;
702 } else if (flags == PA_FLAG_SHMREVOKE) {
704 /* This is a SHM memblock revoke frame with no payload */
706 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
708 pa_assert(p->import);
709 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
711 goto frame_done;
714 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
716 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
717 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
718 return -1;
721 pa_assert(!p->read.packet && !p->read.memblock);
723 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
725 if (channel == (uint32_t) -1) {
727 if (flags != 0) {
728 pa_log_warn("Received packet frame with invalid flags value.");
729 return -1;
732 /* Frame is a packet frame */
733 p->read.packet = pa_packet_new(length);
734 p->read.data = p->read.packet->data;
736 } else {
738 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
739 pa_log_warn("Received memblock frame with invalid seek mode.");
740 return -1;
743 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
745 if (length != sizeof(p->read.shm_info)) {
746 pa_log_warn("Received SHM memblock frame with Invalid frame length.");
747 return -1;
750 /* Frame is a memblock frame referencing an SHM memblock */
751 p->read.data = p->read.shm_info;
753 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
755 /* Frame is a memblock frame */
757 p->read.memblock = pa_memblock_new(p->mempool, length);
758 p->read.data = NULL;
759 } else {
761 pa_log_warn("Received memblock frame with invalid flags value.");
762 return -1;
766 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
767 /* Frame payload available */
769 if (p->read.memblock && p->recieve_memblock_callback) {
771 /* Is this memblock data? Than pass it to the user */
772 l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
774 if (l > 0) {
775 pa_memchunk chunk;
777 chunk.memblock = p->read.memblock;
778 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
779 chunk.length = l;
781 if (p->recieve_memblock_callback) {
782 int64_t offset;
784 offset = (int64_t) (
785 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
786 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
788 p->recieve_memblock_callback(
790 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
791 offset,
792 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
793 &chunk,
794 p->recieve_memblock_callback_userdata);
797 /* Drop seek info for following callbacks */
798 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
799 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
800 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
804 /* Frame complete */
805 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
807 if (p->read.memblock) {
809 /* This was a memblock frame. We can unref the memblock now */
810 pa_memblock_unref(p->read.memblock);
812 } else if (p->read.packet) {
814 if (p->recieve_packet_callback)
815 #ifdef HAVE_CREDS
816 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
817 #else
818 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
819 #endif
821 pa_packet_unref(p->read.packet);
822 } else {
823 pa_memblock *b;
825 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
827 pa_assert(p->import);
829 if (!(b = pa_memimport_get(p->import,
830 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
831 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
832 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
833 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
835 if (pa_log_ratelimit(PA_LOG_DEBUG))
836 pa_log_debug("Failed to import memory block.");
839 if (p->recieve_memblock_callback) {
840 int64_t offset;
841 pa_memchunk chunk;
843 chunk.memblock = b;
844 chunk.index = 0;
845 chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
847 offset = (int64_t) (
848 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
849 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
851 p->recieve_memblock_callback(
853 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
854 offset,
855 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
856 &chunk,
857 p->recieve_memblock_callback_userdata);
860 if (b)
861 pa_memblock_unref(b);
864 goto frame_done;
868 return 0;
870 frame_done:
871 p->read.memblock = NULL;
872 p->read.packet = NULL;
873 p->read.index = 0;
874 p->read.data = NULL;
876 #ifdef HAVE_CREDS
877 p->read_creds_valid = FALSE;
878 #endif
880 return 0;
882 fail:
883 if (release_memblock)
884 pa_memblock_release(release_memblock);
886 return -1;
889 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
890 pa_assert(p);
891 pa_assert(PA_REFCNT_VALUE(p) > 0);
893 p->die_callback = cb;
894 p->die_callback_userdata = userdata;
897 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
898 pa_assert(p);
899 pa_assert(PA_REFCNT_VALUE(p) > 0);
901 p->drain_callback = cb;
902 p->drain_callback_userdata = userdata;
905 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
906 pa_assert(p);
907 pa_assert(PA_REFCNT_VALUE(p) > 0);
909 p->recieve_packet_callback = cb;
910 p->recieve_packet_callback_userdata = userdata;
913 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
914 pa_assert(p);
915 pa_assert(PA_REFCNT_VALUE(p) > 0);
917 p->recieve_memblock_callback = cb;
918 p->recieve_memblock_callback_userdata = userdata;
921 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
922 pa_assert(p);
923 pa_assert(PA_REFCNT_VALUE(p) > 0);
925 p->release_callback = cb;
926 p->release_callback_userdata = userdata;
929 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
930 pa_assert(p);
931 pa_assert(PA_REFCNT_VALUE(p) > 0);
933 p->release_callback = cb;
934 p->release_callback_userdata = userdata;
937 pa_bool_t pa_pstream_is_pending(pa_pstream *p) {
938 pa_bool_t b;
940 pa_assert(p);
941 pa_assert(PA_REFCNT_VALUE(p) > 0);
943 if (p->dead)
944 b = FALSE;
945 else
946 b = p->write.current || !pa_queue_isempty(p->send_queue);
948 return b;
951 void pa_pstream_unref(pa_pstream*p) {
952 pa_assert(p);
953 pa_assert(PA_REFCNT_VALUE(p) > 0);
955 if (PA_REFCNT_DEC(p) <= 0)
956 pstream_free(p);
959 pa_pstream* pa_pstream_ref(pa_pstream*p) {
960 pa_assert(p);
961 pa_assert(PA_REFCNT_VALUE(p) > 0);
963 PA_REFCNT_INC(p);
964 return p;
967 void pa_pstream_unlink(pa_pstream *p) {
968 pa_assert(p);
970 if (p->dead)
971 return;
973 p->dead = TRUE;
975 if (p->import) {
976 pa_memimport_free(p->import);
977 p->import = NULL;
980 if (p->export) {
981 pa_memexport_free(p->export);
982 p->export = NULL;
985 if (p->io) {
986 pa_iochannel_free(p->io);
987 p->io = NULL;
990 if (p->defer_event) {
991 p->mainloop->defer_free(p->defer_event);
992 p->defer_event = NULL;
995 p->die_callback = NULL;
996 p->drain_callback = NULL;
997 p->recieve_packet_callback = NULL;
998 p->recieve_memblock_callback = NULL;
1001 void pa_pstream_enable_shm(pa_pstream *p, pa_bool_t enable) {
1002 pa_assert(p);
1003 pa_assert(PA_REFCNT_VALUE(p) > 0);
1005 p->use_shm = enable;
1007 if (enable) {
1009 if (!p->export)
1010 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1012 } else {
1014 if (p->export) {
1015 pa_memexport_free(p->export);
1016 p->export = NULL;
1021 pa_bool_t pa_pstream_get_shm(pa_pstream *p) {
1022 pa_assert(p);
1023 pa_assert(PA_REFCNT_VALUE(p) > 0);
1025 return p->use_shm;