数据存储
[goroutine-traffic-statistics.git] / 讲师示例代码.go.example
blob46cfe9486587a4fa4850fc1f7733bc4cda9305ea
1 package main
3 import (
4         "bufio"
5         "crypto/md5"
6         "encoding/hex"
7         "flag"
8         "io"
9         "net/url"
10         "os"
11         "strconv"
12         "strings"
13         "time"
15         "github.com/mediocregopher/radix.v2/pool"
16         "github.com/mgutz/str"
17         "github.com/sirupsen/logrus"
20 const HANDLE_DIG = " /dig?"
21 const HANDLE_MOVIE = "/movie/"
22 const HANDLE_LIST = "/list/"
23 const HANDLE_HTML = ".html"
25 type cmdParams struct {
26         logFilePath string
27         routineNum  int
29 type digData struct {
30         time  string
31         url   string
32         refer string
33         ua    string
35 type urlData struct {
36         data  digData
37         uid   string
38         unode urlNode
40 type urlNode struct {
41         unType string // 详情页 或者 列表页 或者 首页
42         unRid  int    // Resource ID 资源ID
43         unUrl  string // 当前这个页面的url
44         unTime string // 当前访问这个页面的时间
46 type storageBlock struct {
47         counterType  string
48         storageModel string
49         unode        urlNode
52 var log = logrus.New()
54 func init() {
55         log.Out = os.Stdout
56         log.SetLevel(logrus.DebugLevel)
59 func main() {
60         // 获取参数
61         logFilePath := flag.String("logFilePath", "/Users/pangee/Public/nginx/logs/dig.log", "log file path")
62         routineNum := flag.Int("routineNum", 5, "consumer numble by goroutine")
63         l := flag.String("l", "/tmp/log", "this programe runtime log target file path")
64         flag.Parse()
66         params := cmdParams{*logFilePath, *routineNum}
68         // 打日志
69         logFd, err := os.OpenFile(*l, os.O_CREATE|os.O_WRONLY, 0644)
70         if err == nil {
71                 log.Out = logFd
72                 defer logFd.Close()
73         }
74         log.Infof("Exec start.")
75         log.Infof("Params: logFilePath=%s, routineNum=%d", params.logFilePath, params.routineNum)
77         // 初始化一些channel,用于数据传递
78         var logChannel = make(chan string, 3*params.routineNum)
79         var pvChannel = make(chan urlData, params.routineNum)
80         var uvChannel = make(chan urlData, params.routineNum)
81         var storageChannel = make(chan storageBlock, params.routineNum)
83         // Redis Pool
84         redisPool, err := pool.New("tcp", "localhost:6379", 2*params.routineNum)
85         if err != nil {
86                 log.Fatalln("Redis pool created failed.")
87                 panic(err)
88         } else {
89                 go func() {
90                         for {
91                                 redisPool.Cmd("PING")
92                                 time.Sleep(3 * time.Second)
93                         }
94                 }()
95         }
97         // 日志消费者
98         go readFileLinebyLine(params, logChannel)
100         // 创建一组日志处理
101         for i := 0; i < params.routineNum; i++ {
102                 go logConsumer(logChannel, pvChannel, uvChannel)
103         }
105         // 创建PV UV 统计器
106         go pvCounter(pvChannel, storageChannel)
107         go uvCounter(uvChannel, storageChannel, redisPool)
108         // 可扩展的 xxxCounter
110         // 创建 存储器
111         go dataStorage(storageChannel, redisPool)
113         time.Sleep(1000 * time.Second)
116 // HBase 劣势:列簇需要声明清楚
117 func dataStorage(storageChannel chan storageBlock, redisPool *pool.Pool) {
118         for block := range storageChannel {
119                 prefix := block.counterType + "_"
121                 // 逐层添加,加洋葱皮的过程
122                 // 维度: 天-小时-分钟
123                 // 层级: 定级-大分类-小分类-终极页面
124                 // 存储模型: Redis  SortedSet
125                 setKeys := []string{
126                         prefix + "day_" + getTime(block.unode.unTime, "day"),
127                         prefix + "hour_" + getTime(block.unode.unTime, "hour"),
128                         prefix + "min_" + getTime(block.unode.unTime, "min"),
129                         prefix + block.unode.unType + "_day_" + getTime(block.unode.unTime, "day"),
130                         prefix + block.unode.unType + "_hour_" + getTime(block.unode.unTime, "hour"),
131                         prefix + block.unode.unType + "_min_" + getTime(block.unode.unTime, "min"),
132                 }
134                 rowId := block.unode.unRid
136                 for _, key := range setKeys {
137                         ret, err := redisPool.Cmd(block.storageModel, key, 1, rowId).Int()
138                         if ret <= 0 || err != nil {
139                                 log.Errorln("DataStorage redis storage error.", block.storageModel, key, rowId)
140                         }
141                 }
142         }
145 func pvCounter(pvChannel chan urlData, storageChannel chan storageBlock) {
146         for data := range pvChannel {
147                 sItem := storageBlock{"pv", "ZINCRBY", data.unode}
148                 storageChannel <- sItem
149         }
151 func uvCounter(uvChannel chan urlData, storageChannel chan storageBlock, redisPool *pool.Pool) {
152         for data := range uvChannel {
153                 //HyperLoglog redis
154                 hyperLogLogKey := "uv_hpll_" + getTime(data.data.time, "day")
155                 ret, err := redisPool.Cmd("PFADD", hyperLogLogKey, data.uid, "EX", 86400).Int()
156                 if err != nil {
157                         log.Warningln("UvCounter check redis hyperloglog failed, ", err)
158                 }
159                 if ret != 1 {
160                         continue
161                 }
163                 sItem := storageBlock{"uv", "ZINCRBY", data.unode}
164                 storageChannel <- sItem
165         }
168 func logConsumer(logChannel chan string, pvChannel, uvChannel chan urlData) error {
169         for logStr := range logChannel {
170                 // 切割日志字符串,扣出打点上报的数据
171                 data := cutLogFetchData(logStr)
173                 // uid
174                 // 说明: 课程中模拟生成uid, md5(refer+ua)
175                 hasher := md5.New()
176                 hasher.Write([]byte(data.refer + data.ua))
177                 uid := hex.EncodeToString(hasher.Sum(nil))
179                 // 很多解析的工作都可以放到这里完成
180                 // ...
181                 // ...
183                 uData := urlData{data, uid, formatUrl(data.url, data.time)}
185                 pvChannel <- uData
186                 uvChannel <- uData
187         }
188         return nil
190 func cutLogFetchData(logStr string) digData {
191         logStr = strings.TrimSpace(logStr)
192         pos1 := str.IndexOf(logStr, HANDLE_DIG, 0)
193         if pos1 == -1 {
194                 return digData{}
195         }
196         pos1 += len(HANDLE_DIG)
197         pos2 := str.IndexOf(logStr, " HTTP/", pos1)
198         d := str.Substr(logStr, pos1, pos2-pos1)
200         urlInfo, err := url.Parse("http://localhost/?" + d)
201         if err != nil {
202                 return digData{}
203         }
204         data := urlInfo.Query()
205         return digData{
206                 data.Get("time"),
207                 data.Get("refer"),
208                 data.Get("url"),
209                 data.Get("ua"),
210         }
212 func readFileLinebyLine(params cmdParams, logChannel chan string) error {
213         fd, err := os.Open(params.logFilePath)
214         if err != nil {
215                 log.Warningf("ReadFileLinebyLine can't open file:%s", params.logFilePath)
216                 return err
217         }
218         defer fd.Close()
220         count := 0
221         bufferRead := bufio.NewReader(fd)
222         for {
223                 line, err := bufferRead.ReadString('\n')
224                 logChannel <- line
225                 count++
227                 if count%(1000*params.routineNum) == 0 {
228                         log.Infof("ReadFileLinebyLine line: %d", count)
229                 }
230                 if err != nil {
231                         if err == io.EOF {
232                                 time.Sleep(3 * time.Second)
233                                 log.Infof("ReadFileLinebyLine wait, raedline:%d", count)
234                         } else {
235                                 log.Warningf("ReadFileLinebyLine read log error")
236                         }
237                 }
238         }
239         return nil
242 func formatUrl(url, t string) urlNode {
243         // 一定从量大的着手,  详情页>列表页≥首页
244         pos1 := str.IndexOf(url, HANDLE_MOVIE, 0)
245         if pos1 != -1 {
246                 pos1 += len(HANDLE_MOVIE)
247                 pos2 := str.IndexOf(url, HANDLE_HTML, 0)
248                 idStr := str.Substr(url, pos1, pos2-pos1)
249                 id, _ := strconv.Atoi(idStr)
250                 return urlNode{"movie", id, url, t}
251         } else {
252                 pos1 = str.IndexOf(url, HANDLE_LIST, 0)
253                 if pos1 != -1 {
254                         pos1 += len(HANDLE_LIST)
255                         pos2 := str.IndexOf(url, HANDLE_HTML, 0)
256                         idStr := str.Substr(url, pos1, pos2-pos1)
257                         id, _ := strconv.Atoi(idStr)
258                         return urlNode{"list", id, url, t}
259                 } else {
260                         return urlNode{"home", 1, url, t}
261                 } // 如果页面url有很多种,就不断在这里扩展
262         }
265 func getTime(logTime, timeType string) string {
266         var item string
267         switch timeType {
268         case "day":
269                 item = "2006-01-02"
270                 break
271         case "hour":
272                 item = "2006-01-02 15"
273                 break
274         case "min":
275                 item = "2006-01-02 15:04"
276                 break
277         }
278         t, _ := time.Parse(item, time.Now().Format(item))
279         return strconv.FormatInt(t.Unix(), 10)