Turn package and -package keywords into a regexp
[debiancodesearch.git] / cmd / dcs-source-backend / source-backend.go
blobcc3ff359d6281585b99b0ecbc42a2889abeb4745
1 // vim:ts=4:sw=4:noexpandtab
2 package main
4 import (
5 "flag"
6 "fmt"
7 "io"
8 "io/ioutil"
9 "log"
10 "math/rand"
11 "net/http"
12 _ "net/http/pprof"
13 "net/url"
14 "os"
15 "path"
16 "sort"
17 "strings"
18 "sync"
19 "time"
21 "github.com/Debian/dcs/grpcutil"
22 "github.com/Debian/dcs/proto"
23 "github.com/Debian/dcs/ranking"
24 "github.com/Debian/dcs/regexp"
25 _ "github.com/Debian/dcs/varz"
26 opentracing "github.com/opentracing/opentracing-go"
27 olog "github.com/opentracing/opentracing-go/log"
28 "github.com/prometheus/client_golang/prometheus"
29 "github.com/uber/jaeger-client-go"
30 jaegercfg "github.com/uber/jaeger-client-go/config"
31 "golang.org/x/net/context"
32 "google.golang.org/grpc"
35 var (
36 listenAddress = flag.String("listen_address", ":28082", "listen address ([host]:port)")
37 unpackedPath = flag.String("unpacked_path",
38 "/dcs-ssd/unpacked/",
39 "Path to the unpacked sources")
40 rankingDataPath = flag.String("ranking_data_path",
41 "/var/dcs/ranking.json",
42 "Path to the JSON containing ranking data")
43 tlsCertPath = flag.String("tls_cert_path", "", "Path to a .pem file containing the TLS certificate.")
44 tlsKeyPath = flag.String("tls_key_path", "", "Path to a .pem file containing the TLS private key.")
45 jaegerAgent = flag.String("jaeger_agent",
46 "localhost:5775",
47 "host:port of a github.com/uber/jaeger agent")
49 indexBackend proto.IndexBackendClient
52 type SourceReply struct {
53 // The number of the last used filename, needed for pagination
54 LastUsedFilename int
56 AllMatches []regexp.Match
59 type server struct {
62 // Serves a single file for displaying it in /show
63 func (s *server) File(ctx context.Context, in *proto.FileRequest) (*proto.FileReply, error) {
64 log.Printf("requested filename *%s*\n", in.Path)
65 // path.Join calls path.Clean so we get the shortest path without any "..".
66 absPath := path.Join(*unpackedPath, in.Path)
67 log.Printf("clean, absolute path is *%s*\n", absPath)
68 if !strings.HasPrefix(absPath, *unpackedPath) {
69 return nil, fmt.Errorf("Path traversal is bad, mhkay?")
72 contents, err := ioutil.ReadFile(absPath)
73 if err != nil {
74 return nil, err
76 return &proto.FileReply{
77 Contents: contents,
78 }, nil
81 func filterByKeywords(rewritten *url.URL, files []ranking.ResultPath) []ranking.ResultPath {
82 // The "package:" keyword, if specified.
83 pkg := rewritten.Query().Get("package")
84 // The "-package:" keywords, if specified.
85 npkgs := rewritten.Query()["npackage"]
86 // The "path:" keywords, if specified.
87 paths := rewritten.Query()["path"]
88 // The "-path" keywords, if specified.
89 npaths := rewritten.Query()["npath"]
91 // Filter the filenames if the "package:" keyword was specified.
92 if pkg != "" {
93 fmt.Printf("Filtering for package %q\n", pkg)
94 pkgRegexp, err := regexp.Compile(pkg)
95 if err != nil {
96 return files
98 filtered := make(ranking.ResultPaths, 0, len(files))
99 for _, file := range files {
100 if pkgRegexp.MatchString(file.Path[file.SourcePkgIdx[0]:file.SourcePkgIdx[1]], true, true) == -1 {
101 continue
104 filtered = append(filtered, file)
107 files = filtered
110 // Filter the filenames if the "-package:" keyword was specified.
111 for _, npkg := range npkgs {
112 fmt.Printf("Excluding matches for package %q\n", npkg)
113 npkgRegexp, err := regexp.Compile(npkg)
114 if err != nil {
115 return files
117 filtered := make(ranking.ResultPaths, 0, len(files))
118 for _, file := range files {
119 if npkgRegexp.MatchString(file.Path[file.SourcePkgIdx[0]:file.SourcePkgIdx[1]], true, true) != -1 {
120 continue
123 filtered = append(filtered, file)
126 files = filtered
129 for _, path := range paths {
130 fmt.Printf("Filtering for path %q\n", path)
131 pathRegexp, err := regexp.Compile(path)
132 if err != nil {
133 return files
134 // TODO: perform this validation before accepting the query, i.e. in dcs-web
135 //err := common.Templates.ExecuteTemplate(w, "error.html", map[string]interface{}{
136 // "q": r.URL.Query().Get("q"),
137 // "errormsg": fmt.Sprintf(`%v`, err),
138 // "suggestion": template.HTML(`See <a href="http://codesearch.debian.net/faq#regexp">http://codesearch.debian.net/faq#regexp</a> for help on regular expressions.`),
139 //})
140 //if err != nil {
141 // http.Error(w, err.Error(), http.StatusInternalServerError)
145 filtered := make(ranking.ResultPaths, 0, len(files))
146 for _, file := range files {
147 if pathRegexp.MatchString(file.Path, true, true) == -1 {
148 continue
151 filtered = append(filtered, file)
154 files = filtered
157 for _, path := range npaths {
158 fmt.Printf("Filtering for path %q\n", path)
159 pathRegexp, err := regexp.Compile(path)
160 if err != nil {
161 return files
162 // TODO: perform this validation before accepting the query, i.e. in dcs-web
163 //err := common.Templates.ExecuteTemplate(w, "error.html", map[string]interface{}{
164 // "q": r.URL.Query().Get("q"),
165 // "errormsg": fmt.Sprintf(`%v`, err),
166 // "suggestion": template.HTML(`See <a href="http://codesearch.debian.net/faq#regexp">http://codesearch.debian.net/faq#regexp</a> for help on regular expressions.`),
167 //})
168 //if err != nil {
169 // http.Error(w, err.Error(), http.StatusInternalServerError)
173 filtered := make(ranking.ResultPaths, 0, len(files))
174 for _, file := range files {
175 if pathRegexp.MatchString(file.Path, true, true) != -1 {
176 continue
179 filtered = append(filtered, file)
182 files = filtered
185 return files
188 func sendProgressUpdate(stream proto.SourceBackend_SearchServer, connMu *sync.Mutex, filesProcessed, filesTotal int) error {
189 connMu.Lock()
190 defer connMu.Unlock()
191 return stream.Send(&proto.SearchReply{
192 Type: proto.SearchReply_PROGRESS_UPDATE,
193 ProgressUpdate: &proto.ProgressUpdate{
194 FilesProcessed: uint64(filesProcessed),
195 FilesTotal: uint64(filesTotal),
200 // Reads a single JSON request from the TCP connection, performs the search and
201 // sends results back over the TCP connection as they appear.
202 func (s *server) Search(in *proto.SearchRequest, stream proto.SourceBackend_SearchServer) error {
203 ctx := stream.Context()
204 connMu := new(sync.Mutex)
205 logprefix := fmt.Sprintf("[%q]", in.Query)
206 span := opentracing.SpanFromContext(ctx)
208 // Ask the local index backend for all the filenames.
209 fstream, err := indexBackend.Files(ctx, &proto.FilesRequest{Query: in.Query})
210 if err != nil {
211 return fmt.Errorf("%s Error querying index backend for query %q: %v\n", logprefix, in.Query, err)
214 var possible []string
215 for {
216 resp, err := fstream.Recv()
217 if err == io.EOF {
218 break
220 if err != nil {
221 return err
223 possible = append(possible, resp.Path)
226 span.LogFields(olog.Int("files.possible", len(possible)))
228 // Parse the (rewritten) URL to extract all ranking options/keywords.
229 rewritten, err := url.Parse(in.RewrittenUrl)
230 if err != nil {
231 return err
233 rankingopts := ranking.RankingOptsFromQuery(rewritten.Query())
234 span.LogFields(olog.String("rankingopts", fmt.Sprintf("%+v", rankingopts)))
236 // Rank all the paths.
237 rankspan, _ := opentracing.StartSpanFromContext(ctx, "Rank")
238 files := make(ranking.ResultPaths, 0, len(possible))
239 for _, filename := range possible {
240 result := ranking.ResultPath{Path: filename}
241 result.Rank(&rankingopts)
242 if result.Ranking > -1 {
243 files = append(files, result)
246 rankspan.Finish()
248 // Filter all files that should be excluded.
249 filterspan, _ := opentracing.StartSpanFromContext(ctx, "Filter")
250 files = filterByKeywords(rewritten, files)
251 filterspan.Finish()
253 span.LogFields(olog.Int("files.filtered", len(files)))
255 // While not strictly necessary, this will lead to better results being
256 // discovered (and returned!) earlier, so let’s spend a few cycles on
257 // sorting the list of potential files first.
258 sort.Sort(files)
260 re, err := regexp.Compile(in.Query)
261 if err != nil {
262 return fmt.Errorf("%s Could not compile regexp: %v\n", logprefix, err)
265 span.LogFields(olog.String("regexp", re.String()))
267 log.Printf("%s regexp = %q, %d possible files\n", logprefix, re, len(files))
269 // Send the first progress update so that clients know how many files are
270 // going to be searched.
271 if err := sendProgressUpdate(stream, connMu, 0, len(files)); err != nil {
272 return fmt.Errorf("%s %v\n", logprefix, err)
275 // The tricky part here is “flow control”: if we just start grepping like
276 // crazy, we will eventually run out of memory because all our writes are
277 // blocked on the connection (and the goroutines need to keep the write
278 // buffer in memory until the write is done).
280 // So instead, we start 1000 worker goroutines and feed them work through a
281 // single channel. Due to these goroutines being blocked on writing,
282 // the grepping will naturally become slower.
283 work := make(chan ranking.ResultPath)
284 progress := make(chan int)
286 var wg sync.WaitGroup
287 // We add the additional 1 for the progress updater goroutine. It also
288 // needs to be done before we can return, otherwise it will try to use the
289 // (already closed) network connection, which is a fatal error.
290 wg.Add(len(files) + 1)
292 go func() {
293 for _, file := range files {
294 work <- file
296 close(work)
299 go func() {
300 cnt := 0
301 errorShown := false
302 var lastProgressUpdate time.Time
303 progressInterval := 2*time.Second + time.Duration(rand.Int63n(int64(500*time.Millisecond)))
304 for cnt < len(files) {
305 add := <-progress
306 cnt += add
308 if time.Since(lastProgressUpdate) > progressInterval {
309 if err := sendProgressUpdate(stream, connMu, cnt, len(files)); err != nil {
310 if !errorShown {
311 log.Printf("%s %v\n", logprefix, err)
312 // We need to read the 'progress' channel, so we cannot
313 // just exit the loop here. Instead, we suppress all
314 // error messages after the first one.
315 errorShown = true
318 lastProgressUpdate = time.Now()
322 if err := sendProgressUpdate(stream, connMu, len(files), len(files)); err != nil {
323 log.Printf("%s %v\n", logprefix, err)
325 close(progress)
327 wg.Done()
330 querystr := ranking.NewQueryStr(in.Query)
332 numWorkers := 1000
333 if len(files) < 1000 {
334 numWorkers = len(files)
336 for i := 0; i < numWorkers; i++ {
337 go func() {
338 re, err := regexp.Compile(in.Query)
339 if err != nil {
340 log.Printf("%s\n", err)
341 return
344 grep := regexp.Grep{
345 Regexp: re,
346 Stdout: os.Stdout,
347 Stderr: os.Stderr,
350 for file := range work {
351 sourcePkgName := file.Path[file.SourcePkgIdx[0]:file.SourcePkgIdx[1]]
352 if rankingopts.Pathmatch {
353 file.Ranking += querystr.Match(&file.Path)
355 if rankingopts.Sourcepkgmatch {
356 file.Ranking += querystr.Match(&sourcePkgName)
358 if rankingopts.Weighted {
359 file.Ranking += 0.1460 * querystr.Match(&file.Path)
360 file.Ranking += 0.0008 * querystr.Match(&sourcePkgName)
363 // TODO: figure out how to safely clone a dcs/regexp
364 matches := grep.File(path.Join(*unpackedPath, file.Path))
365 for _, match := range matches {
366 match.Ranking = ranking.PostRank(rankingopts, &match, &querystr)
367 match.PathRank = file.Ranking
368 //match.Path = match.Path[len(*unpackedPath):]
369 // NB: populating match.Ranking happens in
370 // cmd/dcs-web/querymanager because it depends on at least
371 // one other result.
373 // TODO: ideally, we’d get proto.Match structs from grep.File(), let’s do that after profiling the decoding performance
375 path := match.Path[len(*unpackedPath):]
376 connMu.Lock()
377 if err := stream.Send(&proto.SearchReply{
378 Type: proto.SearchReply_MATCH,
379 Match: &proto.Match{
380 Path: path,
381 Line: uint32(match.Line),
382 Package: path[:strings.Index(path, "/")],
383 Ctxp2: match.Ctxp2,
384 Ctxp1: match.Ctxp1,
385 Context: match.Context,
386 Ctxn1: match.Ctxn1,
387 Ctxn2: match.Ctxn2,
388 Pathrank: match.PathRank,
389 Ranking: match.Ranking,
391 }); err != nil {
392 connMu.Unlock()
393 log.Printf("%s %v\n", logprefix, err)
394 // Drain the work channel, but without doing any work.
395 // This effectively exits the worker goroutine(s)
396 // cleanly.
397 for _ = range work {
399 break
401 connMu.Unlock()
404 progress <- 1
406 wg.Done()
411 wg.Wait()
413 log.Printf("%s Sent all results.\n", logprefix)
414 return nil
417 func main() {
418 log.SetFlags(log.LstdFlags | log.Lshortfile)
419 flag.Parse()
421 cfg := jaegercfg.Configuration{
422 Sampler: &jaegercfg.SamplerConfig{
423 Type: "const",
424 Param: 1,
426 Reporter: &jaegercfg.ReporterConfig{
427 BufferFlushInterval: 1 * time.Second,
428 LocalAgentHostPort: *jaegerAgent,
431 closer, err := cfg.InitGlobalTracer(
432 "dcs-source-backend",
433 jaegercfg.Logger(jaeger.StdLogger),
435 if err != nil {
436 log.Fatal(err)
438 defer closer.Close()
440 rand.Seed(time.Now().UnixNano())
441 if !strings.HasSuffix(*unpackedPath, "/") {
442 *unpackedPath = *unpackedPath + "/"
444 fmt.Println("Debian Code Search source-backend")
446 if err := ranking.ReadRankingData(*rankingDataPath); err != nil {
447 log.Fatal(err)
450 conn, err := grpcutil.DialTLS("localhost:28081", *tlsCertPath, *tlsKeyPath)
451 if err != nil {
452 log.Fatalf("could not connect to %q: %v", "localhost:28081", err)
454 defer conn.Close()
455 indexBackend = proto.NewIndexBackendClient(conn)
457 http.Handle("/metrics", prometheus.Handler())
458 log.Fatal(grpcutil.ListenAndServeTLS(*listenAddress,
459 *tlsCertPath,
460 *tlsKeyPath,
461 func(s *grpc.Server) {
462 proto.RegisterSourceBackendServer(s, &server{})