Oops, my fix to subdir.mk didn't work with existing symlinks. Fixed.
[wvapps.git] / wvsync / wvsyncprotocol.cc
blob490fce2a4efe4fda0cda91a0bd0b64235a8fc1fc
1 /*
2 * Worldvisions Weaver Software:
3 * Copyright (C) 1997-2004 Net Integration Technologies, Inc.
5 * See wvsyncprotocol.h for details.
7 */
9 #include "wvsyncprotocol.h"
10 #include "wvsyncobj.h"
11 #include "wvsyncarbiter.h"
12 #include "wvsyncver.h"
13 #include "wvsynclister.h"
14 #include "wvsyncstring.h"
16 #include <wvcallback.h>
18 char *WvSyncProtocol::cmdstr[] = {
19 "HELLO", "OK", "ERROR", "!!", "help", "chdir", "ihave", "sig", "delta",
20 "donewith", "abort", "yourturn", "quit", "haverep", "done",
21 NULL
25 WvSyncProtocol::WvSyncProtocol(WvStream *_cloned, WvLog *_debuglog,
26 WvSyncArbiter &_arb, UniConf &_cfgroot,
27 bool _server)
28 : WvProtoStream(_cloned, _debuglog), arb(_arb), cfgroot(_cfgroot),
29 log("Net", WvLog::Debug1), err(log.split(WvLog::Error))
31 server = _server;
32 myturn = !server;
33 myturn_soon = false;
34 cwd = "";
35 greeted = false;
36 lister = 0;
37 binaryobj = 0;
38 uid = 0;
39 locdone = remdone = false;
41 if (server)
42 send_greeting();
46 void WvSyncProtocol::set_lister(WvSyncLister *_lister)
48 lister = _lister;
52 void WvSyncProtocol::do_state(Token &t1)
54 int cmd = tokanal(t1, cmdstr);
56 if (cmd < 0)
58 print("ERROR Unknown command \"%s\"\n", t1.data);
59 return;
62 // some commands always work, regardless of state
63 if (cmd == cTrivia)
64 return; // always ignore trivia
65 else if (cmd == cHelp)
67 print("OK Known commands: chdir, ihave, sig, delta, donewith, "
68 "abort, yourturn, quit\n");
69 return;
71 else if (cmd == cAbort)
72 do_cabort();
74 else if (cmd == cQuit)
75 do_cquit();
77 else if (cmd == cChdir)
78 do_cchdir();
80 else if (cmd == cIHave)
81 do_cihave();
83 else if (cmd == cHaveRep)
84 do_chaverep();
86 else if (cmd == cSig || cmd == cDelta)
87 do_cbinary((Commands)cmd);
89 else if (cmd == cDonewith)
90 do_cdonewith();
92 else if (cmd == cYourturn)
93 do_cyourturn();
95 else if (cmd == cHello)
96 greeted = true;
98 else if (cmd == cDone)
99 do_cdone();
101 if (!greeted)
103 err("Server didn't say hello. Dying...\n");
104 close();
109 void WvSyncProtocol::switch_state(int newstate)
111 state = newstate;
115 void WvSyncProtocol::execute()
117 log(WvLog::Debug5, "Started execute state=%s\n", state);
119 if (state == ReceiveSig || state == ReceiveDelta)
121 assert(binaryobj);
122 log(WvLog::Debug, "Loading into %s\n", binaryobj->name);
123 WvDynBuf &buf = (state==ReceiveSig) ?
124 binaryobj->sigbuf : binaryobj->deltabuf;
125 off_t &need = (state==ReceiveSig) ?
126 binaryobj->signeed : binaryobj->deltaneed;
127 log(WvLog::Debug5, "Waiting on %s bytes\n", need);
128 int n = read(buf, need);
129 log(WvLog::Debug5, "AA Buf now has %s bytes\n", buf.used());
131 if (n == 0) { // closed? NO!
132 return;
135 need -= n;
137 // write the delta to a file... although it'd be great if we didn't
138 // have to. But it might be big. So I guess we do.
139 if (state == ReceiveDelta)
141 log(WvLog::Debug5, "It's delta time\n");
142 if (binaryobj->tmpfile.isok() == false)
143 binaryobj->tmpfile.open(WvString("/tmp/delta-%s",
144 binaryobj->id), // FIXME
145 O_WRONLY | O_CREAT | O_TRUNC);
146 binaryobj->tmpfile.write(buf, buf.used());
149 if (need != 0) // still need more?
151 WvStreamClone::execute();
152 return;
155 // done.
156 print("!! Done receiving\n");
157 log(WvLog::Debug5, "Buf now has %s bytes\n", buf.used());
158 if (state == ReceiveSig)
159 makedelta(*binaryobj);
160 else
162 binaryobj->tmpfile.close();
163 applydelta(*binaryobj);
167 if (locdone && remdone && in_flight.isempty())
169 log("Exiting because everybody's done.\n");
170 cleanup();
171 close();
174 WvProtoStream::execute();
175 log(WvLog::Debug5, "Ending execute\n");
179 void WvSyncProtocol::do_chaverep()
181 WvString s;
182 int remarbremote;
183 WvString remact, remmd5sig, remmeta, remname, id;
184 time_t remmtime;
185 bool ret;
187 ProtoObj *curobj;
188 id = next_token_str();
189 curobj = in_flight[id];
191 remarbremote = next_token_str().num();
192 remact = next_token_str();
193 remmd5sig = next_token_str();
194 remmtime = next_token_str().num();
195 remmeta = next_token_str();
196 tokbuf.get(1);
197 remname = tokbuf.getstr();
199 // if the other side doesn't have the file, mtime will be -1.
200 // Disregard everything else.
201 if (remmtime < 0)
202 remact = remmd5sig = remmeta = "";
204 if (remname != curobj->name)
206 err("Weird! Remote end replied about the wrong file. (%s) != (%s)\n",
207 remname, curobj->name);
208 send_abort(curobj, "Weird! Remote end replied about the wrong file.");
209 return;
212 // decide what to do and send local status to other end
213 ret = arb.arbremote(cfgroot, *curobj->obj, remact, remmeta,
214 remmtime, remmd5sig);
215 log(WvLog::Debug5, "arbremote returned %s\n", ret);
217 // both sides better not be requesting...
218 if (remarbremote && ret)
220 err("Weird! Both sides want to request the same file.\n");
221 send_abort(curobj, "Both sides want to request the same file");
222 return;
225 // both sides are allowed to *not* request...
226 if (!remarbremote && !ret)
228 log(WvLog::Debug5, "Neither side wants to request.\n");
229 print("DONEWITH %s (%s)\n", id, curobj->name);
230 remove_obj(curobj);
233 if (ret)
235 log(WvLog::Debug5, "Sending the sig\n");
236 send_sig(*curobj);
241 void WvSyncProtocol::send_sig(ProtoObj &curobj)
243 print("SIG %s %s %s\n",
244 curobj.id, curobj.mysigbuf.used(), curobj.name);
245 cloned->write(curobj.mysigbuf, curobj.mysigbuf.used());
249 bool WvSyncProtocol::newcwd(WvStringParm dir)
251 bool ret = lister->newcwd(dir);
252 lister->cleardone();
254 log(WvLog::Debug5, "lister returned: %s\n", ret);
255 if (ret)
257 cwd = dir;
260 return ret;
264 void deltacb(WvSyncProtocol::ProtoObj &curobj, WvSyncObj &obj, WvBuf &out)
266 assert(curobj.tmpfile.isok()); // FIXME?
268 curobj.tmpfile.write(out, out.used());
272 // FIXME: I would prefer it if this didn't use a temporary file. It'd be
273 // great to have deltacb send the delta across in chunks as it is produced.
274 // But this is quick and hacky, just for now...
275 void WvSyncProtocol::makedelta(ProtoObj &curobj)
277 WvString fname("/tmp/mmdelta-%s", curobj.id); // FIXME
278 curobj.tmpfile.open(fname, O_WRONLY | O_TRUNC | O_CREAT);
280 if (curobj.tmpfile.isok())
282 int ret = curobj.obj->makedelta(curobj.sigbuf, curobj.deltabuf,
283 WvBoundCallback<WvSyncCallback, ProtoObj &>
284 (&deltacb, curobj));
285 deltacb(curobj, *curobj.obj, curobj.deltabuf); // write the last bit
286 curobj.tmpfile.close();
288 if (ret == 0)
290 log(WvLog::Debug5, "Starting to dump file.\n");
291 struct stat st;
292 stat(fname, &st);
293 print("DELTA %s %s %s\n", curobj.id, st.st_size, curobj.name);
294 WvFile f(fname, O_RDONLY); // FIXME
295 f.autoforward(*(WvStream *) this->cloned);
296 while (f.isok())
298 if (f.select(-1, true, false))
299 f.callback();
301 f.close();
302 log(WvLog::Debug5, "Finished dumping file.\n");
303 switch_state(Ready);
304 ::unlink(fname);
305 return;
308 send_abort(&curobj, "couldn't make delta, let's skip");
312 void WvSyncProtocol::applydelta(ProtoObj &curobj)
314 WvString infname("/tmp/delta-%s", curobj.id); // FIXME
315 WvString outfname("/tmp/patched-%s", curobj.id); // FIXME
316 curobj.tmpfile.open(outfname, O_WRONLY | O_TRUNC | O_CREAT);
318 if (curobj.tmpfile.isok())
320 WvDynBuf buf;
322 int ret = curobj.obj->applydelta(infname, buf,
323 WvBoundCallback<WvSyncCallback, ProtoObj &>
324 (&deltacb, curobj));
325 deltacb(curobj, *curobj.obj, buf); // write the last bit
326 curobj.tmpfile.close();
327 ::unlink(infname);
329 if (ret == 0)
331 WvString meta = curobj.cfg["meta"].getme();
332 time_t mtime = curobj.cfg["mtime"].getmeint();
333 if (curobj.obj->installnew(outfname, meta, mtime))
335 print("DONEWITH %s (%s)\n", curobj.id, curobj.name);
336 remove_obj(&curobj);
337 switch_state(Ready);
338 ::unlink(outfname);
339 return;
341 else
342 log(WvLog::Debug5, "installnew just didn't work\n");
344 else
345 log(WvLog::Debug5, "librsync ret'ed badly\n");
347 else
348 log(WvLog::Debug5, "tmpfile was not ok\n");
349 send_abort(&curobj, "couldn't apply delta, let's skip");
353 void WvSyncProtocol::send_abort(ProtoObj *curobj, WvStringParm msg)
355 curobj->obj->revert(curobj->cfg);
356 print("ABORT %s (%s)\n", curobj->id, msg);
357 remove_obj(curobj);
361 void WvSyncProtocol::do_ihave(WvSyncObj *obj)
363 ProtoObj *curobj = new ProtoObj;
364 curobj->id = gen_id();
365 curobj->obj = obj;
366 curobj->name = obj->name;
367 curobj->cfg = cfgroot[obj->name];
369 log(WvLog::Debug5, "Running do_ihave on %s\n", obj->name);
370 int ret = arb.arblocal(cfgroot, *curobj->obj, curobj->mysigbuf);
371 log(WvLog::Debug5, "arblocal returned %s\n", ret);
373 WvString act = curobj->cfg["act"].getme();
374 WvString md5sig = curobj->cfg["md5sig"].getme();
375 time_t mtime = curobj->cfg["mtime"].getmeint();
376 WvString meta = curobj->cfg["meta"].getme();
378 print("IHAVE %s %s %s %s %s %s\n",
379 curobj->id, act, md5sig, mtime, meta, curobj->name);
380 in_flight.add(curobj, true);
384 void WvSyncProtocol::do_chdir(WvStringParm _cwd)
386 // FIXME: should only set this when we get an OK to the CHDIR command...
387 cwd = _cwd;
388 if (!cwd)
389 cwd = ".";
391 print("CHDIR %s\n", cwd);
395 void WvSyncProtocol::do_yourturn()
397 print("YOURTURN\n");
398 myturn = false;
402 void WvSyncProtocol::do_done()
404 print("DONE\n");
405 locdone = true;
409 void WvSyncProtocol::do_cchdir()
411 WvString s = next_token_str();
412 if (!!s)
414 if (newcwd(s))
415 print("OK Setting CWD\n");
416 else
418 print("ERROR Couldn't set CWD\n");
419 err("Couldn't change into %s\n. Dying.\n");
420 close();
423 else
424 print("ERROR Specify CWD\n");
425 return;
429 void WvSyncProtocol::do_cihave()
431 WvString id, remact, remmd5sig, remmeta, remname;
432 WvString act, md5sig, mtime, meta;
433 int lret, ret, remmtime;
435 id = next_token_str();
436 if (!cwd)
438 print("ABORT %s (cwd first)\n", id);
439 return;
441 remact = next_token_str();
442 remmd5sig = next_token_str();
443 remmtime = next_token_str().num();
444 remmeta = next_token_str();
445 tokbuf.get(1); // drop the space
446 remname = tokbuf.getstr();
448 // if the other side doesn't have the file but is requesting it
449 // anyway (such as in a conflict) mtime will be -1. Disregard
450 // everything else.
451 if (remmtime < 0)
452 remact = remmd5sig = remmeta = "";
454 if (!remname)
456 print("ABORT %s Use ihave id action md5sig mtime meta objname\n", id);
457 return;
460 ProtoObj *curobj = new ProtoObj;
462 curobj->id = id;
463 curobj->name = remname;
464 log(WvLog::Debug5, "remname: %s\n", remname);
465 curobj->cfg = cfgroot[remname];
466 curobj->obj = lister->makeobj(remname);
467 if (!curobj->obj)
469 print("ABORT %s skip\n", id);
470 return;
472 lret = arb.arblocal(cfgroot, *curobj->obj, curobj->mysigbuf);
473 log(WvLog::Debug5, "arblocal returned %s\n", lret);
475 // decide what to do and send local status to other end
476 act = curobj->cfg["act"].getme();
477 md5sig = curobj->cfg["md5sig"].getme();
478 mtime = curobj->cfg["mtime"].getmeint();
479 meta = curobj->cfg["meta"].getme();
480 ret = arb.arbremote(cfgroot, *curobj->obj, remact, remmeta,
481 remmtime, remmd5sig);
482 log(WvLog::Debug5, "arbremote returned %s\n", ret);
483 print("HAVEREP %s %s %s %s %s %s %s\n", id, ret,
484 act, md5sig, mtime, meta, curobj->name);
485 in_flight.add(curobj, true);
487 if (ret)
488 send_sig(*curobj);
492 void WvSyncProtocol::do_cbinary(Commands c)
494 assert(!in_flight.isempty());
495 WvString id = next_token_str();
496 off_t binarybytes = next_token_str().num();
497 ProtoObj *curobj = in_flight[id];
498 if (!curobj)
500 print("ABORT %s (Could not find object with proper id)\n", id);
501 return;
503 binaryobj = curobj;
504 log(WvLog::Debug5, "binaryobj.name %s\n", binaryobj->name);
505 if (c == cSig)
507 log(WvLog::Debug5, "Switching state to ReceiveSig\n");
508 curobj->signeed = binarybytes;
509 switch_state(ReceiveSig);
511 else
513 log(WvLog::Debug5, "Switching state to ReceiveDelta\n");
514 curobj->deltaneed = binarybytes;
515 switch_state(ReceiveDelta);
520 void WvSyncProtocol::do_cdonewith()
522 WvString id = next_token_str();
523 ProtoObj *obj = in_flight[id];
524 remove_obj(obj);
525 return;
529 void WvSyncProtocol::do_cyourturn()
531 if (in_flight.isempty())
532 myturn = true;
533 else
534 myturn_soon = true;
538 void WvSyncProtocol::do_cabort()
540 WvString id = next_token_str();
541 ProtoObj *obj = in_flight[id];
542 obj->obj->revert(obj->cfg);
543 remove_obj(obj);
547 void WvSyncProtocol::do_cquit()
549 cleanup();
550 close();
554 void WvSyncProtocol::do_cdone()
556 remdone = true;
560 void WvSyncProtocol::send_greeting()
562 print("HELLO Welcome to WvSync (" WVSYNC_VER_STRING ")\n");
563 greeted = true;
567 void WvSyncProtocol::cleanup()
569 ProtoObjDict::Iter i(in_flight);
571 i.rewind();
572 while (i.next())
574 ProtoObj *obj = i.ptr();
575 obj->obj->revert(obj->cfg);
576 remove_obj(obj);
577 i.rewind();
579 binaryobj = 0;
583 WvString WvSyncProtocol::gen_id()
585 return WvString("a%s", uid++);
589 void WvSyncProtocol::remove_obj(ProtoObj *obj)
591 lister->markdone(obj->name);
593 in_flight.remove(obj);
594 if (myturn_soon && in_flight.isempty())
595 myturn = true;