Merged 0.51.1 branch
[zeroinstall/zeroinstall-afb.git] / zeroinstall / injector / sat.py
blob0ac6670bf21fb25ebe2f1cd1f8d32ac7087fe41e
1 """
2 Internal implementation of a SAT solver, used by L{solver.SATSolver}.
3 This is not part of the public API.
4 """
6 # Copyright (C) 2010, Thomas Leonard
7 # See the README file for details, or visit http://0install.net.
9 # The design of this solver is very heavily based on the one described in
10 # the MiniSat paper "An Extensible SAT-solver [extended version 1.2]"
11 # http://minisat.se/Papers.html
13 # The main differences are:
15 # - We care about which solution we find (not just "satisfiable" or "not").
16 # - We take care to be deterministic (always select the same versions given
17 # the same input). We do not do random restarts, etc.
18 # - We add an AtMostOneClause (the paper suggests this in the Excercises, and
19 # it's very useful for our purposes).
21 import tempfile, subprocess, os, sys
22 from logging import warn
24 def debug(msg, *args):
25 return
26 print "SAT:", msg % args
28 # variables are numbered from 0
29 # literals have the same number as the corresponding variable,
30 # except they for negatives they are (-1-v):
32 # Variable Literal not(Literal)
33 # 0 0 -1
34 # 1 1 -2
35 def neg(lit):
36 return -1 - lit
38 def watch_index(lit):
39 if lit >= 0:
40 return lit * 2
41 return neg(lit) * 2 + 1
43 def makeAtMostOneClause(solver):
44 class AtMostOneClause:
45 def __init__(self, lits):
46 """Preferred literals come first."""
47 self.lits = lits
49 # The single literal from our set that is True.
50 # We store this explicitly because the decider needs to know quickly.
51 self.current = None
53 def propagate(self, lit):
54 # Re-add ourselves to the watch list.
55 # (we we won't get any more notifications unless we backtrack,
56 # in which case we'd need to get back on the list anyway)
57 solver.watch_lit(lit, self)
59 # value[lit] has just become True
60 assert solver.lit_value(lit) == True
61 assert lit >= 0
63 #debug("%s: noticed %s has become True" % (self, solver.name_lit(lit)))
65 # If we already propagated successfully when the first
66 # one was set then we set all the others to False and
67 # anyone trying to set one True will get rejected. And
68 # if we didn't propagate yet, current will still be
69 # None, even if we now have a conflict (which we'll
70 # detect below).
71 assert self.current is None
73 self.current = lit
75 # If we later backtrace, call our undo function to unset current
76 solver.get_varinfo_for_lit(lit).undo.append(self)
78 count = 0
79 for l in self.lits:
80 value = solver.lit_value(l)
81 #debug("Value of %s is %s" % (solver.name_lit(l), value))
82 if value is True and l is not lit:
83 # Due to queuing, we might get called with current = None
84 # and two versions already selected.
85 debug("CONFLICT: already selected %s" % solver.name_lit(l))
86 return False
87 if value is None:
88 # Since one of our lits is already true, all unknown ones
89 # can be set to False.
90 if not solver.enqueue(neg(l), self):
91 debug("CONFLICT: enqueue failed for %s", solver.name_lit(neg(l)))
92 return False # Conflict; abort
94 return True
96 def undo(self, lit):
97 debug("(backtracking: no longer selected %s)" % solver.name_lit(lit))
98 assert lit == self.current
99 self.current = None
101 # Why is lit True?
102 # Or, why are we causing a conflict (if lit is None)?
103 def cacl_reason(self, lit):
104 if lit is None:
105 # Find two True literals
106 trues = []
107 for l in self.lits:
108 if solver.lit_value(l) is True:
109 trues.append(l)
110 if len(trues) == 2: return trues
111 else:
112 for l in self.lits:
113 if l is not lit and solver.lit_value(l) is True:
114 return [l]
115 # Find one True literal
116 assert 0 # don't know why!
118 def best_undecided(self):
119 debug("best_undecided: %s" % (solver.name_lits(self.lits)))
120 for lit in self.lits:
121 #debug("%s = %s" % (solver.name_lit(lit), solver.lit_value(lit)))
122 if solver.lit_value(lit) is None:
123 return lit
124 return None
126 def __repr__(self):
127 return "<lone: %s>" % (', '.join(solver.name_lits(self.lits)))
129 return AtMostOneClause
131 def makeUnionClause(solver):
132 class UnionClause:
133 def __init__(self, lits):
134 self.lits = lits
136 # Try to infer new facts.
137 # We can do this only when all of our literals are False except one,
138 # which is undecided. That is,
139 # False... or X or False... = True => X = True
141 # To get notified when this happens, we tell the solver to
142 # watch two of our undecided literals. Watching two undecided
143 # literals is sufficient. When one changes we check the state
144 # again. If we still have two or more undecided then we switch
145 # to watching them, otherwise we propagate.
147 # Returns False on conflict.
148 def propagate(self, lit):
149 # value[get(lit)] has just become False
151 #debug("%s: noticed %s has become False" % (self, solver.name_lit(neg(lit))))
153 # For simplicity, only handle the case where self.lits[1]
154 # is the one that just got set to False, so that:
155 # - value[lits[0]] = None | True
156 # - value[lits[1]] = False
157 # If it's the other way around, just swap them before we start.
158 if self.lits[0] == neg(lit):
159 self.lits[0], self.lits[1] = self.lits[1], self.lits[0]
161 if solver.lit_value(self.lits[0]) == True:
162 # We're already satisfied. Do nothing.
163 solver.watch_lit(lit, self)
164 return True
166 assert solver.lit_value(self.lits[1]) == False
168 # Find a new literal to watch now that lits[1] is resolved,
169 # swap it with lits[1], and start watching it.
170 for i in range(2, len(self.lits)):
171 value = solver.lit_value(self.lits[i])
172 if value != False:
173 # Could be None or True. If it's True then we've already done our job,
174 # so this means we don't get notified unless we backtrack, which is fine.
175 self.lits[1], self.lits[i] = self.lits[i], self.lits[1]
176 solver.watch_lit(neg(self.lits[1]), self)
177 return True
179 # Only lits[0], is now undefined.
180 solver.watch_lit(lit, self)
181 return solver.enqueue(self.lits[0], self)
183 def undo(self, lit): pass
185 # Why is lit True?
186 # Or, why are we causing a conflict (if lit is None)?
187 def cacl_reason(self, lit):
188 assert lit is None or lit is self.lits[0]
190 # The cause is everything except lit.
191 return [neg(l) for l in self.lits if l is not lit]
193 def __repr__(self):
194 return "<some: %s>" % (', '.join(solver.name_lits(self.lits)))
195 return UnionClause
197 # Using an array of VarInfo objects is less efficient than using multiple arrays, but
198 # easier for me to understand.
199 class VarInfo(object):
200 __slots__ = ['value', 'reason', 'level', 'undo', 'obj']
201 def __init__(self, obj):
202 self.value = None # True/False/None
203 self.reason = None # The constraint that implied our value, if True or False
204 self.level = -1 # The decision level at which we got a value (when not None)
205 self.undo = [] # Constraints to update if we become unbound (by backtracking)
206 self.obj = obj # The object this corresponds to (for our caller and for debugging)
208 def __repr__(self):
209 return '%s=%s' % (self.name, self.value)
211 @property
212 def name(self):
213 return str(self.obj)
215 class SATProblem(object):
216 def __init__(self):
217 # Propagation
218 self.watches = [] # watches[2i,2i+1] = constraints to check when literal[i] becomes True/False
219 self.propQ = [] # propagation queue
221 # Assignments
222 self.assigns = [] # [VarInfo]
223 self.trail = [] # order of assignments
224 self.trail_lim = [] # decision levels
226 self.toplevel_conflict = False
228 self.makeAtMostOneClause = makeAtMostOneClause(self)
229 self.makeUnionClause = makeUnionClause(self)
231 def get_decision_level(self):
232 return len(self.trail_lim)
234 def add_variable(self, obj):
235 debug("add_variable('%s')", obj)
236 index = len(self.assigns)
238 self.watches += [[], []] # Add watch lists for X and not(X)
239 self.assigns.append(VarInfo(obj))
240 return index
242 # lit is now True
243 # reason is the clause that is asserting this
244 # Returns False if this immediately causes a conflict.
245 def enqueue(self, lit, reason):
246 debug("%s => %s" % (reason, self.name_lit(lit)))
247 old_value = self.lit_value(lit)
248 if old_value is not None:
249 if old_value is False:
250 # Conflict
251 return False
252 else:
253 # Already set (shouldn't happen)
254 return True
256 if lit < 0:
257 var_info = self.assigns[neg(lit)]
258 var_info.value = False
259 else:
260 var_info = self.assigns[lit]
261 var_info.value = True
262 var_info.level = self.get_decision_level()
263 var_info.reason = reason
265 self.trail.append(lit)
266 self.propQ.append(lit)
268 return True
270 # Pop most recent assignment from self.trail
271 def undo_one(self):
272 lit = self.trail[-1]
273 debug("(pop %s)", self.name_lit(lit))
274 var_info = self.get_varinfo_for_lit(lit)
275 var_info.value = None
276 var_info.reason = None
277 var_info.level = -1
278 self.trail.pop()
280 while var_info.undo:
281 var_info.undo.pop().undo(lit)
283 def cancel(self):
284 n_this_level = len(self.trail) - self.trail_lim[-1]
285 debug("backtracking from level %d (%d assignments)" %
286 (self.get_decision_level(), n_this_level))
287 while n_this_level != 0:
288 self.undo_one()
289 n_this_level -= 1
290 self.trail_lim.pop()
292 def cancel_until(self, level):
293 while self.get_decision_level() > level:
294 self.cancel()
296 # Process the propQ.
297 # Returns None when done, or the clause that caused a conflict.
298 def propagate(self):
299 #debug("propagate: queue length = %d", len(self.propQ))
300 while self.propQ:
301 lit = self.propQ[0]
302 del self.propQ[0]
303 var_info = self.get_varinfo_for_lit(lit)
304 wi = watch_index(lit)
305 watches = self.watches[wi]
306 self.watches[wi] = []
308 debug("%s -> True : watches: %s" % (self.name_lit(lit), watches))
310 # Notifiy all watchers
311 for i in range(len(watches)):
312 clause = watches[i]
313 if not clause.propagate(lit):
314 # Conflict
316 # Re-add remaining watches
317 self.watches[wi] += watches[i+1:]
319 # No point processing the rest of the queue as
320 # we'll have to backtrack now.
321 self.propQ = []
323 return clause
324 return None
326 def impossible(self):
327 self.toplevel_conflict = True
329 def get_varinfo_for_lit(self, lit):
330 if lit >= 0:
331 return self.assigns[lit]
332 else:
333 return self.assigns[neg(lit)]
335 def lit_value(self, lit):
336 if lit >= 0:
337 value = self.assigns[lit].value
338 return value
339 else:
340 v = -1 - lit
341 value = self.assigns[v].value
342 if value is None:
343 return None
344 else:
345 return not value
347 # Call cb when lit becomes True
348 def watch_lit(self, lit, cb):
349 #debug("%s is watching for %s to become True" % (cb, self.name_lit(lit)))
350 self.watches[watch_index(lit)].append(cb)
352 # Returns the new clause if one was added, True if none was added
353 # because this clause is trivially True, or False if the clause is
354 # False.
355 def _add_clause(self, lits, learnt):
356 if not lits:
357 assert not learnt
358 self.toplevel_conflict = True
359 return False
360 elif len(lits) == 1:
361 # A clause with only a single literal is represented
362 # as an assignment rather than as a clause.
363 if learnt:
364 reason = "learnt"
365 else:
366 reason = "top-level"
367 return self.enqueue(lits[0], reason)
369 clause = self.makeUnionClause(lits)
370 clause.learnt = learnt
372 if learnt:
373 # lits[0] is None because we just backtracked.
374 # Start watching the next literal that we will
375 # backtrack over.
376 best_level = -1
377 best_i = 1
378 for i in range(1, len(lits)):
379 level = self.get_varinfo_for_lit(lits[i]).level
380 if level > best_level:
381 best_level = level
382 best_i = i
383 lits[1], lits[best_i] = lits[best_i], lits[1]
385 # Watch the first two literals in the clause (both must be
386 # undefined at this point).
387 for lit in lits[:2]:
388 self.watch_lit(neg(lit), clause)
390 return clause
392 def name_lits(self, lst):
393 return [self.name_lit(l) for l in lst]
395 # For nicer debug messages
396 def name_lit(self, lit):
397 if lit >= 0:
398 return self.assigns[lit].name
399 return "not(%s)" % self.assigns[neg(lit)].name
401 def add_clause(self, lits):
402 # Public interface. Only used before the solve starts.
403 assert lits
405 debug("add_clause([%s])" % ', '.join(self.name_lits(lits)))
407 if any(self.lit_value(l) == True for l in lits):
408 # Trivially true already.
409 return True
410 lit_set = set(lits)
411 for l in lits:
412 if neg(l) in lit_set:
413 # X or not(X) is always True.
414 return True
415 # Remove duplicates and values known to be False
416 lits = [l for l in lit_set if self.lit_value(l) != False]
418 retval = self._add_clause(lits, learnt = False)
419 if not retval:
420 self.toplevel_conflict = True
421 return retval
423 def at_most_one(self, lits):
424 assert lits
426 debug("at_most_one(%s)" % ', '.join(self.name_lits(lits)))
428 # If we have zero or one literals then we're trivially true
429 # and not really needed for the solve. However, Zero Install
430 # monitors these objects to find out what was selected, so
431 # keep even trivial ones around for that.
433 #if len(lits) < 2:
434 # return True # Trivially true
436 # Ensure no duplicates
437 assert len(set(lits)) == len(lits), lits
439 # Ignore any literals already known to be False.
440 # If any are True then they're enqueued and we'll process them
441 # soon.
442 lits = [l for l in lits if self.lit_value(l) != False]
444 clause = self.makeAtMostOneClause(lits)
446 for lit in lits:
447 self.watch_lit(lit, clause)
449 return clause
451 def analyse(self, cause):
452 # After trying some assignments, we've discovered a conflict.
453 # e.g.
454 # - we selected A then B then C
455 # - from A, B, C we got X, Y
456 # - we have a rule: not(A) or not(X) or not(Y)
458 # The simplest thing to do would be:
459 # 1. add the rule "not(A) or not(B) or not(C)"
460 # 2. unassign C
462 # Then we we'd deduce not(C) and we could try something else.
463 # However, that would be inefficient. We want to learn a more
464 # general rule that will help us with the rest of the problem.
466 # We take the clause that caused the conflict ("cause") and
467 # ask it for its cause. In this case:
469 # A and X and Y => conflict
471 # Since X and Y followed logically from A, B, C there's no
472 # point learning this rule; we need to know to avoid A, B, C
473 # *before* choosing C. We ask the two variables deduced at the
474 # current level (X and Y) what caused them, and work backwards.
475 # e.g.
477 # X: A and C => X
478 # Y: C => Y
480 # Combining these, we get the cause of the conflict in terms of
481 # things we knew before the current decision level:
483 # A and X and Y => conflict
484 # A and (A and C) and (C) => conflict
485 # A and C => conflict
487 # We can then learn (record) the more general rule:
489 # not(A) or not(C)
491 # Then, in future, whenever A is selected we can remove C and
492 # everything that depends on it from consideration.
495 learnt = [None] # The general rule we're learning
496 btlevel = 0 # The deepest decision in learnt
497 p = None # The literal we want to expand now
498 seen = set() # The variables involved in the conflict
500 counter = 0
502 while True:
503 # cause is the reason why p is True (i.e. it enqueued it).
504 # The first time, p is None, which requests the reason
505 # why it is conflicting.
506 if p is None:
507 debug("Why did %s make us fail?" % cause)
508 p_reason = cause.cacl_reason(p)
509 debug("Because: %s => conflict" % (' and '.join(self.name_lits(p_reason))))
510 else:
511 debug("Why did %s lead to %s?" % (cause, self.name_lit(p)))
512 p_reason = cause.cacl_reason(p)
513 debug("Because: %s => %s" % (' and '.join(self.name_lits(p_reason)), self.name_lit(p)))
515 # p_reason is in the form (A and B and ...)
516 # p_reason => p
518 # Check each of the variables in p_reason that we haven't
519 # already considered:
520 # - if the variable was assigned at the current level,
521 # mark it for expansion
522 # - otherwise, add it to learnt
524 for lit in p_reason:
525 var_info = self.get_varinfo_for_lit(lit)
526 if var_info not in seen:
527 seen.add(var_info)
528 if var_info.level == self.get_decision_level():
529 # We deduced this var since the last decision.
530 # It must be in self.trail, so we'll get to it
531 # soon. Remember not to stop until we've processed it.
532 counter += 1
533 elif var_info.level > 0:
534 # We won't expand lit, just remember it.
535 # (we could expand it if it's not a decision, but
536 # apparently not doing so is useful)
537 learnt.append(neg(lit))
538 btlevel = max(btlevel, var_info.level)
539 # else we already considered the cause of this assignment
541 # At this point, counter is the number of assigned
542 # variables in self.trail at the current decision level that
543 # we've seen. That is, the number left to process. Pop
544 # the next one off self.trail (as well as any unrelated
545 # variables before it; everything up to the previous
546 # decision has to go anyway).
548 # On the first time round the loop, we must find the
549 # conflict depends on at least one assignment at the
550 # current level. Otherwise, simply setting the decision
551 # variable caused a clause to conflict, in which case
552 # the clause should have asserted not(decision-variable)
553 # before we ever made the decision.
554 # On later times round the loop, counter was already >
555 # 0 before we started iterating over p_reason.
556 assert counter > 0
558 while True:
559 p = self.trail[-1]
560 var_info = self.get_varinfo_for_lit(p)
561 cause = var_info.reason
562 self.undo_one()
563 if var_info in seen:
564 break
565 debug("(irrelevant)")
566 counter -= 1
568 if counter <= 0:
569 assert counter == 0
570 # If counter = 0 then we still have one more
571 # literal (p) at the current level that we
572 # could expand. However, apparently it's best
573 # to leave this unprocessed (says the minisat
574 # paper).
575 break
577 # p is the literal we decided to stop processing on. It's either
578 # a derived variable at the current level, or the decision that
579 # led to this level. Since we're not going to expand it, add it
580 # directly to the learnt clause.
581 learnt[0] = neg(p)
583 debug("Learnt: %s" % (' or '.join(self.name_lits(learnt))))
585 return learnt, btlevel
587 def run_solver(self, decide):
588 # Check whether we detected a trivial problem
589 # during setup.
590 if self.toplevel_conflict:
591 debug("FAIL: toplevel_conflict before starting solve!")
592 return False
594 while True:
595 # Use logical deduction to simplify the clauses
596 # and assign literals where there is only one possibility.
597 conflicting_clause = self.propagate()
598 if not conflicting_clause:
599 debug("new state: %s", self.assigns)
600 if all(info.value != None for info in self.assigns):
601 # Everything is assigned without conflicts
602 debug("SUCCESS!")
603 return True
604 else:
605 # Pick a variable and try assigning it one way.
606 # If it leads to a conflict, we'll backtrack and
607 # try it the other way.
608 lit = decide()
609 #print "TRYING:", self.name_lit(lit)
610 assert lit is not None, "decide function returned None!"
611 assert self.lit_value(lit) is None
612 self.trail_lim.append(len(self.trail))
613 r = self.enqueue(lit, reason = "considering")
614 assert r is True
615 else:
616 if self.get_decision_level() == 0:
617 debug("FAIL: conflict found at top level")
618 return False
619 else:
620 # Figure out the root cause of this failure.
621 learnt, backtrack_level = self.analyse(conflicting_clause)
623 self.cancel_until(backtrack_level)
625 c = self._add_clause(learnt, learnt = True)
627 if c is not True:
628 # Everything except the first literal in learnt is known to
629 # be False, so the first must be True.
630 e = self.enqueue(learnt[0], c)
631 assert e is True