2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
34 #ifdef HAVE_NETINET_IN_H
35 #include <netinet/in.h>
38 #include <pulse/xmalloc.h>
40 #include <pulsecore/socket.h>
41 #include <pulsecore/queue.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-scache.h>
44 #include <pulsecore/creds.h>
45 #include <pulsecore/refcnt.h>
46 #include <pulsecore/flist.h>
47 #include <pulsecore/macro.h>
51 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
52 #define PA_FLAG_SHMDATA 0x80000000LU
53 #define PA_FLAG_SHMRELEASE 0x40000000LU
54 #define PA_FLAG_SHMREVOKE 0xC0000000LU
55 #define PA_FLAG_SHMMASK 0xFF000000LU
56 #define PA_FLAG_SEEKMASK 0x000000FFLU
58 /* The sequence descriptor header consists of 5 32bit integers: */
60 PA_PSTREAM_DESCRIPTOR_LENGTH
,
61 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
62 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
63 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
64 PA_PSTREAM_DESCRIPTOR_FLAGS
,
65 PA_PSTREAM_DESCRIPTOR_MAX
68 /* If we have an SHM block, this info follows the descriptor */
70 PA_PSTREAM_SHM_BLOCKID
,
73 PA_PSTREAM_SHM_LENGTH
,
77 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
79 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
80 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
82 PA_STATIC_FLIST_DECLARE(items
, 0, pa_xfree
);
86 PA_PSTREAM_ITEM_PACKET
,
87 PA_PSTREAM_ITEM_MEMBLOCK
,
88 PA_PSTREAM_ITEM_SHMRELEASE
,
89 PA_PSTREAM_ITEM_SHMREVOKE
103 pa_seek_mode_t seek_mode
;
105 /* release/revoke info */
112 pa_mainloop_api
*mainloop
;
113 pa_defer_event
*defer_event
;
116 pa_queue
*send_queue
;
121 pa_pstream_descriptor descriptor
;
122 struct item_info
* current
;
123 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
126 pa_memchunk memchunk
;
130 pa_pstream_descriptor descriptor
;
131 pa_memblock
*memblock
;
133 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
139 pa_memimport
*import
;
140 pa_memexport
*export
;
142 pa_pstream_packet_cb_t recieve_packet_callback
;
143 void *recieve_packet_callback_userdata
;
145 pa_pstream_memblock_cb_t recieve_memblock_callback
;
146 void *recieve_memblock_callback_userdata
;
148 pa_pstream_notify_cb_t drain_callback
;
149 void *drain_callback_userdata
;
151 pa_pstream_notify_cb_t die_callback
;
152 void *die_callback_userdata
;
154 pa_pstream_block_id_cb_t revoke_callback
;
155 void *revoke_callback_userdata
;
157 pa_pstream_block_id_cb_t release_callback
;
158 void *release_callback_userdata
;
163 pa_creds read_creds
, write_creds
;
164 pa_bool_t read_creds_valid
, send_creds_now
;
168 static int do_write(pa_pstream
*p
);
169 static int do_read(pa_pstream
*p
);
171 static void do_something(pa_pstream
*p
) {
173 pa_assert(PA_REFCNT_VALUE(p
) > 0);
177 p
->mainloop
->defer_enable(p
->defer_event
, 0);
179 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
182 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
185 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
196 p
->die_callback(p
, p
->die_callback_userdata
);
198 pa_pstream_unlink(p
);
202 static void io_callback(pa_iochannel
*io
, void *userdata
) {
203 pa_pstream
*p
= userdata
;
206 pa_assert(PA_REFCNT_VALUE(p
) > 0);
207 pa_assert(p
->io
== io
);
212 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
213 pa_pstream
*p
= userdata
;
216 pa_assert(PA_REFCNT_VALUE(p
) > 0);
217 pa_assert(p
->defer_event
== e
);
218 pa_assert(p
->mainloop
== m
);
223 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
);
225 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_mempool
*pool
) {
232 p
= pa_xnew(pa_pstream
, 1);
235 pa_iochannel_set_callback(io
, io_callback
, p
);
239 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
240 m
->defer_enable(p
->defer_event
, 0);
242 p
->send_queue
= pa_queue_new();
244 p
->write
.current
= NULL
;
246 pa_memchunk_reset(&p
->write
.memchunk
);
247 p
->read
.memblock
= NULL
;
248 p
->read
.packet
= NULL
;
251 p
->recieve_packet_callback
= NULL
;
252 p
->recieve_packet_callback_userdata
= NULL
;
253 p
->recieve_memblock_callback
= NULL
;
254 p
->recieve_memblock_callback_userdata
= NULL
;
255 p
->drain_callback
= NULL
;
256 p
->drain_callback_userdata
= NULL
;
257 p
->die_callback
= NULL
;
258 p
->die_callback_userdata
= NULL
;
259 p
->revoke_callback
= NULL
;
260 p
->revoke_callback_userdata
= NULL
;
261 p
->release_callback
= NULL
;
262 p
->release_callback_userdata
= NULL
;
269 /* We do importing unconditionally */
270 p
->import
= pa_memimport_new(p
->mempool
, memimport_release_cb
, p
);
272 pa_iochannel_socket_set_rcvbuf(io
, pa_mempool_block_size_max(p
->mempool
));
273 pa_iochannel_socket_set_sndbuf(io
, pa_mempool_block_size_max(p
->mempool
));
276 p
->send_creds_now
= FALSE
;
277 p
->read_creds_valid
= FALSE
;
282 static void item_free(void *item
, void *q
) {
283 struct item_info
*i
= item
;
286 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
287 pa_assert(i
->chunk
.memblock
);
288 pa_memblock_unref(i
->chunk
.memblock
);
289 } else if (i
->type
== PA_PSTREAM_ITEM_PACKET
) {
290 pa_assert(i
->packet
);
291 pa_packet_unref(i
->packet
);
294 if (pa_flist_push(PA_STATIC_FLIST_GET(items
), i
) < 0)
298 static void pstream_free(pa_pstream
*p
) {
301 pa_pstream_unlink(p
);
303 pa_queue_free(p
->send_queue
, item_free
, NULL
);
305 if (p
->write
.current
)
306 item_free(p
->write
.current
, NULL
);
308 if (p
->write
.memchunk
.memblock
)
309 pa_memblock_unref(p
->write
.memchunk
.memblock
);
311 if (p
->read
.memblock
)
312 pa_memblock_unref(p
->read
.memblock
);
315 pa_packet_unref(p
->read
.packet
);
320 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
) {
324 pa_assert(PA_REFCNT_VALUE(p
) > 0);
330 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
331 i
= pa_xnew(struct item_info
, 1);
333 i
->type
= PA_PSTREAM_ITEM_PACKET
;
334 i
->packet
= pa_packet_ref(packet
);
337 if ((i
->with_creds
= !!creds
))
341 pa_queue_push(p
->send_queue
, i
);
343 p
->mainloop
->defer_enable(p
->defer_event
, 1);
346 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
351 pa_assert(PA_REFCNT_VALUE(p
) > 0);
352 pa_assert(channel
!= (uint32_t) -1);
359 length
= chunk
->length
;
361 bsm
= pa_mempool_block_size_max(p
->mempool
);
367 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
368 i
= pa_xnew(struct item_info
, 1);
369 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
371 n
= PA_MIN(length
, bsm
);
372 i
->chunk
.index
= chunk
->index
+ idx
;
374 i
->chunk
.memblock
= pa_memblock_ref(chunk
->memblock
);
376 i
->channel
= channel
;
378 i
->seek_mode
= seek_mode
;
380 i
->with_creds
= FALSE
;
383 pa_queue_push(p
->send_queue
, i
);
389 p
->mainloop
->defer_enable(p
->defer_event
, 1);
392 void pa_pstream_send_release(pa_pstream
*p
, uint32_t block_id
) {
393 struct item_info
*item
;
395 pa_assert(PA_REFCNT_VALUE(p
) > 0);
400 /* pa_log("Releasing block %u", block_id); */
402 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
403 item
= pa_xnew(struct item_info
, 1);
404 item
->type
= PA_PSTREAM_ITEM_SHMRELEASE
;
405 item
->block_id
= block_id
;
407 item
->with_creds
= FALSE
;
410 pa_queue_push(p
->send_queue
, item
);
411 p
->mainloop
->defer_enable(p
->defer_event
, 1);
414 /* might be called from thread context */
415 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
) {
416 pa_pstream
*p
= userdata
;
419 pa_assert(PA_REFCNT_VALUE(p
) > 0);
424 if (p
->release_callback
)
425 p
->release_callback(p
, block_id
, p
->release_callback_userdata
);
427 pa_pstream_send_release(p
, block_id
);
430 void pa_pstream_send_revoke(pa_pstream
*p
, uint32_t block_id
) {
431 struct item_info
*item
;
433 pa_assert(PA_REFCNT_VALUE(p
) > 0);
437 /* pa_log("Revoking block %u", block_id); */
439 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
440 item
= pa_xnew(struct item_info
, 1);
441 item
->type
= PA_PSTREAM_ITEM_SHMREVOKE
;
442 item
->block_id
= block_id
;
444 item
->with_creds
= FALSE
;
447 pa_queue_push(p
->send_queue
, item
);
448 p
->mainloop
->defer_enable(p
->defer_event
, 1);
451 /* might be called from thread context */
452 static void memexport_revoke_cb(pa_memexport
*e
, uint32_t block_id
, void *userdata
) {
453 pa_pstream
*p
= userdata
;
456 pa_assert(PA_REFCNT_VALUE(p
) > 0);
458 if (p
->revoke_callback
)
459 p
->revoke_callback(p
, block_id
, p
->revoke_callback_userdata
);
461 pa_pstream_send_revoke(p
, block_id
);
464 static void prepare_next_write_item(pa_pstream
*p
) {
466 pa_assert(PA_REFCNT_VALUE(p
) > 0);
468 p
->write
.current
= pa_queue_pop(p
->send_queue
);
470 if (!p
->write
.current
)
474 p
->write
.data
= NULL
;
475 pa_memchunk_reset(&p
->write
.memchunk
);
477 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = 0;
478 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
479 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
480 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
481 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = 0;
483 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
485 pa_assert(p
->write
.current
->packet
);
486 p
->write
.data
= p
->write
.current
->packet
->data
;
487 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl((uint32_t) p
->write
.current
->packet
->length
);
489 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMRELEASE
) {
491 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMRELEASE
);
492 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
494 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMREVOKE
) {
496 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMREVOKE
);
497 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
501 pa_bool_t send_payload
= TRUE
;
503 pa_assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
);
504 pa_assert(p
->write
.current
->chunk
.memblock
);
506 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
507 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
508 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
510 flags
= (uint32_t) (p
->write
.current
->seek_mode
& PA_FLAG_SEEKMASK
);
513 uint32_t block_id
, shm_id
;
514 size_t offset
, length
;
516 pa_assert(p
->export
);
518 if (pa_memexport_put(p
->export
,
519 p
->write
.current
->chunk
.memblock
,
525 flags
|= PA_FLAG_SHMDATA
;
526 send_payload
= FALSE
;
528 p
->write
.shm_info
[PA_PSTREAM_SHM_BLOCKID
] = htonl(block_id
);
529 p
->write
.shm_info
[PA_PSTREAM_SHM_SHMID
] = htonl(shm_id
);
530 p
->write
.shm_info
[PA_PSTREAM_SHM_INDEX
] = htonl((uint32_t) (offset
+ p
->write
.current
->chunk
.index
));
531 p
->write
.shm_info
[PA_PSTREAM_SHM_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
533 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(sizeof(p
->write
.shm_info
));
534 p
->write
.data
= p
->write
.shm_info
;
537 /* pa_log_warn("Failed to export memory block."); */
541 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
542 p
->write
.memchunk
= p
->write
.current
->chunk
;
543 pa_memblock_ref(p
->write
.memchunk
.memblock
);
544 p
->write
.data
= NULL
;
547 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(flags
);
551 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
552 p
->write_creds
= p
->write
.current
->creds
;
556 static int do_write(pa_pstream
*p
) {
560 pa_memblock
*release_memblock
= NULL
;
563 pa_assert(PA_REFCNT_VALUE(p
) > 0);
565 if (!p
->write
.current
)
566 prepare_next_write_item(p
);
568 if (!p
->write
.current
)
571 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
572 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
573 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
575 pa_assert(p
->write
.data
|| p
->write
.memchunk
.memblock
);
580 d
= (uint8_t*) pa_memblock_acquire(p
->write
.memchunk
.memblock
) + p
->write
.memchunk
.index
;
581 release_memblock
= p
->write
.memchunk
.memblock
;
584 d
= (uint8_t*) d
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
585 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
591 if (p
->send_creds_now
) {
593 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
596 p
->send_creds_now
= FALSE
;
600 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
603 if (release_memblock
)
604 pa_memblock_release(release_memblock
);
606 p
->write
.index
+= (size_t) r
;
608 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
609 pa_assert(p
->write
.current
);
610 item_free(p
->write
.current
, NULL
);
611 p
->write
.current
= NULL
;
613 if (p
->write
.memchunk
.memblock
)
614 pa_memblock_unref(p
->write
.memchunk
.memblock
);
616 pa_memchunk_reset(&p
->write
.memchunk
);
618 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
619 p
->drain_callback(p
, p
->drain_callback_userdata
);
626 if (release_memblock
)
627 pa_memblock_release(release_memblock
);
632 static int do_read(pa_pstream
*p
) {
636 pa_memblock
*release_memblock
= NULL
;
638 pa_assert(PA_REFCNT_VALUE(p
) > 0);
640 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
641 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
642 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
644 pa_assert(p
->read
.data
|| p
->read
.memblock
);
649 d
= pa_memblock_acquire(p
->read
.memblock
);
650 release_memblock
= p
->read
.memblock
;
653 d
= (uint8_t*) d
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
654 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
661 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
664 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
667 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
671 if (release_memblock
)
672 pa_memblock_release(release_memblock
);
674 p
->read
.index
+= (size_t) r
;
676 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
677 uint32_t flags
, length
, channel
;
678 /* Reading of frame descriptor complete */
680 flags
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]);
682 if (!p
->use_shm
&& (flags
& PA_FLAG_SHMMASK
) != 0) {
683 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
687 if (flags
== PA_FLAG_SHMRELEASE
) {
689 /* This is a SHM memblock release frame with no payload */
691 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
693 pa_assert(p
->export
);
694 pa_memexport_process_release(p
->export
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
698 } else if (flags
== PA_FLAG_SHMREVOKE
) {
700 /* This is a SHM memblock revoke frame with no payload */
702 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
704 pa_assert(p
->import
);
705 pa_memimport_process_revoke(p
->import
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
710 length
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]);
712 if (length
> FRAME_SIZE_MAX_ALLOW
|| length
<= 0) {
713 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length
);
717 pa_assert(!p
->read
.packet
&& !p
->read
.memblock
);
719 channel
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]);
721 if (channel
== (uint32_t) -1) {
724 pa_log_warn("Received packet frame with invalid flags value.");
728 /* Frame is a packet frame */
729 p
->read
.packet
= pa_packet_new(length
);
730 p
->read
.data
= p
->read
.packet
->data
;
734 if ((flags
& PA_FLAG_SEEKMASK
) > PA_SEEK_RELATIVE_END
) {
735 pa_log_warn("Received memblock frame with invalid seek mode.");
739 if ((flags
& PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
) {
741 if (length
!= sizeof(p
->read
.shm_info
)) {
742 pa_log_warn("Received SHM memblock frame with Invalid frame length.");
746 /* Frame is a memblock frame referencing an SHM memblock */
747 p
->read
.data
= p
->read
.shm_info
;
749 } else if ((flags
& PA_FLAG_SHMMASK
) == 0) {
751 /* Frame is a memblock frame */
753 p
->read
.memblock
= pa_memblock_new(p
->mempool
, length
);
757 pa_log_warn("Received memblock frame with invalid flags value.");
762 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
763 /* Frame payload available */
765 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) {
767 /* Is this memblock data? Than pass it to the user */
768 l
= (p
->read
.index
- (size_t) r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? (size_t) (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
) : (size_t) r
;
773 chunk
.memblock
= p
->read
.memblock
;
774 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
777 if (p
->recieve_memblock_callback
) {
781 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
782 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
784 p
->recieve_memblock_callback(
786 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
788 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
790 p
->recieve_memblock_callback_userdata
);
793 /* Drop seek info for following callbacks */
794 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] =
795 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
796 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
801 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
803 if (p
->read
.memblock
) {
805 /* This was a memblock frame. We can unref the memblock now */
806 pa_memblock_unref(p
->read
.memblock
);
808 } else if (p
->read
.packet
) {
810 if (p
->recieve_packet_callback
)
812 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->recieve_packet_callback_userdata
);
814 p
->recieve_packet_callback(p
, p
->read
.packet
, NULL
, p
->recieve_packet_callback_userdata
);
817 pa_packet_unref(p
->read
.packet
);
821 pa_assert((ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
);
823 pa_assert(p
->import
);
825 if (!(b
= pa_memimport_get(p
->import
,
826 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_BLOCKID
]),
827 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_SHMID
]),
828 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_INDEX
]),
829 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
])))) {
831 if (pa_log_ratelimit(PA_LOG_DEBUG
))
832 pa_log_debug("Failed to import memory block.");
835 if (p
->recieve_memblock_callback
) {
841 chunk
.length
= b
? pa_memblock_get_length(b
) : ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
]);
844 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
845 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
847 p
->recieve_memblock_callback(
849 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
851 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
853 p
->recieve_memblock_callback_userdata
);
857 pa_memblock_unref(b
);
867 p
->read
.memblock
= NULL
;
868 p
->read
.packet
= NULL
;
873 p
->read_creds_valid
= FALSE
;
879 if (release_memblock
)
880 pa_memblock_release(release_memblock
);
885 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
887 pa_assert(PA_REFCNT_VALUE(p
) > 0);
889 p
->die_callback
= cb
;
890 p
->die_callback_userdata
= userdata
;
893 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
895 pa_assert(PA_REFCNT_VALUE(p
) > 0);
897 p
->drain_callback
= cb
;
898 p
->drain_callback_userdata
= userdata
;
901 void pa_pstream_set_recieve_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
903 pa_assert(PA_REFCNT_VALUE(p
) > 0);
905 p
->recieve_packet_callback
= cb
;
906 p
->recieve_packet_callback_userdata
= userdata
;
909 void pa_pstream_set_recieve_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
911 pa_assert(PA_REFCNT_VALUE(p
) > 0);
913 p
->recieve_memblock_callback
= cb
;
914 p
->recieve_memblock_callback_userdata
= userdata
;
917 void pa_pstream_set_release_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
919 pa_assert(PA_REFCNT_VALUE(p
) > 0);
921 p
->release_callback
= cb
;
922 p
->release_callback_userdata
= userdata
;
925 void pa_pstream_set_revoke_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
927 pa_assert(PA_REFCNT_VALUE(p
) > 0);
929 p
->release_callback
= cb
;
930 p
->release_callback_userdata
= userdata
;
933 pa_bool_t
pa_pstream_is_pending(pa_pstream
*p
) {
937 pa_assert(PA_REFCNT_VALUE(p
) > 0);
942 b
= p
->write
.current
|| !pa_queue_isempty(p
->send_queue
);
947 void pa_pstream_unref(pa_pstream
*p
) {
949 pa_assert(PA_REFCNT_VALUE(p
) > 0);
951 if (PA_REFCNT_DEC(p
) <= 0)
955 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
957 pa_assert(PA_REFCNT_VALUE(p
) > 0);
963 void pa_pstream_unlink(pa_pstream
*p
) {
972 pa_memimport_free(p
->import
);
977 pa_memexport_free(p
->export
);
982 pa_iochannel_free(p
->io
);
986 if (p
->defer_event
) {
987 p
->mainloop
->defer_free(p
->defer_event
);
988 p
->defer_event
= NULL
;
991 p
->die_callback
= NULL
;
992 p
->drain_callback
= NULL
;
993 p
->recieve_packet_callback
= NULL
;
994 p
->recieve_memblock_callback
= NULL
;
997 void pa_pstream_enable_shm(pa_pstream
*p
, pa_bool_t enable
) {
999 pa_assert(PA_REFCNT_VALUE(p
) > 0);
1001 p
->use_shm
= enable
;
1006 p
->export
= pa_memexport_new(p
->mempool
, memexport_revoke_cb
, p
);
1011 pa_memexport_free(p
->export
);
1017 pa_bool_t
pa_pstream_get_shm(pa_pstream
*p
) {
1019 pa_assert(PA_REFCNT_VALUE(p
) > 0);