go实现es语法查询及sql查询数据到文本文件并发送到企微
main.go
package main
import (
"time"
)
func main() {
// 获取前一天的日期
yesterday := time.Now().AddDate(0, 0, -1).Format("2006-01-02")
RegisterAllInfoTotalCount("registerAllInfo", yesterday)
RegisterAllInfoTotalCount("assRegister", yesterday)
selectSqlGetCustomersCount(yesterday)
//currentdir := "/go-code/workspace/src/es_count"
currentdir := "/home/script/member_center_es_stat"
excelfile := fmt.Sprintf("注册接口统计-%s.txt", yesterday)
sendWebhook(currentdir, excelfile)
}
type.go
package main
// LogEntry 定义异常日志条目的结构体
type LogEntry struct {
RequestBody string `json:"requestBody"`
Status int `json:"status"`
TimeLocal string `json:"timeLocal"`
}
// 消费异常统计
type CustomersCount struct {
ReqParam string
ErrorMsg string
}
// 上传文件
type uploadWebhookMessage struct {
ErrCode int `json:"errcode"`
ErrMsg string `json:"errmsg"`
Type string `json:"type"`
MediaID string `json:"media_id"`
CreatedAt string `json:"created_at"`
}
const (
sendWebhookURL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxxxxxxxxxxx"
uploadWebhookURL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key=xxxxxxxxxxxxxxxx&type=file"
)
file_writer.go
package main
import (
"log"
"os"
)
// WriteToFile 将内容写入指定文件,采用追加模式
func WriteToFile(filename string, content string) {
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatalf("Error opening file: %s", err)
}
defer file.Close()
_, err = file.WriteString(content)
if err != nil {
log.Fatalf("Error writing to file: %s", err)
}
}
register_interface.go
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
"log"
)
// 生成 KQL 查询字符串
func generateKQL(path string, requestBody string, flagSuccess *string, flagFail *string, flagExcept []string) string {
kql := fmt.Sprintf("path:\"%s\" AND requestBody:\"%s\"", path, requestBody)
if flagSuccess != nil {
kql += fmt.Sprintf(" AND responseBody:\"%s\"", *flagSuccess)
}
if flagFail != nil {
kql += fmt.Sprintf(" AND responseBody:\"%s\"", *flagFail)
}
if len(flagExcept) > 0 {
for _, flag := range flagExcept {
kql += fmt.Sprintf(" AND NOT responseBody:\"%s\"", flag)
}
}
return kql
}
// 集成注册接口统计
func RegisterAllInfoTotalCount(interFaceName string, yesterday string) {
// 创建上下文
ctx := context.Background()
// 设置 Elasticsearch 地址
esURL := "http://192.168.64.xx:30092" // 根据实际情况修改 Elasticsearch 地址
// 创建 Elastic 客户端
client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(esURL))
if err != nil {
log.Fatalf("Error creating Elastic client: %s", err)
}
reqTimePattern := fmt.Sprintf("reqTime:*%s*", yesterday)
flagPatternSuccess := fmt.Sprintf("flag:%d", 1)
flagPatternFail := fmt.Sprintf("flag:%d", 0)
// 定义查询函数
queries := []struct {
desc string
query elastic.Query
}{
{
desc: fmt.Sprintf("%s-接口总计", interFaceName),
query: elastic.NewBoolQuery().
Must(
elastic.NewMatchQuery("path", interFaceName),
elastic.NewMatchPhraseQuery("requestBody", reqTimePattern),
),
},
{
desc: fmt.Sprintf("%s-接口成功数", interFaceName),
query: elastic.NewBoolQuery().
Must(
elastic.NewMatchQuery("path", interFaceName),
elastic.NewMatchPhraseQuery("requestBody", reqTimePattern),
elastic.NewMatchPhraseQuery("responseBody", flagPatternSuccess),
),
},
{
desc: fmt.Sprintf("%s-接口失败数", interFaceName),
query: elastic.NewBoolQuery().
Must(
elastic.NewMatchQuery("path", interFaceName),
elastic.NewMatchPhraseQuery("requestBody", reqTimePattern),
elastic.NewMatchPhraseQuery("responseBody", flagPatternFail),
),
},
{
desc: fmt.Sprintf("%s-接口异常数", interFaceName),
query: elastic.NewBoolQuery().
Must(
elastic.NewMatchQuery("path", interFaceName),
elastic.NewMatchPhraseQuery("requestBody", reqTimePattern),
).
MustNot(
elastic.NewMatchPhraseQuery("responseBody", flagPatternFail),
elastic.NewMatchPhraseQuery("responseBody", flagPatternSuccess),
),
},
}
// 打印 KQL 查询
kql_total := generateKQL(interFaceName, reqTimePattern, nil, nil, nil)
kql_success := generateKQL(interFaceName, reqTimePattern, &flagPatternSuccess, nil, nil)
kql_fail := generateKQL(interFaceName, reqTimePattern, nil, &flagPatternFail, nil)
kql_except := generateKQL(interFaceName, reqTimePattern, nil, nil, []string{flagPatternSuccess, flagPatternFail})
fmt.Printf("%s-接口总计 KQL 查询:%s\n", interFaceName, kql_total)
fmt.Printf("%s-接口成功数 KQL 查询:%s\n", interFaceName, kql_success)
fmt.Printf("%s-接口失败数 KQL 查询:%s\n", interFaceName, kql_fail)
fmt.Printf("%s-接口异常数 KQL 查询:%s\n", interFaceName, kql_except)
//fileName := fmt.Sprintf("./es_count/注册接口统计-%s.txt", yesterday)
fileName := fmt.Sprintf("注册接口统计-%s.txt", yesterday)
WriteToFile(fileName, "====================================================================================================\n")
//WriteToFile(fileName, "---\n")
// 执行查询并打印结果
for _, q := range queries {
res, err := client.Search().
Index("shenyu-access-logging").
//Index("shenyu-access-logging-202709122109.old").
Query(q.query).
Size(0). // 只需要计数,不需要返回源文档
Do(ctx)
if err != nil {
log.Fatalf("Error executing query for %s: %s", q.desc, err)
}
//fmt.Printf("%s为:%d\n", q.desc, res.TotalHits())
output := fmt.Sprintf("%s为:%d\n", q.desc, res.TotalHits())
WriteToFile(fileName, output) // 写入文件
}
// 执行异常数据查询,获取 requestBody 和 status 字段
exceptionQuery := queries[3].query
exceptionRes, err := client.Search().
Index("shenyu-access-logging").
//Index("shenyu-access-logging-202709122109.old").
Query(exceptionQuery).
Size(10000). // 根据需要调整返回的数量
Do(ctx)
if err != nil {
log.Fatalf("Error executing exception query: %s", err)
}
// 打印异常数据中的 requestBody 和 status 字段
exceptDataName := fmt.Sprintf("%s-接口异常数据:\n", interFaceName)
WriteToFile(fileName, exceptDataName)
for _, hit := range exceptionRes.Hits.Hits {
var logEntry LogEntry
if err := json.Unmarshal(hit.Source, &logEntry); err != nil {
log.Printf("Error unmarshalling log entry: %s", err)
} else {
WriteToFile(fileName, "##################################################\n")
output := fmt.Sprintf("timeLocal: %s; status: %d; requestBody:\n %s\n", logEntry.TimeLocal, logEntry.Status, logEntry.RequestBody)
WriteToFile(fileName, output) // 写入文件
}
}
}
selet_sql.go
package main
import "fmt"
import "log"
func selectSqlGetCustomersCount(yesterday string) {
// 生产消费异常统计
datecustomerscountsql := (fmt.Sprintf(`SELECT req_param,error_msg FROM xxxx
WHERE error_msg like '%%会员信息不存在,%%' and req_param like '%%reqTime%%%s%%'`, yesterday))
datecustomerscountquery := datecustomerscountsql
datecustomerscounts, err := getCustomersCount(datecustomerscountquery)
if err != nil {
log.Fatalf("Error getting customer counts: %s", err)
}
fileName := fmt.Sprintf("注册接口统计-%s.txt", yesterday)
WriteToFile(fileName, "====================================================================================================\n")
WriteToFile(fileName, fmt.Sprintf("消费异常统计:\n"))
for _, result := range datecustomerscounts {
WriteToFile(fileName, "##################################################\n")
WriteToFile(fileName, fmt.Sprintf("error_msg: %s ---> req_param: \n%s\n", result.ErrorMsg, result.ReqParam))
//fmt.Printf("error_msg: %s ---> req_param: %s\n", result.ErrorMsg, result.ReqParam))
}
WriteToFile(fileName, "====================================================================================================\n")
}
sql.go
package main
import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"log"
)
func connectToDB() (*sql.DB, error) {
// 连接到数据库
db, err := sql.Open("mysql", "xxxxx:xxxxxx@tcp(192.168.64.xx:6001)/testr_center")
if err != nil {
return nil, fmt.Errorf("连接数据库失败: %s", err)
}
return db, nil
}
// 获取消费统计
func getCustomersCount(query string) ([]CustomersCount, error) {
fmt.Println(query, ";")
fmt.Println("--")
db, err := connectToDB()
if err != nil {
fmt.Println(err)
}
rows, err := db.Query(query)
if err != nil {
return nil, err
}
defer db.Close()
// 创建消费记录的结果列表
CustomersCounts := []CustomersCount{}
// 遍历查询结果,将数据存入结构体
for rows.Next() {
var result CustomersCount
var ReqParam sql.NullString
var ErrorMsg sql.NullString
err := rows.Scan(&ReqParam, &ErrorMsg)
if err != nil {
log.Fatal(err)
}
result.ReqParam = ReqParam.String
result.ErrorMsg = ErrorMsg.String
CustomersCounts = append(CustomersCounts, result)
}
return CustomersCounts, nil
}
wehook.go
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
)
// 发送文件
func sendWebhook(currentDir string, excelFile string) {
excelPathFile := currentDir + "/" + excelFile
mediaId := uploadWebhoook(excelPathFile, excelFile)
// 构建请求体
requestBody := map[string]interface{}{
"msgtype": "file",
"file": map[string]string{
"media_id": mediaId,
},
}
jsonBody, err := json.Marshal(requestBody)
if err != nil {
panic(err)
}
// 发送请求
resp, err := http.Post(sendWebhookURL, "application/json", bytes.NewBuffer(jsonBody))
if err != nil {
panic(err)
}
defer resp.Body.Close()
// 检查响应状态码
if resp.StatusCode != http.StatusOK {
fmt.Printf("发送文件请求失败,状态码:%d\n", resp.StatusCode)
return
}
fmt.Println("发送文件请求成功")
sendTextMessage()
}
// 发送@提醒的文本消息
func sendTextMessage() {
requestBody := map[string]interface{}{
"msgtype": "text",
"text": map[string]interface{}{
"content": "昨日注册接口统计及消费异常数据统计文件请查收",
"mentioned_mobile_list": []string{"@all"},
},
}
jsonBody, err := json.Marshal(requestBody)
if err != nil {
panic(err)
}
resp, err := http.Post(sendWebhookURL, "application/json", bytes.NewBuffer(jsonBody))
if err != nil {
panic(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
fmt.Printf("文本消息发送失败,状态码:%d\n", resp.StatusCode)
} else {
fmt.Println("文本消息发送成功")
}
}
// 上传文件
func uploadWebhoook(excelPathFile string, excelFile string) (mediaId string) {
// 打开文件
file, err := os.Open(excelPathFile)
if err != nil {
panic(err)
}
defer file.Close()
// 创建一个multipart.Writer
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
// 创建一个文件字段
fileField, err := writer.CreateFormFile("media", excelFile)
if err != nil {
panic(err)
}
// 将文件内容拷贝到文件字段
_, err = io.Copy(fileField, file)
if err != nil {
panic(err)
}
// 关闭multipart.Writer
err = writer.Close()
if err != nil {
panic(err)
}
// 创建POST请求
req, err := http.NewRequest("POST", uploadWebhookURL, body)
if err != nil {
panic(err)
}
req.Header.Set("Content-Type", writer.FormDataContentType())
// 发送请求
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
// 解析响应
var uploadResp uploadWebhookMessage
err = json.NewDecoder(resp.Body).Decode(&uploadResp)
if err != nil {
panic(err)
}
// 检查错误
if uploadResp.ErrCode != 0 {
//fmt.Printf("上传文件失败: %s\n", uploadResp.ErrMsg)
fmt.Printf("上传文件失败")
return
} else {
fmt.Printf("上传文件成功")
//fmt.Printf("上传文件成功,media_id: %s\n", uploadResp.MediaID)
}
return uploadResp.MediaID
}
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 悩姜!



