describe -query_results_path
[debiancodesearch.git] / cmd / dcs-web / querymanager.go
bloba10be82d9bffd1a3317f6ce64578e5956a680d70
1 package main
3 import (
4 "bytes"
5 "encoding/json"
6 "flag"
7 "fmt"
8 "github.com/Debian/dcs/cmd/dcs-web/common"
9 "github.com/Debian/dcs/cmd/dcs-web/search"
10 dcsregexp "github.com/Debian/dcs/regexp"
11 "github.com/Debian/dcs/stringpool"
12 "github.com/Debian/dcs/varz"
13 "github.com/influxdb/influxdb-go"
14 "hash/fnv"
15 "io"
16 "log"
17 "math"
18 "net"
19 "net/http"
20 "net/url"
21 "os"
22 "path/filepath"
23 "regexp"
24 "sort"
25 "strconv"
26 "strings"
27 "sync"
28 "syscall"
29 "time"
32 var (
33 queryResultsPath = flag.String("query_results_path",
34 "/tmp/qr/",
35 "Path where query results files (page_0.json etc.) are stored")
36 influxDBHost = flag.String("influx_db_host",
37 "",
38 "host:port of the InfluxDB to store time series in")
39 influxDBDatabase = flag.String("influx_db_database",
40 "dcs",
41 "InfluxDB database name")
42 influxDBUsername = flag.String("influx_db_username",
43 "root",
44 "InfluxDB username")
45 influxDBPassword = flag.String("influx_db_password",
46 "root",
47 "InfluxDB password")
49 perPackagePathRe = regexp.MustCompile(`^/perpackage-results/([^/]+)/` +
50 strconv.Itoa(resultsPerPackage) + `/page_([0-9]+).json$`)
53 const (
54 // NB: All of these constants needs to match those in static/instant.js.
55 packagesPerPage = 5
56 resultsPerPackage = 2
59 // TODO: make this type satisfy obsoletableEvent
60 // TODO: get rid of this type — replace all occurences with a more specific
61 // version, e.g. Error, ProgressUpdate. Then, strip all fields except “Type”
62 // and make those use Result as an anonymous struct.
63 type Result struct {
64 // This is set to “result” to distinguish the message type on the client.
65 // Additionally, it serves as an indicator for whether the result is
66 // initialized or whether this is the nil value.
67 Type string
69 dcsregexp.Match
71 Package string
73 FilesProcessed int
74 FilesTotal int
77 type Error struct {
78 // This is set to “error” to distinguish the message type on the client.
79 Type string
81 // Currently only “backendunavailable”
82 ErrorType string
85 type ProgressUpdate struct {
86 Type string
87 QueryId string
88 FilesProcessed int
89 FilesTotal int
90 Results int
93 func (p *ProgressUpdate) EventType() string {
94 return p.Type
97 func (p *ProgressUpdate) ObsoletedBy(newEvent *obsoletableEvent) bool {
98 return (*newEvent).EventType() == p.Type
101 type ByRanking []Result
103 func (s ByRanking) Len() int {
104 return len(s)
107 func (s ByRanking) Less(i, j int) bool {
108 if s[i].Ranking == s[j].Ranking {
109 // On a tie, we use the path to make the order of results stable over
110 // multiple queries (which can have different results depending on
111 // which index backend reacts quicker).
112 return s[i].Path > s[j].Path
114 return s[i].Ranking > s[j].Ranking
117 func (s ByRanking) Swap(i, j int) {
118 s[i], s[j] = s[j], s[i]
121 type resultPointer struct {
122 backendidx int
123 ranking float32
124 offset int64
125 length int64
127 // Used as a tie-breaker when sorting by ranking to guarantee stable
128 // results, independent of the order in which the results are returned from
129 // source backends.
130 pathHash uint64
132 // Used for per-package results. Points into a stringpool.StringPool
133 packageName *string
136 type pointerByRanking []resultPointer
138 func (s pointerByRanking) Len() int {
139 return len(s)
142 func (s pointerByRanking) Less(i, j int) bool {
143 if s[i].ranking == s[j].ranking {
144 return s[i].pathHash > s[j].pathHash
146 return s[i].ranking > s[j].ranking
149 func (s pointerByRanking) Swap(i, j int) {
150 s[i], s[j] = s[j], s[i]
153 type queryState struct {
154 started time.Time
155 events []event
156 newEvent *sync.Cond
157 done bool
158 query string
160 results [10]Result
161 resultMu *sync.Mutex
163 filesTotal []int
164 filesProcessed []int
165 filesMu *sync.Mutex
167 resultPages int
168 numResults int
170 // One file per backend, containing JSON-serialized results. When writing,
171 // we keep the offsets, so that we can later sort the pointers and write
172 // the resulting files.
173 tempFiles []*os.File
174 packagePool *stringpool.StringPool
175 resultPointers []resultPointer
177 allPackages map[string]bool
178 allPackagesSorted []string
179 allPackagesMu *sync.Mutex
181 FirstPathRank float32
184 var (
185 state = make(map[string]queryState)
186 stateMu sync.Mutex
189 func queryBackend(queryid string, backend string, backendidx int, sourceQuery []byte) {
190 // When exiting this function, check that all results were processed. If
191 // not, the backend query must have failed for some reason. Send a progress
192 // update to prevent the query from running forever.
193 defer func() {
194 filesTotal := state[queryid].filesTotal[backendidx]
196 if state[queryid].filesProcessed[backendidx] == filesTotal {
197 return
200 if filesTotal == -1 {
201 filesTotal = 0
204 // TODO: use a more specific type (progressupdate)
205 storeProgress(queryid, backendidx, Result{
206 Type: "progress",
207 FilesProcessed: filesTotal,
208 FilesTotal: filesTotal})
210 addEventMarshal(queryid, &Error{
211 Type: "error",
212 ErrorType: "backendunavailable",
216 // TODO: switch in the config
217 log.Printf("[%s] [src:%s] connecting...\n", queryid, backend)
218 conn, err := net.DialTimeout("tcp", strings.Replace(backend, "28082", "26082", -1), 5*time.Second)
219 if err != nil {
220 log.Printf("[%s] [src:%s] Connection failed: %v\n", queryid, backend, err)
221 return
223 defer conn.Close()
224 if _, err := conn.Write(sourceQuery); err != nil {
225 log.Printf("[%s] [src:%s] could not send query: %v\n", queryid, backend, err)
226 return
228 decoder := json.NewDecoder(conn)
229 r := Result{Type: "result"}
230 for !state[queryid].done {
231 conn.SetReadDeadline(time.Now().Add(10 * time.Second))
232 if err := decoder.Decode(&r); err != nil {
233 if err == io.EOF {
234 return
235 } else {
236 log.Printf("[%s] [src:%s] Error decoding result stream: %v\n", queryid, backend, err)
237 return
240 if r.Type == "result" {
241 storeResult(queryid, backendidx, r)
242 } else if r.Type == "progress" {
243 storeProgress(queryid, backendidx, r)
245 // The source backend sends back results without type, so the default is “result”.
246 r.Type = "result"
250 func maybeStartQuery(queryid, src, query string) bool {
251 stateMu.Lock()
252 defer stateMu.Unlock()
253 querystate, running := state[queryid]
254 // XXX: Starting a new query while there may still be clients reading that
255 // query is not a great idea. Best fix may be to make getEvent() use a
256 // querystate instead of the string identifier.
257 if !running || time.Since(querystate.started) > 30*time.Minute {
258 // See if we can garbage-collect old queries.
259 if !running && len(state) >= 10 {
260 log.Printf("Trying to garbage collect queries (currently %d)\n", len(state))
261 for queryid, s := range state {
262 if len(state) < 10 {
263 break
265 if !s.done {
266 continue
268 delete(state, queryid)
270 log.Printf("Garbage collection done. %d queries remaining", len(state))
272 backends := strings.Split(*common.SourceBackends, ",")
273 state[queryid] = queryState{
274 started: time.Now(),
275 query: query,
276 newEvent: sync.NewCond(&sync.Mutex{}),
277 resultMu: &sync.Mutex{},
278 filesTotal: make([]int, len(backends)),
279 filesProcessed: make([]int, len(backends)),
280 filesMu: &sync.Mutex{},
281 tempFiles: make([]*os.File, len(backends)),
282 allPackages: make(map[string]bool),
283 allPackagesMu: &sync.Mutex{},
284 packagePool: stringpool.NewStringPool(),
287 var err error
288 dir := filepath.Join(*queryResultsPath, queryid)
289 if err := os.MkdirAll(dir, os.FileMode(0755)); err != nil {
290 log.Printf("[%s] could not create %q: %v\n", queryid, dir, err)
291 failQuery(queryid)
292 return false
295 // TODO: it’d be so much better if we would correctly handle ESPACE errors
296 // in the code below (and above), but for that we need to carefully test it.
297 ensureEnoughSpaceAvailable()
299 for i := 0; i < len(backends); i++ {
300 state[queryid].filesTotal[i] = -1
301 path := filepath.Join(dir, fmt.Sprintf("unsorted_%d.json", i))
302 state[queryid].tempFiles[i], err = os.Create(path)
303 if err != nil {
304 log.Printf("[%s] could not create %q: %v\n", queryid, path, err)
305 failQuery(queryid)
306 return false
309 log.Printf("initial results = %v\n", state[queryid])
311 // Rewrite the query into a query for source backends.
312 fakeUrl, err := url.Parse("?" + query)
313 if err != nil {
314 log.Fatal(err)
316 rewritten := search.RewriteQuery(*fakeUrl)
317 type streamingRequest struct {
318 Query string
319 URL string
321 request := streamingRequest{
322 Query: rewritten.Query().Get("q"),
323 URL: rewritten.String(),
325 log.Printf("[%s] querying for %q\n", queryid, request.Query)
326 sourceQuery, err := json.Marshal(&request)
327 if err != nil {
328 log.Fatal(err)
331 for idx, backend := range backends {
332 go queryBackend(queryid, backend, idx, sourceQuery)
334 return false
337 return true
340 func QueryzHandler(w http.ResponseWriter, r *http.Request) {
341 r.ParseForm()
342 if cancel := r.PostFormValue("cancel"); cancel != "" {
343 addEventMarshal(cancel, &Error{
344 Type: "error",
345 ErrorType: "cancelled",
347 finishQuery(cancel)
348 http.Redirect(w, r, "/queryz", http.StatusFound)
349 return
352 type queryStats struct {
353 Searchterm string
354 QueryId string
355 NumEvents int
356 NumResults int
357 NumResultPages int
358 NumPackages int
359 Done bool
360 Started time.Time
361 Duration time.Duration
362 FilesTotal []int
363 FilesProcessed []int
365 stateMu.Lock()
366 stats := make([]queryStats, len(state))
367 idx := 0
368 for queryid, s := range state {
369 stats[idx] = queryStats{
370 Searchterm: s.query,
371 QueryId: queryid,
372 NumEvents: len(s.events),
373 Done: s.done,
374 Started: s.started,
375 Duration: time.Since(s.started),
376 NumResults: len(s.resultPointers),
377 NumPackages: len(s.allPackages),
378 NumResultPages: s.resultPages,
379 FilesTotal: s.filesTotal,
380 FilesProcessed: s.filesProcessed,
382 if stats[idx].NumResults == 0 && stats[idx].Done {
383 stats[idx].NumResults = s.numResults
385 idx++
387 stateMu.Unlock()
388 if err := common.Templates.ExecuteTemplate(w, "queryz.html", map[string]interface{}{
389 "queries": stats,
390 }); err != nil {
391 http.Error(w, err.Error(), http.StatusInternalServerError)
392 return
396 // Caller needs to hold s.clientsMu
397 func sendPaginationUpdate(queryid string, s queryState) {
398 type Pagination struct {
399 // Set to “pagination”.
400 Type string
401 QueryId string
402 ResultPages int
405 if s.resultPages > 0 {
406 addEventMarshal(queryid, &Pagination{
407 Type: "pagination",
408 QueryId: queryid,
409 ResultPages: s.resultPages,
414 func storeResult(queryid string, backendidx int, result Result) {
415 result.Type = "result"
417 result.Package = result.Path[:strings.Index(result.Path, "_")]
419 // Without acquiring a lock, just check if we need to consider this result
420 // for the top 10 at all.
421 s := state[queryid]
423 if s.FirstPathRank > 0 {
424 // Now store the combined ranking of PathRanking (pre) and Ranking (post).
425 // We add the values because they are both percentages.
426 // To make the Ranking (post) less significant, we multiply it with
427 // 1/10 * FirstPathRank. We used to use maxPathRanking here, but
428 // requiring that means delaying the search until all results are
429 // there. Instead, FirstPathRank is a good enough approximation (but
430 // different enough for each query that we can’t hardcode it).
431 result.Ranking = result.PathRank + ((s.FirstPathRank * 0.1) * result.Ranking)
432 } else {
433 s.FirstPathRank = result.PathRank
436 worst := s.results[9]
437 if result.Ranking > worst.Ranking {
438 s.resultMu.Lock()
440 // TODO: find the first s.result[] for the same package. then check again if the result is worthy of replacing that per-package result
441 // TODO: probably change the data structure so that we can do this more easily and also keep N results per package.
443 combined := append(s.results[:], result)
444 sort.Sort(ByRanking(combined))
445 copy(s.results[:], combined[:10])
446 state[queryid] = s
447 s.resultMu.Unlock()
449 // The result entered the top 10, so send it to the client(s) for
450 // immediate display.
451 addEventMarshal(queryid, &result)
454 tmpOffset, err := state[queryid].tempFiles[backendidx].Seek(0, os.SEEK_CUR)
455 if err != nil {
456 log.Printf("[%s] could not seek: %v\n", queryid, err)
457 failQuery(queryid)
458 return
461 if err := json.NewEncoder(s.tempFiles[backendidx]).Encode(result); err != nil {
462 log.Printf("[%s] could not write %v: %v\n", queryid, result, err)
463 failQuery(queryid)
464 return
467 offsetAfterWriting, err := state[queryid].tempFiles[backendidx].Seek(0, os.SEEK_CUR)
468 if err != nil {
469 log.Printf("[%s] could not seek: %v\n", queryid, err)
470 failQuery(queryid)
471 return
474 h := fnv.New64()
475 io.WriteString(h, result.Path)
477 stateMu.Lock()
478 s = state[queryid]
479 s.resultPointers = append(s.resultPointers, resultPointer{
480 backendidx: backendidx,
481 ranking: result.Ranking,
482 offset: tmpOffset,
483 length: offsetAfterWriting - tmpOffset,
484 pathHash: h.Sum64(),
485 packageName: s.packagePool.Get(result.Package)})
486 s.allPackages[result.Package] = true
487 s.numResults++
488 state[queryid] = s
489 stateMu.Unlock()
492 func failQuery(queryid string) {
493 varz.Increment("failed-queries")
494 addEventMarshal(queryid, &Error{
495 Type: "error",
496 ErrorType: "failed",
498 finishQuery(queryid)
501 func finishQuery(queryid string) {
502 log.Printf("[%s] done, closing all client channels.\n", queryid)
503 stateMu.Lock()
504 s := state[queryid]
505 for _, f := range s.tempFiles {
506 f.Close()
508 state[queryid] = s
509 stateMu.Unlock()
510 addEvent(queryid, []byte{}, nil)
512 if *influxDBHost != "" {
513 go func() {
514 db, err := influxdb.NewClient(&influxdb.ClientConfig{
515 Host: *influxDBHost,
516 Database: *influxDBDatabase,
517 Username: *influxDBUsername,
518 Password: *influxDBPassword,
520 if err != nil {
521 log.Printf("Cannot log query-finished timeseries: %v\n", err)
522 return
525 var seriesBatch []*influxdb.Series
526 series := influxdb.Series{
527 Name: "query-finished.int-dcsi-web",
528 Columns: []string{"queryid", "searchterm", "milliseconds", "results"},
529 Points: [][]interface{}{
530 []interface{}{
531 queryid,
532 state[queryid].query,
533 time.Since(state[queryid].started) / time.Millisecond,
534 state[queryid].numResults,
538 seriesBatch = append(seriesBatch, &series)
540 if err := db.WriteSeries(seriesBatch); err != nil {
541 log.Printf("Cannot log query-finished timeseries: %v\n", err)
542 return
548 type ByModTime []os.FileInfo
550 func (s ByModTime) Len() int {
551 return len(s)
554 func (s ByModTime) Less(i, j int) bool {
555 return s[i].ModTime().Before(s[j].ModTime())
558 func (s ByModTime) Swap(i, j int) {
559 s[i], s[j] = s[j], s[i]
562 func availableBytes(path string) uint64 {
563 var stat syscall.Statfs_t
564 if err := syscall.Statfs(path, &stat); err != nil {
565 log.Fatal("Could not stat filesystem for %q: %v\n", path, err)
567 log.Printf("Available bytes on %q: %d\n", path, stat.Bavail*uint64(stat.Bsize))
568 return stat.Bavail * uint64(stat.Bsize)
571 func ensureEnoughSpaceAvailable() {
572 headroom := uint64(2 * 1024 * 1024 * 1024)
573 if availableBytes(*queryResultsPath) >= headroom {
574 return
577 log.Printf("Deleting an old query...\n")
578 dir, err := os.Open(*queryResultsPath)
579 if err != nil {
580 log.Fatal(err)
582 defer dir.Close()
583 infos, err := dir.Readdir(-1)
584 if err != nil {
585 log.Fatal(err)
587 sort.Sort(ByModTime(infos))
588 for _, info := range infos {
589 if !info.IsDir() {
590 continue
592 log.Printf("Removing query results for %q to make enough space\n", info.Name())
593 if err := os.RemoveAll(filepath.Join(*queryResultsPath, info.Name())); err != nil {
594 log.Fatal(err)
596 if availableBytes(*queryResultsPath) >= headroom {
597 break
602 func createFromPointers(queryid string, name string, pointers []resultPointer) error {
603 log.Printf("[%s] writing %q\n", queryid, name)
604 f, err := os.Create(name)
605 if err != nil {
606 return err
608 defer f.Close()
609 if _, err := f.Write([]byte("[")); err != nil {
610 return err
612 for idx, pointer := range pointers {
613 src := state[queryid].tempFiles[pointer.backendidx]
614 if _, err := src.Seek(pointer.offset, os.SEEK_SET); err != nil {
615 return err
617 if idx > 0 {
618 if _, err := f.Write([]byte(",")); err != nil {
619 return err
622 if _, err := io.CopyN(f, src, pointer.length); err != nil {
623 return err
626 if _, err := f.Write([]byte("]\n")); err != nil {
627 return err
629 return nil
632 func writeToDisk(queryid string) error {
633 // Get the slice with results and unset it on the state so that processing can continue.
634 stateMu.Lock()
635 s := state[queryid]
636 pointers := s.resultPointers
637 if len(pointers) == 0 {
638 log.Printf("[%s] not writing, no results.\n", queryid)
639 stateMu.Unlock()
640 return nil
642 s.resultPointers = nil
643 idx := 0
644 packages := make([]string, len(s.allPackages))
645 // TODO: sort by ranking as soon as we store the best ranking with each package. (at the moment it’s first result, first stored)
646 for pkg, _ := range s.allPackages {
647 packages[idx] = pkg
648 idx++
650 s.allPackagesSorted = packages
651 state[queryid] = s
652 stateMu.Unlock()
654 log.Printf("[%s] writing, %d results.\n", queryid, len(pointers))
655 log.Printf("[%s] packages: %v\n", queryid, packages)
657 sort.Sort(pointerByRanking(pointers))
659 resultsPerPage := 10
660 dir := filepath.Join(*queryResultsPath, queryid)
661 if err := os.MkdirAll(dir, os.FileMode(0755)); err != nil {
662 return err
665 // TODO: it’d be so much better if we would correctly handle ESPACE errors
666 // in the code below (and above), but for that we need to carefully test it.
667 ensureEnoughSpaceAvailable()
669 f, err := os.Create(filepath.Join(dir, "packages.json"))
670 if err != nil {
671 return err
673 if err := json.NewEncoder(f).Encode(struct{ Packages []string }{packages}); err != nil {
674 return err
676 f.Close()
678 pages := int(math.Ceil(float64(len(pointers)) / float64(resultsPerPage)))
679 for page := 0; page < pages; page++ {
680 start := page * resultsPerPage
681 end := (page + 1) * resultsPerPage
682 if end > len(pointers) {
683 end = len(pointers)
686 name := filepath.Join(dir, fmt.Sprintf("page_%d.json", page))
687 if err := createFromPointers(queryid, name, pointers[start:end]); err != nil {
688 return err
692 // Now save the results into their package-specific files.
693 bypkg := make(map[string][]resultPointer)
694 for _, pointer := range pointers {
695 pkgresults := bypkg[*pointer.packageName]
696 if len(pkgresults) >= resultsPerPackage {
697 continue
699 pkgresults = append(pkgresults, pointer)
700 bypkg[*pointer.packageName] = pkgresults
703 for pkg, pkgresults := range bypkg {
704 name := filepath.Join(dir, fmt.Sprintf("pkg_%s.json", pkg))
705 if err := createFromPointers(queryid, name, pkgresults); err != nil {
706 return err
710 stateMu.Lock()
711 s = state[queryid]
712 s.resultPages = pages
713 state[queryid] = s
714 stateMu.Unlock()
716 sendPaginationUpdate(queryid, s)
717 return nil
720 func storeProgress(queryid string, backendidx int, progress Result) {
721 backends := strings.Split(*common.SourceBackends, ",")
722 s := state[queryid]
723 s.filesMu.Lock()
724 s.filesTotal[backendidx] = progress.FilesTotal
725 s.filesMu.Unlock()
726 allSet := true
727 for i := 0; i < len(backends); i++ {
728 if s.filesTotal[i] == -1 {
729 log.Printf("total number for backend %d missing\n", i)
730 allSet = false
731 break
735 s.filesMu.Lock()
736 s.filesProcessed[backendidx] = progress.FilesProcessed
737 s.filesMu.Unlock()
739 filesProcessed := 0
740 for _, processed := range s.filesProcessed {
741 filesProcessed += processed
743 filesTotal := 0
744 for _, total := range s.filesTotal {
745 filesTotal += total
748 if allSet && filesProcessed == filesTotal {
749 log.Printf("[%s] [src:%d] query done on all backends, writing to disk.\n", queryid, backendidx)
750 if err := writeToDisk(queryid); err != nil {
751 log.Printf("[%s] writeToDisk() failed: %v\n", queryid)
752 failQuery(queryid)
756 if allSet {
757 log.Printf("[%s] [src:%d] (sending) progress: %d of %d\n", queryid, backendidx, progress.FilesProcessed, progress.FilesTotal)
758 addEventMarshal(queryid, &ProgressUpdate{
759 Type: progress.Type,
760 QueryId: queryid,
761 FilesProcessed: filesProcessed,
762 FilesTotal: filesTotal,
763 Results: s.numResults,
765 if filesProcessed == filesTotal {
766 finishQuery(queryid)
768 } else {
769 log.Printf("[%s] [src:%d] progress: %d of %d\n", queryid, backendidx, progress.FilesProcessed, progress.FilesTotal)
773 func PerPackageResultsHandler(w http.ResponseWriter, r *http.Request) {
774 matches := perPackagePathRe.FindStringSubmatch(r.URL.Path)
775 if matches == nil || len(matches) != 3 {
776 // TODO: what about non-js clients?
777 // While this just serves index.html, the javascript part of index.html
778 // realizes the path starts with /perpackage-results/ and starts the
779 // search, then requests the specified page on search completion.
780 http.ServeFile(w, r, filepath.Join(*staticPath, "index.html"))
781 return
783 queryid := matches[1]
784 pagenr, err := strconv.Atoi(matches[2])
785 if err != nil {
786 log.Fatal("Could not convert %q into a number: %v\n", matches[2], err)
788 s, ok := state[queryid]
789 if !ok {
790 http.Error(w, "No such query.", http.StatusNotFound)
791 return
793 if !s.done {
794 started := time.Now()
795 for time.Since(started) < 60*time.Second {
796 if state[queryid].done {
797 s = state[queryid]
798 break
800 time.Sleep(100 * time.Millisecond)
802 if !s.done {
803 log.Printf("[%s] query not yet finished, cannot produce per-package results\n", queryid)
804 http.Error(w, "Query not finished yet.", http.StatusInternalServerError)
805 return
809 pages := int(math.Ceil(float64(len(s.allPackagesSorted)) / float64(packagesPerPage)))
810 if pagenr >= pages {
811 log.Printf("[%s] page %d not found (total %d pages)\n", queryid, pagenr, pages)
812 http.Error(w, "No such page.", http.StatusNotFound)
813 return
816 w.Header().Set("Content-Type", "application/json")
817 // Advise the client to cache the results for one hour. This needs to match
818 // the nginx configuration for serving static files (the not-per-package
819 // results are served directly by nginx).
820 utc := time.Now().UTC()
821 cacheSince := utc.Format(http.TimeFormat)
822 cacheUntil := utc.Add(1 * time.Hour).Format(http.TimeFormat)
823 w.Header().Set("Cache-Control", "max-age=3600, public")
824 w.Header().Set("Last-Modified", cacheSince)
825 w.Header().Set("Expires", cacheUntil)
827 log.Printf("[%s] Computing per-package results for page %d\n", queryid, pagenr)
828 dir := filepath.Join(*queryResultsPath, queryid)
830 start := pagenr * packagesPerPage
831 end := (pagenr + 1) * packagesPerPage
832 if end > len(s.allPackagesSorted) {
833 end = len(s.allPackagesSorted)
836 // We concatenate a JSON reply that essentially contains multiple JSON
837 // files by directly writing to a buffer in order to avoid
838 // decoding/encoding the same data. We cannot write directly to the
839 // ResponseWriter because we may still need to use http.Error(), which must
840 // be called before sending any content.
842 // Perhaps a better way would be to use HTTP2 and send multiple files to
843 // the client.
844 var buffer bytes.Buffer
845 buffer.Write([]byte("["))
847 for _, pkg := range s.allPackagesSorted[start:end] {
848 if buffer.Len() == 1 {
849 fmt.Fprintf(&buffer, `{"Package": "%s", "Results":`, pkg)
850 } else {
851 fmt.Fprintf(&buffer, `,{"Package": "%s", "Results":`, pkg)
853 f, err := os.Open(filepath.Join(dir, "pkg_"+pkg+".json"))
854 if err != nil {
855 http.Error(w, fmt.Sprintf("Could not open %q: %v", "pkg_"+pkg+".json", err), http.StatusInternalServerError)
856 return
858 if _, err := io.Copy(&buffer, f); err != nil {
859 http.Error(w, fmt.Sprintf("Could not read %q: %v", "pkg_"+pkg+".json", err), http.StatusInternalServerError)
860 return
862 f.Close()
863 fmt.Fprintf(&buffer, `}`)
866 buffer.Write([]byte("]"))
867 if _, err := io.Copy(w, &buffer); err != nil {
868 log.Printf("[%s] Could not send response: %v\n", queryid, err)