main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
"time"
)

func main() {
// 获取前一天的日期
yesterday := time.Now().AddDate(0, 0, -1).Format("2006-01-02")

RegisterAllInfoTotalCount("registerAllInfo", yesterday)
RegisterAllInfoTotalCount("assRegister", yesterday)
selectSqlGetCustomersCount(yesterday)

//currentdir := "/go-code/workspace/src/es_count"
currentdir := "/home/script/member_center_es_stat"
excelfile := fmt.Sprintf("注册接口统计-%s.txt", yesterday)
sendWebhook(currentdir, excelfile)
}

type.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

// LogEntry 定义异常日志条目的结构体
type LogEntry struct {
RequestBody string `json:"requestBody"`
Status int `json:"status"`
TimeLocal string `json:"timeLocal"`
}

// 消费异常统计
type CustomersCount struct {
ReqParam string
ErrorMsg string
}

// 上传文件
type uploadWebhookMessage struct {
ErrCode int `json:"errcode"`
ErrMsg string `json:"errmsg"`
Type string `json:"type"`
MediaID string `json:"media_id"`
CreatedAt string `json:"created_at"`
}

const (
sendWebhookURL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxxxxxxxxxxx"
uploadWebhookURL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key=xxxxxxxxxxxxxxxx&type=file"
)

file_writer.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"log"
"os"
)

// WriteToFile 将内容写入指定文件,采用追加模式
func WriteToFile(filename string, content string) {
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatalf("Error opening file: %s", err)
}
defer file.Close()

_, err = file.WriteString(content)
if err != nil {
log.Fatalf("Error writing to file: %s", err)
}
}

register_interface.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package main

import (
"context"
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
"log"
)

// 生成 KQL 查询字符串
func generateKQL(path string, requestBody string, flagSuccess *string, flagFail *string, flagExcept []string) string {
kql := fmt.Sprintf("path:\"%s\" AND requestBody:\"%s\"", path, requestBody)

if flagSuccess != nil {
kql += fmt.Sprintf(" AND responseBody:\"%s\"", *flagSuccess)
}

if flagFail != nil {
kql += fmt.Sprintf(" AND responseBody:\"%s\"", *flagFail)
}

if len(flagExcept) > 0 {
for _, flag := range flagExcept {
kql += fmt.Sprintf(" AND NOT responseBody:\"%s\"", flag)
}
}

return kql
}

// 集成注册接口统计
func RegisterAllInfoTotalCount(interFaceName string, yesterday string) {
// 创建上下文
ctx := context.Background()

// 设置 Elasticsearch 地址
esURL := "http://192.168.64.xx:30092" // 根据实际情况修改 Elasticsearch 地址

// 创建 Elastic 客户端
client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(esURL))
if err != nil {
log.Fatalf("Error creating Elastic client: %s", err)
}

reqTimePattern := fmt.Sprintf("reqTime:*%s*", yesterday)
flagPatternSuccess := fmt.Sprintf("flag:%d", 1)
flagPatternFail := fmt.Sprintf("flag:%d", 0)
// 定义查询函数
queries := []struct {
desc string
query elastic.Query
}{
{
desc: fmt.Sprintf("%s-接口总计", interFaceName),
query: elastic.NewBoolQuery().
Must(
elastic.NewMatchQuery("path", interFaceName),
elastic.NewMatchPhraseQuery("requestBody", reqTimePattern),
),
},
{
desc: fmt.Sprintf("%s-接口成功数", interFaceName),
query: elastic.NewBoolQuery().
Must(
elastic.NewMatchQuery("path", interFaceName),
elastic.NewMatchPhraseQuery("requestBody", reqTimePattern),
elastic.NewMatchPhraseQuery("responseBody", flagPatternSuccess),
),
},
{
desc: fmt.Sprintf("%s-接口失败数", interFaceName),
query: elastic.NewBoolQuery().
Must(
elastic.NewMatchQuery("path", interFaceName),
elastic.NewMatchPhraseQuery("requestBody", reqTimePattern),
elastic.NewMatchPhraseQuery("responseBody", flagPatternFail),
),
},
{
desc: fmt.Sprintf("%s-接口异常数", interFaceName),
query: elastic.NewBoolQuery().
Must(
elastic.NewMatchQuery("path", interFaceName),
elastic.NewMatchPhraseQuery("requestBody", reqTimePattern),
).
MustNot(
elastic.NewMatchPhraseQuery("responseBody", flagPatternFail),
elastic.NewMatchPhraseQuery("responseBody", flagPatternSuccess),
),
},
}

