1 // vim:ts=4:sw=4:noexpandtab
21 "github.com/Debian/dcs/grpcutil"
22 "github.com/Debian/dcs/proto"
23 "github.com/Debian/dcs/ranking"
24 "github.com/Debian/dcs/regexp"
25 _
"github.com/Debian/dcs/varz"
26 opentracing
"github.com/opentracing/opentracing-go"
27 olog
"github.com/opentracing/opentracing-go/log"
28 "github.com/prometheus/client_golang/prometheus"
29 "github.com/uber/jaeger-client-go"
30 jaegercfg
"github.com/uber/jaeger-client-go/config"
31 "golang.org/x/net/context"
32 "google.golang.org/grpc"
36 listenAddress
= flag
.String("listen_address", ":28082", "listen address ([host]:port)")
37 unpackedPath
= flag
.String("unpacked_path",
39 "Path to the unpacked sources")
40 rankingDataPath
= flag
.String("ranking_data_path",
41 "/var/dcs/ranking.json",
42 "Path to the JSON containing ranking data")
43 tlsCertPath
= flag
.String("tls_cert_path", "", "Path to a .pem file containing the TLS certificate.")
44 tlsKeyPath
= flag
.String("tls_key_path", "", "Path to a .pem file containing the TLS private key.")
45 jaegerAgent
= flag
.String("jaeger_agent",
47 "host:port of a github.com/uber/jaeger agent")
49 indexBackend proto
.IndexBackendClient
52 type SourceReply
struct {
53 // The number of the last used filename, needed for pagination
56 AllMatches
[]regexp
.Match
62 // Serves a single file for displaying it in /show
63 func (s
*server
) File(ctx context
.Context
, in
*proto
.FileRequest
) (*proto
.FileReply
, error
) {
64 log
.Printf("requested filename *%s*\n", in
.Path
)
65 // path.Join calls path.Clean so we get the shortest path without any "..".
66 absPath
:= path
.Join(*unpackedPath
, in
.Path
)
67 log
.Printf("clean, absolute path is *%s*\n", absPath
)
68 if !strings
.HasPrefix(absPath
, *unpackedPath
) {
69 return nil, fmt
.Errorf("Path traversal is bad, mhkay?")
72 contents
, err
:= ioutil
.ReadFile(absPath
)
76 return &proto
.FileReply
{
81 func filterByKeywords(rewritten
*url
.URL
, files
[]ranking
.ResultPath
) []ranking
.ResultPath
{
82 // The "package:" keyword, if specified.
83 pkg
:= rewritten
.Query().Get("package")
84 // The "-package:" keywords, if specified.
85 npkgs
:= rewritten
.Query()["npackage"]
86 // The "path:" keywords, if specified.
87 paths
:= rewritten
.Query()["path"]
88 // The "-path" keywords, if specified.
89 npaths
:= rewritten
.Query()["npath"]
91 // Filter the filenames if the "package:" keyword was specified.
93 fmt
.Printf("Filtering for package %q\n", pkg
)
94 pkgRegexp
, err
:= regexp
.Compile(pkg
)
98 filtered
:= make(ranking
.ResultPaths
, 0, len(files
))
99 for _
, file
:= range files
{
100 if pkgRegexp
.MatchString(file
.Path
[file
.SourcePkgIdx
[0]:file
.SourcePkgIdx
[1]], true, true) == -1 {
104 filtered
= append(filtered
, file
)
110 // Filter the filenames if the "-package:" keyword was specified.
111 for _
, npkg
:= range npkgs
{
112 fmt
.Printf("Excluding matches for package %q\n", npkg
)
113 npkgRegexp
, err
:= regexp
.Compile(npkg
)
117 filtered
:= make(ranking
.ResultPaths
, 0, len(files
))
118 for _
, file
:= range files
{
119 if npkgRegexp
.MatchString(file
.Path
[file
.SourcePkgIdx
[0]:file
.SourcePkgIdx
[1]], true, true) != -1 {
123 filtered
= append(filtered
, file
)
129 for _
, path
:= range paths
{
130 fmt
.Printf("Filtering for path %q\n", path
)
131 pathRegexp
, err
:= regexp
.Compile(path
)
134 // TODO: perform this validation before accepting the query, i.e. in dcs-web
135 //err := common.Templates.ExecuteTemplate(w, "error.html", map[string]interface{}{
136 // "q": r.URL.Query().Get("q"),
137 // "errormsg": fmt.Sprintf(`%v`, err),
138 // "suggestion": template.HTML(`See <a href="http://codesearch.debian.net/faq#regexp">http://codesearch.debian.net/faq#regexp</a> for help on regular expressions.`),
141 // http.Error(w, err.Error(), http.StatusInternalServerError)
145 filtered
:= make(ranking
.ResultPaths
, 0, len(files
))
146 for _
, file
:= range files
{
147 if pathRegexp
.MatchString(file
.Path
, true, true) == -1 {
151 filtered
= append(filtered
, file
)
157 for _
, path
:= range npaths
{
158 fmt
.Printf("Filtering for path %q\n", path
)
159 pathRegexp
, err
:= regexp
.Compile(path
)
162 // TODO: perform this validation before accepting the query, i.e. in dcs-web
163 //err := common.Templates.ExecuteTemplate(w, "error.html", map[string]interface{}{
164 // "q": r.URL.Query().Get("q"),
165 // "errormsg": fmt.Sprintf(`%v`, err),
166 // "suggestion": template.HTML(`See <a href="http://codesearch.debian.net/faq#regexp">http://codesearch.debian.net/faq#regexp</a> for help on regular expressions.`),
169 // http.Error(w, err.Error(), http.StatusInternalServerError)
173 filtered
:= make(ranking
.ResultPaths
, 0, len(files
))
174 for _
, file
:= range files
{
175 if pathRegexp
.MatchString(file
.Path
, true, true) != -1 {
179 filtered
= append(filtered
, file
)
188 func sendProgressUpdate(stream proto
.SourceBackend_SearchServer
, connMu
*sync
.Mutex
, filesProcessed
, filesTotal
int) error
{
190 defer connMu
.Unlock()
191 return stream
.Send(&proto
.SearchReply
{
192 Type
: proto
.SearchReply_PROGRESS_UPDATE
,
193 ProgressUpdate
: &proto
.ProgressUpdate
{
194 FilesProcessed
: uint64(filesProcessed
),
195 FilesTotal
: uint64(filesTotal
),
200 // Reads a single JSON request from the TCP connection, performs the search and
201 // sends results back over the TCP connection as they appear.
202 func (s
*server
) Search(in
*proto
.SearchRequest
, stream proto
.SourceBackend_SearchServer
) error
{
203 ctx
:= stream
.Context()
204 connMu
:= new(sync
.Mutex
)
205 logprefix
:= fmt
.Sprintf("[%q]", in
.Query
)
206 span
:= opentracing
.SpanFromContext(ctx
)
208 // Ask the local index backend for all the filenames.
209 fstream
, err
:= indexBackend
.Files(ctx
, &proto
.FilesRequest
{Query
: in
.Query
})
211 return fmt
.Errorf("%s Error querying index backend for query %q: %v\n", logprefix
, in
.Query
, err
)
214 var possible
[]string
216 resp
, err
:= fstream
.Recv()
223 possible
= append(possible
, resp
.Path
)
226 span
.LogFields(olog
.Int("files.possible", len(possible
)))
228 // Parse the (rewritten) URL to extract all ranking options/keywords.
229 rewritten
, err
:= url
.Parse(in
.RewrittenUrl
)
233 rankingopts
:= ranking
.RankingOptsFromQuery(rewritten
.Query())
234 span
.LogFields(olog
.String("rankingopts", fmt
.Sprintf("%+v", rankingopts
)))
236 // Rank all the paths.
237 rankspan
, _
:= opentracing
.StartSpanFromContext(ctx
, "Rank")
238 files
:= make(ranking
.ResultPaths
, 0, len(possible
))
239 for _
, filename
:= range possible
{
240 result
:= ranking
.ResultPath
{Path
: filename
}
241 result
.Rank(&rankingopts
)
242 if result
.Ranking
> -1 {
243 files
= append(files
, result
)
248 // Filter all files that should be excluded.
249 filterspan
, _
:= opentracing
.StartSpanFromContext(ctx
, "Filter")
250 files
= filterByKeywords(rewritten
, files
)
253 span
.LogFields(olog
.Int("files.filtered", len(files
)))
255 // While not strictly necessary, this will lead to better results being
256 // discovered (and returned!) earlier, so let’s spend a few cycles on
257 // sorting the list of potential files first.
260 re
, err
:= regexp
.Compile(in
.Query
)
262 return fmt
.Errorf("%s Could not compile regexp: %v\n", logprefix
, err
)
265 span
.LogFields(olog
.String("regexp", re
.String()))
267 log
.Printf("%s regexp = %q, %d possible files\n", logprefix
, re
, len(files
))
269 // Send the first progress update so that clients know how many files are
270 // going to be searched.
271 if err
:= sendProgressUpdate(stream
, connMu
, 0, len(files
)); err
!= nil {
272 return fmt
.Errorf("%s %v\n", logprefix
, err
)
275 // The tricky part here is “flow control”: if we just start grepping like
276 // crazy, we will eventually run out of memory because all our writes are
277 // blocked on the connection (and the goroutines need to keep the write
278 // buffer in memory until the write is done).
280 // So instead, we start 1000 worker goroutines and feed them work through a
281 // single channel. Due to these goroutines being blocked on writing,
282 // the grepping will naturally become slower.
283 work
:= make(chan ranking
.ResultPath
)
284 progress
:= make(chan int)
286 var wg sync
.WaitGroup
287 // We add the additional 1 for the progress updater goroutine. It also
288 // needs to be done before we can return, otherwise it will try to use the
289 // (already closed) network connection, which is a fatal error.
290 wg
.Add(len(files
) + 1)
293 for _
, file
:= range files
{
302 var lastProgressUpdate time
.Time
303 progressInterval
:= 2*time
.Second
+ time
.Duration(rand
.Int63n(int64(500*time
.Millisecond
)))
304 for cnt
< len(files
) {
308 if time
.Since(lastProgressUpdate
) > progressInterval
{
309 if err
:= sendProgressUpdate(stream
, connMu
, cnt
, len(files
)); err
!= nil {
311 log
.Printf("%s %v\n", logprefix
, err
)
312 // We need to read the 'progress' channel, so we cannot
313 // just exit the loop here. Instead, we suppress all
314 // error messages after the first one.
318 lastProgressUpdate
= time
.Now()
322 if err
:= sendProgressUpdate(stream
, connMu
, len(files
), len(files
)); err
!= nil {
323 log
.Printf("%s %v\n", logprefix
, err
)
330 querystr
:= ranking
.NewQueryStr(in
.Query
)
333 if len(files
) < 1000 {
334 numWorkers
= len(files
)
336 for i
:= 0; i
< numWorkers
; i
++ {
338 re
, err
:= regexp
.Compile(in
.Query
)
340 log
.Printf("%s\n", err
)
350 for file
:= range work
{
351 sourcePkgName
:= file
.Path
[file
.SourcePkgIdx
[0]:file
.SourcePkgIdx
[1]]
352 if rankingopts
.Pathmatch
{
353 file
.Ranking
+= querystr
.Match(&file
.Path
)
355 if rankingopts
.Sourcepkgmatch
{
356 file
.Ranking
+= querystr
.Match(&sourcePkgName
)
358 if rankingopts
.Weighted
{
359 file
.Ranking
+= 0.1460 * querystr
.Match(&file
.Path
)
360 file
.Ranking
+= 0.0008 * querystr
.Match(&sourcePkgName
)
363 // TODO: figure out how to safely clone a dcs/regexp
364 matches
:= grep
.File(path
.Join(*unpackedPath
, file
.Path
))
365 for _
, match
:= range matches
{
366 match
.Ranking
= ranking
.PostRank(rankingopts
, &match
, &querystr
)
367 match
.PathRank
= file
.Ranking
368 //match.Path = match.Path[len(*unpackedPath):]
369 // NB: populating match.Ranking happens in
370 // cmd/dcs-web/querymanager because it depends on at least
373 // TODO: ideally, we’d get proto.Match structs from grep.File(), let’s do that after profiling the decoding performance
375 path
:= match
.Path
[len(*unpackedPath
):]
377 if err
:= stream
.Send(&proto
.SearchReply
{
378 Type
: proto
.SearchReply_MATCH
,
381 Line
: uint32(match
.Line
),
382 Package
: path
[:strings
.Index(path
, "/")],
385 Context
: match
.Context
,
388 Pathrank
: match
.PathRank
,
389 Ranking
: match
.Ranking
,
393 log
.Printf("%s %v\n", logprefix
, err
)
394 // Drain the work channel, but without doing any work.
395 // This effectively exits the worker goroutine(s)
413 log
.Printf("%s Sent all results.\n", logprefix
)
418 log
.SetFlags(log
.LstdFlags | log
.Lshortfile
)
421 cfg
:= jaegercfg
.Configuration
{
422 Sampler
: &jaegercfg
.SamplerConfig
{
426 Reporter
: &jaegercfg
.ReporterConfig
{
427 BufferFlushInterval
: 1 * time
.Second
,
428 LocalAgentHostPort
: *jaegerAgent
,
431 closer
, err
:= cfg
.InitGlobalTracer(
432 "dcs-source-backend",
433 jaegercfg
.Logger(jaeger
.StdLogger
),
440 rand
.Seed(time
.Now().UnixNano())
441 if !strings
.HasSuffix(*unpackedPath
, "/") {
442 *unpackedPath
= *unpackedPath
+ "/"
444 fmt
.Println("Debian Code Search source-backend")
446 if err
:= ranking
.ReadRankingData(*rankingDataPath
); err
!= nil {
450 conn
, err
:= grpcutil
.DialTLS("localhost:28081", *tlsCertPath
, *tlsKeyPath
)
452 log
.Fatalf("could not connect to %q: %v", "localhost:28081", err
)
455 indexBackend
= proto
.NewIndexBackendClient(conn
)
457 http
.Handle("/metrics", prometheus
.Handler())
458 log
.Fatal(grpcutil
.ListenAndServeTLS(*listenAddress
,
461 func(s
*grpc
.Server
) {
462 proto
.RegisterSourceBackendServer(s
, &server
{})