changed CMap to use a goroutine for each func
[bills-tools.git] / seq / seq.go
blobe5dbd53311314bc4b2761bbe23986135e3ab87a3
1 // Copyright 2010 Bill Burdick. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 //
5 package seq
7 import "fmt"
8 import "io"
9 import "os"
10 import "container/vector"
12 type El interface{}
13 type Seq interface {
14 // core methods
15 While(f func(i El)bool)
16 Rest() Seq
17 Len() int
18 // wrappers that keep the current type
19 Append(s2 Seq) Seq
20 Prepend(s2 Seq) Seq
21 Filter(filter func(e El) bool) Seq
22 Map(f func(i El) El) Seq
23 FlatMap(f func(i El) Seq) Seq
26 //convert a sequence to a concurrent sequence
27 func Concurrent(s Seq) ConcurrentSeq {
28 switch seq := s.(type) {case ConcurrentSeq: return seq}
29 return Gen(func(c SeqChan){Output(s, c)})
32 //convert a sequence to a sequential sequence
33 func Sequential(s Seq) *SequentialSeq {
34 switch seq := s.(type) {case *SequentialSeq: return seq}
35 vec := make(vector.Vector, 0, 8)
36 Do(s, func(v El){vec.Push(v)})
37 return (*SequentialSeq)(&vec)
40 func FirstN(s Seq, n int) []interface{} {
41 r := make([]interface{}, n)
42 x := 0
43 While(s, func(el El)bool{
44 r[x] = el
45 x++
46 return x < n
48 return r
51 func First2(s Seq) (a, b interface{}) {
52 r := FirstN(s, 2)
53 return r[0], r[1]
56 func First3(s Seq) (a, b, c interface{}) {
57 r := FirstN(s, 3)
58 return r[0], r[1], r[2]
61 func First4(s Seq) (a, b, c, d interface{}) {
62 r := FirstN(s, 4)
63 return r[0], r[1], r[2], r[3]
66 func First5(s Seq) (a, b, c, d, e interface{}) {
67 r := FirstN(s, 5)
68 return r[0], r[1], r[2], r[3], r[4]
71 func First6(s Seq) (a, b, c, d, e, f interface{}) {
72 r := FirstN(s, 6)
73 return r[0], r[1], r[2], r[3], r[4], r[5]
76 func IsSeq(s interface{}) bool {
77 _, test := s.(Seq)
78 return test
81 func First(s Seq) interface{} {
82 var result interface{}
83 s.While(func(el El)bool{
84 result = el
85 return false
87 return result
90 func IsEmpty(s Seq) bool {
91 empty := true
92 s.While(func(el El)bool{
93 empty = false
94 return false
96 return empty
99 func While(s Seq, f func(el El) bool) {s.While(f)}
101 func Do(s Seq, f func(el El)) {
102 s.While(func(el El)bool{
103 f(el)
104 return true
108 func Len(s Seq) int {return s.Len()}
110 func Output(s Seq, c SeqChan) {
111 Do(s, func(el El){
112 c <- el
116 func Rest(s Seq) Seq {return s.Rest()}
118 func Append(s1 Seq, s2 Seq) Seq {return s1.Append(s2)}
120 func AppendToVector(s Seq, vec *vector.Vector) {
121 switch arg := s.(type) {
122 case *SequentialSeq: vec.AppendVector((*vector.Vector)(arg))
123 default: Do(s, func(el El){vec.Push(el)})
127 func SAppend(s Seq, s2 Seq) Seq {
128 vec := make(vector.Vector, 0, quickLen(s, 8) + quickLen(s2, 8))
129 AppendToVector(s, &vec)
130 AppendToVector(s2, &vec)
131 //print("SAppend ");Prettyln(s);print(" + ");Prettyln(s2);println(" = ");Prettyln((*SequentialSeq)(&vec))
132 return (*SequentialSeq)(&vec)
135 func CAppend(s Seq, s2 Seq) Seq {
136 return Gen(func(c SeqChan){
137 Output(s, c)
138 Output(s2, c)
142 func Prepend(s1 Seq, s2 Seq) Seq {return s1.Prepend(s2)}
144 func quickLen(s Seq, d int) int {
145 switch seq := s.(type) {case *SequentialSeq: return s.Len()}
146 return d
149 func Filter(s Seq, filter func(e El)bool) Seq {return s.Filter(filter)}
151 func SFilter(s Seq, filter func(e El)bool) Seq {
152 //continue shrinking
153 vec := make(vector.Vector, 0, quickLen(s, 8))
154 Do(s, func(el El){
155 if filter(el) {vec.Push(el)}
157 return (*SequentialSeq)(&vec)
160 func CFilter(s Seq, filter func(e El)bool) Seq {
161 return Gen(func(c SeqChan){
162 Do(s, func(el El){
163 if filter(el) {c <- el}
168 func Map(s Seq, f func(el El) El) Seq {return s.Map(f)}
170 func SMap(s Seq, f func(i El)El) Seq {
171 vec := make(vector.Vector, 0, quickLen(s, 8))
172 Do(s, func(el El){vec.Push(f(el))})
173 return (*SequentialSeq)(&vec)
176 type reply struct {
177 index int;
178 result El
181 type swEntry struct {
182 present boolean
183 value El
186 // SlidingWindow is a vector with limited capacity and a base
187 type SlidingWindow {
188 start, base, count int
189 values []swEntry
191 // NewSlidingWindow creates a new SlidingWindow with capacity size
192 func NewSlidingWindow(size int) *SlidingWindow {return &SlidingWindow{0, 0, 0, make([]swEntry, size)}}
193 func (r *SlidingWindow) Max() int {return r.base + r.Capacity()}
194 func (r *SlidingWindow) Capacity() int {return len(r.values)}
195 func (r *SlidingWindow) normalize(index int) int {return (index + r.Capacity()) % r.Capacity()}
196 func (r *SlidingWindow) IsEmpty() bool {return r.count == 0}
197 func (r *SlidingWindow) IsFull() bool {return r.count == r.Capacity()}
198 func (r *SlidingWindow) GetFirst() (interface{}, bool) {return r.values[r.start].value, r.values[r.start].present}
199 func (r *SlidingWindow) RemoveFirst() (interface{}, boolean) {
200 if count == 0 {return nil, false}
201 result := r.values[r.start]
202 r.values[r.start] = swEntry{false, nil}
203 if result.present {r.count--}
204 r.start = normalize(r.start++)
205 r.base++
206 return result.value, result.present
208 func (r *SlidingWindow) RemoveLast() (interface{}, boolean) {
209 if count == 0 {return nil, false}
210 end := r.normalize(r.start + r.Capacity() - 1)
211 result := r.values[end]
212 r.values[end] = swEntry{false, nil}
213 if result.present {r.count--}
214 if r.start > 0 {
215 r.start = normalize(r.start--)
216 r.base--
218 return result.value, result.present
220 func (r *SlidingWindow) Get(index int) (value interface{}, boolean) {
221 index = r.normalize(r.index - r.base + r.start)
222 if index < 0 || index >= r.Capacity() {return nil, false}
223 value = r.values[index]
224 return value.value, value.present
226 func (r *SlidingWindow) Set(index int, value interface{}) boolean {
227 index = r.normalize(r.index - r.base + r.start)
228 if index < 0 || index >= r.Capacity() {return false}
229 r.values[index].value = value
230 if !r.values[index].present {
231 r.values[index].present = true
232 r.count++
234 return true
237 // spawn a goroutine that does the following for each value, with up to size pending at a time:
238 // spawn a goroutine to apply f to the value and send the result back in a channel
239 // send the results in order to the ouput channel as they are completed
240 func CMap(s Seq, f func(el El) El, sizeOpt... int) Seq {
241 size := 64
242 if len(sizeOpt) > 0 {size = sizeOpt[0]}
243 return Gen(func(output SeqChan){
244 //punt and convert sequence to concurrent
245 //maybe someday we'll handle SequentialSequences separately
246 input := Concurrent(s)()
247 go func(){
248 window := NewSlidingWindow(size)
249 replyChannel := make(chan reply)
250 inputCount := 0
251 pendingInput := 0
252 pendingOutput := 0
253 inputClosed := false
254 for {
255 first, hasFirst := window.GetFirst()
256 oc := output
257 ic := input
258 rc :- replyChannel
259 if !hasFirst {oc = nil}
260 if inputClosed || pendingInput >= size {ic = nil}
261 if pendingOutput >= size {rc = nil}
262 select {
263 case oc <- first:
264 window.RemoveFirst()
265 pendingOutput--
266 case inputMsg := <- ic:
267 if closed(ic) {
268 inputClosed = true
269 } else {
270 go func(index int, value interface{}) {
271 replyChannel <- reply{index, f(value)}
272 }(inputCount, v)
273 inputCount++
274 pendingInput++
276 case replyMsg := <- rc:
277 window.addLast(replyMsg)
278 pendingInput--
279 pendingOutput++
286 func FlatMap(s Seq, f func(el El) Seq) Seq {return s.FlatMap(f)}
288 func SFlatMap(s Seq, f func(i El) Seq) Seq {
289 vec := make(vector.Vector, 0, quickLen(s, 8))
290 Do(SMap(s, f), func(sub El){vec.Push(sub)})
291 return (*SequentialSeq)(&vec)
294 func CFlatMap(s Seq, f func(i El) Seq, sizeOpt... int) Seq {
295 return Gen(func(c SeqChan){
296 Do(CMap(s, f, sizeOpt), func(sub El){c <- sub})
300 func Fold(s Seq, init interface{}, f func(acc, el El)El) interface{} {
301 Do(s, func(el El){init = f(init, el)})
302 return init
305 //maybe convert this to use an accumulator instead of append?
306 func Combinations(s Seq, number int) Seq {
307 if number == 0 || IsEmpty(s) {return From(From())}
308 return Combinations(s.Rest(), number).Prepend(Combinations(s.Rest(), number - 1).Map(func(el El)El{
309 return el.(Seq).Prepend(From(First(s)))
313 //returns the product of the Seqs contained in sequences
314 func Product(sequences Seq) Seq {
315 return Fold(sequences, From(From()), func(result, each El)El{
316 //fmt.Print("folding: ");Pretty(each);fmt.Print(" into ");Prettyln(result)
317 return result.(Seq).FlatMap(func(seq El)Seq{
318 //fmt.Print("flat map with: ");Prettyln(seq)
319 return each.(Seq).Map(func(i El) El {
320 //fmt.Print("map with: ");Prettyln(i)
321 return seq.(Seq).Append(From(i))
324 }).(Seq)
327 func Prettyln(s interface{}, rest... interface{}) {
328 writer := Pretty(s, rest...)
329 fmt.Fprintln(writer)
331 func Pretty(s interface{}, args... interface{}) io.Writer {
332 var writer io.Writer = os.Stdout
333 var names map[interface{}]string
334 for i := 0; i < len(args); i++ {
335 switch arg := args[i].(type) {
336 case map[interface{}]string: names = arg
337 case io.Writer: writer = arg
340 if names == nil {names = map[interface{}]string{}}
341 prettyLevel(s, 0, names, writer)
342 return writer
345 //This is pretty ugly :)
346 func prettyLevel(s interface{}, level int, names map[interface{}]string, w io.Writer) {
347 name, hasName := names[s]
348 if hasName {
349 fmt.Fprint(w, name)
350 } else switch arg := s.(type) {
351 case Seq:
352 fmt.Fprintf(w, "%*s%s", level, "", "[")
353 first := true
354 innerSeq := false
355 named := false
356 Do(arg, func(v El) {
357 _, named = names[v]
358 _,innerSeq = v.(Seq)
359 if first {
360 first = false
361 if !named && innerSeq {
362 fmt.Fprintln(w)
364 } else if !named && innerSeq {
365 fmt.Fprintln(w, ",")
366 } else {
367 fmt.Fprint(w, ", ")
369 if innerSeq {
370 prettyLevel(v.(Seq), level + 4, names, w)
371 } else {
372 fmt.Fprintf(w, "%v", v)
375 if innerSeq {
376 if !named {
377 fmt.Fprintf(w, "\n%*s", level, "")
380 fmt.Fprintf(w, "]")
381 default:
382 fmt.Print(arg)
386 //ConcurrentSeq
388 type SeqChan chan interface{}
390 type ConcurrentSeq func()SeqChan
392 // f must behave properly when the channel is closed, so that IsEmpty and First work properly
393 func Gen(f func(c SeqChan)) ConcurrentSeq {
394 return func() SeqChan {
395 c := make(SeqChan)
396 go func() {
397 defer close(c)
398 f(c)
400 return c
404 func CUpto(limit int) ConcurrentSeq {
405 return Gen(func(c SeqChan) {
406 for i := 0; i < limit; i++ {
407 c <- i
412 func (s ConcurrentSeq) While(f func(el El)bool) {
413 c := s()
414 defer close(c)
415 for el := <- c; !closed(c) && f(el); el = <- c {}
418 func (s ConcurrentSeq) Rest() Seq {
419 return ConcurrentSeq(func()SeqChan{
420 c := s()
421 <- c
422 return c
426 func (s ConcurrentSeq) Len() int {
427 len := 0
428 Do(s, func(el El){
429 len++
431 return len
434 func (s ConcurrentSeq) Append(s2 Seq) Seq {return CAppend(s, s2)}
436 func (s ConcurrentSeq) Prepend(s2 Seq) Seq {return CAppend(s2, s)}
438 func (s ConcurrentSeq) Filter(f func(e El)bool) Seq {return CFilter(s, f)}
440 func (s ConcurrentSeq) Map(f func(i El)El) Seq {return CMap(s, f)}
442 func (s ConcurrentSeq) FlatMap(f func(i El) Seq) Seq {return CFlatMap(s, f)}
444 func toSequentialSeq(el interface{}) interface{} {
445 switch seq := el.(type) {
446 case ConcurrentSeq: return seq.ToSequentialSeq()
447 case []interface{}:
448 cpy := make([]interface{}, len(seq))
449 copy(cpy, seq)
450 return cpy
452 return el
455 func (s ConcurrentSeq) ToSequentialSeq() *SequentialSeq {
456 vec := make(vector.Vector, 0, 8)
457 Do(s, func(v El){vec.Push(toSequentialSeq(v))})
458 return (*SequentialSeq)(&vec)
462 // SequentialSeq
464 type SequentialSeq []interface{}
466 func From(els... interface{}) *SequentialSeq {return (*SequentialSeq)(&els)}
468 func AUpto(limit int) *SequentialSeq {
469 a := make([]interface{}, limit)
470 for i := 0; i < limit; i++ {
471 a[i] = i
473 return (*SequentialSeq)(&a)
476 func (s *SequentialSeq) While(f func(el El)bool) {
477 for i := 0; i < len(*s) && f((*s)[i]); i++ {}
480 func (s *SequentialSeq) Rest() Seq {
481 s2 := (*s)[1:]
482 return (*SequentialSeq)(&s2)
485 func (s *SequentialSeq) Len() int {return len(*s)}
487 func (s *SequentialSeq) Append(s2 Seq) Seq {return SAppend(s, s2)}
489 func (s *SequentialSeq) Prepend(s2 Seq) Seq {return SAppend(s2, s)}
491 func (s *SequentialSeq) Filter(f func(e El)bool) Seq {return SFilter(s, f)}
493 func (s *SequentialSeq) Map(f func(i El)El) Seq {return SMap(s, f)}
495 func (s *SequentialSeq) FlatMap(f func(i El) Seq) Seq {return SFlatMap(s, f)}