// 打印 KQL 查询
kql_total := generateKQL(interFaceName, reqTimePattern, nil, nil, nil)
kql_success := generateKQL(interFaceName, reqTimePattern, &flagPatternSuccess, nil, nil)
kql_fail := generateKQL(interFaceName, reqTimePattern, nil, &flagPatternFail, nil)
kql_except := generateKQL(interFaceName, reqTimePattern, nil, nil, []string{flagPatternSuccess, flagPatternFail})
fmt.Printf("%s-接口总计 KQL 查询:%s\n", interFaceName, kql_total)
fmt.Printf("%s-接口成功数 KQL 查询:%s\n", interFaceName, kql_success)
fmt.Printf("%s-接口失败数 KQL 查询:%s\n", interFaceName, kql_fail)
fmt.Printf("%s-接口异常数 KQL 查询:%s\n", interFaceName, kql_except)

//fileName := fmt.Sprintf("./es_count/注册接口统计-%s.txt", yesterday)
fileName := fmt.Sprintf("注册接口统计-%s.txt", yesterday)
WriteToFile(fileName, "====================================================================================================\n")
//WriteToFile(fileName, "---\n")
// 执行查询并打印结果
for _, q := range queries {
res, err := client.Search().
Index("shenyu-access-logging").
//Index("shenyu-access-logging-202709122109.old").
Query(q.query).
Size(0). // 只需要计数,不需要返回源文档
Do(ctx)

if err != nil {
log.Fatalf("Error executing query for %s: %s", q.desc, err)
}

//fmt.Printf("%s为:%d\n", q.desc, res.TotalHits())
output := fmt.Sprintf("%s为:%d\n", q.desc, res.TotalHits())
WriteToFile(fileName, output) // 写入文件
}

// 执行异常数据查询,获取 requestBody 和 status 字段
exceptionQuery := queries[3].query
exceptionRes, err := client.Search().
Index("shenyu-access-logging").
//Index("shenyu-access-logging-202709122109.old").
Query(exceptionQuery).
Size(10000). // 根据需要调整返回的数量
Do(ctx)

if err != nil {
log.Fatalf("Error executing exception query: %s", err)
}

// 打印异常数据中的 requestBody 和 status 字段
exceptDataName := fmt.Sprintf("%s-接口异常数据:\n", interFaceName)
WriteToFile(fileName, exceptDataName)
for _, hit := range exceptionRes.Hits.Hits {
var logEntry LogEntry
if err := json.Unmarshal(hit.Source, &logEntry); err != nil {
log.Printf("Error unmarshalling log entry: %s", err)
} else {
WriteToFile(fileName, "##################################################\n")
output := fmt.Sprintf("timeLocal: %s; status: %d; requestBody:\n %s\n", logEntry.TimeLocal, logEntry.Status, logEntry.RequestBody)
WriteToFile(fileName, output) // 写入文件
}
}
}

selet_sql.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import "fmt"
import "log"

func selectSqlGetCustomersCount(yesterday string) {
// 生产消费异常统计
datecustomerscountsql := (fmt.Sprintf(`SELECT req_param,error_msg FROM xxxx
WHERE error_msg like '%%会员信息不存在,%%' and req_param like '%%reqTime%%%s%%'`, yesterday))

datecustomerscountquery := datecustomerscountsql
datecustomerscounts, err := getCustomersCount(datecustomerscountquery)
if err != nil {
log.Fatalf("Error getting customer counts: %s", err)
}
fileName := fmt.Sprintf("注册接口统计-%s.txt", yesterday)
WriteToFile(fileName, "====================================================================================================\n")
WriteToFile(fileName, fmt.Sprintf("消费异常统计:\n"))
for _, result := range datecustomerscounts {
WriteToFile(fileName, "##################################################\n")
WriteToFile(fileName, fmt.Sprintf("error_msg: %s ---> req_param: \n%s\n", result.ErrorMsg, result.ReqParam))
//fmt.Printf("error_msg: %s ---> req_param: %s\n", result.ErrorMsg, result.ReqParam))
}
WriteToFile(fileName, "====================================================================================================\n")
}

