2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #ifdef HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
42 #include <pulse/xmalloc.h>
44 #include <pulsecore/winsock.h>
45 #include <pulsecore/queue.h>
46 #include <pulsecore/log.h>
47 #include <pulsecore/core-scache.h>
48 #include <pulsecore/creds.h>
49 #include <pulsecore/refcnt.h>
50 #include <pulsecore/flist.h>
51 #include <pulsecore/macro.h>
55 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
56 #define PA_FLAG_SHMDATA 0x80000000LU
57 #define PA_FLAG_SHMRELEASE 0x40000000LU
58 #define PA_FLAG_SHMREVOKE 0xC0000000LU
59 #define PA_FLAG_SHMMASK 0xFF000000LU
60 #define PA_FLAG_SEEKMASK 0x000000FFLU
62 /* The sequence descriptor header consists of 5 32bit integers: */
64 PA_PSTREAM_DESCRIPTOR_LENGTH
,
65 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
66 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
67 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
68 PA_PSTREAM_DESCRIPTOR_FLAGS
,
69 PA_PSTREAM_DESCRIPTOR_MAX
72 /* If we have an SHM block, this info follows the descriptor */
74 PA_PSTREAM_SHM_BLOCKID
,
77 PA_PSTREAM_SHM_LENGTH
,
81 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
83 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
84 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
86 PA_STATIC_FLIST_DECLARE(items
, 0, pa_xfree
);
90 PA_PSTREAM_ITEM_PACKET
,
91 PA_PSTREAM_ITEM_MEMBLOCK
,
92 PA_PSTREAM_ITEM_SHMRELEASE
,
93 PA_PSTREAM_ITEM_SHMREVOKE
107 pa_seek_mode_t seek_mode
;
109 /* release/revoke info */
116 pa_mainloop_api
*mainloop
;
117 pa_defer_event
*defer_event
;
120 pa_queue
*send_queue
;
125 pa_pstream_descriptor descriptor
;
126 struct item_info
* current
;
127 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
130 pa_memchunk memchunk
;
134 pa_pstream_descriptor descriptor
;
135 pa_memblock
*memblock
;
137 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
143 pa_memimport
*import
;
144 pa_memexport
*export
;
146 pa_pstream_packet_cb_t recieve_packet_callback
;
147 void *recieve_packet_callback_userdata
;
149 pa_pstream_memblock_cb_t recieve_memblock_callback
;
150 void *recieve_memblock_callback_userdata
;
152 pa_pstream_notify_cb_t drain_callback
;
153 void *drain_callback_userdata
;
155 pa_pstream_notify_cb_t die_callback
;
156 void *die_callback_userdata
;
158 pa_pstream_block_id_cb_t revoke_callback
;
159 void *revoke_callback_userdata
;
161 pa_pstream_block_id_cb_t release_callback
;
162 void *release_callback_userdata
;
167 pa_creds read_creds
, write_creds
;
168 pa_bool_t read_creds_valid
, send_creds_now
;
172 static int do_write(pa_pstream
*p
);
173 static int do_read(pa_pstream
*p
);
175 static void do_something(pa_pstream
*p
) {
177 pa_assert(PA_REFCNT_VALUE(p
) > 0);
181 p
->mainloop
->defer_enable(p
->defer_event
, 0);
183 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
186 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
189 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
200 p
->die_callback(p
, p
->die_callback_userdata
);
202 pa_pstream_unlink(p
);
206 static void io_callback(pa_iochannel
*io
, void *userdata
) {
207 pa_pstream
*p
= userdata
;
210 pa_assert(PA_REFCNT_VALUE(p
) > 0);
211 pa_assert(p
->io
== io
);
216 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
217 pa_pstream
*p
= userdata
;
220 pa_assert(PA_REFCNT_VALUE(p
) > 0);
221 pa_assert(p
->defer_event
== e
);
222 pa_assert(p
->mainloop
== m
);
227 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
);
229 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_mempool
*pool
) {
236 p
= pa_xnew(pa_pstream
, 1);
239 pa_iochannel_set_callback(io
, io_callback
, p
);
243 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
244 m
->defer_enable(p
->defer_event
, 0);
246 p
->send_queue
= pa_queue_new();
248 p
->write
.current
= NULL
;
250 pa_memchunk_reset(&p
->write
.memchunk
);
251 p
->read
.memblock
= NULL
;
252 p
->read
.packet
= NULL
;
255 p
->recieve_packet_callback
= NULL
;
256 p
->recieve_packet_callback_userdata
= NULL
;
257 p
->recieve_memblock_callback
= NULL
;
258 p
->recieve_memblock_callback_userdata
= NULL
;
259 p
->drain_callback
= NULL
;
260 p
->drain_callback_userdata
= NULL
;
261 p
->die_callback
= NULL
;
262 p
->die_callback_userdata
= NULL
;
263 p
->revoke_callback
= NULL
;
264 p
->revoke_callback_userdata
= NULL
;
265 p
->release_callback
= NULL
;
266 p
->release_callback_userdata
= NULL
;
273 /* We do importing unconditionally */
274 p
->import
= pa_memimport_new(p
->mempool
, memimport_release_cb
, p
);
276 pa_iochannel_socket_set_rcvbuf(io
, pa_mempool_block_size_max(p
->mempool
));
277 pa_iochannel_socket_set_sndbuf(io
, pa_mempool_block_size_max(p
->mempool
));
280 p
->send_creds_now
= FALSE
;
281 p
->read_creds_valid
= FALSE
;
286 static void item_free(void *item
, void *q
) {
287 struct item_info
*i
= item
;
290 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
291 pa_assert(i
->chunk
.memblock
);
292 pa_memblock_unref(i
->chunk
.memblock
);
293 } else if (i
->type
== PA_PSTREAM_ITEM_PACKET
) {
294 pa_assert(i
->packet
);
295 pa_packet_unref(i
->packet
);
298 if (pa_flist_push(PA_STATIC_FLIST_GET(items
), i
) < 0)
302 static void pstream_free(pa_pstream
*p
) {
305 pa_pstream_unlink(p
);
307 pa_queue_free(p
->send_queue
, item_free
, NULL
);
309 if (p
->write
.current
)
310 item_free(p
->write
.current
, NULL
);
312 if (p
->write
.memchunk
.memblock
)
313 pa_memblock_unref(p
->write
.memchunk
.memblock
);
315 if (p
->read
.memblock
)
316 pa_memblock_unref(p
->read
.memblock
);
319 pa_packet_unref(p
->read
.packet
);
324 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
) {
328 pa_assert(PA_REFCNT_VALUE(p
) > 0);
334 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
335 i
= pa_xnew(struct item_info
, 1);
337 i
->type
= PA_PSTREAM_ITEM_PACKET
;
338 i
->packet
= pa_packet_ref(packet
);
341 if ((i
->with_creds
= !!creds
))
345 pa_queue_push(p
->send_queue
, i
);
347 p
->mainloop
->defer_enable(p
->defer_event
, 1);
350 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
355 pa_assert(PA_REFCNT_VALUE(p
) > 0);
356 pa_assert(channel
!= (uint32_t) -1);
363 length
= chunk
->length
;
365 bsm
= pa_mempool_block_size_max(p
->mempool
);
371 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
372 i
= pa_xnew(struct item_info
, 1);
373 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
375 n
= PA_MIN(length
, bsm
);
376 i
->chunk
.index
= chunk
->index
+ idx
;
378 i
->chunk
.memblock
= pa_memblock_ref(chunk
->memblock
);
380 i
->channel
= channel
;
382 i
->seek_mode
= seek_mode
;
384 i
->with_creds
= FALSE
;
387 pa_queue_push(p
->send_queue
, i
);
393 p
->mainloop
->defer_enable(p
->defer_event
, 1);
396 void pa_pstream_send_release(pa_pstream
*p
, uint32_t block_id
) {
397 struct item_info
*item
;
399 pa_assert(PA_REFCNT_VALUE(p
) > 0);
404 /* pa_log("Releasing block %u", block_id); */
406 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
407 item
= pa_xnew(struct item_info
, 1);
408 item
->type
= PA_PSTREAM_ITEM_SHMRELEASE
;
409 item
->block_id
= block_id
;
411 item
->with_creds
= FALSE
;
414 pa_queue_push(p
->send_queue
, item
);
415 p
->mainloop
->defer_enable(p
->defer_event
, 1);
418 /* might be called from thread context */
419 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
) {
420 pa_pstream
*p
= userdata
;
423 pa_assert(PA_REFCNT_VALUE(p
) > 0);
428 if (p
->release_callback
)
429 p
->release_callback(p
, block_id
, p
->release_callback_userdata
);
431 pa_pstream_send_release(p
, block_id
);
434 void pa_pstream_send_revoke(pa_pstream
*p
, uint32_t block_id
) {
435 struct item_info
*item
;
437 pa_assert(PA_REFCNT_VALUE(p
) > 0);
441 /* pa_log("Revoking block %u", block_id); */
443 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
444 item
= pa_xnew(struct item_info
, 1);
445 item
->type
= PA_PSTREAM_ITEM_SHMREVOKE
;
446 item
->block_id
= block_id
;
448 item
->with_creds
= FALSE
;
451 pa_queue_push(p
->send_queue
, item
);
452 p
->mainloop
->defer_enable(p
->defer_event
, 1);
455 /* might be called from thread context */
456 static void memexport_revoke_cb(pa_memexport
*e
, uint32_t block_id
, void *userdata
) {
457 pa_pstream
*p
= userdata
;
460 pa_assert(PA_REFCNT_VALUE(p
) > 0);
462 if (p
->revoke_callback
)
463 p
->revoke_callback(p
, block_id
, p
->revoke_callback_userdata
);
465 pa_pstream_send_revoke(p
, block_id
);
468 static void prepare_next_write_item(pa_pstream
*p
) {
470 pa_assert(PA_REFCNT_VALUE(p
) > 0);
472 p
->write
.current
= pa_queue_pop(p
->send_queue
);
474 if (!p
->write
.current
)
478 p
->write
.data
= NULL
;
479 pa_memchunk_reset(&p
->write
.memchunk
);
481 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = 0;
482 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
483 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
484 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
485 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = 0;
487 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
489 pa_assert(p
->write
.current
->packet
);
490 p
->write
.data
= p
->write
.current
->packet
->data
;
491 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl((uint32_t) p
->write
.current
->packet
->length
);
493 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMRELEASE
) {
495 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMRELEASE
);
496 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
498 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMREVOKE
) {
500 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMREVOKE
);
501 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
505 pa_bool_t send_payload
= TRUE
;
507 pa_assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
);
508 pa_assert(p
->write
.current
->chunk
.memblock
);
510 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
511 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
512 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
514 flags
= (uint32_t) (p
->write
.current
->seek_mode
& PA_FLAG_SEEKMASK
);
517 uint32_t block_id
, shm_id
;
518 size_t offset
, length
;
520 pa_assert(p
->export
);
522 if (pa_memexport_put(p
->export
,
523 p
->write
.current
->chunk
.memblock
,
529 flags
|= PA_FLAG_SHMDATA
;
530 send_payload
= FALSE
;
532 p
->write
.shm_info
[PA_PSTREAM_SHM_BLOCKID
] = htonl(block_id
);
533 p
->write
.shm_info
[PA_PSTREAM_SHM_SHMID
] = htonl(shm_id
);
534 p
->write
.shm_info
[PA_PSTREAM_SHM_INDEX
] = htonl((uint32_t) (offset
+ p
->write
.current
->chunk
.index
));
535 p
->write
.shm_info
[PA_PSTREAM_SHM_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
537 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(sizeof(p
->write
.shm_info
));
538 p
->write
.data
= p
->write
.shm_info
;
541 /* pa_log_warn("Failed to export memory block."); */
545 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
546 p
->write
.memchunk
= p
->write
.current
->chunk
;
547 pa_memblock_ref(p
->write
.memchunk
.memblock
);
548 p
->write
.data
= NULL
;
551 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(flags
);
555 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
556 p
->write_creds
= p
->write
.current
->creds
;
560 static int do_write(pa_pstream
*p
) {
564 pa_memblock
*release_memblock
= NULL
;
567 pa_assert(PA_REFCNT_VALUE(p
) > 0);
569 if (!p
->write
.current
)
570 prepare_next_write_item(p
);
572 if (!p
->write
.current
)
575 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
576 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
577 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
579 pa_assert(p
->write
.data
|| p
->write
.memchunk
.memblock
);
584 d
= (uint8_t*) pa_memblock_acquire(p
->write
.memchunk
.memblock
) + p
->write
.memchunk
.index
;
585 release_memblock
= p
->write
.memchunk
.memblock
;
588 d
= (uint8_t*) d
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
589 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
595 if (p
->send_creds_now
) {
597 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
600 p
->send_creds_now
= FALSE
;
604 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
607 if (release_memblock
)
608 pa_memblock_release(release_memblock
);
610 p
->write
.index
+= (size_t) r
;
612 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
613 pa_assert(p
->write
.current
);
614 item_free(p
->write
.current
, NULL
);
615 p
->write
.current
= NULL
;
617 if (p
->write
.memchunk
.memblock
)
618 pa_memblock_unref(p
->write
.memchunk
.memblock
);
620 pa_memchunk_reset(&p
->write
.memchunk
);
622 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
623 p
->drain_callback(p
, p
->drain_callback_userdata
);
630 if (release_memblock
)
631 pa_memblock_release(release_memblock
);
636 static int do_read(pa_pstream
*p
) {
640 pa_memblock
*release_memblock
= NULL
;
642 pa_assert(PA_REFCNT_VALUE(p
) > 0);
644 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
645 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
646 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
648 pa_assert(p
->read
.data
|| p
->read
.memblock
);
653 d
= pa_memblock_acquire(p
->read
.memblock
);
654 release_memblock
= p
->read
.memblock
;
657 d
= (uint8_t*) d
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
658 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
665 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
668 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
671 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
675 if (release_memblock
)
676 pa_memblock_release(release_memblock
);
678 p
->read
.index
+= (size_t) r
;
680 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
681 uint32_t flags
, length
, channel
;
682 /* Reading of frame descriptor complete */
684 flags
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]);
686 if (!p
->use_shm
&& (flags
& PA_FLAG_SHMMASK
) != 0) {
687 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
691 if (flags
== PA_FLAG_SHMRELEASE
) {
693 /* This is a SHM memblock release frame with no payload */
695 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
697 pa_assert(p
->export
);
698 pa_memexport_process_release(p
->export
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
702 } else if (flags
== PA_FLAG_SHMREVOKE
) {
704 /* This is a SHM memblock revoke frame with no payload */
706 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
708 pa_assert(p
->import
);
709 pa_memimport_process_revoke(p
->import
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
714 length
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]);
716 if (length
> FRAME_SIZE_MAX_ALLOW
|| length
<= 0) {
717 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length
);
721 pa_assert(!p
->read
.packet
&& !p
->read
.memblock
);
723 channel
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]);
725 if (channel
== (uint32_t) -1) {
728 pa_log_warn("Received packet frame with invalid flags value.");
732 /* Frame is a packet frame */
733 p
->read
.packet
= pa_packet_new(length
);
734 p
->read
.data
= p
->read
.packet
->data
;
738 if ((flags
& PA_FLAG_SEEKMASK
) > PA_SEEK_RELATIVE_END
) {
739 pa_log_warn("Received memblock frame with invalid seek mode.");
743 if ((flags
& PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
) {
745 if (length
!= sizeof(p
->read
.shm_info
)) {
746 pa_log_warn("Received SHM memblock frame with Invalid frame length.");
750 /* Frame is a memblock frame referencing an SHM memblock */
751 p
->read
.data
= p
->read
.shm_info
;
753 } else if ((flags
& PA_FLAG_SHMMASK
) == 0) {
755 /* Frame is a memblock frame */
757 p
->read
.memblock
= pa_memblock_new(p
->mempool
, length
);
761 pa_log_warn("Received memblock frame with invalid flags value.");
766 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
767 /* Frame payload available */
769 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) {
771 /* Is this memblock data? Than pass it to the user */
772 l
= (p
->read
.index
- (size_t) r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? (size_t) (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
) : (size_t) r
;
777 chunk
.memblock
= p
->read
.memblock
;
778 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
781 if (p
->recieve_memblock_callback
) {
785 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
786 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
788 p
->recieve_memblock_callback(
790 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
792 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
794 p
->recieve_memblock_callback_userdata
);
797 /* Drop seek info for following callbacks */
798 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] =
799 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
800 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
805 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
807 if (p
->read
.memblock
) {
809 /* This was a memblock frame. We can unref the memblock now */
810 pa_memblock_unref(p
->read
.memblock
);
812 } else if (p
->read
.packet
) {
814 if (p
->recieve_packet_callback
)
816 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->recieve_packet_callback_userdata
);
818 p
->recieve_packet_callback(p
, p
->read
.packet
, NULL
, p
->recieve_packet_callback_userdata
);
821 pa_packet_unref(p
->read
.packet
);
825 pa_assert((ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
);
827 pa_assert(p
->import
);
829 if (!(b
= pa_memimport_get(p
->import
,
830 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_BLOCKID
]),
831 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_SHMID
]),
832 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_INDEX
]),
833 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
])))) {
835 if (pa_log_ratelimit(PA_LOG_DEBUG
))
836 pa_log_debug("Failed to import memory block.");
839 if (p
->recieve_memblock_callback
) {
845 chunk
.length
= b
? pa_memblock_get_length(b
) : ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
]);
848 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
849 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
851 p
->recieve_memblock_callback(
853 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
855 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
857 p
->recieve_memblock_callback_userdata
);
861 pa_memblock_unref(b
);
871 p
->read
.memblock
= NULL
;
872 p
->read
.packet
= NULL
;
877 p
->read_creds_valid
= FALSE
;
883 if (release_memblock
)
884 pa_memblock_release(release_memblock
);
889 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
891 pa_assert(PA_REFCNT_VALUE(p
) > 0);
893 p
->die_callback
= cb
;
894 p
->die_callback_userdata
= userdata
;
897 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
899 pa_assert(PA_REFCNT_VALUE(p
) > 0);
901 p
->drain_callback
= cb
;
902 p
->drain_callback_userdata
= userdata
;
905 void pa_pstream_set_recieve_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
907 pa_assert(PA_REFCNT_VALUE(p
) > 0);
909 p
->recieve_packet_callback
= cb
;
910 p
->recieve_packet_callback_userdata
= userdata
;
913 void pa_pstream_set_recieve_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
915 pa_assert(PA_REFCNT_VALUE(p
) > 0);
917 p
->recieve_memblock_callback
= cb
;
918 p
->recieve_memblock_callback_userdata
= userdata
;
921 void pa_pstream_set_release_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
923 pa_assert(PA_REFCNT_VALUE(p
) > 0);
925 p
->release_callback
= cb
;
926 p
->release_callback_userdata
= userdata
;
929 void pa_pstream_set_revoke_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
931 pa_assert(PA_REFCNT_VALUE(p
) > 0);
933 p
->release_callback
= cb
;
934 p
->release_callback_userdata
= userdata
;
937 pa_bool_t
pa_pstream_is_pending(pa_pstream
*p
) {
941 pa_assert(PA_REFCNT_VALUE(p
) > 0);
946 b
= p
->write
.current
|| !pa_queue_isempty(p
->send_queue
);
951 void pa_pstream_unref(pa_pstream
*p
) {
953 pa_assert(PA_REFCNT_VALUE(p
) > 0);
955 if (PA_REFCNT_DEC(p
) <= 0)
959 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
961 pa_assert(PA_REFCNT_VALUE(p
) > 0);
967 void pa_pstream_unlink(pa_pstream
*p
) {
976 pa_memimport_free(p
->import
);
981 pa_memexport_free(p
->export
);
986 pa_iochannel_free(p
->io
);
990 if (p
->defer_event
) {
991 p
->mainloop
->defer_free(p
->defer_event
);
992 p
->defer_event
= NULL
;
995 p
->die_callback
= NULL
;
996 p
->drain_callback
= NULL
;
997 p
->recieve_packet_callback
= NULL
;
998 p
->recieve_memblock_callback
= NULL
;
1001 void pa_pstream_enable_shm(pa_pstream
*p
, pa_bool_t enable
) {
1003 pa_assert(PA_REFCNT_VALUE(p
) > 0);
1005 p
->use_shm
= enable
;
1010 p
->export
= pa_memexport_new(p
->mempool
, memexport_revoke_cb
, p
);
1015 pa_memexport_free(p
->export
);
1021 pa_bool_t
pa_pstream_get_shm(pa_pstream
*p
) {
1023 pa_assert(PA_REFCNT_VALUE(p
) > 0);