1 // Copyright 2006 Alp Toker <alp@atoker.com>
2 // Copyright 2007 Versabanq (Adrian Dewhurst <adewhurst@versabanq.com>)
3 // This software is made available under the MIT License
4 // See COPYING for details
7 using System
.Collections
.Generic
;
9 using System
.Threading
;
10 using System
.Reflection
;
17 public class Connection
19 //TODO: reconsider this field
20 public Stream ns
= null;
23 public Transport Transport
{
31 // FIXME: There should be a better way to hack in a socket
33 public Connection () {
34 OnMessage
= HandleMessage
;
37 internal Connection (Transport transport
)
39 OnMessage
= HandleMessage
;
40 this.transport
= transport
;
41 transport
.Connection
= this;
43 //TODO: clean this bit up
44 ns
= transport
.Stream
;
47 //should this be public?
48 internal Connection (string address
)
50 OnMessage
= HandleMessage
;
51 OpenPrivate (address
);
55 //should we do connection sharing here?
56 public static Connection
Open (string address
)
58 Connection conn
= new Connection ();
59 conn
.OpenPrivate (address
);
65 public static Connection
Open (Transport transport
)
67 Connection conn
= new Connection (transport
);
73 internal void OpenPrivate (string address
)
76 throw new ArgumentNullException ("address");
78 AddressEntry
[] entries
= Address
.Parse (address
);
79 if (entries
.Length
== 0)
80 throw new Exception ("No addresses were found");
82 //TODO: try alternative addresses if needed
83 AddressEntry entry
= entries
[0];
85 transport
= Transport
.Create (entry
);
87 //TODO: clean this bit up
88 ns
= transport
.Stream
;
91 internal void Authenticate ()
93 if (transport
!= null)
94 transport
.WriteCred ();
96 SaslProcess auth
= new ExternalAuthClient (this);
98 isAuthenticated
= true;
101 bool isAuthenticated
= false;
102 internal bool IsAuthenticated
105 return isAuthenticated
;
109 //Interlocked.Increment() handles the overflow condition for uint correctly, so it's ok to store the value as an int but cast it to uint
111 uint GenerateSerial ()
114 return (uint)Interlocked
.Increment (ref serial
);
117 public Message
SendWithReplyAndBlock (Message msg
)
119 PendingCall pending
= SendWithReply (msg
);
120 return pending
.Reply
;
123 internal PendingCall
SendWithReply (Message msg
)
125 msg
.ReplyExpected
= true;
126 msg
.Header
.Serial
= GenerateSerial ();
128 //TODO: throttle the maximum number of concurrent PendingCalls
129 PendingCall pending
= new PendingCall (this);
130 pendingCalls
[msg
.Header
.Serial
] = pending
;
137 public uint Send (Message msg
)
139 msg
.Header
.Serial
= GenerateSerial ();
143 return msg
.Header
.Serial
;
146 object writeLock
= new object ();
147 internal void WriteMessage (Message msg
)
149 byte[] HeaderData
= msg
.GetHeaderData ();
151 long msgLength
= HeaderData
.Length
+ (msg
.Body
!= null ? msg
.Body
.Length
: 0);
152 if (msgLength
> Protocol
.MaxMessageLength
)
153 throw new Exception ("Message length " + msgLength
+ " exceeds maximum allowed " + Protocol
.MaxMessageLength
+ " bytes");
156 ns
.Write (HeaderData
, 0, HeaderData
.Length
);
157 if (msg
.Body
!= null && msg
.Body
.Length
!= 0)
158 ns
.Write (msg
.Body
, 0, msg
.Body
.Length
);
162 Queue
<Message
> Inbound
= new Queue
<Message
> ();
164 // Given the first 16 bytes of the header, returns the full
165 // header and body lengths (including the 16 bytes of the
166 // header already read) Positions the stream after
167 // execution at the point where it began
168 internal static void GetMessageSize(Stream s
, out int headerSize
,
173 byte[] buf
= new byte[16];
174 read
= s
.Read (buf
, 0, 16);
176 s
.Seek(-read
, SeekOrigin
.Current
);
179 throw new Exception ("Header read length mismatch: " + read
+ " of expected " + "16");
181 EndianFlag endianness
= (EndianFlag
)buf
[0];
182 MessageReader reader
= new MessageReader (endianness
, buf
);
184 //discard the endian byte as we've already read it
187 //discard message type and flags, which we don't care about here
191 byte version
= reader
.ReadByte();
193 if (version
< Protocol
.MinVersion
|| version
> Protocol
.MaxVersion
)
194 throw new NotSupportedException ("Protocol version '" + version
.ToString () + "' is not supported");
196 if (Protocol
.Verbose
)
197 if (version
!= Protocol
.Version
)
198 Console
.Error
.WriteLine ("Warning: Protocol version '" + version
.ToString () + "' is not explicitly supported but may be compatible");
200 uint bodyLength
= reader
.ReadUInt32 ();
201 reader
.ReadUInt32 (); // serial
202 uint headerLength
= reader
.ReadUInt32 ();
204 //TODO: remove this limitation
205 if (bodyLength
> Int32
.MaxValue
|| headerLength
> Int32
.MaxValue
)
206 throw new NotImplementedException ("Long messages are not yet supported");
208 bodySize
= (int)bodyLength
;
209 headerSize
= Protocol
.Padded ((int)headerLength
, 8) + 16;
212 internal Message
BuildMessage (Stream s
,
213 int headerSize
, int bodySize
)
215 if (s
.Length
-s
.Position
< headerSize
+ bodySize
)
216 throw new Exception("Buffer is not header + body sizes");
218 Message msg
= new Message ();
219 msg
.Connection
= this;
223 byte[] header
= new byte[headerSize
];
224 len
= s
.Read(header
, 0, headerSize
);
226 if (len
!= headerSize
)
227 throw new Exception ("Read length mismatch: " + len
+ " of expected " + headerSize
);
229 msg
.SetHeaderData (header
);
232 byte[] body
= new byte[bodySize
];
233 len
= s
.Read(body
, 0, bodySize
);
236 throw new Exception ("Read length mismatch: " + len
+ " of expected " + bodySize
);
244 public Message
ReadMessage()
251 //16 bytes is the size of the fixed part of the header
252 byte[] hbuf
= new byte[16];
253 read
= ns
.Read (hbuf
, 0, 16);
259 throw new Exception ("Header read length mismatch: " + read
+ " of expected " + "16");
261 EndianFlag endianness
= (EndianFlag
)hbuf
[0];
262 MessageReader reader
= new MessageReader (endianness
, hbuf
);
264 //discard the endian byte as we've already read it
267 //discard message type and flags, which we don't care about here
271 byte version
= reader
.ReadByte ();
273 if (version
< Protocol
.MinVersion
|| version
> Protocol
.MaxVersion
)
274 throw new NotSupportedException ("Protocol version '" + version
.ToString () + "' is not supported");
276 if (Protocol
.Verbose
)
277 if (version
!= Protocol
.Version
)
278 Console
.Error
.WriteLine ("Warning: Protocol version '" + version
.ToString () + "' is not explicitly supported but may be compatible");
280 uint bodyLength
= reader
.ReadUInt32 ();
282 reader
.ReadUInt32 ();
283 uint headerLength
= reader
.ReadUInt32 ();
285 int bodyLen
= (int)bodyLength
;
286 int toRead
= (int)headerLength
;
288 //we fixup to include the padding following the header
289 toRead
= Protocol
.Padded (toRead
, 8);
291 long msgLength
= toRead
+ bodyLen
;
292 if (msgLength
> Protocol
.MaxMessageLength
)
293 throw new Exception ("Message length " + msgLength
+ " exceeds maximum allowed " + Protocol
.MaxMessageLength
+ " bytes");
295 header
= new byte[16 + toRead
];
296 Array
.Copy (hbuf
, header
, 16);
298 read
= ns
.Read (header
, 16, toRead
);
301 throw new Exception ("Message header length mismatch: " + read
+ " of expected " + toRead
);
305 body
= new byte[bodyLen
];
309 while (numRead
< bodyLen
&& lastRead
!= 0)
311 lastRead
= ns
.Read (body
, numRead
, bodyLen
- numRead
);
315 if (numRead
!= bodyLen
)
316 throw new Exception (String
.Format(
317 "Message body size mismatch: numRead={0}, bodyLen={1}",
321 Message msg
= new Message ();
322 msg
.Connection
= this;
324 msg
.SetHeaderData (header
);
330 internal void DispatchSignals ()
333 while (Inbound
.Count
!= 0) {
334 Message msg
= Inbound
.Dequeue ();
341 public delegate void MessageHandler(Message msg
);
342 public MessageHandler OnMessage
;
344 MemoryStream msgbuf
= new MemoryStream();
345 public long ReceiveBuffer(byte[] buf
, int offset
, int count
)
347 msgbuf
.Seek(0, SeekOrigin
.End
);
348 msgbuf
.Write(buf
, offset
, count
);
350 msgbuf
.Seek(0, SeekOrigin
.Begin
);
352 long left
= msgbuf
.Length
;
356 int headerSize
, bodySize
;
357 GetMessageSize(msgbuf
, out headerSize
, out bodySize
);
359 if (left
>= headerSize
+ bodySize
) {
360 Message msg
= BuildMessage(msgbuf
, headerSize
,
365 left
-= headerSize
+ bodySize
;
367 want
= headerSize
+ bodySize
- left
;
372 if (left
> 0 && msgbuf
.Length
!= left
) {
373 byte[] tmp
= new byte[left
];
375 msgbuf
.Read(tmp
, 0, tmp
.Length
);
377 msgbuf
.SetLength(tmp
.Length
);
379 msgbuf
.Seek(0, SeekOrigin
.Begin
);
380 msgbuf
.Write(tmp
, 0, tmp
.Length
);
381 } else if (left
== 0) {
391 internal Thread mainThread
= Thread
.CurrentThread
;
394 public void Iterate ()
396 mainThread
= Thread
.CurrentThread
;
398 //Message msg = Inbound.Dequeue ();
399 Message msg
= ReadMessage ();
405 public Message
GetNext()
407 return ReadMessage();
410 internal void HandleMessage (Message msg
)
412 //TODO: support disconnection situations properly and move this check elsewhere
414 throw new ArgumentNullException ("msg", "Cannot handle a null message; maybe the bus was disconnected");
418 if (msg
.Header
.Fields
.TryGetValue (FieldCode
.ReplySerial
, out field_value
)) {
419 uint reply_serial
= (uint)field_value
;
422 if (pendingCalls
.TryGetValue (reply_serial
, out pending
)) {
423 if (pendingCalls
.Remove (reply_serial
))
429 //we discard reply messages with no corresponding PendingCall
430 if (Protocol
.Verbose
)
431 Console
.Error
.WriteLine ("Unexpected reply message received: MessageType='" + msg
.Header
.MessageType
+ "', ReplySerial=" + reply_serial
);
437 switch (msg
.Header
.MessageType
) {
438 case MessageType
.MethodCall
:
439 MethodCall method_call
= new MethodCall (msg
);
440 HandleMethodCall (method_call
);
442 case MessageType
.Signal
:
443 //HandleSignal (msg);
445 Inbound
.Enqueue (msg
);
447 case MessageType
.Error
:
448 //TODO: better exception handling
449 Error error
= new Error (msg
);
450 string errMsg
= String
.Empty
;
451 if (msg
.Signature
.Value
.StartsWith ("s")) {
452 MessageReader reader
= new MessageReader (msg
);
453 errMsg
= reader
.ReadString ();
455 //throw new Exception ("Remote Error: Signature='" + msg.Signature.Value + "' " + error.ErrorName + ": " + errMsg);
456 //if (Protocol.Verbose)
457 Console
.Error
.WriteLine ("Remote Error: Signature='" + msg
.Signature
.Value
+ "' " + error
.ErrorName
+ ": " + errMsg
);
459 case MessageType
.Invalid
:
461 throw new Exception ("Invalid message received: MessageType='" + msg
.Header
.MessageType
+ "'");
465 Dictionary
<uint,PendingCall
> pendingCalls
= new Dictionary
<uint,PendingCall
> ();
467 //this might need reworking with MulticastDelegate
468 internal void HandleSignal (Message msg
)
470 Signal signal
= new Signal (msg
);
472 //TODO: this is a hack, not necessary when MatchRule is complete
473 MatchRule rule
= new MatchRule ();
474 rule
.MessageType
= MessageType
.Signal
;
475 rule
.Interface
= signal
.Interface
;
476 rule
.Member
= signal
.Member
;
477 rule
.Path
= signal
.Path
;
480 if (Handlers
.TryGetValue (rule
, out dlg
)) {
481 //dlg.DynamicInvoke (GetDynamicValues (msg));
483 MethodInfo mi
= dlg
.Method
;
484 //signals have no return value
485 dlg
.DynamicInvoke (MessageHelper
.GetDynamicValues (msg
, mi
.GetParameters ()));
488 //TODO: how should we handle this condition? sending an Error may not be appropriate in this case
489 if (Protocol
.Verbose
)
490 Console
.Error
.WriteLine ("Warning: No signal handler for " + signal
.Member
);
494 internal Dictionary
<MatchRule
,Delegate
> Handlers
= new Dictionary
<MatchRule
,Delegate
> ();
497 internal void MaybeSendUnknownMethodError (MethodCall method_call
)
499 Message msg
= MessageHelper
.CreateUnknownMethodError (method_call
);
504 //not particularly efficient and needs to be generalized
505 internal void HandleMethodCall (MethodCall method_call
)
507 //TODO: Ping and Introspect need to be abstracted and moved somewhere more appropriate once message filter infrastructure is complete
509 //FIXME: these special cases are slightly broken for the case where the member but not the interface is specified in the message
510 if (method_call
.Interface
== "org.freedesktop.DBus.Peer" && method_call
.Member
== "Ping") {
511 Message reply
= MessageHelper
.ConstructReply (method_call
);
516 if (method_call
.Interface
== "org.freedesktop.DBus.Introspectable" && method_call
.Member
== "Introspect") {
517 Introspector intro
= new Introspector ();
518 intro
.root_path
= method_call
.Path
;
521 //FIXME: do this properly
522 //this is messy and inefficient
523 List
<string> linkNodes
= new List
<string> ();
524 int depth
= method_call
.Path
.Decomposed
.Length
;
525 foreach (ObjectPath pth
in RegisteredObjects
.Keys
) {
526 if (pth
.Value
== (method_call
.Path
.Value
)) {
527 ExportObject exo
= (ExportObject
)RegisteredObjects
[pth
];
528 intro
.WriteType (exo
.obj
.GetType ());
530 for (ObjectPath cur
= pth
; cur
!= null ; cur
= cur
.Parent
) {
531 if (cur
.Value
== method_call
.Path
.Value
) {
532 string linkNode
= pth
.Decomposed
[depth
];
533 if (!linkNodes
.Contains (linkNode
)) {
534 intro
.WriteNode (linkNode
);
535 linkNodes
.Add (linkNode
);
544 Message reply
= MessageHelper
.ConstructReply (method_call
, intro
.xml
);
550 if (RegisteredObjects
.TryGetValue (method_call
.Path
, out bo
)) {
551 ExportObject eo
= (ExportObject
)bo
;
552 eo
.HandleMethodCall (method_call
);
554 MaybeSendUnknownMethodError (method_call
);
558 Dictionary
<ObjectPath
,BusObject
> RegisteredObjects
= new Dictionary
<ObjectPath
,BusObject
> ();
560 //FIXME: this shouldn't be part of the core API
561 //that also applies to much of the other object mapping code
563 public object GetObject (Type type
, string bus_name
, ObjectPath path
)
566 // return GetObject (bus_name, path);
568 //if the requested type is an interface, we can implement it efficiently
569 //otherwise we fall back to using a transparent proxy
570 if (type
.IsInterface
) {
571 return BusObject
.GetObject (this, bus_name
, path
, type
);
573 if (Protocol
.Verbose
)
574 Console
.Error
.WriteLine ("Warning: Note that MarshalByRefObject use is not recommended; for best performance, define interfaces");
576 BusObject busObject
= new BusObject (this, bus_name
, path
);
577 DProxy prox
= new DProxy (busObject
, type
);
578 return prox
.GetTransparentProxy ();
582 public T GetObject
<T
> (string bus_name
, ObjectPath path
)
584 return (T
)GetObject (typeof (T
), bus_name
, path
);
587 [Obsolete ("Use the overload of Register() which does not take a bus_name parameter")]
588 public void Register (string bus_name
, ObjectPath path
, object obj
)
590 Register (path
, obj
);
593 [Obsolete ("Use the overload of Unregister() which does not take a bus_name parameter")]
594 public object Unregister (string bus_name
, ObjectPath path
)
596 return Unregister (path
);
599 public void Register (ObjectPath path
, object obj
)
601 ExportObject eo
= new ExportObject (this, path
, obj
);
602 eo
.Registered
= true;
604 //TODO: implement some kind of tree data structure or internal object hierarchy. right now we are ignoring the name and putting all object paths in one namespace, which is bad
605 RegisteredObjects
[path
] = eo
;
608 public object Unregister (ObjectPath path
)
612 if (!RegisteredObjects
.TryGetValue (path
, out bo
))
613 throw new Exception ("Cannot unregister " + path
+ " as it isn't registered");
615 RegisteredObjects
.Remove (path
);
617 ExportObject eo
= (ExportObject
)bo
;
618 eo
.Registered
= false;
623 //these look out of place, but are useful
624 internal protected virtual void AddMatch (string rule
)
628 internal protected virtual void RemoveMatch (string rule
)
634 if (BitConverter
.IsLittleEndian
)
635 NativeEndianness
= EndianFlag
.Little
;
637 NativeEndianness
= EndianFlag
.Big
;
640 public static readonly EndianFlag NativeEndianness
;