avoid unnecessary struct when implementing countingWriter (Thanks Mero)
[debiancodesearch.git] / cmd / dcs-web / querymanager.go
blob10d84ba7640be289b6f111987f9c05c972fd0ec2
1 package main
3 import (
4 "bufio"
5 "bytes"
6 "encoding/json"
7 "flag"
8 "fmt"
9 "github.com/Debian/dcs/cmd/dcs-web/common"
10 "github.com/Debian/dcs/cmd/dcs-web/search"
11 "github.com/Debian/dcs/dpkgversion"
12 "github.com/Debian/dcs/proto"
13 "github.com/Debian/dcs/stringpool"
14 "github.com/Debian/dcs/varz"
15 "github.com/influxdb/influxdb-go"
16 "hash/fnv"
17 "io"
18 "log"
19 "math"
20 "net"
21 "net/http"
22 "net/url"
23 "os"
24 "path/filepath"
25 "regexp"
26 "sort"
27 "strconv"
28 "strings"
29 "sync"
30 "syscall"
31 "time"
33 capn "github.com/glycerine/go-capnproto"
36 var (
37 queryResultsPath = flag.String("query_results_path",
38 "/tmp/qr/",
39 "Path where query results files (page_0.json etc.) are stored")
40 influxDBHost = flag.String("influx_db_host",
41 "",
42 "host:port of the InfluxDB to store time series in")
43 influxDBDatabase = flag.String("influx_db_database",
44 "dcs",
45 "InfluxDB database name")
46 influxDBUsername = flag.String("influx_db_username",
47 "root",
48 "InfluxDB username")
49 influxDBPassword = flag.String("influx_db_password",
50 "root",
51 "InfluxDB password")
53 perPackagePathRe = regexp.MustCompile(`^/perpackage-results/([^/]+)/` +
54 strconv.Itoa(resultsPerPackage) + `/page_([0-9]+).json$`)
57 const (
58 // NB: All of these constants needs to match those in static/instant.js.
59 packagesPerPage = 5
60 resultsPerPackage = 2
61 resultsPerPage = 10
64 type Error struct {
65 // This is set to “error” to distinguish the message type on the client.
66 Type string
68 // Currently only “backendunavailable”
69 ErrorType string
72 type ProgressUpdate struct {
73 Type string
74 QueryId string
75 FilesProcessed int
76 FilesTotal int
77 Results int
80 func (p *ProgressUpdate) EventType() string {
81 return p.Type
84 func (p *ProgressUpdate) ObsoletedBy(newEvent *obsoletableEvent) bool {
85 return (*newEvent).EventType() == p.Type
88 type ByRanking []proto.Match
90 func (s ByRanking) Len() int {
91 return len(s)
94 func (s ByRanking) Less(i, j int) bool {
95 if s[i].Ranking() == s[j].Ranking() {
96 // On a tie, we use the path to make the order of results stable over
97 // multiple queries (which can have different results depending on
98 // which index backend reacts quicker).
99 return s[i].Path() > s[j].Path()
101 return s[i].Ranking() > s[j].Ranking()
104 func (s ByRanking) Swap(i, j int) {
105 s[i], s[j] = s[j], s[i]
108 type resultPointer struct {
109 backendidx int
110 ranking float32
111 offset int64
113 // Used as a tie-breaker when sorting by ranking to guarantee stable
114 // results, independent of the order in which the results are returned from
115 // source backends.
116 pathHash uint64
118 // Used for per-package results. Points into a stringpool.StringPool
119 packageName *string
122 type pointerByRanking []resultPointer
124 func (s pointerByRanking) Len() int {
125 return len(s)
128 func (s pointerByRanking) Less(i, j int) bool {
129 if s[i].ranking == s[j].ranking {
130 return s[i].pathHash > s[j].pathHash
132 return s[i].ranking > s[j].ranking
135 func (s pointerByRanking) Swap(i, j int) {
136 s[i], s[j] = s[j], s[i]
139 type perBackendState struct {
140 // One file per backend, containing JSON-serialized results. When writing,
141 // we keep the offsets, so that we can later sort the pointers and write
142 // the resulting files.
143 tempFile *os.File
144 tempFileWriter *bufio.Writer
145 tempFileOffset int64
146 packagePool *stringpool.StringPool
147 resultPointers []resultPointer
148 allPackages map[string]bool
151 type queryState struct {
152 started time.Time
153 ended time.Time
154 events []event
155 newEvent *sync.Cond
156 done bool
157 query string
159 results [10]resultPointer
161 filesTotal []int
162 filesProcessed []int
163 filesMu *sync.Mutex
165 resultPages int
167 // This guards concurrent access to any perBackend[].tempFile.
168 tempFilesMu *sync.Mutex
169 perBackend []*perBackendState
171 resultPointers []resultPointer
172 resultPointersByPkg map[string][]resultPointer
174 allPackagesSorted []string
176 FirstPathRank float32
179 func (qs *queryState) numResults() int {
180 var result int
181 for _, bstate := range qs.perBackend {
182 result += len(bstate.resultPointers)
184 return result
187 var (
188 state = make(map[string]queryState)
189 stateMu sync.Mutex
192 func queryBackend(queryid string, backend string, backendidx int, sourceQuery []byte) {
193 // When exiting this function, check that all results were processed. If
194 // not, the backend query must have failed for some reason. Send a progress
195 // update to prevent the query from running forever.
196 defer func() {
197 filesTotal := state[queryid].filesTotal[backendidx]
199 if state[queryid].filesProcessed[backendidx] == filesTotal {
200 return
203 if filesTotal == -1 {
204 filesTotal = 0
207 seg := capn.NewBuffer(nil)
208 p := proto.NewProgressUpdate(seg)
209 p.SetFilesprocessed(uint64(filesTotal))
210 p.SetFilestotal(uint64(filesTotal))
211 storeProgress(queryid, backendidx, p)
213 addEventMarshal(queryid, &Error{
214 Type: "error",
215 ErrorType: "backendunavailable",
219 // TODO: switch in the config
220 log.Printf("[%s] [src:%s] connecting...\n", queryid, backend)
221 conn, err := net.DialTimeout("tcp", strings.Replace(backend, "28082", "26082", -1), 5*time.Second)
222 if err != nil {
223 log.Printf("[%s] [src:%s] Connection failed: %v\n", queryid, backend, err)
224 return
226 defer conn.Close()
227 if _, err := conn.Write(sourceQuery); err != nil {
228 log.Printf("[%s] [src:%s] could not send query: %v\n", queryid, backend, err)
229 return
232 bufferedReader := bufio.NewReaderSize(conn, 65536)
233 bstate := state[queryid].perBackend[backendidx]
234 tempFileWriter := bstate.tempFileWriter
235 var capnbuf bytes.Buffer
236 var written countingWriter
238 for !state[queryid].done {
239 conn.SetReadDeadline(time.Now().Add(10 * time.Second))
241 written = 0
242 tee := io.TeeReader(bufferedReader, io.MultiWriter(
243 tempFileWriter, &written))
245 seg, err := capn.ReadFromPackedStream(tee, &capnbuf)
246 if err != nil {
247 if err == io.EOF {
248 log.Printf("[%s] [src:%s] EOF\n", queryid, backend)
249 return
250 } else {
251 log.Printf("[%s] [src:%s] Error decoding result stream: %v\n", queryid, backend, err)
252 return
256 z := proto.ReadRootZ(seg)
257 if z.Which() == proto.Z_PROGRESSUPDATE {
258 storeProgress(queryid, backendidx, z.Progressupdate())
259 } else {
260 storeResult(queryid, backendidx, z.Match())
263 bstate.tempFileOffset += int64(written)
265 log.Printf("[%s] [src:%s] query done, disconnecting\n", queryid, backend)
268 func maybeStartQuery(queryid, src, query string) bool {
269 stateMu.Lock()
270 defer stateMu.Unlock()
271 querystate, running := state[queryid]
272 // XXX: Starting a new query while there may still be clients reading that
273 // query is not a great idea. Best fix may be to make getEvent() use a
274 // querystate instead of the string identifier.
275 if !running || time.Since(querystate.started) > 30*time.Minute {
276 // See if we can garbage-collect old queries.
277 if !running && len(state) >= 10 {
278 log.Printf("Trying to garbage collect queries (currently %d)\n", len(state))
279 for queryid, s := range state {
280 if len(state) < 10 {
281 break
283 if !s.done {
284 continue
286 for _, state := range s.perBackend {
287 state.tempFile.Close()
289 delete(state, queryid)
291 log.Printf("Garbage collection done. %d queries remaining", len(state))
293 backends := strings.Split(*common.SourceBackends, ",")
294 state[queryid] = queryState{
295 started: time.Now(),
296 query: query,
297 newEvent: sync.NewCond(&sync.Mutex{}),
298 filesTotal: make([]int, len(backends)),
299 filesProcessed: make([]int, len(backends)),
300 filesMu: &sync.Mutex{},
301 perBackend: make([]*perBackendState, len(backends)),
302 tempFilesMu: &sync.Mutex{},
305 varz.Increment("active-queries")
307 var err error
308 dir := filepath.Join(*queryResultsPath, queryid)
309 if err := os.MkdirAll(dir, os.FileMode(0755)); err != nil {
310 log.Printf("[%s] could not create %q: %v\n", queryid, dir, err)
311 failQuery(queryid)
312 return false
315 // TODO: it’d be so much better if we would correctly handle ESPACE errors
316 // in the code below (and above), but for that we need to carefully test it.
317 ensureEnoughSpaceAvailable()
319 for i := 0; i < len(backends); i++ {
320 state[queryid].filesTotal[i] = -1
321 path := filepath.Join(dir, fmt.Sprintf("unsorted_%d.capnproto", i))
322 f, err := os.Create(path)
323 if err != nil {
324 log.Printf("[%s] could not create %q: %v\n", queryid, path, err)
325 failQuery(queryid)
326 return false
328 state[queryid].perBackend[i] = &perBackendState{
329 packagePool: stringpool.NewStringPool(),
330 tempFile: f,
331 tempFileWriter: bufio.NewWriterSize(f, 65536),
332 allPackages: make(map[string]bool),
335 log.Printf("initial results = %v\n", state[queryid])
337 // Rewrite the query into a query for source backends.
338 fakeUrl, err := url.Parse("?" + query)
339 if err != nil {
340 log.Fatal(err)
342 rewritten := search.RewriteQuery(*fakeUrl)
343 type streamingRequest struct {
344 Query string
345 URL string
347 request := streamingRequest{
348 Query: rewritten.Query().Get("q"),
349 URL: rewritten.String(),
351 log.Printf("[%s] querying for %q\n", queryid, request.Query)
352 sourceQuery, err := json.Marshal(&request)
353 if err != nil {
354 log.Fatal(err)
357 for idx, backend := range backends {
358 go queryBackend(queryid, backend, idx, sourceQuery)
360 return false
363 return true
366 type queryStats struct {
367 Searchterm string
368 QueryId string
369 NumEvents int
370 NumResults int
371 NumResultPages int
372 NumPackages int
373 Done bool
374 Started time.Time
375 Ended time.Time
376 StartedFromNow time.Duration
377 Duration time.Duration
378 FilesTotal []int
379 FilesProcessed []int
382 type byStarted []queryStats
384 func (s byStarted) Len() int {
385 return len(s)
388 func (s byStarted) Less(i, j int) bool {
389 return s[i].Started.After(s[j].Started)
392 func (s byStarted) Swap(i, j int) {
393 s[i], s[j] = s[j], s[i]
396 func QueryzHandler(w http.ResponseWriter, r *http.Request) {
397 r.ParseForm()
398 if cancel := r.PostFormValue("cancel"); cancel != "" {
399 addEventMarshal(cancel, &Error{
400 Type: "error",
401 ErrorType: "cancelled",
403 finishQuery(cancel)
404 http.Redirect(w, r, "/queryz", http.StatusFound)
405 return
408 stateMu.Lock()
409 stats := make([]queryStats, len(state))
410 idx := 0
411 for queryid, s := range state {
412 stats[idx] = queryStats{
413 Searchterm: s.query,
414 QueryId: queryid,
415 NumEvents: len(s.events),
416 Done: s.done,
417 Started: s.started,
418 Ended: s.ended,
419 StartedFromNow: time.Since(s.started),
420 Duration: s.ended.Sub(s.started),
421 NumResults: s.numResults(),
422 NumResultPages: s.resultPages,
423 FilesTotal: s.filesTotal,
424 FilesProcessed: s.filesProcessed,
426 if stats[idx].NumResults == 0 && stats[idx].Done {
427 stats[idx].NumResults = s.numResults()
429 idx++
431 stateMu.Unlock()
433 sort.Sort(byStarted(stats))
435 if err := common.Templates.ExecuteTemplate(w, "queryz.html", map[string]interface{}{
436 "queries": stats,
437 }); err != nil {
438 http.Error(w, err.Error(), http.StatusInternalServerError)
439 return
443 // Caller needs to hold s.clientsMu
444 func sendPaginationUpdate(queryid string, s queryState) {
445 type Pagination struct {
446 // Set to “pagination”.
447 Type string
448 QueryId string
449 ResultPages int
452 if s.resultPages > 0 {
453 addEventMarshal(queryid, &Pagination{
454 Type: "pagination",
455 QueryId: queryid,
456 ResultPages: s.resultPages,
461 // countingWriter implements io.Writer, and increments *written with the amount
462 // of data written on each call. Handy in an io.MultiWriter
463 type countingWriter int64
465 func (c *countingWriter) Write(p []byte) (n int, err error) {
466 *c += countingWriter(len(p))
467 return len(p), nil
470 func storeResult(queryid string, backendidx int, result proto.Match) {
471 // Without acquiring a lock, just check if we need to consider this result
472 // for the top 10 at all.
473 s := state[queryid]
475 if s.FirstPathRank > 0 {
476 // Now store the combined ranking of PathRanking (pre) and Ranking (post).
477 // We add the values because they are both percentages.
478 // To make the Ranking (post) less significant, we multiply it with
479 // 1/10 * FirstPathRank. We used to use maxPathRanking here, but
480 // requiring that means delaying the search until all results are
481 // there. Instead, FirstPathRank is a good enough approximation (but
482 // different enough for each query that we can’t hardcode it).
483 result.SetRanking(result.Pathrank() + ((s.FirstPathRank * 0.1) * result.Ranking()))
484 } else {
485 // This code path (and lock acquisition) gets executed only on the
486 // first result.
487 stateMu.Lock()
488 s = state[queryid]
489 s.FirstPathRank = result.Pathrank()
490 state[queryid] = s
491 stateMu.Unlock()
494 h := fnv.New64()
495 io.WriteString(h, result.Path())
497 if result.Ranking() > s.results[9].ranking {
498 stateMu.Lock()
499 s = state[queryid]
500 if result.Ranking() <= s.results[9].ranking {
501 stateMu.Unlock()
502 } else {
503 // TODO: find the first s.result[] for the same package. then check again if the result is worthy of replacing that per-package result
504 // TODO: probably change the data structure so that we can do this more easily and also keep N results per package.
506 combined := append(s.results[:], resultPointer{
507 ranking: result.Ranking(),
508 pathHash: h.Sum64(),
510 sort.Sort(pointerByRanking(combined))
511 copy(s.results[:], combined[:10])
512 state[queryid] = s
513 stateMu.Unlock()
515 // The result entered the top 10, so send it to the client(s) for
516 // immediate display.
517 // TODO: make this satisfy obsoletableEvent in order to skip
518 // sending results to the client which are then overwritten by
519 // better top10 results.
520 bytes, err := result.MarshalJSON()
521 if err != nil {
522 log.Fatal("Could not marshal result as JSON: %v\n", err)
524 addEvent(queryid, bytes, &result)
528 bstate := s.perBackend[backendidx]
529 bstate.resultPointers = append(bstate.resultPointers, resultPointer{
530 backendidx: backendidx,
531 ranking: result.Ranking(),
532 offset: bstate.tempFileOffset,
533 pathHash: h.Sum64(),
534 packageName: bstate.packagePool.Get(result.Package())})
535 bstate.allPackages[result.Package()] = true
538 func failQuery(queryid string) {
539 varz.Increment("failed-queries")
540 addEventMarshal(queryid, &Error{
541 Type: "error",
542 ErrorType: "failed",
544 finishQuery(queryid)
547 func finishQuery(queryid string) {
548 log.Printf("[%s] done (in %v), closing all client channels.\n", queryid, time.Since(state[queryid].started))
549 addEvent(queryid, []byte{}, nil)
551 if *influxDBHost != "" {
552 go func() {
553 db, err := influxdb.NewClient(&influxdb.ClientConfig{
554 Host: *influxDBHost,
555 Database: *influxDBDatabase,
556 Username: *influxDBUsername,
557 Password: *influxDBPassword,
559 if err != nil {
560 log.Printf("Cannot log query-finished timeseries: %v\n", err)
561 return
564 var seriesBatch []*influxdb.Series
565 s := state[queryid]
566 series := influxdb.Series{
567 Name: "query-finished.int-dcsi-web",
568 Columns: []string{"queryid", "searchterm", "milliseconds", "results"},
569 Points: [][]interface{}{
570 []interface{}{
571 queryid,
572 s.query,
573 time.Since(s.started) / time.Millisecond,
574 s.numResults(),
578 seriesBatch = append(seriesBatch, &series)
580 if err := db.WriteSeries(seriesBatch); err != nil {
581 log.Printf("Cannot log query-finished timeseries: %v\n", err)
582 return
588 type ByModTime []os.FileInfo
590 func (s ByModTime) Len() int {
591 return len(s)
594 func (s ByModTime) Less(i, j int) bool {
595 return s[i].ModTime().Before(s[j].ModTime())
598 func (s ByModTime) Swap(i, j int) {
599 s[i], s[j] = s[j], s[i]
602 func fsBytes(path string) (available uint64, total uint64) {
603 var stat syscall.Statfs_t
604 if err := syscall.Statfs(path, &stat); err != nil {
605 log.Fatal("Could not stat filesystem for %q: %v\n", path, err)
607 log.Printf("Available bytes on %q: %d\n", path, stat.Bavail*uint64(stat.Bsize))
608 available = stat.Bavail * uint64(stat.Bsize)
609 total = stat.Blocks * uint64(stat.Bsize)
610 return
613 // Makes sure 20% of the filesystem backing -query_results_path are available,
614 // cleans up old query results otherwise.
615 func ensureEnoughSpaceAvailable() {
616 available, total := fsBytes(*queryResultsPath)
617 headroom := uint64(0.2 * float64(total))
618 log.Printf("%d bytes available, %d bytes headroom required (20%%)\n", available, headroom)
619 if available >= headroom {
620 return
623 log.Printf("Deleting an old query...\n")
624 dir, err := os.Open(*queryResultsPath)
625 if err != nil {
626 log.Fatal(err)
628 defer dir.Close()
629 infos, err := dir.Readdir(-1)
630 if err != nil {
631 log.Fatal(err)
633 sort.Sort(ByModTime(infos))
634 for _, info := range infos {
635 if !info.IsDir() {
636 continue
638 log.Printf("Removing query results for %q to make enough space\n", info.Name())
639 if err := os.RemoveAll(filepath.Join(*queryResultsPath, info.Name())); err != nil {
640 log.Fatal(err)
642 available, _ = fsBytes(*queryResultsPath)
643 if available >= headroom {
644 break
649 func writeFromPointers(queryid string, f io.Writer, pointers []resultPointer) error {
650 var capnbuf bytes.Buffer
651 firstPathRank := state[queryid].FirstPathRank
653 state[queryid].tempFilesMu.Lock()
654 defer state[queryid].tempFilesMu.Unlock()
656 if _, err := f.Write([]byte("[")); err != nil {
657 return err
659 for idx, pointer := range pointers {
660 src := state[queryid].perBackend[pointer.backendidx].tempFile
661 if _, err := src.Seek(pointer.offset, os.SEEK_SET); err != nil {
662 return err
664 if idx > 0 {
665 if _, err := f.Write([]byte(",")); err != nil {
666 return err
669 seg, err := capn.ReadFromPackedStream(src, &capnbuf)
670 if err != nil {
671 return err
673 z := proto.ReadRootZ(seg)
674 if z.Which() != proto.Z_MATCH {
675 return fmt.Errorf("Expected to find a proto.Z_MATCH, instead got %d", z.Which())
677 result := z.Match()
678 // We need to fix the ranking here because we persist raw results from
679 // the dcs-source-backend in queryBackend(), but then modify the
680 // ranking in storeResult().
681 result.SetRanking(result.Pathrank() + ((firstPathRank * 0.1) * result.Ranking()))
682 if err := result.WriteJSON(f); err != nil {
683 return err
686 if _, err := f.Write([]byte("]\n")); err != nil {
687 return err
689 return nil
692 func writeToDisk(queryid string) error {
693 // Get the slice with results and unset it on the state so that processing can continue.
694 stateMu.Lock()
695 s := state[queryid]
696 pointers := make([]resultPointer, 0, s.numResults())
697 for _, bstate := range s.perBackend {
698 pointers = append(pointers, bstate.resultPointers...)
699 bstate.tempFileWriter.Flush()
701 if len(pointers) == 0 {
702 log.Printf("[%s] not writing, no results.\n", queryid)
703 stateMu.Unlock()
704 return nil
706 idx := 0
708 // For each full package (i3-wm_4.8-1), store only the newest version.
709 packageVersions := make(map[string]dpkgversion.Version)
710 for _, bstate := range s.perBackend {
711 for pkg, _ := range bstate.allPackages {
712 underscore := strings.Index(pkg, "_")
713 name := pkg[:underscore]
714 version, err := dpkgversion.Parse(pkg[underscore+1:])
715 if err != nil {
716 log.Printf("[%s] parsing version %q failed: %v\n", queryid, pkg[underscore+1:], err)
717 continue
720 if bestversion, ok := packageVersions[name]; ok {
721 if dpkgversion.Compare(version, bestversion) > 0 {
722 packageVersions[name] = version
724 } else {
725 packageVersions[name] = version
730 packages := make([]string, len(packageVersions))
731 for pkg, _ := range packageVersions {
732 packages[idx] = pkg
733 idx++
735 // TODO: sort by ranking as soon as we store the best ranking with each package. (at the moment it’s first result, first stored)
736 s.allPackagesSorted = packages
737 state[queryid] = s
738 stateMu.Unlock()
740 log.Printf("[%s] sorting, %d results, %d packages.\n", queryid, len(pointers), len(packages))
741 pointerSortingStarted := time.Now()
742 sort.Sort(pointerByRanking(pointers))
743 log.Printf("[%s] pointer sorting done (%v).\n", queryid, time.Since(pointerSortingStarted))
745 // TODO: it’d be so much better if we would correctly handle ESPACE errors
746 // in the code below (and above), but for that we need to carefully test it.
747 ensureEnoughSpaceAvailable()
749 pages := int(math.Ceil(float64(len(pointers)) / float64(resultsPerPage)))
751 // Now save the results into their package-specific files.
752 byPkgSortingStarted := time.Now()
753 bypkg := make(map[string][]resultPointer)
754 for _, pointer := range pointers {
755 pkg := *pointer.packageName
756 underscore := strings.Index(pkg, "_")
757 name := pkg[:underscore]
758 // Skip this result if it’s not in the newest version of the package.
759 if packageVersions[name].String() != pkg[underscore+1:] {
760 continue
762 pkgresults := bypkg[name]
763 if len(pkgresults) >= resultsPerPackage {
764 continue
766 pkgresults = append(pkgresults, pointer)
767 bypkg[name] = pkgresults
769 log.Printf("[%s] by-pkg sorting done (%v).\n", queryid, time.Since(byPkgSortingStarted))
771 stateMu.Lock()
772 s = state[queryid]
773 s.resultPointers = pointers
774 s.resultPointersByPkg = bypkg
775 s.resultPages = pages
776 state[queryid] = s
777 stateMu.Unlock()
779 sendPaginationUpdate(queryid, s)
780 return nil
783 func storeProgress(queryid string, backendidx int, progress proto.ProgressUpdate) {
784 backends := strings.Split(*common.SourceBackends, ",")
785 s := state[queryid]
786 s.filesMu.Lock()
787 s.filesTotal[backendidx] = int(progress.Filestotal())
788 s.filesProcessed[backendidx] = int(progress.Filesprocessed())
789 s.filesMu.Unlock()
790 allSet := true
791 for i := 0; i < len(backends); i++ {
792 if s.filesTotal[i] == -1 {
793 log.Printf("total number for backend %d missing\n", i)
794 allSet = false
795 break
799 filesProcessed := 0
800 for _, processed := range s.filesProcessed {
801 filesProcessed += processed
803 filesTotal := 0
804 for _, total := range s.filesTotal {
805 filesTotal += total
808 if allSet && filesProcessed == filesTotal {
809 log.Printf("[%s] [src:%d] query done on all backends, writing to disk.\n", queryid, backendidx)
810 if err := writeToDisk(queryid); err != nil {
811 log.Printf("[%s] writeToDisk() failed: %v\n", queryid)
812 failQuery(queryid)
816 if allSet {
817 log.Printf("[%s] [src:%d] (sending) progress: %d of %d\n", queryid, backendidx, progress.Filesprocessed(), progress.Filestotal())
818 addEventMarshal(queryid, &ProgressUpdate{
819 Type: "progress",
820 QueryId: queryid,
821 FilesProcessed: filesProcessed,
822 FilesTotal: filesTotal,
823 Results: s.numResults(),
825 if filesProcessed == filesTotal {
826 finishQuery(queryid)
828 } else {
829 log.Printf("[%s] [src:%d] progress: %d of %d\n", queryid, backendidx, progress.Filesprocessed(), progress.Filestotal())
833 func PerPackageResultsHandler(w http.ResponseWriter, r *http.Request) {
834 matches := perPackagePathRe.FindStringSubmatch(r.URL.Path)
835 if matches == nil || len(matches) != 3 {
836 // TODO: what about non-js clients?
837 // While this just serves index.html, the javascript part of index.html
838 // realizes the path starts with /perpackage-results/ and starts the
839 // search, then requests the specified page on search completion.
840 http.ServeFile(w, r, filepath.Join(*staticPath, "index.html"))
841 return
844 queryid := matches[1]
845 pagenr, err := strconv.Atoi(matches[2])
846 if err != nil {
847 log.Fatal("Could not convert %q into a number: %v\n", matches[2], err)
849 s, ok := state[queryid]
850 if !ok {
851 http.Error(w, "No such query.", http.StatusNotFound)
852 return
854 if !s.done {
855 started := time.Now()
856 for time.Since(started) < 60*time.Second {
857 if state[queryid].done {
858 s = state[queryid]
859 break
861 time.Sleep(100 * time.Millisecond)
863 if !s.done {
864 log.Printf("[%s] query not yet finished, cannot produce per-package results\n", queryid)
865 http.Error(w, "Query not finished yet.", http.StatusInternalServerError)
866 return
870 // For compatibility with old versions, we serve the files that are
871 // directly served by nginx as well by now.
872 // This can be removed after 2015-06-01, when all old clients should be
873 // long expired from any caches.
874 name := filepath.Join(*queryResultsPath, queryid, fmt.Sprintf("perpackage_2_page_%d.json", pagenr))
875 http.ServeFile(w, r, name)