From fc4fd5d64f375f3ecb5c0fb97a01e1e8573711a1 Mon Sep 17 00:00:00 2001 From: Bill Burdick Date: Tue, 9 Nov 2010 12:30:44 +0200 Subject: [PATCH] Added CDo, fixing and getting dice example to work concurrently --- seq/examples/dice.go | 20 +++++++---- seq/seq.go | 96 +++++++++++++++++++++++++--------------------------- 2 files changed, 60 insertions(+), 56 deletions(-) diff --git a/seq/examples/dice.go b/seq/examples/dice.go index 71bbe7c..d2874dd 100644 --- a/seq/examples/dice.go +++ b/seq/examples/dice.go @@ -39,7 +39,16 @@ func main() { rank := map[Seq]int{d4:0, d6:1, d8:2, d10:3} sets := map[string]int{} //attempts is [[label, [score, ...]]...] - attempts := Concurrent(Map(Filter(Product(From(dice, dice, dice)), func(d El)bool{ +println("dice...") +CDo(dice, func(el El){Prettyln(el, names)}) +println("done") +//Prettyln(FlatMap(From(1,2,3,4), func(el El)Seq{ +// return From("a", el) +// })) +//Names = names +//Prettyln(From(dice,dice,dice), names) +//Prettyln(Product(From(dice,dice,dice)), names) + attempts := Map(Filter(Product(From(dice, dice, dice)), func(d El)bool{ oldRank := -1 result := true Do(d.(Seq), func(set El){ @@ -57,7 +66,7 @@ func main() { return Fold(el.(Seq), 0, func(acc, el El)El{return max(acc.(int), el.(int))}) })) - })) + }) println("#sets:", len(sets)) fmt.Println("#Attempts:", Len(attempts)) println("results...") @@ -65,15 +74,14 @@ func main() { label, rolls := First2(el.(Seq)) fmt.Printf("%s: %d\n", label, Len(rolls.(Seq))) }) - cattempts := Concurrent(attempts) - Do(cattempts, func(el El) { + CDo(attempts, func(el El) { label, sc := First2(el.(Seq)) - Do(cattempts, func(del El){ + CDo(attempts, func(del El) { rolls := 0 wins := 0 margins := map[int]int{} dlabel, dsc := First2(del.(Seq)) - Do(Product(From(sc,dsc)), func(rel El){ + CDo(Product(From(sc,dsc)), func(rel El){ rolls++ attack, defense := First2(rel.(Seq)) margin := attack.(int) - defense.(int) diff --git a/seq/seq.go b/seq/seq.go index e578449..3506e96 100644 --- a/seq/seq.go +++ b/seq/seq.go @@ -32,9 +32,7 @@ func Concurrent(s Seq) ConcurrentSeq { //convert a sequence to a sequential sequence func Sequential(s Seq) *SequentialSeq { switch seq := s.(type) {case *SequentialSeq: return seq} - vec := make(vector.Vector, 0, 8) - Do(s, func(v El){vec.Push(v)}) - return (*SequentialSeq)(&vec) + return SMap(s, func(el El)El{return el}) } func FirstN(s Seq, n int) []interface{} { @@ -105,6 +103,9 @@ func Do(s Seq, f func(el El)) { }) } +// CDo -- do f concurrently on each element of s, in any order +func CDo(s Seq, f func(el El)) {Do(CMap(s, func(el El)El{f(el); return nil}), func(el El){})} + func Len(s Seq) int {return s.Len()} func Output(s Seq, c SeqChan) { @@ -124,7 +125,7 @@ func AppendToVector(s Seq, vec *vector.Vector) { } } -func SAppend(s Seq, s2 Seq) Seq { +func SAppend(s Seq, s2 Seq) *SequentialSeq { vec := make(vector.Vector, 0, quickLen(s, 8) + quickLen(s2, 8)) AppendToVector(s, &vec) AppendToVector(s2, &vec) @@ -132,7 +133,7 @@ func SAppend(s Seq, s2 Seq) Seq { return (*SequentialSeq)(&vec) } -func CAppend(s Seq, s2 Seq) Seq { +func CAppend(s Seq, s2 Seq) ConcurrentSeq { return Gen(func(c SeqChan){ Output(s, c) Output(s2, c) @@ -148,26 +149,24 @@ func quickLen(s Seq, d int) int { func Filter(s Seq, filter func(e El)bool) Seq {return s.Filter(filter)} -func SFilter(s Seq, filter func(e El)bool) Seq { +func ifFunc(condition func(e El)bool, op func(e El)) func(el El){return func(el El){if condition(el) {op(el)}}} + +func SFilter(s Seq, filter func(e El)bool) *SequentialSeq { //continue shrinking vec := make(vector.Vector, 0, quickLen(s, 8)) - Do(s, func(el El){ - if filter(el) {vec.Push(el)} - }) + Do(s, ifFunc(filter, func(el El){vec.Push(el)})) return (*SequentialSeq)(&vec) } -func CFilter(s Seq, filter func(e El)bool) Seq { +func CFilter(s Seq, filter func(e El)bool) ConcurrentSeq { return Gen(func(c SeqChan){ - Do(s, func(el El){ - if filter(el) {c <- el} - }) + Do(s, ifFunc(filter, func(el El){c <- el})) }) } func Map(s Seq, f func(el El) El) Seq {return s.Map(f)} -func SMap(s Seq, f func(i El)El) Seq { +func SMap(s Seq, f func(i El)El) *SequentialSeq { vec := make(vector.Vector, 0, quickLen(s, 8)) Do(s, func(el El){vec.Push(f(el))}) return (*SequentialSeq)(&vec) @@ -179,12 +178,12 @@ type reply struct { } type swEntry struct { - present boolean + present bool value El } // SlidingWindow is a vector with limited capacity and a base -type SlidingWindow { +type SlidingWindow struct { start, base, count int values []swEntry } @@ -196,35 +195,35 @@ func (r *SlidingWindow) normalize(index int) int {return (index + r.Capacity()) func (r *SlidingWindow) IsEmpty() bool {return r.count == 0} func (r *SlidingWindow) IsFull() bool {return r.count == r.Capacity()} func (r *SlidingWindow) GetFirst() (interface{}, bool) {return r.values[r.start].value, r.values[r.start].present} -func (r *SlidingWindow) RemoveFirst() (interface{}, boolean) { - if count == 0 {return nil, false} +func (r *SlidingWindow) RemoveFirst() (interface{}, bool) { + if r.count == 0 {return nil, false} result := r.values[r.start] r.values[r.start] = swEntry{false, nil} if result.present {r.count--} - r.start = normalize(r.start++) + r.start = r.normalize(r.start + 1) r.base++ return result.value, result.present } -func (r *SlidingWindow) RemoveLast() (interface{}, boolean) { - if count == 0 {return nil, false} +func (r *SlidingWindow) RemoveLast() (interface{}, bool) { + if r.count == 0 {return nil, false} end := r.normalize(r.start + r.Capacity() - 1) result := r.values[end] r.values[end] = swEntry{false, nil} if result.present {r.count--} if r.start > 0 { - r.start = normalize(r.start--) + r.start = r.normalize(r.start - 1) r.base-- } return result.value, result.present } -func (r *SlidingWindow) Get(index int) (value interface{}, boolean) { - index = r.normalize(r.index - r.base + r.start) +func (r *SlidingWindow) Get(index int) (interface{}, bool) { + index = r.normalize(index - r.base + r.start) if index < 0 || index >= r.Capacity() {return nil, false} - value = r.values[index] + value := r.values[index] return value.value, value.present } -func (r *SlidingWindow) Set(index int, value interface{}) boolean { - index = r.normalize(r.index - r.base + r.start) +func (r *SlidingWindow) Set(index int, value interface{}) bool { + index = r.normalize(index - r.base + r.start) if index < 0 || index >= r.Capacity() {return false} r.values[index].value = value if !r.values[index].present { @@ -241,6 +240,7 @@ func CMap(s Seq, f func(el El) El, sizeOpt... int) Seq { size := 64 if len(sizeOpt) > 0 {size = sizeOpt[0]} return Gen(func(output SeqChan){ +println("spawn") //punt and convert sequence to concurrent //maybe someday we'll handle SequentialSequences separately input := Concurrent(s)() @@ -259,18 +259,18 @@ func CMap(s Seq, f func(el El) El, sizeOpt... int) Seq { case oc <- first: window.RemoveFirst() pendingOutput-- - case inputMsg := <- ic: + case inputElement := <- ic: if closed(ic) { inputClosed = true } else { go func(index int, value interface{}) { replyChannel <- reply{index, f(value)} - }(inputCount, v) + }(inputCount, inputElement) inputCount++ pendingInput++ } - case replyMsg := <- rc: - window.addLast(replyMsg) + case replyElement := <- rc: + window.Set(replyElement.index, replyElement.result) pendingInput-- pendingOutput++ } @@ -283,13 +283,15 @@ func FlatMap(s Seq, f func(el El) Seq) Seq {return s.FlatMap(f)} func SFlatMap(s Seq, f func(i El) Seq) Seq { vec := make(vector.Vector, 0, quickLen(s, 8)) - Do(SMap(s, f), func(sub El){vec.Push(sub)}) + Do(s, func(e El){Do(f(e).(Seq), func(sub El){vec.Push(sub)})}) return (*SequentialSeq)(&vec) } func CFlatMap(s Seq, f func(i El) Seq, sizeOpt... int) Seq { return Gen(func(c SeqChan){ - Do(CMap(s, f, sizeOpt), func(sub El){c <- sub}) + Do(CMap(s, func(e El)El{return f(e)}, sizeOpt...), func(sub El){ + Do(sub.(Seq), func(el El){c <- el}) + }) }) } @@ -306,14 +308,16 @@ func Combinations(s Seq, number int) Seq { })) } +//var Names map[interface{}]string + //returns the product of the Seqs contained in sequences func Product(sequences Seq) Seq { return Fold(sequences, From(From()), func(result, each El)El{ -//fmt.Print("folding: ");Pretty(each);fmt.Print(" into ");Prettyln(result) +//fmt.Print("folding: ");Pretty(each, Names);fmt.Print(" into ");Prettyln(result, Names) return result.(Seq).FlatMap(func(seq El)Seq{ -//fmt.Print("flat map with: ");Prettyln(seq) +//fmt.Print("flat map with: ");Prettyln(seq, Names) return each.(Seq).Map(func(i El) El { -//fmt.Print("map with: ");Prettyln(i) +//fmt.Print("map with: ");Prettyln(i, Names) return seq.(Seq).Append(From(i)) }) }) @@ -437,21 +441,13 @@ func (s ConcurrentSeq) Map(f func(i El)El) Seq {return CMap(s, f)} func (s ConcurrentSeq) FlatMap(f func(i El) Seq) Seq {return CFlatMap(s, f)} -func toSequentialSeq(el interface{}) interface{} { - switch seq := el.(type) { - case ConcurrentSeq: return seq.ToSequentialSeq() - case []interface{}: - cpy := make([]interface{}, len(seq)) - copy(cpy, seq) - return cpy - } - return el -} - +// recursively convert nested concurrent sequences to sequential +// does not descend into nested sequential sequences func (s ConcurrentSeq) ToSequentialSeq() *SequentialSeq { - vec := make(vector.Vector, 0, 8) - Do(s, func(v El){vec.Push(toSequentialSeq(v))}) - return (*SequentialSeq)(&vec) + return SMap(s, func(el El)El{ + switch seq := el.(type) {case ConcurrentSeq: return seq.ToSequentialSeq()} + return el + }) } -- 2.11.4.GIT