1 // Copyright 2010 Bill Burdick. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
10 import "container/vector"
15 Find(f
func(i El
)bool) El
18 // wrappers that keep the current type
21 Filter(filter
func(e El
) bool) Seq
22 Map(f
func(i El
) El
) Seq
23 FlatMap(f
func(i El
) Seq
) Seq
26 //convert a sequence to a concurrent sequence
27 func Concurrent(s Seq
) ConcurrentSeq
{
28 switch seq
:= s
.(type) {case ConcurrentSeq
: return seq
}
29 return Gen(func(c SeqChan
){Output(s
, c
)})
32 //convert a sequence to a sequential sequence
33 func Sequential(s Seq
) *SequentialSeq
{
34 switch seq
:= s
.(type) {case *SequentialSeq
: return seq
}
35 return SMap(s
, func(el El
)El
{return el
})
38 func FirstN(s Seq
, n
int) []interface{} {
39 r
:= make([]interface{}, n
)
41 Find(s
, func(el El
)bool{
49 func First2(s Seq
) (a
, b
interface{}) {
54 func First3(s Seq
) (a
, b
, c
interface{}) {
56 return r
[0], r
[1], r
[2]
59 func First4(s Seq
) (a
, b
, c
, d
interface{}) {
61 return r
[0], r
[1], r
[2], r
[3]
64 func First5(s Seq
) (a
, b
, c
, d
, e
interface{}) {
66 return r
[0], r
[1], r
[2], r
[3], r
[4]
69 func First6(s Seq
) (a
, b
, c
, d
, e
, f
interface{}) {
71 return r
[0], r
[1], r
[2], r
[3], r
[4], r
[5]
74 func IsSeq(s
interface{}) bool {
79 func First(s Seq
) interface{} {
80 var result
interface{}
81 s
.Find(func(el El
)bool{
88 func IsEmpty(s Seq
) bool {
90 s
.Find(func(el El
)bool{
97 func Find(s Seq
, f
func(el El
) bool) El
{return s
.Find(f
)}
99 func While(s Seq
, f
func(el El
) bool) {s
.Find(func(el El
)bool{return !f(el
)})}
101 func Do(s Seq
, f
func(el El
)) {
102 s
.Find(func(el El
)bool{
108 // CDo -- do f concurrently on each element of s, in any order
109 func CDo(s Seq
, f
func(el El
)) {
110 c
:= CMap(s
, func(el El
)El
{f(el
); return nil})()
111 for <- c
; !closed(c
); <- c
{}
114 func Len(s Seq
) int {return s
.Len()}
116 func Output(s Seq
, c SeqChan
) {
117 Do(s
, func(el El
){c
<- el
})
120 func Rest(s Seq
) Seq
{return s
.Rest()}
122 func Append(s1 Seq
, s2 Seq
) Seq
{return s1
.Append(s2
)}
124 func AppendToVector(s Seq
, vec
*vector
.Vector
) {
125 switch arg
:= s
.(type) {
126 case *SequentialSeq
: vec
.AppendVector((*vector
.Vector
)(arg
))
127 default: Do(s
, func(el El
){vec
.Push(el
)})
131 func SAppend(s Seq
, s2 Seq
) *SequentialSeq
{
132 vec
:= make(vector
.Vector
, 0, quickLen(s
, 8) + quickLen(s2
, 8))
133 AppendToVector(s
, &vec
)
134 AppendToVector(s2
, &vec
)
135 return (*SequentialSeq
)(&vec
)
138 func CAppend(s Seq
, s2 Seq
) ConcurrentSeq
{
139 return Gen(func(c SeqChan
){
145 func Prepend(s1 Seq
, s2 Seq
) Seq
{return s1
.Prepend(s2
)}
147 func quickLen(s Seq
, d
int) int {
148 switch seq
:= s
.(type) {case *SequentialSeq
: return s
.Len()}
152 func Filter(s Seq
, filter
func(e El
)bool) Seq
{return s
.Filter(filter
)}
154 func ifFunc(condition
func(e El
)bool, op
func(e El
)) func(el El
){return func(el El
){if condition(el
) {op(el
)}}}
156 func SFilter(s Seq
, filter
func(e El
)bool) *SequentialSeq
{
158 vec
:= make(vector
.Vector
, 0, quickLen(s
, 8))
159 Do(s
, ifFunc(filter
, func(el El
){vec
.Push(el
)}))
160 return (*SequentialSeq
)(&vec
)
163 func CFilter(s Seq
, filter
func(e El
)bool) ConcurrentSeq
{
164 return Gen(func(c SeqChan
){
165 Do(s
, ifFunc(filter
, func(el El
){c
<- el
}))
169 func Map(s Seq
, f
func(el El
) El
) Seq
{return s
.Map(f
)}
171 func SMap(s Seq
, f
func(i El
)El
) *SequentialSeq
{
172 vec
:= make(vector
.Vector
, 0, quickLen(s
, 8))
173 Do(s
, func(el El
){vec
.Push(f(el
))})
174 return (*SequentialSeq
)(&vec
)
182 type swEntry
struct {
187 // SlidingWindow is a vector with limited capacity (power of 2) and a base
188 type SlidingWindow
struct {
189 start
, base
, count
, mask
int
192 // NewSlidingWindow creates a new SlidingWindow with capacity size
193 func NewSlidingWindow(sz
uint) *SlidingWindow
{return &SlidingWindow
{0, 0, 0, (1 << sz
) - 1, make([]swEntry
, 1 << sz
)}}
194 func (r
*SlidingWindow
) Max() int {return r
.base
+ r
.Capacity()}
195 func (r
*SlidingWindow
) Capacity() int {return len(r
.values
)}
196 func (r
*SlidingWindow
) normalize(index
int) int {return (index
+ r
.Capacity()) & r
.mask
}
197 func (r
*SlidingWindow
) IsEmpty() bool {return r
.count
== 0}
198 func (r
*SlidingWindow
) IsFull() bool {return r
.count
== r
.Capacity()}
199 func (r
*SlidingWindow
) GetFirst() (interface{}, bool) {return r
.values
[r
.start
].value
, r
.values
[r
.start
].present
}
200 func (r
*SlidingWindow
) RemoveFirst() (interface{}, bool) {
201 result
:= r
.values
[r
.start
]
202 if !result
.present
{return nil, false}
203 r
.values
[r
.start
] = swEntry
{nil, false}
205 r
.start
= r
.normalize(r
.start
+ 1)
207 return result
.value
, true
209 func (r
*SlidingWindow
) Get(index
int) (interface{}, bool) {
211 if index
< 0 || index
>= r
.Capacity() {return nil, false}
212 index
= r
.normalize(index
+ r
.start
)
213 value
:= r
.values
[index
]
214 return value
.value
, value
.present
216 func (r
*SlidingWindow
) Set(index
int, value
interface{}) bool {
218 if index
< 0 || index
>= r
.Capacity() {return false}
219 index
= r
.normalize(index
+ r
.start
)
220 r
.values
[index
].value
= value
221 if !r
.values
[index
].present
{
222 r
.values
[index
].present
= true
228 // spawn a goroutine that does the following for each value, with up to size pending at a time:
229 // spawn a goroutine to apply f to the value and send the result back in a channel
230 // send the results in order to the ouput channel as they are completed
231 func CMap(s Seq
, f
func(el El
) El
, sizePowerOpt
... uint) ConcurrentSeq
{
233 if len(sizePowerOpt
) > 0 {sizePower
= sizePowerOpt
[0]}
234 size
:= 1 << sizePower
235 return Gen(func(output SeqChan
){
236 //punt and convert sequence to concurrent
237 //maybe someday we'll handle SequentialSequences separately
238 input
:= Concurrent(s
)()
239 window
:= NewSlidingWindow(sizePower
)
240 replyChannel
:= make(chan reply
)
241 inputCount
, pendingInput
, pendingOutput
:= 0, 0, 0
243 defer close(replyChannel
)
244 for !inputClosed || pendingInput
> 0 || pendingOutput
> 0 {
245 first
, hasFirst
:= window
.GetFirst()
246 ic
, oc
, rc
:= input
, output
, replyChannel
247 if !hasFirst
{oc
= nil}
248 if inputClosed || pendingInput
>= size
{ic
= nil}
249 if pendingOutput
>= size
{rc
= nil}
254 case inputElement
:= <- ic
:
258 go func(index
int, value
interface{}) {
260 replyChannel
<- reply
{index
, f(value
)}
261 }(inputCount
, inputElement
)
265 case replyElement
:= <- rc
:
266 window
.Set(replyElement
.index
, replyElement
.result
)
274 func FlatMap(s Seq
, f
func(el El
) Seq
) Seq
{return s
.FlatMap(f
)}
276 func SFlatMap(s Seq
, f
func(i El
) Seq
) *SequentialSeq
{
277 vec
:= make(vector
.Vector
, 0, quickLen(s
, 8))
278 Do(s
, func(e El
){Do(f(e
).(Seq
), func(sub El
){vec
.Push(sub
)})})
279 return (*SequentialSeq
)(&vec
)
282 func CFlatMap(s Seq
, f
func(i El
) Seq
, sizeOpt
... uint) ConcurrentSeq
{
283 return Gen(func(c SeqChan
){
284 Do(CMap(s
, func(e El
)El
{return f(e
)}, sizeOpt
...), func(sub El
){
290 func Fold(s Seq
, init
interface{}, f
func(acc
, el El
)El
) interface{} {
291 Do(s
, func(el El
){init
= f(init
, el
)})
295 //maybe convert this to use an accumulator instead of append?
296 func Combinations(s Seq
, number
int) Seq
{
297 if number
== 0 ||
IsEmpty(s
) {return From(From())}
298 return Combinations(s
.Rest(), number
).Prepend(Combinations(s
.Rest(), number
- 1).Map(func(el El
)El
{
299 return el
.(Seq
).Prepend(From(First(s
)))
303 //var Names map[interface{}]string
305 //returns the product of the Seqs contained in sequences
306 func Product(sequences Seq
) Seq
{
307 return Fold(sequences
, From(From()), func(result
, each El
)El
{
308 return result
.(Seq
).FlatMap(func(seq El
)Seq
{
309 return each
.(Seq
).Map(func(i El
) El
{
310 return seq
.(Seq
).Append(From(i
))
316 func Prettyln(s
interface{}, rest
... interface{}) {
317 writer
:= Pretty(s
, rest
...)
320 func Pretty(s
interface{}, args
... interface{}) io
.Writer
{
321 var writer io
.Writer
= os
.Stdout
322 var names
map[interface{}]string
323 for i
:= 0; i
< len(args
); i
++ {
324 switch arg
:= args
[i
].(type) {
325 case map[interface{}]string: names
= arg
326 case io
.Writer
: writer
= arg
329 if names
== nil {names
= map[interface{}]string{}}
330 prettyLevel(s
, 0, names
, writer
)
334 //This pretty is ugly :)
335 func prettyLevel(s
interface{}, level
int, names
map[interface{}]string, w io
.Writer
) {
336 name
, hasName
:= names
[s
]
339 } else switch arg
:= s
.(type) {
341 fmt
.Fprintf(w
, "%*s%s", level
, "", "[")
350 if !named
&& innerSeq
{
353 } else if !named
&& innerSeq
{
359 prettyLevel(v
.(Seq
), level
+ 4, names
, w
)
361 fmt
.Fprintf(w
, "%v", v
)
366 fmt
.Fprintf(w
, "\n%*s", level
, "")
377 type SeqChan
chan interface{}
379 type ConcurrentSeq
func()SeqChan
381 // f must behave properly when the channel is closed, so that IsEmpty and First work properly
382 func Gen(f
func(c SeqChan
)) ConcurrentSeq
{
383 return func() SeqChan
{
393 func CUpto(limit
int) ConcurrentSeq
{
394 return Gen(func(c SeqChan
) {
395 for i
:= 0; i
< limit
; i
++ {
401 func (s ConcurrentSeq
) Find(f
func(el El
)bool) El
{
404 for el
:= <- c
; !closed(c
) ; el
= <- c
{
410 func (s ConcurrentSeq
) Rest() Seq
{
411 return ConcurrentSeq(func()SeqChan
{
418 func (s ConcurrentSeq
) Len() int {
426 func (s ConcurrentSeq
) Append(s2 Seq
) Seq
{return CAppend(s
, s2
)}
428 func (s ConcurrentSeq
) Prepend(s2 Seq
) Seq
{return CAppend(s2
, s
)}
430 func (s ConcurrentSeq
) Filter(f
func(e El
)bool) Seq
{return CFilter(s
, f
)}
432 func (s ConcurrentSeq
) Map(f
func(i El
)El
) Seq
{return CMap(s
, f
)}
434 func (s ConcurrentSeq
) FlatMap(f
func(i El
) Seq
) Seq
{return CFlatMap(s
, f
)}
436 // recursively convert nested concurrent sequences to sequential
437 // does not descend into nested sequential sequences
438 func (s ConcurrentSeq
) ToSequentialSeq() *SequentialSeq
{
439 return SMap(s
, func(el El
)El
{
440 switch seq
:= el
.(type) {case ConcurrentSeq
: return seq
.ToSequentialSeq()}
448 type SequentialSeq
[]interface{}
450 func From(els
... interface{}) *SequentialSeq
{return (*SequentialSeq
)(&els
)}
452 func AUpto(limit
int) *SequentialSeq
{
453 a
:= make([]interface{}, limit
)
454 for i
:= 0; i
< limit
; i
++ {
457 return (*SequentialSeq
)(&a
)
460 func (s
*SequentialSeq
) Find(f
func(el El
)bool) El
{
461 for i
:= 0; i
< len(*s
); i
++ {
462 if f((*s
)[i
]) {return (*s
)[i
]}
467 func (s
*SequentialSeq
) Rest() Seq
{
469 return (*SequentialSeq
)(&s2
)
472 func (s
*SequentialSeq
) Len() int {return len(*s
)}
474 func (s
*SequentialSeq
) Append(s2 Seq
) Seq
{return SAppend(s
, s2
)}
476 func (s
*SequentialSeq
) Prepend(s2 Seq
) Seq
{return SAppend(s2
, s
)}
478 func (s
*SequentialSeq
) Filter(f
func(e El
)bool) Seq
{return SFilter(s
, f
)}
480 func (s
*SequentialSeq
) Map(f
func(i El
)El
) Seq
{return SMap(s
, f
)}
482 func (s
*SequentialSeq
) FlatMap(f
func(i El
) Seq
) Seq
{return SFlatMap(s
, f
)}