Allow schema files that are missing checksums on the !!SCHEMAMATIC line.
[versaplex.git] / versaplexd / versaplexd.cs
blobb20f56659a3f6d7105d1bdfddf7f92b862d069f1
1 /*
2 * Versaplex:
3 * Copyright (C)2007-2008 Versabanq Innovations Inc. and contributors.
4 * See the included file named LICENSE for license information.
5 */
6 using System;
7 using System.Collections.Generic;
8 using System.IO;
9 using System.Threading;
10 using System.Linq;
11 using Wv;
12 using Wv.Extensions;
14 public class VxActionTriple
16 public VxActionTriple(WvDbus _conn, WvDbusMsg _src, Action _action)
18 this.conn = _conn;
19 this.src = _src;
20 this.action = _action;
23 public WvDbus conn;
24 public WvDbusMsg src;
25 public Action action;
28 public static class Versaplexd
30 static WvLog log = new WvLog("Versaplex");
31 static VxDbusRouter msgrouter = new VxDbusRouter();
32 static WvDBusServer dbusserver;
33 static Thread dbusserver_thread = null;
34 static ManualResetEvent thread_ready = new ManualResetEvent(false);
35 public static bool want_to_die = false;
37 public static WvDbus conn;
38 static List<VxActionTriple> action_queue = new List<VxActionTriple>();
40 static Object action_mutex = new Object();
41 static VxActionTriple curaction = null;
43 private static void HandleCancelQuery(WvDbus conn, WvDbusMsg msg)
45 log.print(WvLog.L.Debug4, "Received CancelQuery request\n");
46 //FIXME: Should I be in yet another thread?
47 Action perform = null;
48 if (msg.signature != "u" && msg.signature != "s")
50 //accept 's' signatures for Perl DBus, which is stupid and can't
51 //send me 'u' parameters, even though the api accepts them.
52 log.print(WvLog.L.Debug4, "CancelQuery: bad signature {0}\n",
53 msg.signature);
54 perform = () => {
55 conn.send(msg.err_reply(
56 "org.freedesktop.DBus.Error.UnknownMethod",
57 "No overload of {0} has signature '{1}'",
58 "CancelQuery", msg.signature));
61 else
63 var it = msg.iter();
64 uint tokill;
65 if (msg.signature == "s")
67 log.print(WvLog.L.Debug4,
68 "CancelQuery: converting arg from string\n");
69 string temps = it.pop();
70 tokill = Convert.ToUInt32(temps);
72 else
73 tokill = it.pop();
75 log.print(WvLog.L.Debug4,
76 "CancelQuery: try killing msg id {0}\n", tokill);
78 lock (action_mutex)
80 if (curaction != null && curaction.conn == conn &&
81 curaction.src.serial == tokill)
83 log.print(WvLog.L.Debug4,
84 "CancelQuery: killing current action!\n");
86 WvSqlRows_IDataReader.Cancel();
87 curaction = null;
89 else
91 log.print(WvLog.L.Debug4,
92 "CancelQuery: traversing action queue...\n");
94 //Traverse the action queue, killing stuff
95 foreach (VxActionTriple t in action_queue)
96 if (t.conn == conn && t.src.serial == tokill)
98 log.print(WvLog.L.Debug4,
99 "CancelQuery: found culprit, killing.\n");
100 //action_queue.Remove(t);
101 //FIXME: What message should we really put here?
102 t.action = () => {
103 conn.send(t.src.err_reply("vx.db.sqlerror",
104 "This message got canceled"));
106 break;
111 //Pointless return to make Perl happy.
112 perform = () => {
113 WvDbusWriter writer = new WvDbusWriter();
114 writer.Write("Cancel");
115 conn.send(msg.reply("s").write(writer));
117 log.print(WvLog.L.Debug4, "CancelQuery: complete\n");
120 //FIXME: It's not clear whether for just add operations, in conjuction
121 //with RemoveAt(0) going on in the otherthread, we need a mutex.
122 action_queue.Add(new VxActionTriple(conn, msg, perform));
125 static bool WvDbusMsgReady(WvDbus conn, WvDbusMsg msg)
127 // FIXME: This should really queue things to be run from the thread
128 // pool and then the response would be sent back through the action
129 // queue
130 log.print(WvLog.L.Debug4, "WvDbusMsgReady\n");
132 switch (msg.type)
134 case Wv.Dbus.MType.MethodCall:
135 if (msg.ifc == "vx.db")
137 if (msg.path == "/db" && msg.method == "CancelQuery")
138 HandleCancelQuery(conn, msg);
139 else //not 'CancelQuery'
141 //FIXME: It's not clear whether for just add operations,
142 //in conjuction with RemoveAt(0) going on in the other
143 //thread, we need a mutex.
144 action_queue.Add(new VxActionTriple(conn, msg,
145 () => {
146 WvDbusMsg reply;
147 if (msgrouter.route(conn, msg, out reply))
149 if (reply == null) {
150 // FIXME: Do something if this happens, maybe?
151 log.print("Empty reply from RouteWvDbusMsg\n");
152 } else {
153 // XXX: Should this be done further down rather
154 // than passing the reply out here?
155 conn.send(reply);
158 }));
160 return true;
162 return false;
164 default:
165 log.print(WvLog.L.Warning,
166 "Unexpected DBus message received: #{0} {1}->{2} {3}:{4}.{5}\n",
167 msg.serial, msg.sender, msg.dest,
168 msg.path, msg.ifc, msg.method);
169 return false;
173 static void _StartDBusServerThread(string[] monikers)
175 using (dbusserver = new WvDBusServer())
177 foreach (string m in monikers)
178 dbusserver.listen(m);
179 thread_ready.Set();
180 while (!want_to_die)
181 dbusserver.runonce();
182 log.print("DBus thread ending.\n");
186 static void StartDBusServerThread(string[] monikers)
188 if (monikers.Length == 0) return;
189 wv.assert(WvDBusServer.check() == 42, "wvdbusd.dll test failed");
190 thread_ready.Reset();
191 dbusserver_thread = new Thread(() => _StartDBusServerThread(monikers));
192 dbusserver_thread.Start();
193 thread_ready.WaitOne();
196 static void StopDBusServerThread()
198 want_to_die = true;
199 if (dbusserver_thread != null)
201 log.print("Waiting for DBus thread.\n");
202 dbusserver_thread.Join();
206 public static int Go(string cfgfile, string bus, string[] listeners)
210 return _Go(cfgfile, bus, listeners);
212 finally
214 StopDBusServerThread();
218 static int _Go(string cfgfile, string bus, string[] listeners)
220 StartDBusServerThread(listeners.ToArray());
222 if (cfgfile.e() && File.Exists("versaplexd.ini"))
223 cfgfile = "versaplexd.ini";
224 if (cfgfile.e())
225 cfgfile = "/etc/versaplexd.ini";
227 log.print("Config file is '{0}'\n", cfgfile);
228 VxSqlPool.SetIniFile(cfgfile);
230 wv.assert(bus.ne());
232 log.print("Connecting to '{0}'\n", bus);
233 conn = new WvDbus(bus);
235 RequestNameReply rnr = conn.RequestName("vx.versaplexd",
236 NameFlag.DoNotQueue);
238 switch (rnr) {
239 case RequestNameReply.PrimaryOwner:
240 log.print("Name registered, ready\n");
241 break;
242 default:
243 log.print("Register name result: \n" + rnr.ToString());
244 return 2;
247 conn.stream.onclose += () => {
248 log.print(
249 "***********************************************************\n"+
250 "************ D-bus connection closed by server ************\n"+
251 "***********************************************************\n");
252 want_to_die = true;
255 conn.handlers.Insert(0, (m) => WvDbusMsgReady(conn, m));
257 while (!want_to_die)
259 log.print(WvLog.L.Debug2, "Event loop.\n");
261 // We can't wait infinitely here, because someone in another
262 // thread might set want_to_die.
263 // (FIXME: in that case it's lame that we might ignore it for
264 // a timeout. We should have a loopback stream of some sort
265 // instead...)
266 WvStream.runonce(1000);
268 while (action_queue.Count > 0)
270 log.print(WvLog.L.Debug2, "Action queue.\n");
271 lock (action_mutex)
273 curaction = action_queue.First();
274 action_queue.RemoveAt(0);
276 curaction.action();
277 lock (action_mutex)
279 curaction = null;
281 log.print(WvLog.L.Debug2, "Action queue element done.\n");
285 log.print("Done!\n");
286 return 0;