Turn Files into a streaming RPC, reduce maximum message size limit
[debiancodesearch.git] / cmd / dcs-index-backend / index-backend.go
blob790ce48a7cbbb2d96597e8864376411753880d8e
1 // vim:ts=4:sw=4:noexpandtab
2 package main
4 import (
5 "flag"
6 "fmt"
7 "log"
8 "net/http"
9 _ "net/http/pprof"
10 "os"
11 "path/filepath"
12 "runtime/pprof"
13 "sync"
14 "time"
16 "github.com/Debian/dcs/grpcutil"
17 "github.com/Debian/dcs/index"
18 "github.com/Debian/dcs/proto"
19 _ "github.com/Debian/dcs/varz"
20 "github.com/google/codesearch/regexp"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/uber/jaeger-client-go"
23 jaegercfg "github.com/uber/jaeger-client-go/config"
24 "golang.org/x/net/context"
25 "google.golang.org/grpc"
28 var (
29 listenAddress = flag.String("listen_address", ":28081", "listen address ([host]:port)")
30 indexPath = flag.String("index_path", "", "path to the index shard to serve, e.g. /dcs-ssd/index.0.idx")
31 cpuProfile = flag.String("cpuprofile", "", "write cpu profile to this file")
32 tlsCertPath = flag.String("tls_cert_path", "", "Path to a .pem file containing the TLS certificate.")
33 tlsKeyPath = flag.String("tls_key_path", "", "Path to a .pem file containing the TLS private key.")
34 jaegerAgent = flag.String("jaeger_agent",
35 "localhost:5775",
36 "host:port of a github.com/uber/jaeger agent")
39 type server struct {
40 id string
41 ix *index.Index
42 ixMutex sync.Mutex
45 // doPostingQuery runs the actual query. This code is in a separate function so
46 // that we can use defer (to be safe against panics in the index querying code)
47 // and still don’t hold the mutex for longer than we need to.
48 func (s *server) doPostingQuery(query *index.Query, stream proto.IndexBackend_FilesServer) error {
49 s.ixMutex.Lock()
50 defer s.ixMutex.Unlock()
51 t0 := time.Now()
52 post := s.ix.PostingQuery(query)
53 t1 := time.Now()
54 fmt.Printf("[%s] postingquery done in %v, %d results\n", s.id, t1.Sub(t0), len(post))
55 var reply proto.FilesReply
56 for _, fileid := range post {
57 reply.Path = s.ix.Name(fileid)
58 if err := stream.Send(&reply); err != nil {
59 return err
62 t2 := time.Now()
63 fmt.Printf("[%s] filenames collected in %v\n", s.id, t2.Sub(t1))
64 return nil
67 // Handles requests to /index by compiling the q= parameter into a regular
68 // expression (codesearch/regexp), searching the index for it and returning the
69 // list of matching filenames in a JSON array.
70 // TODO: This doesn’t handle file name regular expressions at all yet.
71 // TODO: errors aren’t properly signaled to the requester
72 func (s *server) Files(in *proto.FilesRequest, stream proto.IndexBackend_FilesServer) error {
73 if *cpuProfile != "" {
74 f, err := os.Create(*cpuProfile)
75 if err != nil {
76 log.Fatal(err)
78 defer f.Close()
79 pprof.StartCPUProfile(f)
80 defer pprof.StopCPUProfile()
83 re, err := regexp.Compile(in.Query)
84 if err != nil {
85 return fmt.Errorf("regexp.Compile: %s\n", err)
87 query := index.RegexpQuery(re.Syntax)
88 log.Printf("[%s] query: text = %s, regexp = %s\n", s.id, in.Query, query)
89 return s.doPostingQuery(query, stream)
92 func (s *server) ReplaceIndex(ctx context.Context, in *proto.ReplaceIndexRequest) (*proto.ReplaceIndexReply, error) {
93 newShard := in.ReplacementPath
95 file, err := os.Open(filepath.Dir(*indexPath))
96 if err != nil {
97 log.Fatal(err)
99 defer file.Close()
100 names, err := file.Readdirnames(-1)
101 if err != nil {
102 log.Fatal(err)
105 for _, name := range names {
106 if name == newShard {
107 newShard = filepath.Join(filepath.Dir(*indexPath), name)
108 // We verified the given argument refers to an index shard within
109 // this directory, so let’s load this shard.
110 oldIndex := s.ix
111 log.Printf("Trying to load %q\n", newShard)
112 s.ixMutex.Lock()
113 s.ix = index.Open(newShard)
114 s.ixMutex.Unlock()
115 // Overwrite the old full shard with the new one. This is necessary
116 // so that the state is persistent across restarts and has the nice
117 // side-effect of cleaning up the old full shard.
118 if err := os.Rename(newShard, *indexPath); err != nil {
119 log.Fatal(err)
121 oldIndex.Close()
122 return &proto.ReplaceIndexReply{}, nil
126 return nil, fmt.Errorf("No such shard.")
129 func main() {
130 log.SetFlags(log.LstdFlags | log.Lshortfile)
131 flag.Parse()
132 if *indexPath == "" {
133 log.Fatal("You need to specify a non-empty -index_path")
135 fmt.Println("Debian Code Search index-backend")
137 cfg := jaegercfg.Configuration{
138 Sampler: &jaegercfg.SamplerConfig{
139 Type: "const",
140 Param: 1,
142 Reporter: &jaegercfg.ReporterConfig{
143 BufferFlushInterval: 1 * time.Second,
144 LocalAgentHostPort: *jaegerAgent,
147 closer, err := cfg.InitGlobalTracer(
148 "dcs-index-backend",
149 jaegercfg.Logger(jaeger.StdLogger),
151 if err != nil {
152 log.Fatal(err)
154 defer closer.Close()
156 http.Handle("/metrics", prometheus.Handler())
158 log.Fatal(grpcutil.ListenAndServeTLS(*listenAddress,
159 *tlsCertPath,
160 *tlsKeyPath,
161 func(s *grpc.Server) {
162 proto.RegisterIndexBackendServer(s, &server{
163 id: filepath.Base(*indexPath),
164 ix: index.Open(*indexPath),