2 // Mono.Messaging.RabbitMQ
5 // Michael Barker (mike@middlesoft.co.uk)
7 // (C) 2008 Michael Barker
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 using System
.Collections
;
34 using System
.Net
.Sockets
;
35 using System
.Threading
;
39 using RabbitMQ
.Client
;
41 namespace Mono
.Messaging
.RabbitMQ
{
43 public class RabbitMQMessagingProvider
: IMessagingProvider
{
45 private int txCounter
= 0;
46 private readonly Guid localId
;
47 private readonly MessagingContextPool contextPool
;
49 public RabbitMQMessagingProvider ()
51 localId
= Guid
.NewGuid ();
52 contextPool
= new MessagingContextPool (new MessageFactory (this),
56 public IMessage
CreateMessage ()
58 return new MessageBase ();
61 public IMessageQueueTransaction
CreateMessageQueueTransaction ()
63 Interlocked
.Increment (ref txCounter
);
64 string txId
= localId
.ToString () + "_" + txCounter
.ToString ();
66 return new RabbitMQMessageQueueTransaction (txId
, contextPool
);
69 public IMessagingContext
CreateContext (string host
)
71 return contextPool
.GetContext (host
);
74 private IConnection
CreateConnection (string host
)
76 ConnectionFactory cf
= new ConnectionFactory ();
78 return cf
.CreateConnection ();
81 public void DeleteQueue (QueueReference qRef
)
83 RabbitMQMessageQueue
.Delete (qRef
);
86 private readonly IDictionary queues
= new Hashtable ();
87 private readonly ReaderWriterLock qLock
= new ReaderWriterLock ();
88 private const int TIMEOUT
= 15000;
90 public IMessageQueue
[] GetPublicQueues ()
93 qLock
.AcquireReaderLock (TIMEOUT
);
95 ICollection qCollection
= queues
.Values
;
96 qs
= new IMessageQueue
[qCollection
.Count
];
97 qCollection
.CopyTo (qs
, 0);
100 qLock
.ReleaseReaderLock ();
104 public bool Exists (QueueReference qRef
)
106 qLock
.AcquireReaderLock (TIMEOUT
);
108 return queues
.Contains (qRef
);
110 qLock
.ReleaseReaderLock ();
114 public IMessageQueue
CreateMessageQueue (QueueReference qRef
,
117 qLock
.AcquireWriterLock (TIMEOUT
);
119 IMessageQueue mq
= new RabbitMQMessageQueue (this, qRef
, transactional
);
123 qLock
.ReleaseWriterLock ();
127 public IMessageQueue
GetMessageQueue (QueueReference qRef
)
129 qLock
.AcquireReaderLock (TIMEOUT
);
131 if (queues
.Contains (qRef
))
132 return (IMessageQueue
) queues
[qRef
];
134 LockCookie lc
= qLock
.UpgradeToWriterLock (TIMEOUT
);
136 IMessageQueue mq
= new RabbitMQMessageQueue (this, qRef
, false);
140 qLock
.DowngradeFromWriterLock (ref lc
);
144 qLock
.ReleaseReaderLock ();