Remove implicit conversions from scoped_refptr to T* in media/
[chromium-blink-merge.git] / media / cast / test / utility / udp_proxy.cc
blob95640a364e3a4d9bea79c83ff3544977a52772ff
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include <math.h>
6 #include <stdlib.h>
7 #include <vector>
9 #include "media/cast/test/utility/udp_proxy.h"
11 #include "base/logging.h"
12 #include "base/rand_util.h"
13 #include "base/synchronization/waitable_event.h"
14 #include "base/threading/thread.h"
15 #include "base/time/default_tick_clock.h"
16 #include "net/base/io_buffer.h"
17 #include "net/base/net_errors.h"
18 #include "net/udp/udp_socket.h"
20 namespace media {
21 namespace cast {
22 namespace test {
24 const size_t kMaxPacketSize = 65536;
26 PacketPipe::PacketPipe() {}
27 PacketPipe::~PacketPipe() {}
28 void PacketPipe::InitOnIOThread(
29 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
30 base::TickClock* clock) {
31 task_runner_ = task_runner;
32 clock_ = clock;
33 if (pipe_) {
34 pipe_->InitOnIOThread(task_runner, clock);
37 void PacketPipe::AppendToPipe(scoped_ptr<PacketPipe> pipe) {
38 if (pipe_) {
39 pipe_->AppendToPipe(pipe.Pass());
40 } else {
41 pipe_ = pipe.Pass();
45 // Roughly emulates a buffer inside a device.
46 // If the buffer is full, packets are dropped.
47 // Packets are output at a maximum bandwidth.
48 class Buffer : public PacketPipe {
49 public:
50 Buffer(size_t buffer_size, double max_megabits_per_second)
51 : buffer_size_(0),
52 max_buffer_size_(buffer_size),
53 max_megabits_per_second_(max_megabits_per_second),
54 weak_factory_(this) {
55 CHECK_GT(max_buffer_size_, 0UL);
56 CHECK_GT(max_megabits_per_second, 0);
59 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
60 if (packet->size() + buffer_size_ <= max_buffer_size_) {
61 buffer_size_ += packet->size();
62 buffer_.push_back(linked_ptr<Packet>(packet.release()));
63 if (buffer_.size() == 1) {
64 Schedule();
69 private:
70 void Schedule() {
71 last_schedule_ = clock_->NowTicks();
72 double megabits = buffer_.front()->size() * 8 / 1000000.0;
73 double seconds = megabits / max_megabits_per_second_;
74 int64 microseconds = static_cast<int64>(seconds * 1E6);
75 task_runner_->PostDelayedTask(
76 FROM_HERE,
77 base::Bind(&Buffer::ProcessBuffer, weak_factory_.GetWeakPtr()),
78 base::TimeDelta::FromMicroseconds(microseconds));
81 void ProcessBuffer() {
82 int64 bytes_to_send = static_cast<int64>(
83 (clock_->NowTicks() - last_schedule_).InSecondsF() *
84 max_megabits_per_second_ * 1E6 / 8);
85 if (bytes_to_send < static_cast<int64>(buffer_.front()->size())) {
86 bytes_to_send = buffer_.front()->size();
88 while (!buffer_.empty() &&
89 static_cast<int64>(buffer_.front()->size()) <= bytes_to_send) {
90 CHECK(!buffer_.empty());
91 scoped_ptr<Packet> packet(buffer_.front().release());
92 bytes_to_send -= packet->size();
93 buffer_size_ -= packet->size();
94 buffer_.pop_front();
95 pipe_->Send(packet.Pass());
97 if (!buffer_.empty()) {
98 Schedule();
102 std::deque<linked_ptr<Packet> > buffer_;
103 base::TimeTicks last_schedule_;
104 size_t buffer_size_;
105 size_t max_buffer_size_;
106 double max_megabits_per_second_; // megabits per second
107 base::WeakPtrFactory<Buffer> weak_factory_;
110 scoped_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) {
111 return scoped_ptr<PacketPipe>(new Buffer(buffer_size, bandwidth)).Pass();
114 class RandomDrop : public PacketPipe {
115 public:
116 RandomDrop(double drop_fraction)
117 : drop_fraction_(static_cast<int>(drop_fraction * RAND_MAX)) {}
119 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
120 if (rand() > drop_fraction_) {
121 pipe_->Send(packet.Pass());
125 private:
126 int drop_fraction_;
129 scoped_ptr<PacketPipe> NewRandomDrop(double drop_fraction) {
130 return scoped_ptr<PacketPipe>(new RandomDrop(drop_fraction)).Pass();
133 class SimpleDelayBase : public PacketPipe {
134 public:
135 SimpleDelayBase() : weak_factory_(this) {}
136 virtual ~SimpleDelayBase() {}
138 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
139 double seconds = GetDelay();
140 task_runner_->PostDelayedTask(
141 FROM_HERE,
142 base::Bind(&SimpleDelayBase::SendInternal,
143 weak_factory_.GetWeakPtr(),
144 base::Passed(&packet)),
145 base::TimeDelta::FromMicroseconds(static_cast<int64>(seconds * 1E6)));
147 protected:
148 virtual double GetDelay() = 0;
150 private:
151 virtual void SendInternal(scoped_ptr<Packet> packet) {
152 pipe_->Send(packet.Pass());
155 base::WeakPtrFactory<SimpleDelayBase> weak_factory_;
158 class ConstantDelay : public SimpleDelayBase {
159 public:
160 ConstantDelay(double delay_seconds) : delay_seconds_(delay_seconds) {}
161 virtual double GetDelay() OVERRIDE {
162 return delay_seconds_;
165 private:
166 double delay_seconds_;
169 scoped_ptr<PacketPipe> NewConstantDelay(double delay_seconds) {
170 return scoped_ptr<PacketPipe>(new ConstantDelay(delay_seconds)).Pass();
173 class RandomUnsortedDelay : public SimpleDelayBase {
174 public:
175 RandomUnsortedDelay(double random_delay) : random_delay_(random_delay) {}
177 virtual double GetDelay() OVERRIDE {
178 return random_delay_ * base::RandDouble();
181 private:
182 double random_delay_;
185 scoped_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) {
186 return scoped_ptr<PacketPipe>(new RandomUnsortedDelay(random_delay)).Pass();
189 class DuplicateAndDelay : public RandomUnsortedDelay {
190 public:
191 DuplicateAndDelay(double delay_min,
192 double random_delay) :
193 RandomUnsortedDelay(random_delay),
194 delay_min_(delay_min) {
196 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
197 pipe_->Send(scoped_ptr<Packet>(new Packet(*packet.get())));
198 RandomUnsortedDelay::Send(packet.Pass());
200 virtual double GetDelay() OVERRIDE {
201 return RandomUnsortedDelay::GetDelay() + delay_min_;
203 private:
204 double delay_min_;
207 scoped_ptr<PacketPipe> NewDuplicateAndDelay(double delay_min,
208 double random_delay) {
209 return scoped_ptr<PacketPipe>(
210 new DuplicateAndDelay(delay_min, random_delay)).Pass();
213 class RandomSortedDelay : public PacketPipe {
214 public:
215 RandomSortedDelay(double random_delay,
216 double extra_delay,
217 double seconds_between_extra_delay)
218 : random_delay_(random_delay),
219 extra_delay_(extra_delay),
220 seconds_between_extra_delay_(seconds_between_extra_delay),
221 weak_factory_(this) {}
223 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
224 buffer_.push_back(linked_ptr<Packet>(packet.release()));
225 if (buffer_.size() == 1) {
226 next_send_ = std::max(
227 clock_->NowTicks() +
228 base::TimeDelta::FromSecondsD(base::RandDouble() * random_delay_),
229 next_send_);
230 ProcessBuffer();
233 virtual void InitOnIOThread(
234 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
235 base::TickClock* clock) OVERRIDE {
236 PacketPipe::InitOnIOThread(task_runner, clock);
237 // As we start the stream, assume that we are in a random
238 // place between two extra delays, thus multiplier = 1.0;
239 ScheduleExtraDelay(1.0);
242 private:
243 void ScheduleExtraDelay(double mult) {
244 double seconds = seconds_between_extra_delay_ * mult * base::RandDouble();
245 int64 microseconds = static_cast<int64>(seconds * 1E6);
246 task_runner_->PostDelayedTask(
247 FROM_HERE,
248 base::Bind(&RandomSortedDelay::CauseExtraDelay,
249 weak_factory_.GetWeakPtr()),
250 base::TimeDelta::FromMicroseconds(microseconds));
253 void CauseExtraDelay() {
254 next_send_ = std::max<base::TimeTicks>(
255 clock_->NowTicks() +
256 base::TimeDelta::FromMicroseconds(
257 static_cast<int64>(extra_delay_ * 1E6)),
258 next_send_);
259 // An extra delay just happened, wait up to seconds_between_extra_delay_*2
260 // before scheduling another one to make the average equal to
261 // seconds_between_extra_delay_.
262 ScheduleExtraDelay(2.0);
265 void ProcessBuffer() {
266 base::TimeTicks now = clock_->NowTicks();
267 while (!buffer_.empty() && next_send_ <= now) {
268 scoped_ptr<Packet> packet(buffer_.front().release());
269 pipe_->Send(packet.Pass());
270 buffer_.pop_front();
272 next_send_ += base::TimeDelta::FromSecondsD(
273 base::RandDouble() * random_delay_);
276 if (!buffer_.empty()) {
277 task_runner_->PostDelayedTask(
278 FROM_HERE,
279 base::Bind(&RandomSortedDelay::ProcessBuffer,
280 weak_factory_.GetWeakPtr()),
281 next_send_ - now);
285 base::TimeTicks block_until_;
286 std::deque<linked_ptr<Packet> > buffer_;
287 double random_delay_;
288 double extra_delay_;
289 double seconds_between_extra_delay_;
290 base::WeakPtrFactory<RandomSortedDelay> weak_factory_;
291 base::TimeTicks next_send_;
294 scoped_ptr<PacketPipe> NewRandomSortedDelay(
295 double random_delay,
296 double extra_delay,
297 double seconds_between_extra_delay) {
298 return scoped_ptr<PacketPipe>(
299 new RandomSortedDelay(
300 random_delay, extra_delay, seconds_between_extra_delay))
301 .Pass();
304 class NetworkGlitchPipe : public PacketPipe {
305 public:
306 NetworkGlitchPipe(double average_work_time, double average_outage_time)
307 : works_(false),
308 max_work_time_(average_work_time * 2),
309 max_outage_time_(average_outage_time * 2),
310 weak_factory_(this) {}
312 virtual void InitOnIOThread(
313 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
314 base::TickClock* clock) OVERRIDE {
315 PacketPipe::InitOnIOThread(task_runner, clock);
316 Flip();
319 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
320 if (works_) {
321 pipe_->Send(packet.Pass());
325 private:
326 void Flip() {
327 works_ = !works_;
328 double seconds = base::RandDouble() *
329 (works_ ? max_work_time_ : max_outage_time_);
330 int64 microseconds = static_cast<int64>(seconds * 1E6);
331 task_runner_->PostDelayedTask(
332 FROM_HERE,
333 base::Bind(&NetworkGlitchPipe::Flip, weak_factory_.GetWeakPtr()),
334 base::TimeDelta::FromMicroseconds(microseconds));
337 bool works_;
338 double max_work_time_;
339 double max_outage_time_;
340 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
343 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
344 double average_outage_time) {
345 return scoped_ptr<PacketPipe>(
346 new NetworkGlitchPipe(average_work_time, average_outage_time))
347 .Pass();
351 // Internal buffer object for a client of the IPP model.
352 class InterruptedPoissonProcess::InternalBuffer : public PacketPipe {
353 public:
354 InternalBuffer(base::WeakPtr<InterruptedPoissonProcess> ipp,
355 size_t size)
356 : ipp_(ipp),
357 stored_size_(0),
358 stored_limit_(size),
359 clock_(NULL),
360 weak_factory_(this) {
363 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
364 // Drop if buffer is full.
365 if (stored_size_ >= stored_limit_)
366 return;
367 stored_size_ += packet->size();
368 buffer_.push_back(linked_ptr<Packet>(packet.release()));
369 buffer_time_.push_back(clock_->NowTicks());
370 DCHECK(buffer_.size() == buffer_time_.size());
373 virtual void InitOnIOThread(
374 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
375 base::TickClock* clock) OVERRIDE {
376 clock_ = clock;
377 if (ipp_)
378 ipp_->InitOnIOThread(task_runner, clock);
379 PacketPipe::InitOnIOThread(task_runner, clock);
382 void SendOnePacket() {
383 scoped_ptr<Packet> packet(buffer_.front().release());
384 stored_size_ -= packet->size();
385 buffer_.pop_front();
386 buffer_time_.pop_front();
387 pipe_->Send(packet.Pass());
388 DCHECK(buffer_.size() == buffer_time_.size());
391 bool Empty() const {
392 return buffer_.empty();
395 base::TimeTicks FirstPacketTime() const {
396 DCHECK(!buffer_time_.empty());
397 return buffer_time_.front();
400 base::WeakPtr<InternalBuffer> GetWeakPtr() {
401 return weak_factory_.GetWeakPtr();
405 private:
406 const base::WeakPtr<InterruptedPoissonProcess> ipp_;
407 size_t stored_size_;
408 const size_t stored_limit_;
409 std::deque<linked_ptr<Packet> > buffer_;
410 std::deque<base::TimeTicks> buffer_time_;
411 base::TickClock* clock_;
412 base::WeakPtrFactory<InternalBuffer> weak_factory_;
414 DISALLOW_COPY_AND_ASSIGN(InternalBuffer);
417 InterruptedPoissonProcess::InterruptedPoissonProcess(
418 const std::vector<double>& average_rates,
419 double coef_burstiness,
420 double coef_variance,
421 uint32 rand_seed)
422 : clock_(NULL),
423 average_rates_(average_rates),
424 coef_burstiness_(coef_burstiness),
425 coef_variance_(coef_variance),
426 rate_index_(0),
427 on_state_(true),
428 weak_factory_(this) {
429 mt_rand_.init_genrand(rand_seed);
430 DCHECK(!average_rates.empty());
431 ComputeRates();
434 InterruptedPoissonProcess::~InterruptedPoissonProcess() {
437 void InterruptedPoissonProcess::InitOnIOThread(
438 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
439 base::TickClock* clock) {
440 // Already initialized and started.
441 if (task_runner_.get() && clock_)
442 return;
443 task_runner_ = task_runner;
444 clock_ = clock;
445 UpdateRates();
446 SwitchOn();
447 SendPacket();
450 scoped_ptr<PacketPipe> InterruptedPoissonProcess::NewBuffer(size_t size) {
451 scoped_ptr<InternalBuffer> buffer(
452 new InternalBuffer(weak_factory_.GetWeakPtr(), size));
453 send_buffers_.push_back(buffer->GetWeakPtr());
454 return buffer.PassAs<PacketPipe>();
457 base::TimeDelta InterruptedPoissonProcess::NextEvent(double rate) {
458 // Rate is per milliseconds.
459 // The time until next event is exponentially distributed to the
460 // inverse of |rate|.
461 return base::TimeDelta::FromMillisecondsD(
462 fabs(-log(1.0 - RandDouble()) / rate));
465 double InterruptedPoissonProcess::RandDouble() {
466 // Generate a 64-bits random number from MT19937 and then convert
467 // it to double.
468 uint64 rand = mt_rand_.genrand_int32();
469 rand <<= 32;
470 rand |= mt_rand_.genrand_int32();
471 return base::BitsToOpenEndedUnitInterval(rand);
474 void InterruptedPoissonProcess::ComputeRates() {
475 double avg_rate = average_rates_[rate_index_];
477 send_rate_ = avg_rate / coef_burstiness_;
478 switch_off_rate_ =
479 2 * avg_rate * (1 - coef_burstiness_) * (1 - coef_burstiness_) /
480 coef_burstiness_ / (coef_variance_ - 1);
481 switch_on_rate_ =
482 2 * avg_rate * (1 - coef_burstiness_) / (coef_variance_ - 1);
485 void InterruptedPoissonProcess::UpdateRates() {
486 ComputeRates();
488 // Rates are updated once per second.
489 rate_index_ = (rate_index_ + 1) % average_rates_.size();
490 task_runner_->PostDelayedTask(
491 FROM_HERE,
492 base::Bind(&InterruptedPoissonProcess::UpdateRates,
493 weak_factory_.GetWeakPtr()),
494 base::TimeDelta::FromSeconds(1));
497 void InterruptedPoissonProcess::SwitchOff() {
498 on_state_ = false;
499 task_runner_->PostDelayedTask(
500 FROM_HERE,
501 base::Bind(&InterruptedPoissonProcess::SwitchOn,
502 weak_factory_.GetWeakPtr()),
503 NextEvent(switch_on_rate_));
506 void InterruptedPoissonProcess::SwitchOn() {
507 on_state_ = true;
508 task_runner_->PostDelayedTask(
509 FROM_HERE,
510 base::Bind(&InterruptedPoissonProcess::SwitchOff,
511 weak_factory_.GetWeakPtr()),
512 NextEvent(switch_off_rate_));
515 void InterruptedPoissonProcess::SendPacket() {
516 task_runner_->PostDelayedTask(
517 FROM_HERE,
518 base::Bind(&InterruptedPoissonProcess::SendPacket,
519 weak_factory_.GetWeakPtr()),
520 NextEvent(send_rate_));
522 // If OFF then don't send.
523 if (!on_state_)
524 return;
526 // Find the earliest packet to send.
527 base::TimeTicks earliest_time;
528 for (size_t i = 0; i < send_buffers_.size(); ++i) {
529 if (!send_buffers_[i])
530 continue;
531 if (send_buffers_[i]->Empty())
532 continue;
533 if (earliest_time.is_null() ||
534 send_buffers_[i]->FirstPacketTime() < earliest_time)
535 earliest_time = send_buffers_[i]->FirstPacketTime();
537 for (size_t i = 0; i < send_buffers_.size(); ++i) {
538 if (!send_buffers_[i])
539 continue;
540 if (send_buffers_[i]->Empty())
541 continue;
542 if (send_buffers_[i]->FirstPacketTime() != earliest_time)
543 continue;
544 send_buffers_[i]->SendOnePacket();
545 break;
549 class UDPProxyImpl;
551 class PacketSender : public PacketPipe {
552 public:
553 PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination)
554 : udp_proxy_(udp_proxy), destination_(destination) {}
555 virtual void Send(scoped_ptr<Packet> packet) OVERRIDE;
556 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE {
557 NOTREACHED();
560 private:
561 UDPProxyImpl* udp_proxy_;
562 const net::IPEndPoint* destination_; // not owned
565 namespace {
566 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) {
567 if (*pipe) {
568 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass());
569 } else {
570 pipe->reset(next);
573 } // namespace
575 scoped_ptr<PacketPipe> GoodNetwork() {
576 // This represents the buffer on the sender.
577 scoped_ptr<PacketPipe> pipe;
578 BuildPipe(&pipe, new Buffer(2 << 20, 50));
579 BuildPipe(&pipe, new ConstantDelay(1E-3));
580 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 2E-3, 3));
581 // This represents the buffer on the receiving device.
582 BuildPipe(&pipe, new Buffer(2 << 20, 50));
583 return pipe.Pass();
586 scoped_ptr<PacketPipe> WifiNetwork() {
587 // This represents the buffer on the sender.
588 scoped_ptr<PacketPipe> pipe;
589 BuildPipe(&pipe, new Buffer(256 << 10, 20));
590 BuildPipe(&pipe, new RandomDrop(0.005));
591 // This represents the buffer on the router.
592 BuildPipe(&pipe, new ConstantDelay(1E-3));
593 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
594 BuildPipe(&pipe, new Buffer(256 << 10, 20));
595 BuildPipe(&pipe, new ConstantDelay(1E-3));
596 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
597 BuildPipe(&pipe, new RandomDrop(0.005));
598 // This represents the buffer on the receiving device.
599 BuildPipe(&pipe, new Buffer(256 << 10, 20));
600 return pipe.Pass();
603 scoped_ptr<PacketPipe> BadNetwork() {
604 scoped_ptr<PacketPipe> pipe;
605 // This represents the buffer on the sender.
606 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
607 BuildPipe(&pipe, new RandomDrop(0.05)); // 5% packet drop
608 BuildPipe(&pipe, new RandomSortedDelay(2E-3, 20E-3, 1));
609 // This represents the buffer on the router.
610 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 4mbit/s
611 BuildPipe(&pipe, new ConstantDelay(1E-3));
612 // Random 40ms every other second
613 // BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
614 BuildPipe(&pipe, new RandomUnsortedDelay(5E-3));
615 // This represents the buffer on the receiving device.
616 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
617 return pipe.Pass();
621 scoped_ptr<PacketPipe> EvilNetwork() {
622 // This represents the buffer on the sender.
623 scoped_ptr<PacketPipe> pipe;
624 BuildPipe(&pipe, new Buffer(4 << 10, 5)); // 4 kb buf, 2mbit/s
625 // This represents the buffer on the router.
626 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
627 BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1));
628 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
629 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
630 BuildPipe(&pipe, new ConstantDelay(1E-3));
631 BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3));
632 BuildPipe(&pipe, new RandomUnsortedDelay(20E-3));
633 // This represents the buffer on the receiving device.
634 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
635 return pipe.Pass();
638 class UDPProxyImpl : public UDPProxy {
639 public:
640 UDPProxyImpl(const net::IPEndPoint& local_port,
641 const net::IPEndPoint& destination,
642 scoped_ptr<PacketPipe> to_dest_pipe,
643 scoped_ptr<PacketPipe> from_dest_pipe,
644 net::NetLog* net_log)
645 : local_port_(local_port),
646 destination_(destination),
647 destination_is_mutable_(destination.address().empty()),
648 proxy_thread_("media::cast::test::UdpProxy Thread"),
649 to_dest_pipe_(to_dest_pipe.Pass()),
650 from_dest_pipe_(from_dest_pipe.Pass()),
651 blocked_(false),
652 weak_factory_(this) {
653 proxy_thread_.StartWithOptions(
654 base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
655 base::WaitableEvent start_event(false, false);
656 proxy_thread_.message_loop_proxy()->PostTask(
657 FROM_HERE,
658 base::Bind(&UDPProxyImpl::Start,
659 base::Unretained(this),
660 base::Unretained(&start_event),
661 net_log));
662 start_event.Wait();
665 virtual ~UDPProxyImpl() {
666 base::WaitableEvent stop_event(false, false);
667 proxy_thread_.message_loop_proxy()->PostTask(
668 FROM_HERE,
669 base::Bind(&UDPProxyImpl::Stop,
670 base::Unretained(this),
671 base::Unretained(&stop_event)));
672 stop_event.Wait();
673 proxy_thread_.Stop();
676 void Send(scoped_ptr<Packet> packet,
677 const net::IPEndPoint& destination) {
678 if (blocked_) {
679 LOG(ERROR) << "Cannot write packet right now: blocked";
680 return;
683 VLOG(1) << "Sending packet, len = " << packet->size();
684 // We ignore all problems, callbacks and errors.
685 // If it didn't work we just drop the packet at and call it a day.
686 scoped_refptr<net::IOBuffer> buf =
687 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
688 size_t buf_size = packet->size();
689 int result;
690 if (destination.address().empty()) {
691 VLOG(1) << "Destination has not been set yet.";
692 result = net::ERR_INVALID_ARGUMENT;
693 } else {
694 VLOG(1) << "Destination:" << destination.ToString();
695 result = socket_->SendTo(buf.get(),
696 static_cast<int>(buf_size),
697 destination,
698 base::Bind(&UDPProxyImpl::AllowWrite,
699 weak_factory_.GetWeakPtr(),
700 buf,
701 base::Passed(&packet)));
703 if (result == net::ERR_IO_PENDING) {
704 blocked_ = true;
705 } else if (result < 0) {
706 LOG(ERROR) << "Failed to write packet.";
710 private:
711 void Start(base::WaitableEvent* start_event,
712 net::NetLog* net_log) {
713 socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
714 net::RandIntCallback(),
715 net_log,
716 net::NetLog::Source()));
717 BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_));
718 BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_));
719 to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
720 &tick_clock_);
721 from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
722 &tick_clock_);
724 VLOG(0) << "From:" << local_port_.ToString();
725 if (!destination_is_mutable_)
726 VLOG(0) << "To:" << destination_.ToString();
728 CHECK_GE(socket_->Bind(local_port_), 0);
730 start_event->Signal();
731 PollRead();
734 void Stop(base::WaitableEvent* stop_event) {
735 to_dest_pipe_.reset(NULL);
736 from_dest_pipe_.reset(NULL);
737 socket_.reset(NULL);
738 stop_event->Signal();
741 void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) {
742 DCHECK_NE(len, net::ERR_IO_PENDING);
743 VLOG(1) << "Got packet, len = " << len;
744 if (len < 0) {
745 LOG(WARNING) << "Socket read error: " << len;
746 return;
748 packet_->resize(len);
749 if (destination_is_mutable_ && set_destination_next_ &&
750 !(recv_address_ == return_address_) &&
751 !(recv_address_ == destination_)) {
752 destination_ = recv_address_;
754 if (recv_address_ == destination_) {
755 set_destination_next_ = false;
756 from_dest_pipe_->Send(packet_.Pass());
757 } else {
758 set_destination_next_ = true;
759 VLOG(1) << "Return address = " << recv_address_.ToString();
760 return_address_ = recv_address_;
761 to_dest_pipe_->Send(packet_.Pass());
765 void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) {
766 ProcessPacket(recv_buf, len);
767 PollRead();
770 void PollRead() {
771 while (true) {
772 packet_.reset(new Packet(kMaxPacketSize));
773 scoped_refptr<net::IOBuffer> recv_buf =
774 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front()));
775 int len = socket_->RecvFrom(
776 recv_buf.get(),
777 kMaxPacketSize,
778 &recv_address_,
779 base::Bind(
780 &UDPProxyImpl::ReadCallback, base::Unretained(this), recv_buf));
781 if (len == net::ERR_IO_PENDING)
782 break;
783 ProcessPacket(recv_buf, len);
787 void AllowWrite(scoped_refptr<net::IOBuffer> buf,
788 scoped_ptr<Packet> packet,
789 int unused_len) {
790 DCHECK(blocked_);
791 blocked_ = false;
794 // Input
795 net::IPEndPoint local_port_;
797 net::IPEndPoint destination_;
798 bool destination_is_mutable_;
800 net::IPEndPoint return_address_;
801 bool set_destination_next_;
803 base::DefaultTickClock tick_clock_;
804 base::Thread proxy_thread_;
805 scoped_ptr<net::UDPSocket> socket_;
806 scoped_ptr<PacketPipe> to_dest_pipe_;
807 scoped_ptr<PacketPipe> from_dest_pipe_;
809 // For receiving.
810 net::IPEndPoint recv_address_;
811 scoped_ptr<Packet> packet_;
813 // For sending.
814 bool blocked_;
816 base::WeakPtrFactory<UDPProxyImpl> weak_factory_;
819 void PacketSender::Send(scoped_ptr<Packet> packet) {
820 udp_proxy_->Send(packet.Pass(), *destination_);
823 scoped_ptr<UDPProxy> UDPProxy::Create(
824 const net::IPEndPoint& local_port,
825 const net::IPEndPoint& destination,
826 scoped_ptr<PacketPipe> to_dest_pipe,
827 scoped_ptr<PacketPipe> from_dest_pipe,
828 net::NetLog* net_log) {
829 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port,
830 destination,
831 to_dest_pipe.Pass(),
832 from_dest_pipe.Pass(),
833 net_log));
834 return ret.Pass();
837 } // namespace test
838 } // namespace cast
839 } // namespace media