Some doc path fixes from Anders
[pkg-k5-afs_openafs.git] / src / ubik / recovery.c
blob0f3e714a7060c5588c8a5e053b75999ffd39108c
1 /*
2 * Copyright 2000, International Business Machines Corporation and others.
3 * All Rights Reserved.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
10 #include <afsconfig.h>
11 #include <afs/param.h>
13 #include <roken.h>
15 #include <afs/opr.h>
17 #ifdef AFS_PTHREAD_ENV
18 # include <opr/lock.h>
19 #else
20 # include <opr/lockstub.h>
21 #endif
23 #include <rx/rx.h>
24 #include <afs/afsutil.h>
25 #include <afs/cellconfig.h>
28 #define UBIK_INTERNALS
29 #include "ubik.h"
30 #include "ubik_int.h"
32 /*! \file
33 * This module is responsible for determining when the system has
34 * recovered to the point that it can handle new transactions. It
35 * replays logs, polls to determine the current dbase after a crash,
36 * and distributes the new database to the others.
38 * The sync site associates a version number with each database. It
39 * broadcasts the version associated with its current dbase in every
40 * one of its beacon messages. When the sync site send a dbase to a
41 * server, it also sends the db's version. A non-sync site server can
42 * tell if it has the right dbase version by simply comparing the
43 * version from the beacon message \p uvote_dbVersion with the version
44 * associated with the database \p ubik_dbase->version. The sync site
45 * itself simply has one counter to keep track of all of this (again
46 * \p ubik_dbase->version).
48 * sync site: routine called when the sync site loses its quorum; this
49 * procedure is called "up" from the beacon package. It resyncs the
50 * dbase and nudges the recovery daemon to try to propagate out the
51 * changes. It also resets the recovery daemon's state, since
52 * recovery must potentially find a new dbase to propagate out. This
53 * routine should not do anything with variables used by non-sync site
54 * servers.
57 /*!
58 * if this flag is set, then ubik will use only the primary address
59 * (the address specified in the CellServDB) to contact other
60 * ubik servers. Ubik recovery will not try opening connections
61 * to the alternate interface addresses.
63 int ubikPrimaryAddrOnly;
65 int
66 urecovery_ResetState(void)
68 urecovery_state = 0;
69 #if !defined(AFS_PTHREAD_ENV)
70 /* No corresponding LWP_WaitProcess found anywhere for this -- klm */
71 LWP_NoYieldSignal(&urecovery_state);
72 #endif
73 return 0;
76 /*!
77 * \brief sync site
79 * routine called when a non-sync site server goes down; restarts recovery
80 * process to send missing server the new db when it comes back up for
81 * non-sync site servers.
83 * \note This routine should not do anything with variables used by non-sync site servers.
85 int
86 urecovery_LostServer(struct ubik_server *ts)
88 ubeacon_ReinitServer(ts);
89 #if !defined(AFS_PTHREAD_ENV)
90 /* No corresponding LWP_WaitProcess found anywhere for this -- klm */
91 LWP_NoYieldSignal(&urecovery_state);
92 #endif
93 return 0;
96 /*!
97 * return true iff we have a current database (called by both sync
98 * sites and non-sync sites) How do we determine this? If we're the
99 * sync site, we wait until recovery has finished fetching and
100 * re-labelling its dbase (it may still be trying to propagate it out
101 * to everyone else; that's THEIR problem). If we're not the sync
102 * site, then we must have a dbase labelled with the right version,
103 * and we must have a currently-good sync site.
106 urecovery_AllBetter(struct ubik_dbase *adbase, int areadAny)
108 afs_int32 rcode;
110 ubik_dprint_25("allbetter checking\n");
111 rcode = 0;
114 if (areadAny) {
115 if (ubik_dbase->version.epoch > 1)
116 rcode = 1; /* Happy with any good version of database */
119 /* Check if we're sync site and we've got the right data */
120 else if (ubeacon_AmSyncSite() && (urecovery_state & UBIK_RECHAVEDB)) {
121 rcode = 1;
124 /* next, check if we're aux site, and we've ever been sent the
125 * right data (note that if a dbase update fails, we won't think
126 * that the sync site is still the sync site, 'cause it won't talk
127 * to us until a timeout period has gone by. When we recover, we
128 * leave this clear until we get a new dbase */
129 else if (uvote_HaveSyncAndVersion(ubik_dbase->version)) {
130 rcode = 1;
133 ubik_dprint_25("allbetter: returning %d\n", rcode);
134 return rcode;
138 * \brief abort all transactions on this database
141 urecovery_AbortAll(struct ubik_dbase *adbase)
143 struct ubik_trans *tt;
144 for (tt = adbase->activeTrans; tt; tt = tt->next) {
145 udisk_abort(tt);
147 return 0;
151 * \brief this routine aborts the current remote transaction, if any, if the tid is wrong
154 urecovery_CheckTid(struct ubik_tid *atid, int abortalways)
156 if (ubik_currentTrans) {
157 /* there is remote write trans, see if we match, see if this
158 * is a new transaction */
159 if (atid->epoch != ubik_currentTrans->tid.epoch
160 || atid->counter > ubik_currentTrans->tid.counter || abortalways) {
161 /* don't match, abort it */
162 /* If the thread is not waiting for lock - ok to end it */
163 if (ubik_currentTrans->locktype != LOCKWAIT) {
164 udisk_end(ubik_currentTrans);
166 ubik_currentTrans = (struct ubik_trans *)0;
169 return 0;
173 * \brief replay logs
175 * log format is defined here, and implicitly in disk.c
177 * 4 byte opcode, followed by parameters, each 4 bytes long. All integers
178 * are in logged in network standard byte order, in case we want to move logs
179 * from machine-to-machine someday.
181 * Begin transaction: opcode \n
182 * Commit transaction: opcode, version (8 bytes) \n
183 * Truncate file: opcode, file number, length \n
184 * Abort transaction: opcode \n
185 * Write data: opcode, file, position, length, <length> data bytes \n
187 * A very simple routine, it just replays the log. Note that this is a new-value only log, which
188 * implies that no uncommitted data is written to the dbase: one writes data to the log, including
189 * the commit record, then we allow data to be written through to the dbase. In our particular
190 * implementation, once a transaction is done, we write out the pages to the database, so that
191 * our buffer package doesn't have to know about stable and uncommitted data in the memory buffers:
192 * any changed data while there is an uncommitted write transaction can be zapped during an
193 * abort and the remaining dbase on the disk is exactly the right dbase, without having to read
194 * the log.
196 static int
197 ReplayLog(struct ubik_dbase *adbase)
199 afs_int32 opcode;
200 afs_int32 code, tpos;
201 int logIsGood;
202 afs_int32 len, thisSize, tfile, filePos;
203 afs_int32 buffer[4];
204 afs_int32 syncFile = -1;
205 afs_int32 data[1024];
207 /* read the lock twice, once to see whether we have a transaction to deal
208 * with that committed, (theoretically, we should support more than one
209 * trans in the log at once, but not yet), and once replaying the
210 * transactions. */
211 tpos = 0;
212 logIsGood = 0;
213 /* for now, assume that all ops in log pertain to one transaction; see if there's a commit */
214 while (1) {
215 code =
216 (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
217 sizeof(afs_int32));
218 if (code != sizeof(afs_int32))
219 break;
220 opcode = ntohl(opcode);
221 if (opcode == LOGNEW) {
222 /* handle begin trans */
223 tpos += sizeof(afs_int32);
224 } else if (opcode == LOGABORT)
225 break;
226 else if (opcode == LOGEND) {
227 logIsGood = 1;
228 break;
229 } else if (opcode == LOGTRUNCATE) {
230 tpos += 4;
231 code =
232 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
233 2 * sizeof(afs_int32));
234 if (code != 2 * sizeof(afs_int32))
235 break; /* premature eof or io error */
236 tpos += 2 * sizeof(afs_int32);
237 } else if (opcode == LOGDATA) {
238 tpos += 4;
239 code =
240 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
241 3 * sizeof(afs_int32));
242 if (code != 3 * sizeof(afs_int32))
243 break;
244 /* otherwise, skip over the data bytes, too */
245 tpos += ntohl(buffer[2]) + 3 * sizeof(afs_int32);
246 } else {
247 ubik_print("corrupt log opcode (%d) at position %d\n", opcode,
248 tpos);
249 break; /* corrupt log! */
252 if (logIsGood) {
253 /* actually do the replay; log should go all the way through the commit record, since
254 * we just read it above. */
255 tpos = 0;
256 logIsGood = 0;
257 syncFile = -1;
258 while (1) {
259 code =
260 (*adbase->read) (adbase, LOGFILE, (char *)&opcode, tpos,
261 sizeof(afs_int32));
262 if (code != sizeof(afs_int32))
263 break;
264 opcode = ntohl(opcode);
265 if (opcode == LOGNEW) {
266 /* handle begin trans */
267 tpos += sizeof(afs_int32);
268 } else if (opcode == LOGABORT)
269 panic("log abort\n");
270 else if (opcode == LOGEND) {
271 struct ubik_version version;
272 tpos += 4;
273 code =
274 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
275 2 * sizeof(afs_int32));
276 if (code != 2 * sizeof(afs_int32))
277 return UBADLOG;
278 version.epoch = ntohl(buffer[0]);
279 version.counter = ntohl(buffer[1]);
280 code = (*adbase->setlabel) (adbase, 0, &version);
281 if (code)
282 return code;
283 ubik_print("Successfully replayed log for interrupted "
284 "transaction; db version is now %ld.%ld\n",
285 (long) version.epoch, (long) version.counter);
286 logIsGood = 1;
287 break; /* all done now */
288 } else if (opcode == LOGTRUNCATE) {
289 tpos += 4;
290 code =
291 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
292 2 * sizeof(afs_int32));
293 if (code != 2 * sizeof(afs_int32))
294 break; /* premature eof or io error */
295 tpos += 2 * sizeof(afs_int32);
296 code =
297 (*adbase->truncate) (adbase, ntohl(buffer[0]),
298 ntohl(buffer[1]));
299 if (code)
300 return code;
301 } else if (opcode == LOGDATA) {
302 tpos += 4;
303 code =
304 (*adbase->read) (adbase, LOGFILE, (char *)buffer, tpos,
305 3 * sizeof(afs_int32));
306 if (code != 3 * sizeof(afs_int32))
307 break;
308 tpos += 3 * sizeof(afs_int32);
309 /* otherwise, skip over the data bytes, too */
310 len = ntohl(buffer[2]); /* total number of bytes to copy */
311 filePos = ntohl(buffer[1]);
312 tfile = ntohl(buffer[0]);
313 /* try to minimize file syncs */
314 if (syncFile != tfile) {
315 if (syncFile >= 0)
316 code = (*adbase->sync) (adbase, syncFile);
317 else
318 code = 0;
319 syncFile = tfile;
320 if (code)
321 return code;
323 while (len > 0) {
324 thisSize = (len > sizeof(data) ? sizeof(data) : len);
325 /* copy sizeof(data) buffer bytes at a time */
326 code =
327 (*adbase->read) (adbase, LOGFILE, (char *)data, tpos,
328 thisSize);
329 if (code != thisSize)
330 return UBADLOG;
331 code =
332 (*adbase->write) (adbase, tfile, (char *)data, filePos,
333 thisSize);
334 if (code != thisSize)
335 return UBADLOG;
336 filePos += thisSize;
337 tpos += thisSize;
338 len -= thisSize;
340 } else {
341 ubik_print("corrupt log opcode (%d) at position %d\n",
342 opcode, tpos);
343 break; /* corrupt log! */
346 if (logIsGood) {
347 if (syncFile >= 0)
348 code = (*adbase->sync) (adbase, syncFile);
349 if (code)
350 return code;
351 } else {
352 ubik_print("Log read error on pass 2\n");
353 return UBADLOG;
357 /* now truncate the log, we're done with it */
358 code = (*adbase->truncate) (adbase, LOGFILE, 0);
359 return code;
362 /*! \brief
363 * Called at initialization to figure out version of the dbase we really have.
365 * This routine is called after replaying the log; it reads the restored labels.
367 static int
368 InitializeDB(struct ubik_dbase *adbase)
370 afs_int32 code;
372 code = (*adbase->getlabel) (adbase, 0, &adbase->version);
373 if (code) {
374 /* try setting the label to a new value */
375 UBIK_VERSION_LOCK;
376 adbase->version.epoch = 1; /* value for newly-initialized db */
377 adbase->version.counter = 1;
378 code = (*adbase->setlabel) (adbase, 0, &adbase->version);
379 if (code) {
380 /* failed, try to set it back */
381 adbase->version.epoch = 0;
382 adbase->version.counter = 0;
383 (*adbase->setlabel) (adbase, 0, &adbase->version);
385 #ifdef AFS_PTHREAD_ENV
386 opr_cv_broadcast(&adbase->version_cond);
387 #else
388 LWP_NoYieldSignal(&adbase->version);
389 #endif
390 UBIK_VERSION_UNLOCK;
392 return 0;
396 * \brief initialize the local ubik_dbase
398 * We replay the logs and then read the resulting file to figure out what version we've really got.
401 urecovery_Initialize(struct ubik_dbase *adbase)
403 afs_int32 code;
405 DBHOLD(adbase);
406 code = ReplayLog(adbase);
407 if (code)
408 goto done;
409 code = InitializeDB(adbase);
410 done:
411 DBRELE(adbase);
412 return code;
416 * \brief Main interaction loop for the recovery manager
418 * The recovery light-weight process only runs when you're the
419 * synchronization site. It performs the following tasks, if and only
420 * if the prerequisite tasks have been performed successfully (it
421 * keeps track of which ones have been performed in its bit map,
422 * \p urecovery_state).
424 * First, it is responsible for probing that all servers are up. This
425 * is the only operation that must be performed even if this is not
426 * yet the sync site, since otherwise this site may not notice that
427 * enough other machines are running to even elect this guy to be the
428 * sync site.
430 * After that, the recovery process does nothing until the beacon and
431 * voting modules manage to get this site elected sync site.
433 * After becoming sync site, recovery first attempts to find the best
434 * database available in the network (it must do this in order to
435 * ensure finding the latest committed data). After finding the right
436 * database, it must fetch this dbase to the sync site.
438 * After fetching the dbase, it relabels it with a new version number,
439 * to ensure that everyone recognizes this dbase as the most recent
440 * dbase.
442 * One the dbase has been relabelled, this machine can start handling
443 * requests. However, the recovery module still has one more task:
444 * propagating the dbase out to everyone who is up in the network.
446 void *
447 urecovery_Interact(void *dummy)
449 afs_int32 code;
450 struct ubik_server *bestServer = NULL;
451 struct ubik_server *ts;
452 int dbok, doingRPC, now;
453 afs_int32 lastProbeTime;
454 /* if we're the sync site, the best db version we've found yet */
455 static struct ubik_version bestDBVersion;
456 struct ubik_version tversion;
457 struct timeval tv;
458 int length, tlen, offset, file, nbytes;
459 struct rx_call *rxcall;
460 char tbuffer[1024];
461 struct ubik_stat ubikstat;
462 struct in_addr inAddr;
463 char hoststr[16];
464 char pbuffer[1028];
465 int fd = -1;
466 afs_int32 pass;
468 afs_pthread_setname_self("recovery");
470 /* otherwise, begin interaction */
471 urecovery_state = 0;
472 lastProbeTime = 0;
473 while (1) {
474 /* Run through this loop every 4 seconds */
475 tv.tv_sec = 4;
476 tv.tv_usec = 0;
477 #ifdef AFS_PTHREAD_ENV
478 select(0, 0, 0, 0, &tv);
479 #else
480 IOMGR_Select(0, 0, 0, 0, &tv);
481 #endif
483 ubik_dprint("recovery running in state %x\n", urecovery_state);
485 /* Every 30 seconds, check all the down servers and mark them
486 * as up if they respond. When a server comes up or found to
487 * not be current, then re-find the the best database and
488 * propogate it.
490 if ((now = FT_ApproxTime()) > 30 + lastProbeTime) {
492 for (ts = ubik_servers, doingRPC = 0; ts; ts = ts->next) {
493 UBIK_BEACON_LOCK;
494 if (!ts->up) {
495 UBIK_BEACON_UNLOCK;
496 doingRPC = 1;
497 code = DoProbe(ts);
498 if (code == 0) {
499 UBIK_BEACON_LOCK;
500 ts->up = 1;
501 UBIK_BEACON_UNLOCK;
502 DBHOLD(ubik_dbase);
503 urecovery_state &= ~UBIK_RECFOUNDDB;
504 DBRELE(ubik_dbase);
506 } else {
507 UBIK_BEACON_UNLOCK;
508 DBHOLD(ubik_dbase);
509 if (!ts->currentDB)
510 urecovery_state &= ~UBIK_RECFOUNDDB;
511 DBRELE(ubik_dbase);
515 if (doingRPC)
516 now = FT_ApproxTime();
517 lastProbeTime = now;
520 /* Mark whether we are the sync site */
521 DBHOLD(ubik_dbase);
522 if (!ubeacon_AmSyncSite()) {
523 urecovery_state &= ~UBIK_RECSYNCSITE;
524 DBRELE(ubik_dbase);
525 continue; /* nothing to do */
527 urecovery_state |= UBIK_RECSYNCSITE;
529 /* If a server has just come up or if we have not found the
530 * most current database, then go find the most current db.
532 if (!(urecovery_state & UBIK_RECFOUNDDB)) {
533 int okcalls = 0;
534 DBRELE(ubik_dbase);
535 bestServer = (struct ubik_server *)0;
536 bestDBVersion.epoch = 0;
537 bestDBVersion.counter = 0;
538 for (ts = ubik_servers; ts; ts = ts->next) {
539 UBIK_BEACON_LOCK;
540 if (!ts->up) {
541 UBIK_BEACON_UNLOCK;
542 continue; /* don't bother with these guys */
544 UBIK_BEACON_UNLOCK;
545 if (ts->isClone)
546 continue;
547 UBIK_ADDR_LOCK;
548 code = DISK_GetVersion(ts->disk_rxcid, &ts->version);
549 UBIK_ADDR_UNLOCK;
550 if (code == 0) {
551 okcalls++;
552 /* perhaps this is the best version */
553 if (vcmp(ts->version, bestDBVersion) > 0) {
554 /* new best version */
555 bestDBVersion = ts->version;
556 bestServer = ts;
561 DBHOLD(ubik_dbase);
563 if (okcalls + 1 >= ubik_quorum) {
564 /* If we've asked a majority of sites about their db version,
565 * then we can say with confidence that we've found the best db
566 * version. If we haven't contacted most sites (because
567 * GetVersion failed or because we already know the server is
568 * down), then we don't really know if we know about the best
569 * db version. So we can only proceed in here if 'okcalls'
570 * indicates we managed to contact a majority of sites. */
572 /* take into consideration our version. Remember if we,
573 * the sync site, have the best version. Also note that
574 * we may need to send the best version out.
576 if (vcmp(ubik_dbase->version, bestDBVersion) >= 0) {
577 bestDBVersion = ubik_dbase->version;
578 bestServer = (struct ubik_server *)0;
579 urecovery_state |= UBIK_RECHAVEDB;
580 } else {
581 /* Clear the flag only when we know we have to retrieve
582 * the db. Because urecovery_AllBetter() looks at it.
584 urecovery_state &= ~UBIK_RECHAVEDB;
586 urecovery_state |= UBIK_RECFOUNDDB;
587 urecovery_state &= ~UBIK_RECSENTDB;
590 if (!(urecovery_state & UBIK_RECFOUNDDB)) {
591 DBRELE(ubik_dbase);
592 continue; /* not ready */
595 /* If we, the sync site, do not have the best db version, then
596 * go and get it from the server that does.
598 if ((urecovery_state & UBIK_RECHAVEDB) || !bestServer) {
599 urecovery_state |= UBIK_RECHAVEDB;
600 } else {
601 /* we don't have the best version; we should fetch it. */
602 urecovery_AbortAll(ubik_dbase);
604 /* Rx code to do the Bulk fetch */
605 file = 0;
606 offset = 0;
607 UBIK_ADDR_LOCK;
608 rxcall = rx_NewCall(bestServer->disk_rxcid);
610 ubik_print("Ubik: Synchronize database with server %s\n",
611 afs_inet_ntoa_r(bestServer->addr[0], hoststr));
612 UBIK_ADDR_UNLOCK;
614 code = StartDISK_GetFile(rxcall, file);
615 if (code) {
616 ubik_dprint("StartDiskGetFile failed=%d\n", code);
617 goto FetchEndCall;
619 nbytes = rx_Read(rxcall, (char *)&length, sizeof(afs_int32));
620 length = ntohl(length);
621 if (nbytes != sizeof(afs_int32)) {
622 ubik_dprint("Rx-read length error=%d\n", code = BULK_ERROR);
623 code = EIO;
624 goto FetchEndCall;
627 /* give invalid label during file transit */
628 UBIK_VERSION_LOCK;
629 tversion.epoch = 0;
630 code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion);
631 UBIK_VERSION_UNLOCK;
632 if (code) {
633 ubik_dprint("setlabel io error=%d\n", code);
634 goto FetchEndCall;
636 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
637 ubik_dbase->pathName, (file<0)?"SYS":"",
638 (file<0)?-file:file);
639 fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600);
640 if (fd < 0) {
641 code = errno;
642 goto FetchEndCall;
644 code = lseek(fd, HDRSIZE, 0);
645 if (code != HDRSIZE) {
646 close(fd);
647 goto FetchEndCall;
650 pass = 0;
651 while (length > 0) {
652 tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
653 #ifndef AFS_PTHREAD_ENV
654 if (pass % 4 == 0)
655 IOMGR_Poll();
656 #endif
657 nbytes = rx_Read(rxcall, tbuffer, tlen);
658 if (nbytes != tlen) {
659 ubik_dprint("Rx-read bulk error=%d\n", code = BULK_ERROR);
660 code = EIO;
661 close(fd);
662 goto FetchEndCall;
664 nbytes = write(fd, tbuffer, tlen);
665 pass++;
666 if (nbytes != tlen) {
667 code = UIOERROR;
668 close(fd);
669 goto FetchEndCall;
671 offset += tlen;
672 length -= tlen;
674 code = close(fd);
675 if (code)
676 goto FetchEndCall;
677 code = EndDISK_GetFile(rxcall, &tversion);
678 FetchEndCall:
679 code = rx_EndCall(rxcall, code);
680 if (!code) {
681 /* we got a new file, set up its header */
682 urecovery_state |= UBIK_RECHAVEDB;
683 UBIK_VERSION_LOCK;
684 memcpy(&ubik_dbase->version, &tversion,
685 sizeof(struct ubik_version));
686 snprintf(tbuffer, sizeof(tbuffer), "%s.DB%s%d",
687 ubik_dbase->pathName, (file<0)?"SYS":"",
688 (file<0)?-file:file);
689 #ifdef AFS_NT40_ENV
690 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
691 ubik_dbase->pathName, (file<0)?"SYS":"",
692 (file<0)?-file:file);
693 code = unlink(pbuffer);
694 if (!code)
695 code = rename(tbuffer, pbuffer);
696 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.TMP",
697 ubik_dbase->pathName, (file<0)?"SYS":"",
698 (file<0)?-file:file);
699 #endif
700 if (!code)
701 code = rename(pbuffer, tbuffer);
702 if (!code) {
703 (*ubik_dbase->open) (ubik_dbase, file);
704 /* after data is good, sync disk with correct label */
705 code =
706 (*ubik_dbase->setlabel) (ubik_dbase, 0,
707 &ubik_dbase->version);
709 UBIK_VERSION_UNLOCK;
710 #ifdef AFS_NT40_ENV
711 snprintf(pbuffer, sizeof(pbuffer), "%s.DB%s%d.OLD",
712 ubik_dbase->pathName, (file<0)?"SYS":"",
713 (file<0)?-file:file);
714 unlink(pbuffer);
715 #endif
717 if (code) {
718 unlink(pbuffer);
720 * We will effectively invalidate the old data forever now.
721 * Unclear if we *should* but we do.
723 UBIK_VERSION_LOCK;
724 ubik_dbase->version.epoch = 0;
725 ubik_dbase->version.counter = 0;
726 UBIK_VERSION_UNLOCK;
727 ubik_print("Ubik: Synchronize database failed (error = %d)\n",
728 code);
729 } else {
730 ubik_print("Ubik: Synchronize database completed\n");
731 urecovery_state |= UBIK_RECHAVEDB;
733 udisk_Invalidate(ubik_dbase, 0); /* data has changed */
734 #ifdef AFS_PTHREAD_ENV
735 opr_cv_broadcast(&ubik_dbase->version_cond);
736 #else
737 LWP_NoYieldSignal(&ubik_dbase->version);
738 #endif
740 if (!(urecovery_state & UBIK_RECHAVEDB)) {
741 DBRELE(ubik_dbase);
742 continue; /* not ready */
745 /* If the database was newly initialized, then when we establish quorum, write
746 * a new label. This allows urecovery_AllBetter() to allow access for reads.
747 * Setting it to 2 also allows another site to come along with a newer
748 * database and overwrite this one.
750 if (ubik_dbase->version.epoch == 1) {
751 urecovery_AbortAll(ubik_dbase);
752 UBIK_VERSION_LOCK;
753 ubik_dbase->version.epoch = 2;
754 ubik_dbase->version.counter = 1;
755 code =
756 (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
757 UBIK_VERSION_UNLOCK;
758 udisk_Invalidate(ubik_dbase, 0); /* data may have changed */
759 #ifdef AFS_PTHREAD_ENV
760 opr_cv_broadcast(&ubik_dbase->version_cond);
761 #else
762 LWP_NoYieldSignal(&ubik_dbase->version);
763 #endif
766 /* Check the other sites and send the database to them if they
767 * do not have the current db.
769 if (!(urecovery_state & UBIK_RECSENTDB)) {
770 /* now propagate out new version to everyone else */
771 dbok = 1; /* start off assuming they all worked */
774 * Check if a write transaction is in progress. We can't send the
775 * db when a write is in progress here because the db would be
776 * obsolete as soon as it goes there. Also, ops after the begin
777 * trans would reach the recepient and wouldn't find a transaction
778 * pending there. Frankly, I don't think it's possible to get past
779 * the write-lock above if there is a write transaction in progress,
780 * but then, it won't hurt to check, will it?
782 if (ubik_dbase->flags & DBWRITING) {
783 struct timeval tv;
784 int safety = 0;
785 long cur_usec = 50000;
786 while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
787 DBRELE(ubik_dbase);
788 /* sleep for a little while */
789 tv.tv_sec = 0;
790 tv.tv_usec = cur_usec;
791 #ifdef AFS_PTHREAD_ENV
792 select(0, 0, 0, 0, &tv);
793 #else
794 IOMGR_Select(0, 0, 0, 0, &tv);
795 #endif
796 cur_usec += 10000;
797 safety++;
798 DBHOLD(ubik_dbase);
802 for (ts = ubik_servers; ts; ts = ts->next) {
803 UBIK_ADDR_LOCK;
804 inAddr.s_addr = ts->addr[0];
805 UBIK_ADDR_UNLOCK;
806 UBIK_BEACON_LOCK;
807 if (!ts->up) {
808 UBIK_BEACON_UNLOCK;
809 ubik_dprint("recovery cannot send version to %s\n",
810 afs_inet_ntoa_r(inAddr.s_addr, hoststr));
811 dbok = 0;
812 continue;
814 UBIK_BEACON_UNLOCK;
815 ubik_dprint("recovery sending version to %s\n",
816 afs_inet_ntoa_r(inAddr.s_addr, hoststr));
817 if (vcmp(ts->version, ubik_dbase->version) != 0) {
818 ubik_dprint("recovery stating local database\n");
820 /* Rx code to do the Bulk Store */
821 code = (*ubik_dbase->stat) (ubik_dbase, 0, &ubikstat);
822 if (!code) {
823 length = ubikstat.size;
824 file = offset = 0;
825 UBIK_ADDR_LOCK;
826 rxcall = rx_NewCall(ts->disk_rxcid);
827 UBIK_ADDR_UNLOCK;
828 code =
829 StartDISK_SendFile(rxcall, file, length,
830 &ubik_dbase->version);
831 if (code) {
832 ubik_dprint("StartDiskSendFile failed=%d\n",
833 code);
834 goto StoreEndCall;
836 while (length > 0) {
837 tlen =
838 (length >
839 sizeof(tbuffer) ? sizeof(tbuffer) : length);
840 nbytes =
841 (*ubik_dbase->read) (ubik_dbase, file,
842 tbuffer, offset, tlen);
843 if (nbytes != tlen) {
844 ubik_dprint("Local disk read error=%d\n",
845 code = UIOERROR);
846 goto StoreEndCall;
848 nbytes = rx_Write(rxcall, tbuffer, tlen);
849 if (nbytes != tlen) {
850 ubik_dprint("Rx-write bulk error=%d\n", code =
851 BULK_ERROR);
852 goto StoreEndCall;
854 offset += tlen;
855 length -= tlen;
857 code = EndDISK_SendFile(rxcall);
858 StoreEndCall:
859 code = rx_EndCall(rxcall, code);
861 if (code == 0) {
862 /* we set a new file, process its header */
863 ts->version = ubik_dbase->version;
864 ts->currentDB = 1;
865 } else
866 dbok = 0;
867 } else {
868 /* mark file up to date */
869 ts->currentDB = 1;
872 if (dbok)
873 urecovery_state |= UBIK_RECSENTDB;
875 DBRELE(ubik_dbase);
877 return NULL;
881 * \brief send a Probe to all the network address of this server
883 * \return 0 if success, else return 1
886 DoProbe(struct ubik_server *server)
888 struct rx_connection *conns[UBIK_MAX_INTERFACE_ADDR];
889 struct rx_connection *connSuccess = 0;
890 int i, nconns, success_i = -1;
891 afs_uint32 addr;
892 char buffer[32];
893 char hoststr[16];
895 UBIK_ADDR_LOCK;
896 for (i = 0; (i < UBIK_MAX_INTERFACE_ADDR) && (addr = server->addr[i]);
897 i++) {
898 conns[i] =
899 rx_NewConnection(addr, ubik_callPortal, DISK_SERVICE_ID,
900 addr_globals.ubikSecClass, addr_globals.ubikSecIndex);
902 /* user requirement to use only the primary interface */
903 if (ubikPrimaryAddrOnly) {
904 i = 1;
905 break;
908 UBIK_ADDR_UNLOCK;
909 nconns = i;
910 opr_Assert(nconns); /* at least one interface address for this server */
912 multi_Rx(conns, nconns) {
913 multi_DISK_Probe();
914 if (!multi_error) { /* first success */
915 success_i = multi_i;
917 multi_Abort;
919 } multi_End_Ignore;
921 if (success_i >= 0) {
922 UBIK_ADDR_LOCK;
923 addr = server->addr[success_i]; /* successful interface addr */
925 if (server->disk_rxcid) /* destroy existing conn */
926 rx_DestroyConnection(server->disk_rxcid);
927 if (server->vote_rxcid)
928 rx_DestroyConnection(server->vote_rxcid);
930 /* make new connections */
931 server->disk_rxcid = conns[success_i];
932 server->vote_rxcid = rx_NewConnection(addr, ubik_callPortal,
933 VOTE_SERVICE_ID, addr_globals.ubikSecClass,
934 addr_globals.ubikSecIndex);
936 connSuccess = conns[success_i];
937 strcpy(buffer, afs_inet_ntoa_r(server->addr[0], hoststr));
939 ubik_print("ubik:server %s is back up: will be contacted through %s\n",
940 buffer, afs_inet_ntoa_r(addr, hoststr));
941 UBIK_ADDR_UNLOCK;
944 /* Destroy all connections except the one on which we succeeded */
945 for (i = 0; i < nconns; i++)
946 if (conns[i] != connSuccess)
947 rx_DestroyConnection(conns[i]);
949 if (!connSuccess)
950 ubik_dprint("ubik:server %s still down\n",
951 afs_inet_ntoa_r(server->addr[0], hoststr));
953 if (connSuccess)
954 return 0; /* success */
955 else
956 return 1; /* failure */