sql.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"log"
)

func connectToDB() (*sql.DB, error) {
// 连接到数据库
db, err := sql.Open("mysql", "xxxxx:xxxxxx@tcp(192.168.64.xx:6001)/testr_center")

if err != nil {
return nil, fmt.Errorf("连接数据库失败: %s", err)
}
return db, nil
}

// 获取消费统计
func getCustomersCount(query string) ([]CustomersCount, error) {
fmt.Println(query, ";")
fmt.Println("--")
db, err := connectToDB()
if err != nil {
fmt.Println(err)
}

rows, err := db.Query(query)
if err != nil {
return nil, err
}
defer db.Close()

// 创建消费记录的结果列表
CustomersCounts := []CustomersCount{}

// 遍历查询结果,将数据存入结构体
for rows.Next() {
var result CustomersCount
var ReqParam sql.NullString
var ErrorMsg sql.NullString
err := rows.Scan(&ReqParam, &ErrorMsg)
if err != nil {
log.Fatal(err)
}
result.ReqParam = ReqParam.String
result.ErrorMsg = ErrorMsg.String
CustomersCounts = append(CustomersCounts, result)
}
return CustomersCounts, nil
}

wehook.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package main

import (
"bytes"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
)

// 发送文件
func sendWebhook(currentDir string, excelFile string) {
excelPathFile := currentDir + "/" + excelFile
mediaId := uploadWebhoook(excelPathFile, excelFile)

// 构建请求体
requestBody := map[string]interface{}{
"msgtype": "file",
"file": map[string]string{
"media_id": mediaId,
},
}

jsonBody, err := json.Marshal(requestBody)
if err != nil {
panic(err)
}

// 发送请求
resp, err := http.Post(sendWebhookURL, "application/json", bytes.NewBuffer(jsonBody))
if err != nil {
panic(err)
}
defer resp.Body.Close()

// 检查响应状态码
if resp.StatusCode != http.StatusOK {
fmt.Printf("发送文件请求失败,状态码:%d\n", resp.StatusCode)
return
}
fmt.Println("发送文件请求成功")
sendTextMessage()

}

// 发送@提醒的文本消息
func sendTextMessage() {
requestBody := map[string]interface{}{
"msgtype": "text",
"text": map[string]interface{}{
"content": "昨日注册接口统计及消费异常数据统计文件请查收",
"mentioned_mobile_list": []string{"@all"},
},
}
jsonBody, err := json.Marshal(requestBody)
if err != nil {
panic(err)
}

resp, err := http.Post(sendWebhookURL, "application/json", bytes.NewBuffer(jsonBody))
if err != nil {
panic(err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
fmt.Printf("文本消息发送失败,状态码:%d\n", resp.StatusCode)
} else {
fmt.Println("文本消息发送成功")
}
}

// 上传文件
func uploadWebhoook(excelPathFile string, excelFile string) (mediaId string) {
// 打开文件
file, err := os.Open(excelPathFile)
if err != nil {
panic(err)
}
defer file.Close()

// 创建一个multipart.Writer
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)

// 创建一个文件字段
fileField, err := writer.CreateFormFile("media", excelFile)
if err != nil {
panic(err)
}

// 将文件内容拷贝到文件字段
_, err = io.Copy(fileField, file)
if err != nil {
panic(err)
}

// 关闭multipart.Writer
err = writer.Close()
if err != nil {
panic(err)
}

// 创建POST请求
req, err := http.NewRequest("POST", uploadWebhookURL, body)
if err != nil {
panic(err)
}
req.Header.Set("Content-Type", writer.FormDataContentType())

// 发送请求
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()

// 解析响应
var uploadResp uploadWebhookMessage
err = json.NewDecoder(resp.Body).Decode(&uploadResp)
if err != nil {
panic(err)
}

// 检查错误
if uploadResp.ErrCode != 0 {
//fmt.Printf("上传文件失败: %s\n", uploadResp.ErrMsg)
fmt.Printf("上传文件失败")
return
} else {
fmt.Printf("上传文件成功")
//fmt.Printf("上传文件成功,media_id: %s\n", uploadResp.MediaID)
}
return uploadResp.MediaID
}