7 "github.com/garyburd/redigo/redis"
15 // {"Id":"File-Locate-Iterator-21","CalcNext":1,"CalcTimestamp":0,"CalcNextETA":0,"DebugRating":500}
16 type StateType
struct {
18 CalcNext
uint8 // not_scheduled(0) | scheduled(1) | in_progress(2)
24 DebugRunningStart
uint32
25 DebugRunningSetId
uint32
28 // {"distv":"File-Locate-Iterator-21","start":1397696507}
29 type JobqueueJobsXDescr
struct {
34 // {"Id":"nonsense-0.01","VoteAccepted":true,"VoteRefusedBecause":nil,...
35 type VoteType
struct {
38 VoteRefusedReason
string
41 // {"Id":"nonsense-0.01","LcPass":123,"LcFail":13,...
42 type LastCalcType
struct {
48 func voteHandler(w http
.ResponseWriter
, r
*http
.Request
, title
string) {
49 c
, err
:= redis
.Dial("tcp", ":6379")
60 l
:= new(LastCalcType
)
68 if s
.CalcNext
== 1 { // already scheduled unsure what the
69 // right thing to do is here; thinking about it hurts
70 // and costs time, let's discuss this later
71 v
.VoteAccepted
= true // but silently doing nothing
72 } else if s
.CalcNext
== 2 {
73 v
.VoteAccepted
= false
74 v
.VoteRefusedReason
= "Calculation already running, voting closed"
75 } else if l
.LcPass
== 0 && l
.LcFail
== 0 {
77 } else if l
.LcPass
== s
.Pass
&& l
.LcFail
== s
.Fail
{
78 v
.VoteAccepted
= false
79 v
.VoteRefusedReason
= "No new results available that would justify a new calc"
84 _
, err
:= redis
.Int(c
.Do("ZADD", "analysis:jobqueue:q", 10000, title
))
86 v
.VoteAccepted
= false
87 v
.VoteRefusedReason
= "-ERR"
93 b
, err
:= json
.Marshal(v
)
94 fmt
.Fprintf(w
, "%s\n", b
)
97 func stateHandler(w http
.ResponseWriter
, r
*http
.Request
, title
string) {
98 c
, err
:= redis
.Dial("tcp", ":6379")
109 set_calctimestamp(c
, s
)
112 b
, err
:= json
.Marshal(s
)
113 fmt
.Fprintf(w
, "%s\n", b
)
116 func set_calctimestamp(c redis
.Conn
, s
*StateType
) {
117 calctimestamp
:= -1 // initialize to illegal value to detect nil.
118 calctimestamp
, err
:= redis
.Int(c
.Do("ZSCORE", "analysis:distv:calctimestamp", s
.Id
))
122 if calctimestamp
> -1 {
123 s
.CalcTimestamp
= uint64(calctimestamp
)
127 func set_eta(c redis
.Conn
, s
*StateType
) {
128 eta
:= -1 // initialize to illegal value to detect nil.
129 eta
, err
:= redis
.Int(c
.Do("ZSCORE", "analysis:distv:eta", s
.Id
))
134 s
.CalcNextETA
= uint64(eta
)
138 func set_passfail(c redis
.Conn
, s
*StateType
) {
140 fail
, err
:= redis
.Int(c
.Do("ZSCORE", "analysis:distv:fail", s
.Id
))
145 s
.Fail
= uint32(fail
)
148 pass
, err
= redis
.Int(c
.Do("ZSCORE", "analysis:distv:pass", s
.Id
))
153 s
.Pass
= uint32(pass
)
157 func set_lcpassfail(c redis
.Conn
, l
*LastCalcType
) {
159 fail
, err
:= redis
.Int(c
.Do("ZSCORE", "analysis:distv:lcfail", l
.Id
))
164 l
.LcFail
= uint32(fail
)
167 pass
, err
= redis
.Int(c
.Do("ZSCORE", "analysis:distv:lcpass", l
.Id
))
172 l
.LcPass
= uint32(pass
)
176 func set_calcnext(c redis
.Conn
, s
*StateType
) {
177 rating
:= -1 // initialize to illegal value to detect nil.
178 rating
, err
:= redis
.Int(c
.Do("ZSCORE", "analysis:jobqueue:q", s
.Id
))
183 s
.DebugRating
= uint32(rating
)
189 v
, err
:= redis
.Values(c
.Do("SMEMBERS", "analysis:jobqueue:runningjobs"))
196 v
, err
= redis
.Scan(v
, &id
)
200 // my($json) = $redis->get("analysis:jobqueue:jobs:$id\:descr");
202 key
:= fmt
.Sprintf("analysis:jobqueue:jobs:%d:descr", id
)
203 jsonstr
, err
= redis
.String(c
.Do("GET", key
))
205 fmt
.Println("missing key: ", key
)
208 // my($j) = $jsonxs->decode($json);
210 var j JobqueueJobsXDescr
211 err
= json
.Unmarshal(b
, &j
)
215 // $h{distv} = $j->{distv};
217 if thisdistv
== s
.Id
{
226 s
.DebugRunningStart
= tstart
227 s
.DebugRunningSetId
= tid
232 var validPath
= regexp
.MustCompile("^/scheduler/(state|vote)/([a-zA-Z0-9_.-]+)$")
234 func makeHandler(fn
func(http
.ResponseWriter
, *http
.Request
, string)) http
.HandlerFunc
{
235 return func(w http
.ResponseWriter
, r
*http
.Request
) {
236 w
.Header().Set("Content-Type", "application/json; charset=utf-8")
237 m
:= validPath
.FindStringSubmatch(r
.URL
.Path
)
247 addr
= flag
.Bool("addr", false, "find open address and print to final-port.txt")
252 http
.HandleFunc("/scheduler/state/", makeHandler(stateHandler
))
253 http
.HandleFunc("/scheduler/vote/", makeHandler(voteHandler
))
256 l
, err
:= net
.Listen("tcp", "127.0.0.1:0")
260 err
= ioutil
.WriteFile("final-port.txt", []byte(l
.Addr().String()), 0644)
269 http
.ListenAndServe(":3001", nil)