2009-07-17 Atsushi Enomoto <atsushi@ximian.com>
[mcs.git] / class / System.ServiceModel / System.ServiceModel.Channels / TcpDuplexSessionChannel.cs
blob1d597894eb9196e37c9e8b280cd3c9028f9489dc
1 //
2 // TcpDuplexSessionChannel.cs
3 //
4 // Author:
5 // Marcos Cobena (marcoscobena@gmail.com)
6 // Atsushi Enomoto <atsushi@ximian.com>
7 //
8 // Copyright 2007 Marcos Cobena (http://www.youcannoteatbits.org/)
9 //
10 // Copyright (C) 2009 Novell, Inc (http://www.novell.com)
12 // Permission is hereby granted, free of charge, to any person obtaining
13 // a copy of this software and associated documentation files (the
14 // "Software"), to deal in the Software without restriction, including
15 // without limitation the rights to use, copy, modify, merge, publish,
16 // distribute, sublicense, and/or sell copies of the Software, and to
17 // permit persons to whom the Software is furnished to do so, subject to
18 // the following conditions:
19 //
20 // The above copyright notice and this permission notice shall be
21 // included in all copies or substantial portions of the Software.
22 //
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 using System;
33 using System.Collections.Generic;
34 using System.IO;
35 using System.Net;
36 using System.Net.Sockets;
37 using System.Runtime.Serialization;
38 using System.Runtime.Serialization.Formatters.Binary;
39 using System.ServiceModel.Channels;
40 using System.Text;
41 using System.Threading;
42 using System.Xml;
44 namespace System.ServiceModel.Channels
46 internal class TcpDuplexSessionChannel : DuplexChannelBase, IDuplexSessionChannel
48 class TcpDuplexSession : DuplexSessionBase
50 TcpDuplexSessionChannel owner;
52 internal TcpDuplexSession (TcpDuplexSessionChannel owner)
54 this.owner = owner;
57 public override TimeSpan DefaultCloseTimeout {
58 get { return owner.DefaultCloseTimeout; }
61 public override void Close (TimeSpan timeout)
63 owner.DiscardSession ();
67 TcpChannelInfo info;
68 TcpClient client;
69 bool is_service_side;
70 EndpointAddress local_address;
71 TcpBinaryFrameManager frame;
72 TcpDuplexSession session; // do not use this directly. Use Session instead.
74 public TcpDuplexSessionChannel (ChannelFactoryBase factory, TcpChannelInfo info, EndpointAddress address, Uri via)
75 : base (factory, address, via)
77 is_service_side = false;
78 this.info = info;
81 public TcpDuplexSessionChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpClient client)
82 : base (listener)
84 is_service_side = true;
85 this.client = client;
86 this.info = info;
89 public MessageEncoder Encoder {
90 get { return info.MessageEncoder; }
93 public override EndpointAddress LocalAddress {
94 get { return local_address; }
97 public IDuplexSession Session {
98 get {
99 if (session == null)
100 session = new TcpDuplexSession (this);
101 return session;
105 void DiscardSession ()
107 frame.ProcessEndRecordInitiator ();
108 session = null;
111 public override void Send (Message message)
113 Send (message, DefaultSendTimeout);
116 public override void Send (Message message, TimeSpan timeout)
118 if (timeout <= TimeSpan.Zero)
119 throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
121 if (!is_service_side) {
122 if (message.Headers.To == null)
123 message.Headers.To = RemoteAddress.Uri;
124 if (message.Headers.ReplyTo == null)
125 message.Headers.ReplyTo = new EndpointAddress (Constants.WsaAnonymousUri);
126 } else {
127 if (message.Headers.RelatesTo == null)
128 message.Headers.RelatesTo = OperationContext.Current.IncomingMessageHeaders.MessageId;
131 client.SendTimeout = (int) timeout.TotalMilliseconds;
132 frame.WriteSizedMessage (message);
133 // FIXME: should EndRecord be sent here?
134 //if (is_service_side && client.Available > 0)
135 // frame.ProcessEndRecordRecipient ();
138 public override Message Receive ()
140 return Receive (DefaultReceiveTimeout);
143 public override Message Receive (TimeSpan timeout)
145 if (timeout <= TimeSpan.Zero)
146 throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
147 client.ReceiveTimeout = (int) timeout.TotalMilliseconds;
148 return frame.ReadSizedMessage ();
151 public override bool TryReceive (TimeSpan timeout, out Message message)
153 try {
154 DateTime start = DateTime.Now;
155 message = Receive (timeout);
156 if (message != null)
157 return true;
158 // received EndRecord, so close the session and return false instead.
159 // (Closing channel here might not be a good idea, but right now I have no better way.)
160 Close (timeout - (DateTime.Now - start));
161 return false;
162 } catch (TimeoutException) {
163 message = null;
164 return false;
168 public override bool WaitForMessage (TimeSpan timeout)
170 if (client.Available > 0)
171 return true;
173 DateTime start = DateTime.Now;
174 do {
175 Thread.Sleep (50);
176 if (client.Available > 0)
177 return true;
178 } while (DateTime.Now - start < timeout);
179 return false;
182 // CommunicationObject
184 [MonoTODO]
185 protected override void OnAbort ()
187 Close (TimeSpan.FromTicks (1));
190 [MonoTODO]
191 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
192 AsyncCallback callback, object state)
194 throw new NotImplementedException ();
197 [MonoTODO]
198 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
199 AsyncCallback callback, object state)
201 throw new NotImplementedException ();
204 [MonoTODO]
205 protected override void OnClose (TimeSpan timeout)
207 if (!is_service_side)
208 if (session != null)
209 session.Close (timeout);
211 if (client != null)
212 client.Close ();
215 [MonoTODO]
216 protected override void OnEndClose (IAsyncResult result)
218 throw new NotImplementedException ();
221 [MonoTODO]
222 protected override void OnEndOpen (IAsyncResult result)
224 throw new NotImplementedException ();
227 [MonoTODO]
228 protected override void OnOpen (TimeSpan timeout)
230 if (! is_service_side) {
231 int explicitPort = RemoteAddress.Uri.Port;
232 client = new TcpClient (RemoteAddress.Uri.Host, explicitPort <= 0 ? TcpTransportBindingElement.DefaultPort : explicitPort);
233 //RemoteAddress.Uri.Port);
235 NetworkStream ns = client.GetStream ();
236 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, ns, is_service_side) {
237 Encoder = this.Encoder,
238 Via = RemoteAddress.Uri };
239 frame.ProcessPreambleInitiator ();
240 frame.ProcessPreambleAckInitiator ();
241 } else {
242 // server side
243 Stream s = client.GetStream ();
245 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, s, is_service_side) { Encoder = this.Encoder };
247 // FIXME: use retrieved record properties in the request processing.
249 frame.ProcessPreambleRecipient ();
250 frame.ProcessPreambleAckRecipient ();
254 class MyBinaryWriter : BinaryWriter
256 public MyBinaryWriter (Stream s)
257 : base (s)
261 public void WriteBytes (byte [] bytes)
263 Write7BitEncodedInt (bytes.Length);
264 Write (bytes);
269 // seealso: [MC-NMF] Windows Protocol document.
270 class TcpBinaryFrameManager
272 class MyBinaryReader : BinaryReader
274 public MyBinaryReader (Stream s)
275 : base (s)
279 public int ReadVariableInt ()
281 return Read7BitEncodedInt ();
285 class MyBinaryWriter : BinaryWriter
287 public MyBinaryWriter (Stream s)
288 : base (s)
292 public void WriteVariableInt (int value)
294 Write7BitEncodedInt (value);
297 public int GetSizeOfLength (int value)
299 int x = 0;
300 do {
301 value /= 0x100;
302 x++;
303 } while (value != 0);
304 return x;
308 class MyXmlBinaryWriterSession : XmlBinaryWriterSession
310 public override bool TryAdd (XmlDictionaryString value, out int key)
312 if (!base.TryAdd (value, out key))
313 return false;
314 List.Add (value);
315 return true;
318 public List<XmlDictionaryString> List = new List<XmlDictionaryString> ();
321 public const byte VersionRecord = 0;
322 public const byte ModeRecord = 1;
323 public const byte ViaRecord = 2;
324 public const byte KnownEncodingRecord = 3;
325 public const byte ExtendingEncodingRecord = 4;
326 public const byte UnsizedEnvelopeRecord = 5;
327 public const byte SizedEnvelopeRecord = 6;
328 public const byte EndRecord = 7;
329 public const byte FaultRecord = 8;
330 public const byte UpgradeRequestRecord = 9;
331 public const byte UpgradeResponseRecord = 0xA;
332 public const byte PreambleAckRecord = 0xB;
333 public const byte PreambleEndRecord = 0xC;
335 public const byte UnsizedMessageTerminator = 0;
336 public const byte SingletonUnsizedMode = 1;
337 public const byte DuplexMode = 2;
338 public const byte SimplexMode = 3;
339 public const byte SingletonSizedMode = 4;
340 MyBinaryReader reader;
341 MyBinaryWriter writer;
343 public TcpBinaryFrameManager (int mode, Stream s, bool isServiceSide)
345 this.mode = mode;
346 this.s = s;
347 this.is_service_side = isServiceSide;
348 reader = new MyBinaryReader (s);
349 ResetWriteBuffer ();
351 EncodingRecord = 8; // FIXME: it should depend on mode.
354 Stream s;
355 MemoryStream buffer;
356 bool is_service_side;
358 int mode;
360 public byte EncodingRecord { get; set; }
362 public Uri Via { get; set; }
364 public MessageEncoder Encoder { get; set; }
366 void ResetWriteBuffer ()
368 this.buffer = new MemoryStream ();
369 writer = new MyBinaryWriter (buffer);
372 public byte [] ReadSizedChunk ()
374 int length = reader.ReadVariableInt ();
376 if (length > 65536)
377 throw new InvalidOperationException ("The message is too large.");
379 byte [] buffer = new byte [length];
380 for (int readSize = 0; readSize < length; )
381 readSize += reader.Read (buffer, readSize, length - readSize);
382 return buffer;
385 public void WriteSizedChunk (byte [] data)
387 writer.WriteVariableInt (data.Length);
388 writer.Write (data, 0, data.Length);
391 public void ProcessPreambleInitiator ()
393 ResetWriteBuffer ();
395 buffer.WriteByte (VersionRecord);
396 buffer.WriteByte (1);
397 buffer.WriteByte (0);
398 buffer.WriteByte (ModeRecord);
399 buffer.WriteByte ((byte) mode);
400 buffer.WriteByte (ViaRecord);
401 writer.Write (Via.ToString ());
402 buffer.WriteByte (KnownEncodingRecord); // FIXME
403 buffer.WriteByte ((byte) EncodingRecord);
404 buffer.WriteByte (PreambleEndRecord);
405 buffer.Flush ();
406 s.Write (buffer.GetBuffer (), 0, (int) buffer.Position);
407 s.Flush ();
410 public void ProcessPreambleAckInitiator ()
412 int b = s.ReadByte ();
413 switch (b) {
414 case PreambleAckRecord:
415 return; // success
416 case FaultRecord:
417 throw new FaultException (reader.ReadString ());
418 default:
419 throw new ProtocolException (String.Format ("Preamble Ack Record is expected, got {0:X}", b));
423 public void ProcessPreambleAckRecipient ()
425 s.WriteByte (PreambleAckRecord);
428 public void ProcessPreambleRecipient ()
430 bool preambleEnd = false;
431 while (!preambleEnd) {
432 int b = s.ReadByte ();
433 switch (b) {
434 case VersionRecord:
435 if (s.ReadByte () != 1)
436 throw new ProtocolException ("Major version must be 1");
437 if (s.ReadByte () != 0)
438 throw new ProtocolException ("Minor version must be 0");
439 break;
440 case ModeRecord:
441 if (s.ReadByte () != mode)
442 throw new ProtocolException (String.Format ("Duplex mode is expected to be {0:X}", mode));
443 break;
444 case ViaRecord:
445 Via = new Uri (reader.ReadString ());
446 break;
447 case KnownEncodingRecord:
448 EncodingRecord = (byte) s.ReadByte ();
449 break;
450 case ExtendingEncodingRecord:
451 throw new NotImplementedException ("ExtendingEncodingRecord");
452 case UpgradeRequestRecord:
453 throw new NotImplementedException ("UpgradeRequetRecord");
454 case UpgradeResponseRecord:
455 throw new NotImplementedException ("UpgradeResponseRecord");
456 case PreambleEndRecord:
457 preambleEnd = true;
458 break;
459 default:
460 throw new ProtocolException (String.Format ("Unexpected record type {0:X2}", b));
465 XmlBinaryReaderSession reader_session;
466 int reader_session_items;
468 public Message ReadSizedMessage ()
470 // FIXME: implement full [MC-NMF].
472 var packetType = s.ReadByte ();
473 if (packetType == EndRecord)
474 return null;
475 if (packetType != SizedEnvelopeRecord)
476 throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
478 byte [] buffer = ReadSizedChunk ();
480 var ms = new MemoryStream (buffer, 0, buffer.Length);
482 // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
483 if (EncodingRecord != 8)
484 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
486 // Encoding type 8:
487 // the returned buffer consists of a serialized reader
488 // session and the binary xml body.
490 var session = reader_session ?? new XmlBinaryReaderSession ();
491 reader_session = session;
492 byte [] rsbuf = new TcpBinaryFrameManager (0, ms, is_service_side).ReadSizedChunk ();
493 using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
494 var rbr = new BinaryReader (rms, Encoding.UTF8);
495 while (rms.Position < rms.Length)
496 session.Add (reader_session_items++, rbr.ReadString ());
498 var benc = Encoder as BinaryMessageEncoder;
499 if (benc != null)
500 benc.CurrentReaderSession = session;
502 // FIXME: supply maxSizeOfHeaders.
503 Message msg = Encoder.ReadMessage (ms, 0x10000);
504 if (benc != null)
505 benc.CurrentReaderSession = null;
507 return msg;
510 // FIXME: support timeout
511 public Message ReadUnsizedMessage (TimeSpan timeout)
513 var packetType = s.ReadByte ();
514 if (packetType == EndRecord)
515 return null;
516 if (packetType != UnsizedEnvelopeRecord)
517 throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
519 // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
520 if (EncodingRecord != 8)
521 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
523 // Encoding type 8:
524 // the returned buffer consists of a serialized reader
525 // session and the binary xml body.
527 var session = reader_session ?? new XmlBinaryReaderSession ();
528 reader_session = session;
529 byte [] rsbuf = new TcpBinaryFrameManager (0, s, is_service_side).ReadSizedChunk ();
531 using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
532 var rbr = new BinaryReader (rms, Encoding.UTF8);
533 while (rms.Position < rms.Length)
534 session.Add (reader_session_items++, rbr.ReadString ());
536 var benc = Encoder as BinaryMessageEncoder;
537 if (benc != null)
538 benc.CurrentReaderSession = session;
540 // FIXME: supply maxSizeOfHeaders.
541 Message msg = Encoder.ReadMessage (s, 0x10000);
542 if (benc != null)
543 benc.CurrentReaderSession = null;
544 if (s.ReadByte () != UnsizedMessageTerminator)
545 throw new InvalidOperationException ("Unsized message terminator is expected");
547 return msg;
550 byte [] eof_buffer = new byte [1];
551 MyXmlBinaryWriterSession writer_session;
553 public void WriteSizedMessage (Message message)
555 ResetWriteBuffer ();
557 if (EncodingRecord != 8)
558 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
560 buffer.WriteByte (SizedEnvelopeRecord);
562 MemoryStream ms = new MemoryStream ();
563 var session = writer_session ?? new MyXmlBinaryWriterSession ();
564 writer_session = session;
565 int writer_session_count = session.List.Count;
566 var benc = Encoder as BinaryMessageEncoder;
567 try {
568 if (benc != null)
569 benc.CurrentWriterSession = session;
570 Encoder.WriteMessage (message, ms);
571 } finally {
572 benc.CurrentWriterSession = null;
575 // dictionary
576 MemoryStream msd = new MemoryStream ();
577 BinaryWriter dw = new BinaryWriter (msd);
578 for (int i = writer_session_count; i < session.List.Count; i++)
579 dw.Write (session.List [i].Value);
580 dw.Flush ();
582 int length = (int) (msd.Position + ms.Position);
583 var msda = msd.ToArray ();
584 int sizeOfLength = writer.GetSizeOfLength (msda.Length);
586 writer.WriteVariableInt (length + sizeOfLength); // dictionary array also involves the size of itself.
587 WriteSizedChunk (msda);
588 // message body
589 var arr = ms.GetBuffer ();
590 writer.Write (arr, 0, (int) ms.Position);
592 writer.Flush ();
594 s.Write (buffer.GetBuffer (), 0, (int) buffer.Position);
595 s.Flush ();
598 // FIXME: support timeout
599 public void WriteUnsizedMessage (Message message, TimeSpan timeout)
601 ResetWriteBuffer ();
603 if (EncodingRecord != 8)
604 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
606 buffer.WriteByte (UnsizedEnvelopeRecord);
608 MemoryStream ms = new MemoryStream ();
609 var session = writer_session ?? new MyXmlBinaryWriterSession ();
610 writer_session = session;
611 int writer_session_count = session.List.Count;
612 var benc = Encoder as BinaryMessageEncoder;
613 try {
614 if (benc != null)
615 benc.CurrentWriterSession = session;
616 Encoder.WriteMessage (message, ms);
617 } finally {
618 benc.CurrentWriterSession = null;
621 // dictionary
622 MemoryStream msd = new MemoryStream ();
623 BinaryWriter dw = new BinaryWriter (msd);
624 for (int i = writer_session_count; i < session.List.Count; i++)
625 dw.Write (session.List [i].Value);
626 dw.Flush ();
628 int length = (int) (msd.Position + ms.Position);
629 var msda = msd.ToArray ();
631 writer.Write (msda, 0, msda.Length);
632 // message body
633 var arr = ms.GetBuffer ();
634 writer.Write (arr, 0, (int) ms.Position);
636 writer.Write (UnsizedMessageTerminator); // terminator
637 writer.Flush ();
639 // FIXME: it should be rewritten to directly write to the stream.
640 s.Write (buffer.GetBuffer (), 0, (int) buffer.Position);
641 s.Flush ();
644 public void ProcessEndRecordInitiator ()
646 s.WriteByte (EndRecord); // it is required
647 s.Flush ();
650 public void ProcessEndRecordRecipient ()
652 int b;
653 if ((b = s.ReadByte ()) != EndRecord)
654 throw new ProtocolException (String.Format ("EndRecord message was expected, got {0:X}", b));