Concurrency seems to work well; cleaned up seq a bit
[bills-tools.git] / seq / seq.go
blob4a0429c8626854070d56a8b0002e168d44f38053
1 // Copyright 2010 Bill Burdick. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 //
5 package seq
7 import "fmt"
8 import "io"
9 import "os"
10 import "container/vector"
12 type El interface{}
13 type Seq interface {
14 // core methods
15 Find(f func(i El)bool) El
16 Rest() Seq
17 Len() int
18 // wrappers that keep the current type
19 Append(s2 Seq) Seq
20 Prepend(s2 Seq) Seq
21 Filter(filter func(e El) bool) Seq
22 Map(f func(i El) El) Seq
23 FlatMap(f func(i El) Seq) Seq
26 //convert a sequence to a concurrent sequence
27 func Concurrent(s Seq) ConcurrentSeq {
28 switch seq := s.(type) {case ConcurrentSeq: return seq}
29 return Gen(func(c SeqChan){Output(s, c)})
32 //convert a sequence to a sequential sequence
33 func Sequential(s Seq) *SequentialSeq {
34 switch seq := s.(type) {case *SequentialSeq: return seq}
35 return SMap(s, func(el El)El{return el})
38 func FirstN(s Seq, n int) []interface{} {
39 r := make([]interface{}, n)
40 x := 0
41 Find(s, func(el El)bool{
42 r[x] = el
43 x++
44 return x == n
46 return r
49 func First2(s Seq) (a, b interface{}) {
50 r := FirstN(s, 2)
51 return r[0], r[1]
54 func First3(s Seq) (a, b, c interface{}) {
55 r := FirstN(s, 3)
56 return r[0], r[1], r[2]
59 func First4(s Seq) (a, b, c, d interface{}) {
60 r := FirstN(s, 4)
61 return r[0], r[1], r[2], r[3]
64 func First5(s Seq) (a, b, c, d, e interface{}) {
65 r := FirstN(s, 5)
66 return r[0], r[1], r[2], r[3], r[4]
69 func First6(s Seq) (a, b, c, d, e, f interface{}) {
70 r := FirstN(s, 6)
71 return r[0], r[1], r[2], r[3], r[4], r[5]
74 func IsSeq(s interface{}) bool {
75 _, test := s.(Seq)
76 return test
79 func First(s Seq) interface{} {
80 var result interface{}
81 s.Find(func(el El)bool{
82 result = el
83 return true
85 return result
88 func IsEmpty(s Seq) bool {
89 empty := true
90 s.Find(func(el El)bool{
91 empty = false
92 return true
94 return empty
97 func Find(s Seq, f func(el El) bool) El {return s.Find(f)}
99 func While(s Seq, f func(el El) bool) {s.Find(func(el El)bool{return !f(el)})}
101 func Do(s Seq, f func(el El)) {
102 s.Find(func(el El)bool{
103 f(el)
104 return false
108 // CDo -- do f concurrently on each element of s, in any order
109 func CDo(s Seq, f func(el El)) {
110 c := CMap(s, func(el El)El{f(el); return nil})()
111 for <- c; !closed(c); <- c {}
114 func Len(s Seq) int {return s.Len()}
116 func Output(s Seq, c SeqChan) {
117 Do(s, func(el El){c <- el})
120 func Rest(s Seq) Seq {return s.Rest()}
122 func Append(s1 Seq, s2 Seq) Seq {return s1.Append(s2)}
124 func AppendToVector(s Seq, vec *vector.Vector) {
125 switch arg := s.(type) {
126 case *SequentialSeq: vec.AppendVector((*vector.Vector)(arg))
127 default: Do(s, func(el El){vec.Push(el)})
131 func SAppend(s Seq, s2 Seq) *SequentialSeq {
132 vec := make(vector.Vector, 0, quickLen(s, 8) + quickLen(s2, 8))
133 AppendToVector(s, &vec)
134 AppendToVector(s2, &vec)
135 return (*SequentialSeq)(&vec)
138 func CAppend(s Seq, s2 Seq) ConcurrentSeq {
139 return Gen(func(c SeqChan){
140 Output(s, c)
141 Output(s2, c)
145 func Prepend(s1 Seq, s2 Seq) Seq {return s1.Prepend(s2)}
147 func quickLen(s Seq, d int) int {
148 switch seq := s.(type) {case *SequentialSeq: return s.Len()}
149 return d
152 func Filter(s Seq, filter func(e El)bool) Seq {return s.Filter(filter)}
154 func ifFunc(condition func(e El)bool, op func(e El)) func(el El){return func(el El){if condition(el) {op(el)}}}
156 func SFilter(s Seq, filter func(e El)bool) *SequentialSeq {
157 //continue shrinking
158 vec := make(vector.Vector, 0, quickLen(s, 8))
159 Do(s, ifFunc(filter, func(el El){vec.Push(el)}))
160 return (*SequentialSeq)(&vec)
163 func CFilter(s Seq, filter func(e El)bool) ConcurrentSeq {
164 return Gen(func(c SeqChan){
165 Do(s, ifFunc(filter, func(el El){c <- el}))
169 func Map(s Seq, f func(el El) El) Seq {return s.Map(f)}
171 func SMap(s Seq, f func(i El)El) *SequentialSeq {
172 vec := make(vector.Vector, 0, quickLen(s, 8))
173 Do(s, func(el El){vec.Push(f(el))})
174 return (*SequentialSeq)(&vec)
177 type reply struct {
178 index int;
179 result El
182 type swEntry struct {
183 value El
184 present bool
187 // SlidingWindow is a vector with limited capacity (power of 2) and a base
188 type SlidingWindow struct {
189 start, base, count, mask int
190 values []swEntry
192 // NewSlidingWindow creates a new SlidingWindow with capacity size
193 func NewSlidingWindow(sz uint) *SlidingWindow {return &SlidingWindow{0, 0, 0, (1 << sz) - 1, make([]swEntry, 1 << sz)}}
194 func (r *SlidingWindow) Max() int {return r.base + r.Capacity()}
195 func (r *SlidingWindow) Capacity() int {return len(r.values)}
196 func (r *SlidingWindow) normalize(index int) int {return (index + r.Capacity()) & r.mask}
197 func (r *SlidingWindow) IsEmpty() bool {return r.count == 0}
198 func (r *SlidingWindow) IsFull() bool {return r.count == r.Capacity()}
199 func (r *SlidingWindow) GetFirst() (interface{}, bool) {return r.values[r.start].value, r.values[r.start].present}
200 func (r *SlidingWindow) RemoveFirst() (interface{}, bool) {
201 result := r.values[r.start]
202 if !result.present {return nil, false}
203 r.values[r.start] = swEntry{nil, false}
204 r.count--
205 r.start = r.normalize(r.start + 1)
206 r.base++
207 return result.value, true
209 func (r *SlidingWindow) Get(index int) (interface{}, bool) {
210 index -= r.base
211 if index < 0 || index >= r.Capacity() {return nil, false}
212 index = r.normalize(index + r.start)
213 value := r.values[index]
214 return value.value, value.present
216 func (r *SlidingWindow) Set(index int, value interface{}) bool {
217 index -= r.base
218 if index < 0 || index >= r.Capacity() {return false}
219 index = r.normalize(index + r.start)
220 r.values[index].value = value
221 if !r.values[index].present {
222 r.values[index].present = true
223 r.count++
225 return true
228 // spawn a goroutine that does the following for each value, with up to size pending at a time:
229 // spawn a goroutine to apply f to the value and send the result back in a channel
230 // send the results in order to the ouput channel as they are completed
231 func CMap(s Seq, f func(el El) El, sizePowerOpt... uint) ConcurrentSeq {
232 sizePower := uint(6)
233 if len(sizePowerOpt) > 0 {sizePower = sizePowerOpt[0]}
234 size := 1 << sizePower
235 return Gen(func(output SeqChan){
236 //punt and convert sequence to concurrent
237 //maybe someday we'll handle SequentialSequences separately
238 input := Concurrent(s)()
239 window := NewSlidingWindow(sizePower)
240 replyChannel := make(chan reply)
241 inputCount, pendingInput, pendingOutput := 0, 0, 0
242 inputClosed := false
243 defer close(replyChannel)
244 for !inputClosed || pendingInput > 0 || pendingOutput > 0 {
245 first, hasFirst := window.GetFirst()
246 ic, oc, rc := input, output, replyChannel
247 if !hasFirst {oc = nil}
248 if inputClosed || pendingInput >= size {ic = nil}
249 if pendingOutput >= size {rc = nil}
250 select {
251 case oc <- first:
252 window.RemoveFirst()
253 pendingOutput--
254 case inputElement := <- ic:
255 if closed(ic) {
256 inputClosed = true
257 } else {
258 go func(index int, value interface{}) {
259 SpawnCount++
260 replyChannel <- reply{index, f(value)}
261 }(inputCount, inputElement)
262 inputCount++
263 pendingInput++
265 case replyElement := <- rc:
266 window.Set(replyElement.index, replyElement.result)
267 pendingInput--
268 pendingOutput++
274 func FlatMap(s Seq, f func(el El) Seq) Seq {return s.FlatMap(f)}
276 func SFlatMap(s Seq, f func(i El) Seq) *SequentialSeq {
277 vec := make(vector.Vector, 0, quickLen(s, 8))
278 Do(s, func(e El){Do(f(e).(Seq), func(sub El){vec.Push(sub)})})
279 return (*SequentialSeq)(&vec)
282 func CFlatMap(s Seq, f func(i El) Seq, sizeOpt... uint) ConcurrentSeq {
283 return Gen(func(c SeqChan){
284 Do(CMap(s, func(e El)El{return f(e)}, sizeOpt...), func(sub El){
285 Output(sub.(Seq), c)
290 func Fold(s Seq, init interface{}, f func(acc, el El)El) interface{} {
291 Do(s, func(el El){init = f(init, el)})
292 return init
295 //maybe convert this to use an accumulator instead of append?
296 func Combinations(s Seq, number int) Seq {
297 if number == 0 || IsEmpty(s) {return From(From())}
298 return Combinations(s.Rest(), number).Prepend(Combinations(s.Rest(), number - 1).Map(func(el El)El{
299 return el.(Seq).Prepend(From(First(s)))
303 //var Names map[interface{}]string
305 //returns the product of the Seqs contained in sequences
306 func Product(sequences Seq) Seq {
307 return Fold(sequences, From(From()), func(result, each El)El{
308 return result.(Seq).FlatMap(func(seq El)Seq{
309 return each.(Seq).Map(func(i El) El {
310 return seq.(Seq).Append(From(i))
313 }).(Seq)
316 func Prettyln(s interface{}, rest... interface{}) {
317 writer := Pretty(s, rest...)
318 fmt.Fprintln(writer)
320 func Pretty(s interface{}, args... interface{}) io.Writer {
321 var writer io.Writer = os.Stdout
322 var names map[interface{}]string
323 for i := 0; i < len(args); i++ {
324 switch arg := args[i].(type) {
325 case map[interface{}]string: names = arg
326 case io.Writer: writer = arg
329 if names == nil {names = map[interface{}]string{}}
330 prettyLevel(s, 0, names, writer)
331 return writer
334 //This pretty is ugly :)
335 func prettyLevel(s interface{}, level int, names map[interface{}]string, w io.Writer) {
336 name, hasName := names[s]
337 if hasName {
338 fmt.Fprint(w, name)
339 } else switch arg := s.(type) {
340 case Seq:
341 fmt.Fprintf(w, "%*s%s", level, "", "[")
342 first := true
343 innerSeq := false
344 named := false
345 Do(arg, func(v El) {
346 _, named = names[v]
347 _,innerSeq = v.(Seq)
348 if first {
349 first = false
350 if !named && innerSeq {
351 fmt.Fprintln(w)
353 } else if !named && innerSeq {
354 fmt.Fprintln(w, ",")
355 } else {
356 fmt.Fprint(w, ", ")
358 if innerSeq {
359 prettyLevel(v.(Seq), level + 4, names, w)
360 } else {
361 fmt.Fprintf(w, "%v", v)
364 if innerSeq {
365 if !named {
366 fmt.Fprintf(w, "\n%*s", level, "")
369 fmt.Fprintf(w, "]")
370 default:
371 fmt.Print(arg)
375 //ConcurrentSeq
377 type SeqChan chan interface{}
379 type ConcurrentSeq func()SeqChan
381 // f must behave properly when the channel is closed, so that IsEmpty and First work properly
382 func Gen(f func(c SeqChan)) ConcurrentSeq {
383 return func() SeqChan {
384 c := make(SeqChan)
385 go func() {
386 defer close(c)
387 f(c)
389 return c
393 func CUpto(limit int) ConcurrentSeq {
394 return Gen(func(c SeqChan) {
395 for i := 0; i < limit; i++ {
396 c <- i
401 func (s ConcurrentSeq) Find(f func(el El)bool) El {
402 c := s()
403 defer close(c)
404 for el := <- c; !closed(c) ; el = <- c {
405 if f(el) {return el}
407 return nil
410 func (s ConcurrentSeq) Rest() Seq {
411 return ConcurrentSeq(func()SeqChan{
412 c := s()
413 <- c
414 return c
418 func (s ConcurrentSeq) Len() int {
419 len := 0
420 Do(s, func(el El){
421 len++
423 return len
426 func (s ConcurrentSeq) Append(s2 Seq) Seq {return CAppend(s, s2)}
428 func (s ConcurrentSeq) Prepend(s2 Seq) Seq {return CAppend(s2, s)}
430 func (s ConcurrentSeq) Filter(f func(e El)bool) Seq {return CFilter(s, f)}
432 func (s ConcurrentSeq) Map(f func(i El)El) Seq {return CMap(s, f)}
434 func (s ConcurrentSeq) FlatMap(f func(i El) Seq) Seq {return CFlatMap(s, f)}
436 // recursively convert nested concurrent sequences to sequential
437 // does not descend into nested sequential sequences
438 func (s ConcurrentSeq) ToSequentialSeq() *SequentialSeq {
439 return SMap(s, func(el El)El{
440 switch seq := el.(type) {case ConcurrentSeq: return seq.ToSequentialSeq()}
441 return el
446 // SequentialSeq
448 type SequentialSeq []interface{}
450 func From(els... interface{}) *SequentialSeq {return (*SequentialSeq)(&els)}
452 func AUpto(limit int) *SequentialSeq {
453 a := make([]interface{}, limit)
454 for i := 0; i < limit; i++ {
455 a[i] = i
457 return (*SequentialSeq)(&a)
460 func (s *SequentialSeq) Find(f func(el El)bool) El {
461 for i := 0; i < len(*s); i++ {
462 if f((*s)[i]) {return (*s)[i]}
464 return nil
467 func (s *SequentialSeq) Rest() Seq {
468 s2 := (*s)[1:]
469 return (*SequentialSeq)(&s2)
472 func (s *SequentialSeq) Len() int {return len(*s)}
474 func (s *SequentialSeq) Append(s2 Seq) Seq {return SAppend(s, s2)}
476 func (s *SequentialSeq) Prepend(s2 Seq) Seq {return SAppend(s2, s)}
478 func (s *SequentialSeq) Filter(f func(e El)bool) Seq {return SFilter(s, f)}
480 func (s *SequentialSeq) Map(f func(i El)El) Seq {return SMap(s, f)}
482 func (s *SequentialSeq) FlatMap(f func(i El) Seq) Seq {return SFlatMap(s, f)}