2 // TcpDuplexSessionChannel.cs
5 // Marcos Cobena (marcoscobena@gmail.com)
6 // Atsushi Enomoto <atsushi@ximian.com>
8 // Copyright 2007 Marcos Cobena (http://www.youcannoteatbits.org/)
10 // Copyright (C) 2009 Novell, Inc (http://www.novell.com)
12 // Permission is hereby granted, free of charge, to any person obtaining
13 // a copy of this software and associated documentation files (the
14 // "Software"), to deal in the Software without restriction, including
15 // without limitation the rights to use, copy, modify, merge, publish,
16 // distribute, sublicense, and/or sell copies of the Software, and to
17 // permit persons to whom the Software is furnished to do so, subject to
18 // the following conditions:
20 // The above copyright notice and this permission notice shall be
21 // included in all copies or substantial portions of the Software.
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
33 using System
.Collections
.Generic
;
36 using System
.Net
.Sockets
;
37 using System
.Runtime
.Serialization
;
38 using System
.Runtime
.Serialization
.Formatters
.Binary
;
39 using System
.ServiceModel
.Channels
;
41 using System
.Threading
;
44 namespace System
.ServiceModel
.Channels
46 internal class TcpDuplexSessionChannel
: DuplexChannelBase
, IDuplexSessionChannel
48 class TcpDuplexSession
: DuplexSessionBase
50 TcpDuplexSessionChannel owner
;
52 internal TcpDuplexSession (TcpDuplexSessionChannel owner
)
57 public override TimeSpan DefaultCloseTimeout
{
58 get { return owner.DefaultCloseTimeout; }
61 public override void Close (TimeSpan timeout
)
63 owner
.DiscardSession ();
70 EndpointAddress local_address
;
71 TcpBinaryFrameManager frame
;
72 TcpDuplexSession session
; // do not use this directly. Use Session instead.
74 public TcpDuplexSessionChannel (ChannelFactoryBase factory
, TcpChannelInfo info
, EndpointAddress address
, Uri via
)
75 : base (factory
, address
, via
)
77 is_service_side
= false;
81 public TcpDuplexSessionChannel (ChannelListenerBase listener
, TcpChannelInfo info
, TcpClient client
)
84 is_service_side
= true;
89 public MessageEncoder Encoder
{
90 get { return info.MessageEncoder; }
93 public override EndpointAddress LocalAddress
{
94 get { return local_address; }
97 public IDuplexSession Session
{
100 session
= new TcpDuplexSession (this);
105 void DiscardSession ()
107 frame
.ProcessEndRecordInitiator ();
111 public override void Send (Message message
)
113 Send (message
, DefaultSendTimeout
);
116 public override void Send (Message message
, TimeSpan timeout
)
118 if (timeout
<= TimeSpan
.Zero
)
119 throw new ArgumentException (String
.Format ("Timeout value must be positive value. It was {0}", timeout
));
121 if (!is_service_side
) {
122 if (message
.Headers
.To
== null)
123 message
.Headers
.To
= RemoteAddress
.Uri
;
124 if (message
.Headers
.ReplyTo
== null)
125 message
.Headers
.ReplyTo
= new EndpointAddress (Constants
.WsaAnonymousUri
);
127 if (message
.Headers
.RelatesTo
== null)
128 message
.Headers
.RelatesTo
= OperationContext
.Current
.IncomingMessageHeaders
.MessageId
;
131 client
.SendTimeout
= (int) timeout
.TotalMilliseconds
;
132 frame
.WriteSizedMessage (message
);
133 // FIXME: should EndRecord be sent here?
134 //if (is_service_side && client.Available > 0)
135 // frame.ProcessEndRecordRecipient ();
138 public override Message
Receive ()
140 return Receive (DefaultReceiveTimeout
);
143 public override Message
Receive (TimeSpan timeout
)
145 if (timeout
<= TimeSpan
.Zero
)
146 throw new ArgumentException (String
.Format ("Timeout value must be positive value. It was {0}", timeout
));
147 client
.ReceiveTimeout
= (int) timeout
.TotalMilliseconds
;
148 return frame
.ReadSizedMessage ();
151 public override bool TryReceive (TimeSpan timeout
, out Message message
)
154 DateTime start
= DateTime
.Now
;
155 message
= Receive (timeout
);
158 // received EndRecord, so close the session and return false instead.
159 // (Closing channel here might not be a good idea, but right now I have no better way.)
160 Close (timeout
- (DateTime
.Now
- start
));
162 } catch (TimeoutException
) {
168 public override bool WaitForMessage (TimeSpan timeout
)
170 if (client
.Available
> 0)
173 DateTime start
= DateTime
.Now
;
176 if (client
.Available
> 0)
178 } while (DateTime
.Now
- start
< timeout
);
182 // CommunicationObject
185 protected override void OnAbort ()
187 Close (TimeSpan
.FromTicks (1));
191 protected override IAsyncResult
OnBeginClose (TimeSpan timeout
,
192 AsyncCallback callback
, object state
)
194 throw new NotImplementedException ();
198 protected override IAsyncResult
OnBeginOpen (TimeSpan timeout
,
199 AsyncCallback callback
, object state
)
201 throw new NotImplementedException ();
205 protected override void OnClose (TimeSpan timeout
)
207 if (!is_service_side
)
209 session
.Close (timeout
);
216 protected override void OnEndClose (IAsyncResult result
)
218 throw new NotImplementedException ();
222 protected override void OnEndOpen (IAsyncResult result
)
224 throw new NotImplementedException ();
228 protected override void OnOpen (TimeSpan timeout
)
230 if (! is_service_side
) {
231 int explicitPort
= RemoteAddress
.Uri
.Port
;
232 client
= new TcpClient (RemoteAddress
.Uri
.Host
, explicitPort
<= 0 ? TcpTransportBindingElement
.DefaultPort
: explicitPort
);
233 //RemoteAddress.Uri.Port);
235 NetworkStream ns
= client
.GetStream ();
236 frame
= new TcpBinaryFrameManager (TcpBinaryFrameManager
.DuplexMode
, ns
, is_service_side
) {
237 Encoder
= this.Encoder
,
238 Via
= RemoteAddress
.Uri
};
239 frame
.ProcessPreambleInitiator ();
240 frame
.ProcessPreambleAckInitiator ();
243 Stream s
= client
.GetStream ();
245 frame
= new TcpBinaryFrameManager (TcpBinaryFrameManager
.DuplexMode
, s
, is_service_side
) { Encoder = this.Encoder }
;
247 // FIXME: use retrieved record properties in the request processing.
249 frame
.ProcessPreambleRecipient ();
250 frame
.ProcessPreambleAckRecipient ();
254 class MyBinaryWriter
: BinaryWriter
256 public MyBinaryWriter (Stream s
)
261 public void WriteBytes (byte [] bytes
)
263 Write7BitEncodedInt (bytes
.Length
);
269 // seealso: [MC-NMF] Windows Protocol document.
270 class TcpBinaryFrameManager
272 class MyBinaryReader
: BinaryReader
274 public MyBinaryReader (Stream s
)
279 public int ReadVariableInt ()
281 return Read7BitEncodedInt ();
285 class MyBinaryWriter
: BinaryWriter
287 public MyBinaryWriter (Stream s
)
292 public void WriteVariableInt (int value)
294 Write7BitEncodedInt (value);
297 public int GetSizeOfLength (int value)
303 } while (value != 0);
308 class MyXmlBinaryWriterSession
: XmlBinaryWriterSession
310 public override bool TryAdd (XmlDictionaryString
value, out int key
)
312 if (!base.TryAdd (value, out key
))
318 public List
<XmlDictionaryString
> List
= new List
<XmlDictionaryString
> ();
321 public const byte VersionRecord
= 0;
322 public const byte ModeRecord
= 1;
323 public const byte ViaRecord
= 2;
324 public const byte KnownEncodingRecord
= 3;
325 public const byte ExtendingEncodingRecord
= 4;
326 public const byte UnsizedEnvelopeRecord
= 5;
327 public const byte SizedEnvelopeRecord
= 6;
328 public const byte EndRecord
= 7;
329 public const byte FaultRecord
= 8;
330 public const byte UpgradeRequestRecord
= 9;
331 public const byte UpgradeResponseRecord
= 0xA;
332 public const byte PreambleAckRecord
= 0xB;
333 public const byte PreambleEndRecord
= 0xC;
335 public const byte UnsizedMessageTerminator
= 0;
336 public const byte SingletonUnsizedMode
= 1;
337 public const byte DuplexMode
= 2;
338 public const byte SimplexMode
= 3;
339 public const byte SingletonSizedMode
= 4;
340 MyBinaryReader reader
;
341 MyBinaryWriter writer
;
343 public TcpBinaryFrameManager (int mode
, Stream s
, bool isServiceSide
)
347 this.is_service_side
= isServiceSide
;
348 reader
= new MyBinaryReader (s
);
351 EncodingRecord
= 8; // FIXME: it should depend on mode.
356 bool is_service_side
;
360 public byte EncodingRecord { get; set; }
362 public Uri Via { get; set; }
364 public MessageEncoder Encoder { get; set; }
366 void ResetWriteBuffer ()
368 this.buffer
= new MemoryStream ();
369 writer
= new MyBinaryWriter (buffer
);
372 public byte [] ReadSizedChunk ()
374 int length
= reader
.ReadVariableInt ();
377 throw new InvalidOperationException ("The message is too large.");
379 byte [] buffer
= new byte [length
];
380 for (int readSize
= 0; readSize
< length
; )
381 readSize
+= reader
.Read (buffer
, readSize
, length
- readSize
);
385 public void WriteSizedChunk (byte [] data
)
387 writer
.WriteVariableInt (data
.Length
);
388 writer
.Write (data
, 0, data
.Length
);
391 public void ProcessPreambleInitiator ()
395 buffer
.WriteByte (VersionRecord
);
396 buffer
.WriteByte (1);
397 buffer
.WriteByte (0);
398 buffer
.WriteByte (ModeRecord
);
399 buffer
.WriteByte ((byte) mode
);
400 buffer
.WriteByte (ViaRecord
);
401 writer
.Write (Via
.ToString ());
402 buffer
.WriteByte (KnownEncodingRecord
); // FIXME
403 buffer
.WriteByte ((byte) EncodingRecord
);
404 buffer
.WriteByte (PreambleEndRecord
);
406 s
.Write (buffer
.GetBuffer (), 0, (int) buffer
.Position
);
410 public void ProcessPreambleAckInitiator ()
412 int b
= s
.ReadByte ();
414 case PreambleAckRecord
:
417 throw new FaultException (reader
.ReadString ());
419 throw new ProtocolException (String
.Format ("Preamble Ack Record is expected, got {0:X}", b
));
423 public void ProcessPreambleAckRecipient ()
425 s
.WriteByte (PreambleAckRecord
);
428 public void ProcessPreambleRecipient ()
430 bool preambleEnd
= false;
431 while (!preambleEnd
) {
432 int b
= s
.ReadByte ();
435 if (s
.ReadByte () != 1)
436 throw new ProtocolException ("Major version must be 1");
437 if (s
.ReadByte () != 0)
438 throw new ProtocolException ("Minor version must be 0");
441 if (s
.ReadByte () != mode
)
442 throw new ProtocolException (String
.Format ("Duplex mode is expected to be {0:X}", mode
));
445 Via
= new Uri (reader
.ReadString ());
447 case KnownEncodingRecord
:
448 EncodingRecord
= (byte) s
.ReadByte ();
450 case ExtendingEncodingRecord
:
451 throw new NotImplementedException ("ExtendingEncodingRecord");
452 case UpgradeRequestRecord
:
453 throw new NotImplementedException ("UpgradeRequetRecord");
454 case UpgradeResponseRecord
:
455 throw new NotImplementedException ("UpgradeResponseRecord");
456 case PreambleEndRecord
:
460 throw new ProtocolException (String
.Format ("Unexpected record type {0:X2}", b
));
465 XmlBinaryReaderSession reader_session
;
466 int reader_session_items
;
468 public Message
ReadSizedMessage ()
470 // FIXME: implement full [MC-NMF].
472 var packetType
= s
.ReadByte ();
473 if (packetType
== EndRecord
)
475 if (packetType
!= SizedEnvelopeRecord
)
476 throw new NotImplementedException (String
.Format ("Packet type {0:X} is not implemented", packetType
));
478 byte [] buffer
= ReadSizedChunk ();
480 var ms
= new MemoryStream (buffer
, 0, buffer
.Length
);
482 // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
483 if (EncodingRecord
!= 8)
484 throw new NotImplementedException (String
.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord
));
487 // the returned buffer consists of a serialized reader
488 // session and the binary xml body.
490 var session
= reader_session
?? new XmlBinaryReaderSession ();
491 reader_session
= session
;
492 byte [] rsbuf
= new TcpBinaryFrameManager (0, ms
, is_service_side
).ReadSizedChunk ();
493 using (var rms
= new MemoryStream (rsbuf
, 0, rsbuf
.Length
)) {
494 var rbr
= new BinaryReader (rms
, Encoding
.UTF8
);
495 while (rms
.Position
< rms
.Length
)
496 session
.Add (reader_session_items
++, rbr
.ReadString ());
498 var benc
= Encoder
as BinaryMessageEncoder
;
500 benc
.CurrentReaderSession
= session
;
502 // FIXME: supply maxSizeOfHeaders.
503 Message msg
= Encoder
.ReadMessage (ms
, 0x10000);
505 benc
.CurrentReaderSession
= null;
510 // FIXME: support timeout
511 public Message
ReadUnsizedMessage (TimeSpan timeout
)
513 var packetType
= s
.ReadByte ();
514 if (packetType
== EndRecord
)
516 if (packetType
!= UnsizedEnvelopeRecord
)
517 throw new NotImplementedException (String
.Format ("Packet type {0:X} is not implemented", packetType
));
519 // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
520 if (EncodingRecord
!= 8)
521 throw new NotImplementedException (String
.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord
));
524 // the returned buffer consists of a serialized reader
525 // session and the binary xml body.
527 var session
= reader_session
?? new XmlBinaryReaderSession ();
528 reader_session
= session
;
529 byte [] rsbuf
= new TcpBinaryFrameManager (0, s
, is_service_side
).ReadSizedChunk ();
531 using (var rms
= new MemoryStream (rsbuf
, 0, rsbuf
.Length
)) {
532 var rbr
= new BinaryReader (rms
, Encoding
.UTF8
);
533 while (rms
.Position
< rms
.Length
)
534 session
.Add (reader_session_items
++, rbr
.ReadString ());
536 var benc
= Encoder
as BinaryMessageEncoder
;
538 benc
.CurrentReaderSession
= session
;
540 // FIXME: supply maxSizeOfHeaders.
541 Message msg
= Encoder
.ReadMessage (s
, 0x10000);
543 benc
.CurrentReaderSession
= null;
544 if (s
.ReadByte () != UnsizedMessageTerminator
)
545 throw new InvalidOperationException ("Unsized message terminator is expected");
550 byte [] eof_buffer
= new byte [1];
551 MyXmlBinaryWriterSession writer_session
;
553 public void WriteSizedMessage (Message message
)
557 if (EncodingRecord
!= 8)
558 throw new NotImplementedException (String
.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord
));
560 buffer
.WriteByte (SizedEnvelopeRecord
);
562 MemoryStream ms
= new MemoryStream ();
563 var session
= writer_session
?? new MyXmlBinaryWriterSession ();
564 writer_session
= session
;
565 int writer_session_count
= session
.List
.Count
;
566 var benc
= Encoder
as BinaryMessageEncoder
;
569 benc
.CurrentWriterSession
= session
;
570 Encoder
.WriteMessage (message
, ms
);
572 benc
.CurrentWriterSession
= null;
576 MemoryStream msd
= new MemoryStream ();
577 BinaryWriter dw
= new BinaryWriter (msd
);
578 for (int i
= writer_session_count
; i
< session
.List
.Count
; i
++)
579 dw
.Write (session
.List
[i
].Value
);
582 int length
= (int) (msd
.Position
+ ms
.Position
);
583 var msda
= msd
.ToArray ();
584 int sizeOfLength
= writer
.GetSizeOfLength (msda
.Length
);
586 writer
.WriteVariableInt (length
+ sizeOfLength
); // dictionary array also involves the size of itself.
587 WriteSizedChunk (msda
);
589 var arr
= ms
.GetBuffer ();
590 writer
.Write (arr
, 0, (int) ms
.Position
);
594 s
.Write (buffer
.GetBuffer (), 0, (int) buffer
.Position
);
598 // FIXME: support timeout
599 public void WriteUnsizedMessage (Message message
, TimeSpan timeout
)
603 if (EncodingRecord
!= 8)
604 throw new NotImplementedException (String
.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord
));
606 buffer
.WriteByte (UnsizedEnvelopeRecord
);
608 MemoryStream ms
= new MemoryStream ();
609 var session
= writer_session
?? new MyXmlBinaryWriterSession ();
610 writer_session
= session
;
611 int writer_session_count
= session
.List
.Count
;
612 var benc
= Encoder
as BinaryMessageEncoder
;
615 benc
.CurrentWriterSession
= session
;
616 Encoder
.WriteMessage (message
, ms
);
618 benc
.CurrentWriterSession
= null;
622 MemoryStream msd
= new MemoryStream ();
623 BinaryWriter dw
= new BinaryWriter (msd
);
624 for (int i
= writer_session_count
; i
< session
.List
.Count
; i
++)
625 dw
.Write (session
.List
[i
].Value
);
628 int length
= (int) (msd
.Position
+ ms
.Position
);
629 var msda
= msd
.ToArray ();
631 writer
.Write (msda
, 0, msda
.Length
);
633 var arr
= ms
.GetBuffer ();
634 writer
.Write (arr
, 0, (int) ms
.Position
);
636 writer
.Write (UnsizedMessageTerminator
); // terminator
639 // FIXME: it should be rewritten to directly write to the stream.
640 s
.Write (buffer
.GetBuffer (), 0, (int) buffer
.Position
);
644 public void ProcessEndRecordInitiator ()
646 s
.WriteByte (EndRecord
); // it is required
650 public void ProcessEndRecordRecipient ()
653 if ((b
= s
.ReadByte ()) != EndRecord
)
654 throw new ProtocolException (String
.Format ("EndRecord message was expected, got {0:X}", b
));