1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
9 #include "media/cast/test/utility/udp_proxy.h"
11 #include "base/logging.h"
12 #include "base/rand_util.h"
13 #include "base/synchronization/waitable_event.h"
14 #include "base/threading/thread.h"
15 #include "base/time/default_tick_clock.h"
16 #include "net/base/io_buffer.h"
17 #include "net/base/net_errors.h"
18 #include "net/udp/udp_socket.h"
24 const size_t kMaxPacketSize
= 65536;
26 PacketPipe::PacketPipe() {}
27 PacketPipe::~PacketPipe() {}
28 void PacketPipe::InitOnIOThread(
29 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
30 base::TickClock
* clock
) {
31 task_runner_
= task_runner
;
34 pipe_
->InitOnIOThread(task_runner
, clock
);
37 void PacketPipe::AppendToPipe(scoped_ptr
<PacketPipe
> pipe
) {
39 pipe_
->AppendToPipe(pipe
.Pass());
45 // Roughly emulates a buffer inside a device.
46 // If the buffer is full, packets are dropped.
47 // Packets are output at a maximum bandwidth.
48 class Buffer
: public PacketPipe
{
50 Buffer(size_t buffer_size
, double max_megabits_per_second
)
52 max_buffer_size_(buffer_size
),
53 max_megabits_per_second_(max_megabits_per_second
),
55 CHECK_GT(max_buffer_size_
, 0UL);
56 CHECK_GT(max_megabits_per_second
, 0);
59 virtual void Send(scoped_ptr
<Packet
> packet
) OVERRIDE
{
60 if (packet
->size() + buffer_size_
<= max_buffer_size_
) {
61 buffer_size_
+= packet
->size();
62 buffer_
.push_back(linked_ptr
<Packet
>(packet
.release()));
63 if (buffer_
.size() == 1) {
71 last_schedule_
= clock_
->NowTicks();
72 double megabits
= buffer_
.front()->size() * 8 / 1000000.0;
73 double seconds
= megabits
/ max_megabits_per_second_
;
74 int64 microseconds
= static_cast<int64
>(seconds
* 1E6
);
75 task_runner_
->PostDelayedTask(
77 base::Bind(&Buffer::ProcessBuffer
, weak_factory_
.GetWeakPtr()),
78 base::TimeDelta::FromMicroseconds(microseconds
));
81 void ProcessBuffer() {
82 int64 bytes_to_send
= static_cast<int64
>(
83 (clock_
->NowTicks() - last_schedule_
).InSecondsF() *
84 max_megabits_per_second_
* 1E6
/ 8);
85 if (bytes_to_send
< static_cast<int64
>(buffer_
.front()->size())) {
86 bytes_to_send
= buffer_
.front()->size();
88 while (!buffer_
.empty() &&
89 static_cast<int64
>(buffer_
.front()->size()) <= bytes_to_send
) {
90 CHECK(!buffer_
.empty());
91 scoped_ptr
<Packet
> packet(buffer_
.front().release());
92 bytes_to_send
-= packet
->size();
93 buffer_size_
-= packet
->size();
95 pipe_
->Send(packet
.Pass());
97 if (!buffer_
.empty()) {
102 std::deque
<linked_ptr
<Packet
> > buffer_
;
103 base::TimeTicks last_schedule_
;
105 size_t max_buffer_size_
;
106 double max_megabits_per_second_
; // megabits per second
107 base::WeakPtrFactory
<Buffer
> weak_factory_
;
110 scoped_ptr
<PacketPipe
> NewBuffer(size_t buffer_size
, double bandwidth
) {
111 return scoped_ptr
<PacketPipe
>(new Buffer(buffer_size
, bandwidth
)).Pass();
114 class RandomDrop
: public PacketPipe
{
116 RandomDrop(double drop_fraction
)
117 : drop_fraction_(static_cast<int>(drop_fraction
* RAND_MAX
)) {}
119 virtual void Send(scoped_ptr
<Packet
> packet
) OVERRIDE
{
120 if (rand() > drop_fraction_
) {
121 pipe_
->Send(packet
.Pass());
129 scoped_ptr
<PacketPipe
> NewRandomDrop(double drop_fraction
) {
130 return scoped_ptr
<PacketPipe
>(new RandomDrop(drop_fraction
)).Pass();
133 class SimpleDelayBase
: public PacketPipe
{
135 SimpleDelayBase() : weak_factory_(this) {}
136 virtual ~SimpleDelayBase() {}
138 virtual void Send(scoped_ptr
<Packet
> packet
) OVERRIDE
{
139 double seconds
= GetDelay();
140 task_runner_
->PostDelayedTask(
142 base::Bind(&SimpleDelayBase::SendInternal
,
143 weak_factory_
.GetWeakPtr(),
144 base::Passed(&packet
)),
145 base::TimeDelta::FromMicroseconds(static_cast<int64
>(seconds
* 1E6
)));
148 virtual double GetDelay() = 0;
151 virtual void SendInternal(scoped_ptr
<Packet
> packet
) {
152 pipe_
->Send(packet
.Pass());
155 base::WeakPtrFactory
<SimpleDelayBase
> weak_factory_
;
158 class ConstantDelay
: public SimpleDelayBase
{
160 ConstantDelay(double delay_seconds
) : delay_seconds_(delay_seconds
) {}
161 virtual double GetDelay() OVERRIDE
{
162 return delay_seconds_
;
166 double delay_seconds_
;
169 scoped_ptr
<PacketPipe
> NewConstantDelay(double delay_seconds
) {
170 return scoped_ptr
<PacketPipe
>(new ConstantDelay(delay_seconds
)).Pass();
173 class RandomUnsortedDelay
: public SimpleDelayBase
{
175 RandomUnsortedDelay(double random_delay
) : random_delay_(random_delay
) {}
177 virtual double GetDelay() OVERRIDE
{
178 return random_delay_
* base::RandDouble();
182 double random_delay_
;
185 scoped_ptr
<PacketPipe
> NewRandomUnsortedDelay(double random_delay
) {
186 return scoped_ptr
<PacketPipe
>(new RandomUnsortedDelay(random_delay
)).Pass();
189 class DuplicateAndDelay
: public RandomUnsortedDelay
{
191 DuplicateAndDelay(double delay_min
,
192 double random_delay
) :
193 RandomUnsortedDelay(random_delay
),
194 delay_min_(delay_min
) {
196 virtual void Send(scoped_ptr
<Packet
> packet
) OVERRIDE
{
197 pipe_
->Send(scoped_ptr
<Packet
>(new Packet(*packet
.get())));
198 RandomUnsortedDelay::Send(packet
.Pass());
200 virtual double GetDelay() OVERRIDE
{
201 return RandomUnsortedDelay::GetDelay() + delay_min_
;
207 scoped_ptr
<PacketPipe
> NewDuplicateAndDelay(double delay_min
,
208 double random_delay
) {
209 return scoped_ptr
<PacketPipe
>(
210 new DuplicateAndDelay(delay_min
, random_delay
)).Pass();
213 class RandomSortedDelay
: public PacketPipe
{
215 RandomSortedDelay(double random_delay
,
217 double seconds_between_extra_delay
)
218 : random_delay_(random_delay
),
219 extra_delay_(extra_delay
),
220 seconds_between_extra_delay_(seconds_between_extra_delay
),
221 weak_factory_(this) {}
223 virtual void Send(scoped_ptr
<Packet
> packet
) OVERRIDE
{
224 buffer_
.push_back(linked_ptr
<Packet
>(packet
.release()));
225 if (buffer_
.size() == 1) {
226 next_send_
= std::max(
228 base::TimeDelta::FromSecondsD(base::RandDouble() * random_delay_
),
233 virtual void InitOnIOThread(
234 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
235 base::TickClock
* clock
) OVERRIDE
{
236 PacketPipe::InitOnIOThread(task_runner
, clock
);
237 // As we start the stream, assume that we are in a random
238 // place between two extra delays, thus multiplier = 1.0;
239 ScheduleExtraDelay(1.0);
243 void ScheduleExtraDelay(double mult
) {
244 double seconds
= seconds_between_extra_delay_
* mult
* base::RandDouble();
245 int64 microseconds
= static_cast<int64
>(seconds
* 1E6
);
246 task_runner_
->PostDelayedTask(
248 base::Bind(&RandomSortedDelay::CauseExtraDelay
,
249 weak_factory_
.GetWeakPtr()),
250 base::TimeDelta::FromMicroseconds(microseconds
));
253 void CauseExtraDelay() {
254 next_send_
= std::max
<base::TimeTicks
>(
256 base::TimeDelta::FromMicroseconds(
257 static_cast<int64
>(extra_delay_
* 1E6
)),
259 // An extra delay just happened, wait up to seconds_between_extra_delay_*2
260 // before scheduling another one to make the average equal to
261 // seconds_between_extra_delay_.
262 ScheduleExtraDelay(2.0);
265 void ProcessBuffer() {
266 base::TimeTicks now
= clock_
->NowTicks();
267 while (!buffer_
.empty() && next_send_
<= now
) {
268 scoped_ptr
<Packet
> packet(buffer_
.front().release());
269 pipe_
->Send(packet
.Pass());
272 next_send_
+= base::TimeDelta::FromSecondsD(
273 base::RandDouble() * random_delay_
);
276 if (!buffer_
.empty()) {
277 task_runner_
->PostDelayedTask(
279 base::Bind(&RandomSortedDelay::ProcessBuffer
,
280 weak_factory_
.GetWeakPtr()),
285 base::TimeTicks block_until_
;
286 std::deque
<linked_ptr
<Packet
> > buffer_
;
287 double random_delay_
;
289 double seconds_between_extra_delay_
;
290 base::WeakPtrFactory
<RandomSortedDelay
> weak_factory_
;
291 base::TimeTicks next_send_
;
294 scoped_ptr
<PacketPipe
> NewRandomSortedDelay(
297 double seconds_between_extra_delay
) {
298 return scoped_ptr
<PacketPipe
>(
299 new RandomSortedDelay(
300 random_delay
, extra_delay
, seconds_between_extra_delay
))
304 class NetworkGlitchPipe
: public PacketPipe
{
306 NetworkGlitchPipe(double average_work_time
, double average_outage_time
)
308 max_work_time_(average_work_time
* 2),
309 max_outage_time_(average_outage_time
* 2),
310 weak_factory_(this) {}
312 virtual void InitOnIOThread(
313 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
314 base::TickClock
* clock
) OVERRIDE
{
315 PacketPipe::InitOnIOThread(task_runner
, clock
);
319 virtual void Send(scoped_ptr
<Packet
> packet
) OVERRIDE
{
321 pipe_
->Send(packet
.Pass());
328 double seconds
= base::RandDouble() *
329 (works_
? max_work_time_
: max_outage_time_
);
330 int64 microseconds
= static_cast<int64
>(seconds
* 1E6
);
331 task_runner_
->PostDelayedTask(
333 base::Bind(&NetworkGlitchPipe::Flip
, weak_factory_
.GetWeakPtr()),
334 base::TimeDelta::FromMicroseconds(microseconds
));
338 double max_work_time_
;
339 double max_outage_time_
;
340 base::WeakPtrFactory
<NetworkGlitchPipe
> weak_factory_
;
343 scoped_ptr
<PacketPipe
> NewNetworkGlitchPipe(double average_work_time
,
344 double average_outage_time
) {
345 return scoped_ptr
<PacketPipe
>(
346 new NetworkGlitchPipe(average_work_time
, average_outage_time
))
351 // Internal buffer object for a client of the IPP model.
352 class InterruptedPoissonProcess::InternalBuffer
: public PacketPipe
{
354 InternalBuffer(base::WeakPtr
<InterruptedPoissonProcess
> ipp
,
360 weak_factory_(this) {
363 virtual void Send(scoped_ptr
<Packet
> packet
) OVERRIDE
{
364 // Drop if buffer is full.
365 if (stored_size_
>= stored_limit_
)
367 stored_size_
+= packet
->size();
368 buffer_
.push_back(linked_ptr
<Packet
>(packet
.release()));
369 buffer_time_
.push_back(clock_
->NowTicks());
370 DCHECK(buffer_
.size() == buffer_time_
.size());
373 virtual void InitOnIOThread(
374 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
375 base::TickClock
* clock
) OVERRIDE
{
378 ipp_
->InitOnIOThread(task_runner
, clock
);
379 PacketPipe::InitOnIOThread(task_runner
, clock
);
382 void SendOnePacket() {
383 scoped_ptr
<Packet
> packet(buffer_
.front().release());
384 stored_size_
-= packet
->size();
386 buffer_time_
.pop_front();
387 pipe_
->Send(packet
.Pass());
388 DCHECK(buffer_
.size() == buffer_time_
.size());
392 return buffer_
.empty();
395 base::TimeTicks
FirstPacketTime() const {
396 DCHECK(!buffer_time_
.empty());
397 return buffer_time_
.front();
400 base::WeakPtr
<InternalBuffer
> GetWeakPtr() {
401 return weak_factory_
.GetWeakPtr();
406 const base::WeakPtr
<InterruptedPoissonProcess
> ipp_
;
408 const size_t stored_limit_
;
409 std::deque
<linked_ptr
<Packet
> > buffer_
;
410 std::deque
<base::TimeTicks
> buffer_time_
;
411 base::TickClock
* clock_
;
412 base::WeakPtrFactory
<InternalBuffer
> weak_factory_
;
414 DISALLOW_COPY_AND_ASSIGN(InternalBuffer
);
417 InterruptedPoissonProcess::InterruptedPoissonProcess(
418 const std::vector
<double>& average_rates
,
419 double coef_burstiness
,
420 double coef_variance
,
423 average_rates_(average_rates
),
424 coef_burstiness_(coef_burstiness
),
425 coef_variance_(coef_variance
),
428 weak_factory_(this) {
429 mt_rand_
.init_genrand(rand_seed
);
430 DCHECK(!average_rates
.empty());
434 InterruptedPoissonProcess::~InterruptedPoissonProcess() {
437 void InterruptedPoissonProcess::InitOnIOThread(
438 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
439 base::TickClock
* clock
) {
440 // Already initialized and started.
441 if (task_runner_
.get() && clock_
)
443 task_runner_
= task_runner
;
450 scoped_ptr
<PacketPipe
> InterruptedPoissonProcess::NewBuffer(size_t size
) {
451 scoped_ptr
<InternalBuffer
> buffer(
452 new InternalBuffer(weak_factory_
.GetWeakPtr(), size
));
453 send_buffers_
.push_back(buffer
->GetWeakPtr());
454 return buffer
.PassAs
<PacketPipe
>();
457 base::TimeDelta
InterruptedPoissonProcess::NextEvent(double rate
) {
458 // Rate is per milliseconds.
459 // The time until next event is exponentially distributed to the
460 // inverse of |rate|.
461 return base::TimeDelta::FromMillisecondsD(
462 fabs(-log(1.0 - RandDouble()) / rate
));
465 double InterruptedPoissonProcess::RandDouble() {
466 // Generate a 64-bits random number from MT19937 and then convert
468 uint64 rand
= mt_rand_
.genrand_int32();
470 rand
|= mt_rand_
.genrand_int32();
471 return base::BitsToOpenEndedUnitInterval(rand
);
474 void InterruptedPoissonProcess::ComputeRates() {
475 double avg_rate
= average_rates_
[rate_index_
];
477 send_rate_
= avg_rate
/ coef_burstiness_
;
479 2 * avg_rate
* (1 - coef_burstiness_
) * (1 - coef_burstiness_
) /
480 coef_burstiness_
/ (coef_variance_
- 1);
482 2 * avg_rate
* (1 - coef_burstiness_
) / (coef_variance_
- 1);
485 void InterruptedPoissonProcess::UpdateRates() {
488 // Rates are updated once per second.
489 rate_index_
= (rate_index_
+ 1) % average_rates_
.size();
490 task_runner_
->PostDelayedTask(
492 base::Bind(&InterruptedPoissonProcess::UpdateRates
,
493 weak_factory_
.GetWeakPtr()),
494 base::TimeDelta::FromSeconds(1));
497 void InterruptedPoissonProcess::SwitchOff() {
499 task_runner_
->PostDelayedTask(
501 base::Bind(&InterruptedPoissonProcess::SwitchOn
,
502 weak_factory_
.GetWeakPtr()),
503 NextEvent(switch_on_rate_
));
506 void InterruptedPoissonProcess::SwitchOn() {
508 task_runner_
->PostDelayedTask(
510 base::Bind(&InterruptedPoissonProcess::SwitchOff
,
511 weak_factory_
.GetWeakPtr()),
512 NextEvent(switch_off_rate_
));
515 void InterruptedPoissonProcess::SendPacket() {
516 task_runner_
->PostDelayedTask(
518 base::Bind(&InterruptedPoissonProcess::SendPacket
,
519 weak_factory_
.GetWeakPtr()),
520 NextEvent(send_rate_
));
522 // If OFF then don't send.
526 // Find the earliest packet to send.
527 base::TimeTicks earliest_time
;
528 for (size_t i
= 0; i
< send_buffers_
.size(); ++i
) {
529 if (!send_buffers_
[i
])
531 if (send_buffers_
[i
]->Empty())
533 if (earliest_time
.is_null() ||
534 send_buffers_
[i
]->FirstPacketTime() < earliest_time
)
535 earliest_time
= send_buffers_
[i
]->FirstPacketTime();
537 for (size_t i
= 0; i
< send_buffers_
.size(); ++i
) {
538 if (!send_buffers_
[i
])
540 if (send_buffers_
[i
]->Empty())
542 if (send_buffers_
[i
]->FirstPacketTime() != earliest_time
)
544 send_buffers_
[i
]->SendOnePacket();
551 class PacketSender
: public PacketPipe
{
553 PacketSender(UDPProxyImpl
* udp_proxy
, const net::IPEndPoint
* destination
)
554 : udp_proxy_(udp_proxy
), destination_(destination
) {}
555 virtual void Send(scoped_ptr
<Packet
> packet
) OVERRIDE
;
556 virtual void AppendToPipe(scoped_ptr
<PacketPipe
> pipe
) OVERRIDE
{
561 UDPProxyImpl
* udp_proxy_
;
562 const net::IPEndPoint
* destination_
; // not owned
566 void BuildPipe(scoped_ptr
<PacketPipe
>* pipe
, PacketPipe
* next
) {
568 (*pipe
)->AppendToPipe(scoped_ptr
<PacketPipe
>(next
).Pass());
575 scoped_ptr
<PacketPipe
> GoodNetwork() {
576 // This represents the buffer on the sender.
577 scoped_ptr
<PacketPipe
> pipe
;
578 BuildPipe(&pipe
, new Buffer(2 << 20, 50));
579 BuildPipe(&pipe
, new ConstantDelay(1E-3));
580 BuildPipe(&pipe
, new RandomSortedDelay(1E-3, 2E-3, 3));
581 // This represents the buffer on the receiving device.
582 BuildPipe(&pipe
, new Buffer(2 << 20, 50));
586 scoped_ptr
<PacketPipe
> WifiNetwork() {
587 // This represents the buffer on the sender.
588 scoped_ptr
<PacketPipe
> pipe
;
589 BuildPipe(&pipe
, new Buffer(256 << 10, 20));
590 BuildPipe(&pipe
, new RandomDrop(0.005));
591 // This represents the buffer on the router.
592 BuildPipe(&pipe
, new ConstantDelay(1E-3));
593 BuildPipe(&pipe
, new RandomSortedDelay(1E-3, 20E-3, 3));
594 BuildPipe(&pipe
, new Buffer(256 << 10, 20));
595 BuildPipe(&pipe
, new ConstantDelay(1E-3));
596 BuildPipe(&pipe
, new RandomSortedDelay(1E-3, 20E-3, 3));
597 BuildPipe(&pipe
, new RandomDrop(0.005));
598 // This represents the buffer on the receiving device.
599 BuildPipe(&pipe
, new Buffer(256 << 10, 20));
603 scoped_ptr
<PacketPipe
> BadNetwork() {
604 scoped_ptr
<PacketPipe
> pipe
;
605 // This represents the buffer on the sender.
606 BuildPipe(&pipe
, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
607 BuildPipe(&pipe
, new RandomDrop(0.05)); // 5% packet drop
608 BuildPipe(&pipe
, new RandomSortedDelay(2E-3, 20E-3, 1));
609 // This represents the buffer on the router.
610 BuildPipe(&pipe
, new Buffer(64 << 10, 5)); // 64 kb buf, 4mbit/s
611 BuildPipe(&pipe
, new ConstantDelay(1E-3));
612 // Random 40ms every other second
613 // BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
614 BuildPipe(&pipe
, new RandomUnsortedDelay(5E-3));
615 // This represents the buffer on the receiving device.
616 BuildPipe(&pipe
, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
621 scoped_ptr
<PacketPipe
> EvilNetwork() {
622 // This represents the buffer on the sender.
623 scoped_ptr
<PacketPipe
> pipe
;
624 BuildPipe(&pipe
, new Buffer(4 << 10, 5)); // 4 kb buf, 2mbit/s
625 // This represents the buffer on the router.
626 BuildPipe(&pipe
, new RandomDrop(0.1)); // 10% packet drop
627 BuildPipe(&pipe
, new RandomSortedDelay(20E-3, 60E-3, 1));
628 BuildPipe(&pipe
, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
629 BuildPipe(&pipe
, new RandomDrop(0.1)); // 10% packet drop
630 BuildPipe(&pipe
, new ConstantDelay(1E-3));
631 BuildPipe(&pipe
, new NetworkGlitchPipe(2.0, 0.3));
632 BuildPipe(&pipe
, new RandomUnsortedDelay(20E-3));
633 // This represents the buffer on the receiving device.
634 BuildPipe(&pipe
, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
638 class UDPProxyImpl
: public UDPProxy
{
640 UDPProxyImpl(const net::IPEndPoint
& local_port
,
641 const net::IPEndPoint
& destination
,
642 scoped_ptr
<PacketPipe
> to_dest_pipe
,
643 scoped_ptr
<PacketPipe
> from_dest_pipe
,
644 net::NetLog
* net_log
)
645 : local_port_(local_port
),
646 destination_(destination
),
647 destination_is_mutable_(destination
.address().empty()),
648 proxy_thread_("media::cast::test::UdpProxy Thread"),
649 to_dest_pipe_(to_dest_pipe
.Pass()),
650 from_dest_pipe_(from_dest_pipe
.Pass()),
652 weak_factory_(this) {
653 proxy_thread_
.StartWithOptions(
654 base::Thread::Options(base::MessageLoop::TYPE_IO
, 0));
655 base::WaitableEvent
start_event(false, false);
656 proxy_thread_
.message_loop_proxy()->PostTask(
658 base::Bind(&UDPProxyImpl::Start
,
659 base::Unretained(this),
660 base::Unretained(&start_event
),
665 virtual ~UDPProxyImpl() {
666 base::WaitableEvent
stop_event(false, false);
667 proxy_thread_
.message_loop_proxy()->PostTask(
669 base::Bind(&UDPProxyImpl::Stop
,
670 base::Unretained(this),
671 base::Unretained(&stop_event
)));
673 proxy_thread_
.Stop();
676 void Send(scoped_ptr
<Packet
> packet
,
677 const net::IPEndPoint
& destination
) {
679 LOG(ERROR
) << "Cannot write packet right now: blocked";
683 VLOG(1) << "Sending packet, len = " << packet
->size();
684 // We ignore all problems, callbacks and errors.
685 // If it didn't work we just drop the packet at and call it a day.
686 scoped_refptr
<net::IOBuffer
> buf
=
687 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet
->front()));
688 size_t buf_size
= packet
->size();
690 if (destination
.address().empty()) {
691 VLOG(1) << "Destination has not been set yet.";
692 result
= net::ERR_INVALID_ARGUMENT
;
694 VLOG(1) << "Destination:" << destination
.ToString();
695 result
= socket_
->SendTo(buf
.get(),
696 static_cast<int>(buf_size
),
698 base::Bind(&UDPProxyImpl::AllowWrite
,
699 weak_factory_
.GetWeakPtr(),
701 base::Passed(&packet
)));
703 if (result
== net::ERR_IO_PENDING
) {
705 } else if (result
< 0) {
706 LOG(ERROR
) << "Failed to write packet.";
711 void Start(base::WaitableEvent
* start_event
,
712 net::NetLog
* net_log
) {
713 socket_
.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND
,
714 net::RandIntCallback(),
716 net::NetLog::Source()));
717 BuildPipe(&to_dest_pipe_
, new PacketSender(this, &destination_
));
718 BuildPipe(&from_dest_pipe_
, new PacketSender(this, &return_address_
));
719 to_dest_pipe_
->InitOnIOThread(base::MessageLoopProxy::current(),
721 from_dest_pipe_
->InitOnIOThread(base::MessageLoopProxy::current(),
724 VLOG(0) << "From:" << local_port_
.ToString();
725 if (!destination_is_mutable_
)
726 VLOG(0) << "To:" << destination_
.ToString();
728 CHECK_GE(socket_
->Bind(local_port_
), 0);
730 start_event
->Signal();
734 void Stop(base::WaitableEvent
* stop_event
) {
735 to_dest_pipe_
.reset(NULL
);
736 from_dest_pipe_
.reset(NULL
);
738 stop_event
->Signal();
741 void ProcessPacket(scoped_refptr
<net::IOBuffer
> recv_buf
, int len
) {
742 DCHECK_NE(len
, net::ERR_IO_PENDING
);
743 VLOG(1) << "Got packet, len = " << len
;
745 LOG(WARNING
) << "Socket read error: " << len
;
748 packet_
->resize(len
);
749 if (destination_is_mutable_
&& set_destination_next_
&&
750 !(recv_address_
== return_address_
) &&
751 !(recv_address_
== destination_
)) {
752 destination_
= recv_address_
;
754 if (recv_address_
== destination_
) {
755 set_destination_next_
= false;
756 from_dest_pipe_
->Send(packet_
.Pass());
758 set_destination_next_
= true;
759 VLOG(1) << "Return address = " << recv_address_
.ToString();
760 return_address_
= recv_address_
;
761 to_dest_pipe_
->Send(packet_
.Pass());
765 void ReadCallback(scoped_refptr
<net::IOBuffer
> recv_buf
, int len
) {
766 ProcessPacket(recv_buf
, len
);
772 packet_
.reset(new Packet(kMaxPacketSize
));
773 scoped_refptr
<net::IOBuffer
> recv_buf
=
774 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_
->front()));
775 int len
= socket_
->RecvFrom(
780 &UDPProxyImpl::ReadCallback
, base::Unretained(this), recv_buf
));
781 if (len
== net::ERR_IO_PENDING
)
783 ProcessPacket(recv_buf
, len
);
787 void AllowWrite(scoped_refptr
<net::IOBuffer
> buf
,
788 scoped_ptr
<Packet
> packet
,
795 net::IPEndPoint local_port_
;
797 net::IPEndPoint destination_
;
798 bool destination_is_mutable_
;
800 net::IPEndPoint return_address_
;
801 bool set_destination_next_
;
803 base::DefaultTickClock tick_clock_
;
804 base::Thread proxy_thread_
;
805 scoped_ptr
<net::UDPSocket
> socket_
;
806 scoped_ptr
<PacketPipe
> to_dest_pipe_
;
807 scoped_ptr
<PacketPipe
> from_dest_pipe_
;
810 net::IPEndPoint recv_address_
;
811 scoped_ptr
<Packet
> packet_
;
816 base::WeakPtrFactory
<UDPProxyImpl
> weak_factory_
;
819 void PacketSender::Send(scoped_ptr
<Packet
> packet
) {
820 udp_proxy_
->Send(packet
.Pass(), *destination_
);
823 scoped_ptr
<UDPProxy
> UDPProxy::Create(
824 const net::IPEndPoint
& local_port
,
825 const net::IPEndPoint
& destination
,
826 scoped_ptr
<PacketPipe
> to_dest_pipe
,
827 scoped_ptr
<PacketPipe
> from_dest_pipe
,
828 net::NetLog
* net_log
) {
829 scoped_ptr
<UDPProxy
> ret(new UDPProxyImpl(local_port
,
832 from_dest_pipe
.Pass(),