export amount of active queries
[debiancodesearch.git] / index / write.go
blob239f87eee9435b4955b44b61971d2189b4671fe0
1 // Copyright 2011 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
5 package index
7 import (
8 "errors"
9 "io"
10 "io/ioutil"
11 "log"
12 "os"
13 "strings"
14 "unsafe"
16 "code.google.com/p/codesearch/sparse"
19 // Index writing. See read.go for details of on-disk format.
21 // It would suffice to make a single large list of (trigram, file#) pairs
22 // while processing the files one at a time, sort that list by trigram,
23 // and then create the posting lists from subsequences of the list.
24 // However, we do not assume that the entire index fits in memory.
25 // Instead, we sort and flush the list to a new temporary file each time
26 // it reaches its maximum in-memory size, and then at the end we
27 // create the final posting lists by merging the temporary files as we
28 // read them back in.
30 // It would also be useful to be able to create an index for a subset
31 // of the files and then merge that index into an existing one. This would
32 // allow incremental updating of an existing index when a directory changes.
33 // But we have not implemented that.
35 // sortPost sorts the postentry list.
36 // The list is already sorted by fileid (bottom 32 bits)
37 // and the top 8 bits are always zero, so there are only
38 // 24 bits to sort. Run two rounds of 12-bit radix sort.
39 const sortK = 12
41 // An IndexWriter creates an on-disk index corresponding to a set of files.
42 type IndexWriter struct {
43 LogSkip bool // log information about skipped files
44 Verbose bool // log status using package log
46 trigram *sparse.Set // trigrams for the current file
47 buf [8]byte // scratch buffer
49 paths []string
51 nameData *bufWriter // temp file holding list of names
52 nameLen uint32 // number of bytes written to nameData
53 nameIndex *bufWriter // temp file holding name index
54 numName int // number of names written
55 totalBytes int64
57 post []postEntry // list of (trigram, file#) pairs
58 postFile []*os.File // flushed post entries
59 postIndex *bufWriter // temp file holding posting list index
61 inbuf []byte // input buffer
62 main *bufWriter // main index file
64 sortTmp []postEntry
65 sortN [1 << sortK]int
68 const npost = 64 << 20 / 8 // 64 MB worth of post entries
70 // Create returns a new IndexWriter that will write the index to file.
71 func Create(file string) *IndexWriter {
72 return &IndexWriter{
73 // 1 << 24 = 16777216, how many numbers can be represented by 3 uint8_t’s.
74 trigram: sparse.NewSet(1 << 24),
75 nameData: bufCreate(""),
76 nameIndex: bufCreate(""),
77 postIndex: bufCreate(""),
78 main: bufCreate(file),
79 post: make([]postEntry, 0, npost),
80 inbuf: make([]byte, 16384),
84 // A postEntry is an in-memory (trigram, file#) pair.
85 type postEntry uint64
87 func (p postEntry) trigram() uint32 {
88 return uint32(p >> 32)
91 func (p postEntry) fileid() uint32 {
92 return uint32(p)
95 func makePostEntry(trigram, fileid uint32) postEntry {
96 return postEntry(trigram)<<32 | postEntry(fileid)
99 // Tuning constants for detecting text files.
100 // A file is assumed not to be text files (and thus not indexed)
101 // if it contains an invalid UTF-8 sequences, if it is longer than maxFileLength
102 // bytes, if it contains a line longer than maxLineLen bytes,
103 // or if it contains more than maxTextTrigrams distinct trigrams.
104 const (
105 maxFileLen = 1 << 30
106 maxLineLen = 2000
107 maxTextTrigrams = 20000
110 // AddPaths adds the given paths to the index's list of paths.
111 func (ix *IndexWriter) AddPaths(paths []string) {
112 ix.paths = append(ix.paths, paths...)
115 // AddFile adds the file with the given name (opened using os.Open)
116 // to the index. It logs errors using package log.
117 func (ix *IndexWriter) AddFile(name string, indexname string) error {
118 f, err := os.Open(name)
119 if err != nil {
120 log.Print(err)
121 return err
123 defer f.Close()
124 return ix.Add(indexname, f)
127 // Add adds the file f to the index under the given name.
128 // It logs errors using package log.
129 func (ix *IndexWriter) Add(name string, f io.Reader) error {
130 ix.trigram.Reset()
131 var (
132 c = byte(0)
133 i = 0
134 buf = ix.inbuf[:0]
135 tv = uint32(0)
136 n = int64(0)
137 linelen = 0
139 for {
140 tv = (tv << 8) & (1<<24 - 1)
141 if i >= len(buf) {
142 n, err := f.Read(buf[:cap(buf)])
143 if n == 0 {
144 if err != nil {
145 if err == io.EOF {
146 break
148 log.Printf("%s: %v\n", name, err)
149 return err
151 log.Printf("%s: 0-length read\n", name)
152 return errors.New("0-length read")
154 buf = buf[:n]
155 i = 0
157 c = buf[i]
159 tv |= uint32(c)
160 if n++; n >= 3 {
161 ix.trigram.Add(tv)
163 if !validUTF8((tv>>8)&0xFF, tv&0xFF) {
164 if ix.LogSkip {
165 log.Printf("%s: invalid UTF-8, ignoring\n", name)
167 return errors.New("invalid UTF-8, ignoring")
169 if n > maxFileLen {
170 if ix.LogSkip {
171 log.Printf("%s: too long, ignoring\n", name)
173 return errors.New("too long, ignoring")
175 if linelen++; linelen > maxLineLen {
176 if ix.LogSkip {
177 log.Printf("%s: very long lines, ignoring\n", name)
179 return errors.New("very long lines, ignoring")
181 if c == '\n' {
182 linelen = 0
185 if ix.trigram.Len() > maxTextTrigrams {
186 if ix.LogSkip {
187 log.Printf("%s: too many trigrams, probably not text, ignoring\n", name)
189 return errors.New("too many trigrams, probably not text, ignoring")
191 ix.totalBytes += n
193 if ix.Verbose {
194 log.Printf("%d %d %s\n", n, ix.trigram.Len(), name)
197 fileid := ix.addName(name)
198 for _, trigram := range ix.trigram.Dense() {
199 if len(ix.post) >= cap(ix.post) {
200 ix.flushPost()
202 ix.post = append(ix.post, makePostEntry(trigram, fileid))
205 return nil
208 // Flush flushes the index entry to the target file.
209 func (ix *IndexWriter) Flush() {
210 ix.addName("")
212 var off [5]uint32
213 ix.main.writeString(magic)
214 off[0] = ix.main.offset()
215 for _, p := range ix.paths {
216 ix.main.writeString(p)
217 ix.main.writeString("\x00")
219 ix.main.writeString("\x00")
220 off[1] = ix.main.offset()
221 copyFile(ix.main, ix.nameData)
222 off[2] = ix.main.offset()
223 ix.mergePost(ix.main)
224 off[3] = ix.main.offset()
225 copyFile(ix.main, ix.nameIndex)
226 off[4] = ix.main.offset()
227 copyFile(ix.main, ix.postIndex)
228 for _, v := range off {
229 ix.main.writeUint32(v)
231 ix.main.writeString(trailerMagic)
233 os.Remove(ix.nameData.name)
234 for _, f := range ix.postFile {
235 os.Remove(f.Name())
237 os.Remove(ix.nameIndex.name)
238 os.Remove(ix.postIndex.name)
240 log.Printf("%d data bytes, %d index bytes", ix.totalBytes, ix.main.offset())
242 ix.main.flush()
245 func copyFile(dst, src *bufWriter) {
246 dst.flush()
247 _, err := io.Copy(dst.file, src.finish())
248 if err != nil {
249 log.Fatalf("copying %s to %s: %v", src.name, dst.name, err)
253 // addName adds the file with the given name to the index.
254 // It returns the assigned file ID number.
255 func (ix *IndexWriter) addName(name string) uint32 {
256 if strings.Contains(name, "\x00") {
257 log.Fatalf("%q: file has NUL byte in name", name)
260 ix.nameIndex.writeUint32(ix.nameData.offset())
261 ix.nameData.writeString(name)
262 ix.nameData.writeByte(0)
263 id := ix.numName
264 ix.numName++
265 return uint32(id)
268 // flushPost writes ix.post to a new temporary file and
269 // clears the slice.
270 func (ix *IndexWriter) flushPost() {
271 w, err := ioutil.TempFile("", "csearch-index")
272 if err != nil {
273 log.Fatal(err)
275 if ix.Verbose {
276 log.Printf("flush %d entries to %s", len(ix.post), w.Name())
278 ix.sortPost(ix.post)
280 // Write the raw ix.post array to disk as is.
281 // This process is the one reading it back in, so byte order is not a concern.
282 data := (*[npost * 8]byte)(unsafe.Pointer(&ix.post[0]))[:len(ix.post)*8]
283 if n, err := w.Write(data); err != nil || n < len(data) {
284 if err != nil {
285 log.Fatal(err)
287 log.Fatalf("short write writing %s", w.Name())
290 ix.post = ix.post[:0]
291 w.Seek(0, 0)
292 ix.postFile = append(ix.postFile, w)
295 // mergePost reads the flushed index entries and merges them
296 // into posting lists, writing the resulting lists to out.
297 func (ix *IndexWriter) mergePost(out *bufWriter) {
298 var h postHeap
300 log.Printf("merge %d files + mem", len(ix.postFile))
301 for _, f := range ix.postFile {
302 h.addFile(f)
304 ix.sortPost(ix.post)
305 h.addMem(ix.post)
307 npost := 0
308 e := h.next()
309 offset0 := out.offset()
310 for {
311 npost++
312 offset := out.offset() - offset0
313 trigram := e.trigram()
314 ix.buf[0] = byte(trigram >> 16)
315 ix.buf[1] = byte(trigram >> 8)
316 ix.buf[2] = byte(trigram)
318 // posting list
319 fileid := ^uint32(0)
320 nfile := uint32(0)
321 out.write(ix.buf[:3])
322 for ; e.trigram() == trigram && trigram != 1<<24-1; e = h.next() {
323 out.writeUvarint(e.fileid() - fileid)
324 fileid = e.fileid()
325 nfile++
327 out.writeUvarint(0)
329 // index entry
330 ix.postIndex.write(ix.buf[:3])
331 ix.postIndex.writeUint32(nfile)
332 ix.postIndex.writeUint32(offset)
334 if trigram == 1<<24-1 {
335 break
340 // A postChunk represents a chunk of post entries flushed to disk or
341 // still in memory.
342 type postChunk struct {
343 e postEntry // next entry
344 m []postEntry // remaining entries after e
347 const postBuf = 4096
349 // A postHeap is a heap (priority queue) of postChunks.
350 type postHeap struct {
351 ch []*postChunk
354 func (h *postHeap) addFile(f *os.File) {
355 data := mmapFile(f).d
356 m := (*[npost]postEntry)(unsafe.Pointer(&data[0]))[:len(data)/8]
357 h.addMem(m)
360 func (h *postHeap) addMem(x []postEntry) {
361 h.add(&postChunk{m: x})
364 // step reads the next entry from ch and saves it in ch.e.
365 // It returns false if ch is over.
366 func (h *postHeap) step(ch *postChunk) bool {
367 old := ch.e
368 m := ch.m
369 if len(m) == 0 {
370 return false
372 ch.e = postEntry(m[0])
373 m = m[1:]
374 ch.m = m
375 if old >= ch.e {
376 panic("bad sort")
378 return true
381 // add adds the chunk to the postHeap.
382 // All adds must be called before the first call to next.
383 func (h *postHeap) add(ch *postChunk) {
384 if len(ch.m) > 0 {
385 ch.e = ch.m[0]
386 ch.m = ch.m[1:]
387 h.push(ch)
391 // empty reports whether the postHeap is empty.
392 func (h *postHeap) empty() bool {
393 return len(h.ch) == 0
396 // next returns the next entry from the postHeap.
397 // It returns a postEntry with trigram == 1<<24 - 1 if h is empty.
398 func (h *postHeap) next() postEntry {
399 if len(h.ch) == 0 {
400 return makePostEntry(1<<24-1, 0)
402 ch := h.ch[0]
403 e := ch.e
404 m := ch.m
405 if len(m) == 0 {
406 h.pop()
407 } else {
408 ch.e = m[0]
409 ch.m = m[1:]
410 h.siftDown(0)
412 return e
415 func (h *postHeap) pop() *postChunk {
416 ch := h.ch[0]
417 n := len(h.ch) - 1
418 h.ch[0] = h.ch[n]
419 h.ch = h.ch[:n]
420 if n > 1 {
421 h.siftDown(0)
423 return ch
426 func (h *postHeap) push(ch *postChunk) {
427 n := len(h.ch)
428 h.ch = append(h.ch, ch)
429 if len(h.ch) >= 2 {
430 h.siftUp(n)
434 func (h *postHeap) siftDown(i int) {
435 ch := h.ch
436 for {
437 j1 := 2*i + 1
438 if j1 >= len(ch) {
439 break
441 j := j1
442 if j2 := j1 + 1; j2 < len(ch) && ch[j1].e >= ch[j2].e {
443 j = j2
445 if ch[i].e < ch[j].e {
446 break
448 ch[i], ch[j] = ch[j], ch[i]
449 i = j
453 func (h *postHeap) siftUp(j int) {
454 ch := h.ch
455 for {
456 i := (j - 1) / 2
457 if i == j || ch[i].e < ch[j].e {
458 break
460 ch[i], ch[j] = ch[j], ch[i]
464 // A bufWriter is a convenience wrapper: a closeable bufio.Writer.
465 type bufWriter struct {
466 name string
467 file *os.File
468 buf []byte
469 tmp [8]byte
472 // bufCreate creates a new file with the given name and returns a
473 // corresponding bufWriter. If name is empty, bufCreate uses a
474 // temporary file.
475 func bufCreate(name string) *bufWriter {
476 var (
477 f *os.File
478 err error
480 if name != "" {
481 f, err = os.Create(name)
482 } else {
483 f, err = ioutil.TempFile("", "csearch")
485 if err != nil {
486 log.Fatal(err)
488 return &bufWriter{
489 name: f.Name(),
490 buf: make([]byte, 0, 256<<10),
491 file: f,
495 func (b *bufWriter) write(x []byte) {
496 n := cap(b.buf) - len(b.buf)
497 if len(x) > n {
498 b.flush()
499 if len(x) >= cap(b.buf) {
500 if _, err := b.file.Write(x); err != nil {
501 log.Fatalf("writing %s: %v", b.name, err)
503 return
506 b.buf = append(b.buf, x...)
509 func (b *bufWriter) writeByte(x byte) {
510 if len(b.buf) >= cap(b.buf) {
511 b.flush()
513 b.buf = append(b.buf, x)
516 func (b *bufWriter) writeString(s string) {
517 n := cap(b.buf) - len(b.buf)
518 if len(s) > n {
519 b.flush()
520 if len(s) >= cap(b.buf) {
521 if _, err := b.file.WriteString(s); err != nil {
522 log.Fatalf("writing %s: %v", b.name, err)
524 return
527 b.buf = append(b.buf, s...)
530 // offset returns the current write offset.
531 func (b *bufWriter) offset() uint32 {
532 off, _ := b.file.Seek(0, 1)
533 off += int64(len(b.buf))
534 if int64(uint32(off)) != off {
535 log.Fatalf("index is larger than 4GB")
537 return uint32(off)
540 func (b *bufWriter) flush() {
541 if len(b.buf) == 0 {
542 return
544 _, err := b.file.Write(b.buf)
545 if err != nil {
546 log.Fatalf("writing %s: %v", b.name, err)
548 b.buf = b.buf[:0]
551 // finish flushes the file to disk and returns an open file ready for reading.
552 func (b *bufWriter) finish() *os.File {
553 b.flush()
554 f := b.file
555 f.Seek(0, 0)
556 return f
559 func (b *bufWriter) writeTrigram(t uint32) {
560 if cap(b.buf)-len(b.buf) < 3 {
561 b.flush()
563 b.buf = append(b.buf, byte(t>>16), byte(t>>8), byte(t))
566 func (b *bufWriter) writeUint32(x uint32) {
567 if cap(b.buf)-len(b.buf) < 4 {
568 b.flush()
570 b.buf = append(b.buf, byte(x>>24), byte(x>>16), byte(x>>8), byte(x))
573 func (b *bufWriter) writeUvarint(x uint32) {
574 if cap(b.buf)-len(b.buf) < 5 {
575 b.flush()
577 switch {
578 case x < 1<<7:
579 b.buf = append(b.buf, byte(x))
580 case x < 1<<14:
581 b.buf = append(b.buf, byte(x|0x80), byte(x>>7))
582 case x < 1<<21:
583 b.buf = append(b.buf, byte(x|0x80), byte(x>>7|0x80), byte(x>>14))
584 case x < 1<<28:
585 b.buf = append(b.buf, byte(x|0x80), byte(x>>7|0x80), byte(x>>14|0x80), byte(x>>21))
586 default:
587 b.buf = append(b.buf, byte(x|0x80), byte(x>>7|0x80), byte(x>>14|0x80), byte(x>>21|0x80), byte(x>>28))
591 // validUTF8 reports whether the byte pair can appear in a
592 // valid sequence of UTF-8-encoded code points.
593 func validUTF8(c1, c2 uint32) bool {
594 switch {
595 case c1 < 0x80:
596 // 1-byte, must be followed by 1-byte or first of multi-byte
597 return c2 < 0x80 || 0xc0 <= c2 && c2 < 0xf8
598 case c1 < 0xc0:
599 // continuation byte, can be followed by nearly anything
600 return c2 < 0xf8
601 case c1 < 0xf8:
602 // first of multi-byte, must be followed by continuation byte
603 return 0x80 <= c2 && c2 < 0xc0
605 return false
608 func (ix *IndexWriter) sortPost(post []postEntry) {
609 if len(post) > len(ix.sortTmp) {
610 ix.sortTmp = make([]postEntry, len(post))
612 tmp := ix.sortTmp[:len(post)]
614 const k = sortK
615 for i := range ix.sortN {
616 ix.sortN[i] = 0
618 for _, p := range post {
619 r := uintptr(p>>32) & (1<<k - 1)
620 ix.sortN[r]++
622 tot := 0
623 for i, count := range ix.sortN {
624 ix.sortN[i] = tot
625 tot += count
627 for _, p := range post {
628 r := uintptr(p>>32) & (1<<k - 1)
629 o := ix.sortN[r]
630 ix.sortN[r]++
631 tmp[o] = p
633 tmp, post = post, tmp
635 for i := range ix.sortN {
636 ix.sortN[i] = 0
638 for _, p := range post {
639 r := uintptr(p>>(32+k)) & (1<<k - 1)
640 ix.sortN[r]++
642 tot = 0
643 for i, count := range ix.sortN {
644 ix.sortN[i] = tot
645 tot += count
647 for _, p := range post {
648 r := uintptr(p>>(32+k)) & (1<<k - 1)
649 o := ix.sortN[r]
650 ix.sortN[r]++
651 tmp[o] = p