1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
20 // CancelIo Windows API cancels all outstanding IO for a particular
21 // socket on current thread. To overcome that limitation, we run
22 // special goroutine, locked to OS single thread, that both starts
23 // and cancels IO. It means, there are 2 unavoidable thread switches
25 // Some newer versions of Windows has new CancelIoEx API, that does
26 // not have that limitation and can be used from any thread. This
27 // package uses CancelIoEx API, if present, otherwise it fallback
30 var canCancelIO
bool // determines if CancelIoEx API is present
34 e
:= syscall
.WSAStartup(uint32(0x202), &d
)
36 initErr
= os
.NewSyscallError("WSAStartup", e
)
38 canCancelIO
= syscall
.LoadCancelIoEx() == nil
39 if syscall
.LoadGetAddrInfo() == nil {
40 lookupPort
= newLookupPort
41 lookupIP
= newLookupIP
45 func closesocket(s syscall
.Handle
) error
{
46 return syscall
.Closesocket(s
)
49 func canUseConnectEx(net
string) bool {
50 if net
== "udp" || net
== "udp4" || net
== "udp6" {
51 // ConnectEx windows API does not support connectionless sockets.
54 return syscall
.LoadConnectEx() == nil
57 func resolveAndDial(net
, addr
string, localAddr Addr
, deadline time
.Time
) (Conn
, error
) {
58 if !canUseConnectEx(net
) {
59 // Use the relatively inefficient goroutine-racing
60 // implementation of DialTimeout.
61 return resolveAndDialChannel(net
, addr
, localAddr
, deadline
)
63 ra
, err
:= resolveAddr("dial", net
, addr
, deadline
)
67 return dial(net
, addr
, localAddr
, ra
, deadline
)
70 // Interface for all IO operations.
71 type anOpIface
interface {
77 // IO completion result parameters.
78 type ioResult
struct {
83 // anOp implements functionality common to all IO operations.
85 // Used by IOCP interface, it must be first field
86 // of the struct, as our code rely on it.
94 func (o
*anOp
) Init(fd
*netFD
, mode
int) {
102 if fd
.resultc
[i
] == nil {
103 fd
.resultc
[i
] = make(chan ioResult
, 1)
105 o
.resultc
= fd
.resultc
[i
]
106 if fd
.errnoc
[i
] == nil {
107 fd
.errnoc
[i
] = make(chan error
)
109 o
.errnoc
= fd
.errnoc
[i
]
112 func (o
*anOp
) Op() *anOp
{
116 // bufOp is used by IO operations that read / write
117 // data from / to client buffer.
123 func (o
*bufOp
) Init(fd
*netFD
, buf
[]byte, mode
int) {
124 o
.anOp
.Init(fd
, mode
)
125 o
.buf
.Len
= uint32(len(buf
))
129 o
.buf
.Buf
= (*byte)(unsafe
.Pointer(&buf
[0]))
133 // resultSrv will retrieve all IO completion results from
134 // iocp and send them to the correspondent waiting client
135 // goroutine via channel supplied in the request.
136 type resultSrv
struct {
140 func runtime_blockingSyscallHint()
142 func (s
*resultSrv
) Run() {
143 var o
*syscall
.Overlapped
147 r
.err
= syscall
.GetQueuedCompletionStatus(s
.iocp
, &(r
.qty
), &key
, &o
, 0)
148 if r
.err
== syscall
.Errno(syscall
.WAIT_TIMEOUT
) && o
== nil {
149 runtime_blockingSyscallHint()
150 r
.err
= syscall
.GetQueuedCompletionStatus(s
.iocp
, &(r
.qty
), &key
, &o
, syscall
.INFINITE
)
154 // Dequeued successfully completed IO packet.
155 case r
.err
== syscall
.Errno(syscall
.WAIT_TIMEOUT
) && o
== nil:
156 // Wait has timed out (should not happen now, but might be used in the future).
157 panic("GetQueuedCompletionStatus timed out")
159 // Failed to dequeue anything -> report the error.
160 panic("GetQueuedCompletionStatus failed " + r
.err
.Error())
162 // Dequeued failed IO packet.
164 (*anOp
)(unsafe
.Pointer(o
)).resultc
<- r
168 // ioSrv executes net IO requests.
170 submchan
chan anOpIface
// submit IO requests
171 canchan
chan anOpIface
// cancel IO requests
174 // ProcessRemoteIO will execute submit IO requests on behalf
175 // of other goroutines, all on a single os thread, so it can
176 // cancel them later. Results of all operations will be sent
177 // back to their requesters via channel supplied in request.
178 // It is used only when the CancelIoEx API is unavailable.
179 func (s
*ioSrv
) ProcessRemoteIO() {
180 runtime
.LockOSThread()
181 defer runtime
.UnlockOSThread()
184 case o
:= <-s
.submchan
:
185 o
.Op().errnoc
<- o
.Submit()
186 case o
:= <-s
.canchan
:
187 o
.Op().errnoc
<- syscall
.CancelIo(syscall
.Handle(o
.Op().fd
.sysfd
))
192 // ExecIO executes a single IO operation oi. It submits and cancels
193 // IO in the current thread for systems where Windows CancelIoEx API
194 // is available. Alternatively, it passes the request onto
195 // a special goroutine and waits for completion or cancels request.
196 // deadline is unix nanos.
197 func (s
*ioSrv
) ExecIO(oi anOpIface
, deadline
int64) (int, error
) {
200 // Calculate timeout delta.
203 delta
= deadline
- time
.Now().UnixNano()
205 return 0, &OpError
{oi
.Name(), o
.fd
.net
, o
.fd
.laddr
, errTimeout
}
212 // Send request to a special dedicated thread,
213 // so it can stop the IO with CancelIO later.
219 // IO completed immediately, but we need to get our completion message anyway.
220 case syscall
.ERROR_IO_PENDING
:
221 // IO started, and we have to wait for its completion.
224 return 0, &OpError
{oi
.Name(), o
.fd
.net
, o
.fd
.laddr
, err
}
226 // Setup timer, if deadline is given.
227 var timer
<-chan time
.Time
229 t
:= time
.NewTimer(time
.Duration(delta
) * time
.Nanosecond
)
233 // Wait for our request to complete.
235 var cancelled
, timeout
bool
237 case r
= <-o
.resultc
:
247 err
:= syscall
.CancelIoEx(syscall
.Handle(o
.Op().fd
.sysfd
), &o
.o
)
248 // Assuming ERROR_NOT_FOUND is returned, if IO is completed.
249 if err
!= nil && err
!= syscall
.ERROR_NOT_FOUND
{
250 // TODO(brainman): maybe do something else, but panic.
257 // Wait for IO to be canceled or complete successfully.
259 if r
.err
== syscall
.ERROR_OPERATION_ABORTED
{ // IO Canceled
268 err
= &OpError
{oi
.Name(), o
.fd
.net
, o
.fd
.laddr
, r
.err
}
270 return int(r
.qty
), err
273 // Start helper goroutines.
274 var resultsrv
*resultSrv
276 var onceStartServer sync
.Once
279 resultsrv
= new(resultSrv
)
281 resultsrv
.iocp
, err
= syscall
.CreateIoCompletionPort(syscall
.InvalidHandle
, 0, 0, 1)
283 panic("CreateIoCompletionPort: " + err
.Error())
289 // Only CancelIo API is available. Lets start special goroutine
290 // locked to an OS thread, that both starts and cancels IO.
291 iosrv
.submchan
= make(chan anOpIface
)
292 iosrv
.canchan
= make(chan anOpIface
)
293 go iosrv
.ProcessRemoteIO()
297 // Network file descriptor.
299 // locking/lifetime of sysfd
304 // immutable until Close
312 resultc
[2]chan ioResult
// read/write completion results
313 errnoc
[2]chan error
// read/write submit or cancel operation errors
314 closec
chan bool // used by Close to cancel pending IO
316 // serialize access to Read and Write methods
319 // read and write deadlines
320 rdeadline
, wdeadline deadline
323 func allocFD(fd syscall
.Handle
, family
, sotype
int, net
string) *netFD
{
329 closec
: make(chan bool),
334 func newFD(fd syscall
.Handle
, family
, proto
int, net
string) (*netFD
, error
) {
338 onceStartServer
.Do(startServer
)
339 // Associate our socket with resultsrv.iocp.
340 if _
, err
:= syscall
.CreateIoCompletionPort(syscall
.Handle(fd
), resultsrv
.iocp
, 0, 0); err
!= nil {
343 return allocFD(fd
, family
, proto
, net
), nil
346 func (fd
*netFD
) setAddr(laddr
, raddr Addr
) {
349 runtime
.SetFinalizer(fd
, (*netFD
).closesocket
)
352 // Make new connection.
354 type connectOp
struct {
359 func (o
*connectOp
) Submit() error
{
360 return syscall
.ConnectEx(o
.fd
.sysfd
, o
.ra
, nil, 0, nil, &o
.o
)
363 func (o
*connectOp
) Name() string {
367 func (fd
*netFD
) connect(la
, ra syscall
.Sockaddr
) error
{
368 if !canUseConnectEx(fd
.net
) {
369 return syscall
.Connect(fd
.sysfd
, ra
)
371 // ConnectEx windows API requires an unconnected, previously bound socket.
374 case *syscall
.SockaddrInet4
:
375 la
= &syscall
.SockaddrInet4
{}
376 case *syscall
.SockaddrInet6
:
377 la
= &syscall
.SockaddrInet6
{}
379 panic("unexpected type in connect")
381 if err
:= syscall
.Bind(fd
.sysfd
, la
); err
!= nil {
385 // Call ConnectEx API.
389 _
, err
:= iosrv
.ExecIO(&o
, fd
.wdeadline
.value())
393 // Refresh socket properties.
394 return syscall
.Setsockopt(fd
.sysfd
, syscall
.SOL_SOCKET
, syscall
.SO_UPDATE_CONNECT_CONTEXT
, (*byte)(unsafe
.Pointer(&fd
.sysfd
)), int32(unsafe
.Sizeof(fd
.sysfd
)))
397 // Add a reference to this fd.
398 // If closing==true, mark the fd as closing.
399 // Returns an error if the fd cannot be used.
400 func (fd
*netFD
) incref(closing
bool) error
{
418 // Remove a reference to this FD and close if we've been asked to do so (and
419 // there are no references left.
420 func (fd
*netFD
) decref() {
426 if fd
.closing
&& fd
.sysref
== 0 && fd
.sysfd
!= syscall
.InvalidHandle
{
427 closesocket(fd
.sysfd
)
428 fd
.sysfd
= syscall
.InvalidHandle
429 // no need for a finalizer anymore
430 runtime
.SetFinalizer(fd
, nil)
435 func (fd
*netFD
) Close() error
{
436 if err
:= fd
.incref(true); err
!= nil {
440 // unblock pending reader and writer
442 // wait for both reader and writer to exit
444 defer fd
.rio
.Unlock()
446 defer fd
.wio
.Unlock()
450 func (fd
*netFD
) shutdown(how
int) error
{
451 if err
:= fd
.incref(false); err
!= nil {
455 err
:= syscall
.Shutdown(fd
.sysfd
, how
)
457 return &OpError
{"shutdown", fd
.net
, fd
.laddr
, err
}
462 func (fd
*netFD
) CloseRead() error
{
463 return fd
.shutdown(syscall
.SHUT_RD
)
466 func (fd
*netFD
) CloseWrite() error
{
467 return fd
.shutdown(syscall
.SHUT_WR
)
470 func (fd
*netFD
) closesocket() error
{
471 return closesocket(fd
.sysfd
)
474 // Read from network.
480 func (o
*readOp
) Submit() error
{
482 return syscall
.WSARecv(syscall
.Handle(o
.fd
.sysfd
), &o
.buf
, 1, &d
, &f
, &o
.o
, nil)
485 func (o
*readOp
) Name() string {
489 func (fd
*netFD
) Read(buf
[]byte) (int, error
) {
490 if err
:= fd
.incref(false); err
!= nil {
495 defer fd
.rio
.Unlock()
498 n
, err
:= iosrv
.ExecIO(&o
, fd
.rdeadline
.value())
499 if err
== nil && n
== 0 {
505 // ReadFrom from network.
507 type readFromOp
struct {
509 rsa syscall
.RawSockaddrAny
513 func (o
*readFromOp
) Submit() error
{
515 return syscall
.WSARecvFrom(o
.fd
.sysfd
, &o
.buf
, 1, &d
, &f
, &o
.rsa
, &o
.rsan
, &o
.o
, nil)
518 func (o
*readFromOp
) Name() string {
522 func (fd
*netFD
) ReadFrom(buf
[]byte) (n
int, sa syscall
.Sockaddr
, err error
) {
526 if err
:= fd
.incref(false); err
!= nil {
531 defer fd
.rio
.Unlock()
534 o
.rsan
= int32(unsafe
.Sizeof(o
.rsa
))
535 n
, err
= iosrv
.ExecIO(&o
, fd
.rdeadline
.value())
539 sa
, _
= o
.rsa
.Sockaddr()
545 type writeOp
struct {
549 func (o
*writeOp
) Submit() error
{
551 return syscall
.WSASend(o
.fd
.sysfd
, &o
.buf
, 1, &d
, 0, &o
.o
, nil)
554 func (o
*writeOp
) Name() string {
558 func (fd
*netFD
) Write(buf
[]byte) (int, error
) {
559 if err
:= fd
.incref(false); err
!= nil {
564 defer fd
.wio
.Unlock()
567 return iosrv
.ExecIO(&o
, fd
.wdeadline
.value())
570 // WriteTo to network.
572 type writeToOp
struct {
577 func (o
*writeToOp
) Submit() error
{
579 return syscall
.WSASendto(o
.fd
.sysfd
, &o
.buf
, 1, &d
, 0, o
.sa
, &o
.o
, nil)
582 func (o
*writeToOp
) Name() string {
586 func (fd
*netFD
) WriteTo(buf
[]byte, sa syscall
.Sockaddr
) (int, error
) {
590 if err
:= fd
.incref(false); err
!= nil {
595 defer fd
.wio
.Unlock()
599 return iosrv
.ExecIO(&o
, fd
.wdeadline
.value())
602 // Accept new network connections.
604 type acceptOp
struct {
606 newsock syscall
.Handle
607 attrs
[2]syscall
.RawSockaddrAny
// space for local and remote address only
610 func (o
*acceptOp
) Submit() error
{
612 l
:= uint32(unsafe
.Sizeof(o
.attrs
[0]))
613 return syscall
.AcceptEx(o
.fd
.sysfd
, o
.newsock
,
614 (*byte)(unsafe
.Pointer(&o
.attrs
[0])), 0, l
, l
, &d
, &o
.o
)
617 func (o
*acceptOp
) Name() string {
621 func (fd
*netFD
) accept(toAddr
func(syscall
.Sockaddr
) Addr
) (*netFD
, error
) {
622 if err
:= fd
.incref(false); err
!= nil {
628 s
, err
:= sysSocket(fd
.family
, fd
.sotype
, 0)
630 return nil, &OpError
{"socket", fd
.net
, fd
.laddr
, err
}
633 // Associate our new socket with IOCP.
634 onceStartServer
.Do(startServer
)
635 if _
, err
:= syscall
.CreateIoCompletionPort(s
, resultsrv
.iocp
, 0, 0); err
!= nil {
637 return nil, &OpError
{"CreateIoCompletionPort", fd
.net
, fd
.laddr
, err
}
640 // Submit accept request.
644 _
, err
= iosrv
.ExecIO(&o
, fd
.rdeadline
.value())
650 // Inherit properties of the listening socket.
651 err
= syscall
.Setsockopt(s
, syscall
.SOL_SOCKET
, syscall
.SO_UPDATE_ACCEPT_CONTEXT
, (*byte)(unsafe
.Pointer(&fd
.sysfd
)), int32(unsafe
.Sizeof(fd
.sysfd
)))
654 return nil, &OpError
{"Setsockopt", fd
.net
, fd
.laddr
, err
}
657 // Get local and peer addr out of AcceptEx buffer.
658 var lrsa
, rrsa
*syscall
.RawSockaddrAny
660 l
:= uint32(unsafe
.Sizeof(*lrsa
))
661 syscall
.GetAcceptExSockaddrs((*byte)(unsafe
.Pointer(&o
.attrs
[0])),
662 0, l
, l
, &lrsa
, &llen
, &rrsa
, &rlen
)
663 lsa
, _
:= lrsa
.Sockaddr()
664 rsa
, _
:= rrsa
.Sockaddr()
666 netfd
:= allocFD(s
, fd
.family
, fd
.sotype
, fd
.net
)
667 netfd
.setAddr(toAddr(lsa
), toAddr(rsa
))
671 // Unimplemented functions.
673 func (fd
*netFD
) dup() (*os
.File
, error
) {
674 // TODO: Implement this
675 return nil, os
.NewSyscallError("dup", syscall
.EWINDOWS
)
678 var errNoSupport
= errors
.New("address family not supported")
680 func (fd
*netFD
) ReadMsg(p
[]byte, oob
[]byte) (n
, oobn
, flags
int, sa syscall
.Sockaddr
, err error
) {
681 return 0, 0, 0, nil, errNoSupport
684 func (fd
*netFD
) WriteMsg(p
[]byte, oob
[]byte, sa syscall
.Sockaddr
) (n
int, oobn
int, err error
) {
685 return 0, 0, errNoSupport