main.go

package main

import (
	"time"
)

func main() {
	// 获取前一天的日期
	yesterday := time.Now().AddDate(0, 0, -1).Format("2006-01-02")

	RegisterAllInfoTotalCount("registerAllInfo", yesterday)
	RegisterAllInfoTotalCount("assRegister", yesterday)
	selectSqlGetCustomersCount(yesterday)

	//currentdir := "/go-code/workspace/src/es_count"
	currentdir := "/home/script/member_center_es_stat"
	excelfile := fmt.Sprintf("注册接口统计-%s.txt", yesterday)
	sendWebhook(currentdir, excelfile)
}

type.go

package main

// LogEntry 定义异常日志条目的结构体
type LogEntry struct {
	RequestBody string `json:"requestBody"`
	Status      int    `json:"status"`
	TimeLocal   string `json:"timeLocal"`
}

// 消费异常统计
type CustomersCount struct {
	ReqParam string
	ErrorMsg string
}

// 上传文件
type uploadWebhookMessage struct {
	ErrCode   int    `json:"errcode"`
	ErrMsg    string `json:"errmsg"`
	Type      string `json:"type"`
	MediaID   string `json:"media_id"`
	CreatedAt string `json:"created_at"`
}

const (
	sendWebhookURL   = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxxxxxxxxxxx"
	uploadWebhookURL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key=xxxxxxxxxxxxxxxx&type=file"
)

file_writer.go

package main

import (
	"log"
	"os"
)

// WriteToFile 将内容写入指定文件,采用追加模式
func WriteToFile(filename string, content string) {
	file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		log.Fatalf("Error opening file: %s", err)
	}
	defer file.Close()

	_, err = file.WriteString(content)
	if err != nil {
		log.Fatalf("Error writing to file: %s", err)
	}
}

register_interface.go

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/olivere/elastic/v7"
	"log"
)

// 生成 KQL 查询字符串
func generateKQL(path string, requestBody string, flagSuccess *string, flagFail *string, flagExcept []string) string {
	kql := fmt.Sprintf("path:\"%s\" AND requestBody:\"%s\"", path, requestBody)

	if flagSuccess != nil {
		kql += fmt.Sprintf(" AND responseBody:\"%s\"", *flagSuccess)
	}

	if flagFail != nil {
		kql += fmt.Sprintf(" AND responseBody:\"%s\"", *flagFail)
	}

	if len(flagExcept) > 0 {
		for _, flag := range flagExcept {
			kql += fmt.Sprintf(" AND NOT responseBody:\"%s\"", flag)
		}
	}

	return kql
}

// 集成注册接口统计
func RegisterAllInfoTotalCount(interFaceName string, yesterday string) {
	// 创建上下文
	ctx := context.Background()

	// 设置 Elasticsearch 地址
	esURL := "http://192.168.64.xx:30092" // 根据实际情况修改 Elasticsearch 地址

	// 创建 Elastic 客户端
	client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(esURL))
	if err != nil {
		log.Fatalf("Error creating Elastic client: %s", err)
	}

	reqTimePattern := fmt.Sprintf("reqTime:*%s*", yesterday)
	flagPatternSuccess := fmt.Sprintf("flag:%d", 1)
	flagPatternFail := fmt.Sprintf("flag:%d", 0)
	// 定义查询函数
	queries := []struct {
		desc  string
		query elastic.Query
	}{
		{
			desc: fmt.Sprintf("%s-接口总计", interFaceName),
			query: elastic.NewBoolQuery().
				Must(
					elastic.NewMatchQuery("path", interFaceName),
					elastic.NewMatchPhraseQuery("requestBody", reqTimePattern),
				),
		},
		{
			desc: fmt.Sprintf("%s-接口成功数", interFaceName),
			query: elastic.NewBoolQuery().
				Must(
					elastic.NewMatchQuery("path", interFaceName),
					elastic.NewMatchPhraseQuery("requestBody", reqTimePattern),
					elastic.NewMatchPhraseQuery("responseBody", flagPatternSuccess),
				),
		},
		{
			desc: fmt.Sprintf("%s-接口失败数", interFaceName),
			query: elastic.NewBoolQuery().
				Must(
					elastic.NewMatchQuery("path", interFaceName),
					elastic.NewMatchPhraseQuery("requestBody", reqTimePattern),
					elastic.NewMatchPhraseQuery("responseBody", flagPatternFail),
				),
		},
		{
			desc: fmt.Sprintf("%s-接口异常数", interFaceName),
			query: elastic.NewBoolQuery().
				Must(
					elastic.NewMatchQuery("path", interFaceName),
					elastic.NewMatchPhraseQuery("requestBody", reqTimePattern),
				).
				MustNot(
					elastic.NewMatchPhraseQuery("responseBody", flagPatternFail),
					elastic.NewMatchPhraseQuery("responseBody", flagPatternSuccess),
				),
		},
	}

	// 打印 KQL 查询
	kql_total := generateKQL(interFaceName, reqTimePattern, nil, nil, nil)
	kql_success := generateKQL(interFaceName, reqTimePattern, &flagPatternSuccess, nil, nil)
	kql_fail := generateKQL(interFaceName, reqTimePattern, nil, &flagPatternFail, nil)
	kql_except := generateKQL(interFaceName, reqTimePattern, nil, nil, []string{flagPatternSuccess, flagPatternFail})
	fmt.Printf("%s-接口总计 KQL 查询:%s\n", interFaceName, kql_total)
	fmt.Printf("%s-接口成功数 KQL 查询:%s\n", interFaceName, kql_success)
	fmt.Printf("%s-接口失败数 KQL 查询:%s\n", interFaceName, kql_fail)
	fmt.Printf("%s-接口异常数 KQL 查询:%s\n", interFaceName, kql_except)

	//fileName := fmt.Sprintf("./es_count/注册接口统计-%s.txt", yesterday)
	fileName := fmt.Sprintf("注册接口统计-%s.txt", yesterday)
	WriteToFile(fileName, "====================================================================================================\n")
	//WriteToFile(fileName, "---\n")
	// 执行查询并打印结果
	for _, q := range queries {
		res, err := client.Search().
			Index("shenyu-access-logging").
			//Index("shenyu-access-logging-202709122109.old").
			Query(q.query).
			Size(0). // 只需要计数,不需要返回源文档
			Do(ctx)

		if err != nil {
			log.Fatalf("Error executing query for %s: %s", q.desc, err)
		}

		//fmt.Printf("%s为:%d\n", q.desc, res.TotalHits())
		output := fmt.Sprintf("%s为:%d\n", q.desc, res.TotalHits())
		WriteToFile(fileName, output) // 写入文件
	}

	// 执行异常数据查询,获取 requestBody 和 status 字段
	exceptionQuery := queries[3].query
	exceptionRes, err := client.Search().
		Index("shenyu-access-logging").
		//Index("shenyu-access-logging-202709122109.old").
		Query(exceptionQuery).
		Size(10000). // 根据需要调整返回的数量
		Do(ctx)

	if err != nil {
		log.Fatalf("Error executing exception query: %s", err)
	}

	// 打印异常数据中的 requestBody 和 status 字段
	exceptDataName := fmt.Sprintf("%s-接口异常数据:\n", interFaceName)
	WriteToFile(fileName, exceptDataName)
	for _, hit := range exceptionRes.Hits.Hits {
		var logEntry LogEntry
		if err := json.Unmarshal(hit.Source, &logEntry); err != nil {
			log.Printf("Error unmarshalling log entry: %s", err)
		} else {
			WriteToFile(fileName, "##################################################\n")
			output := fmt.Sprintf("timeLocal: %s; status: %d; requestBody:\n %s\n", logEntry.TimeLocal, logEntry.Status, logEntry.RequestBody)
			WriteToFile(fileName, output) // 写入文件
		}
	}
}

selet_sql.go

package main

import "fmt"
import "log"

func selectSqlGetCustomersCount(yesterday string) {
	// 生产消费异常统计
	datecustomerscountsql := (fmt.Sprintf(`SELECT req_param,error_msg FROM xxxx
	WHERE error_msg like '%%会员信息不存在,%%' and req_param  like '%%reqTime%%%s%%'`, yesterday))

	datecustomerscountquery := datecustomerscountsql
	datecustomerscounts, err := getCustomersCount(datecustomerscountquery)
	if err != nil {
		log.Fatalf("Error getting customer counts: %s", err)
	}
	fileName := fmt.Sprintf("注册接口统计-%s.txt", yesterday)
	WriteToFile(fileName, "====================================================================================================\n")
	WriteToFile(fileName, fmt.Sprintf("消费异常统计:\n"))
	for _, result := range datecustomerscounts {
		WriteToFile(fileName, "##################################################\n")
		WriteToFile(fileName, fmt.Sprintf("error_msg: %s ---> req_param: \n%s\n", result.ErrorMsg, result.ReqParam))
		//fmt.Printf("error_msg: %s ---> req_param: %s\n", result.ErrorMsg, result.ReqParam))
	}
	WriteToFile(fileName, "====================================================================================================\n")
}

sql.go

package main

import (
	"database/sql"
	"fmt"
	_ "github.com/go-sql-driver/mysql"
	"log"
)

func connectToDB() (*sql.DB, error) {
	// 连接到数据库
	db, err := sql.Open("mysql", "xxxxx:xxxxxx@tcp(192.168.64.xx:6001)/testr_center")
	
	if err != nil {
		return nil, fmt.Errorf("连接数据库失败: %s", err)
	}
	return db, nil
}

// 获取消费统计
func getCustomersCount(query string) ([]CustomersCount, error) {
	fmt.Println(query, ";")
	fmt.Println("--")
	db, err := connectToDB()
	if err != nil {
		fmt.Println(err)
	}

	rows, err := db.Query(query)
	if err != nil {
		return nil, err
	}
	defer db.Close()

	// 创建消费记录的结果列表
	CustomersCounts := []CustomersCount{}

	// 遍历查询结果,将数据存入结构体
	for rows.Next() {
		var result CustomersCount
		var ReqParam sql.NullString
		var ErrorMsg sql.NullString
		err := rows.Scan(&ReqParam, &ErrorMsg)
		if err != nil {
			log.Fatal(err)
		}
		result.ReqParam = ReqParam.String
		result.ErrorMsg = ErrorMsg.String
		CustomersCounts = append(CustomersCounts, result)
	}
	return CustomersCounts, nil
}

wehook.go

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"mime/multipart"
	"net/http"
	"os"
)

// 发送文件
func sendWebhook(currentDir string, excelFile string) {
	excelPathFile := currentDir + "/" + excelFile
	mediaId := uploadWebhoook(excelPathFile, excelFile)

	// 构建请求体
	requestBody := map[string]interface{}{
		"msgtype": "file",
		"file": map[string]string{
			"media_id": mediaId,
		},
	}

	jsonBody, err := json.Marshal(requestBody)
	if err != nil {
		panic(err)
	}

	// 发送请求
	resp, err := http.Post(sendWebhookURL, "application/json", bytes.NewBuffer(jsonBody))
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	// 检查响应状态码
	if resp.StatusCode != http.StatusOK {
		fmt.Printf("发送文件请求失败,状态码:%d\n", resp.StatusCode)
		return
	}
	fmt.Println("发送文件请求成功")
	sendTextMessage()

}

// 发送@提醒的文本消息
func sendTextMessage() {
	requestBody := map[string]interface{}{
		"msgtype": "text",
		"text": map[string]interface{}{
			"content":               "昨日注册接口统计及消费异常数据统计文件请查收",
			"mentioned_mobile_list": []string{"@all"},
		},
	}
	jsonBody, err := json.Marshal(requestBody)
	if err != nil {
		panic(err)
	}

	resp, err := http.Post(sendWebhookURL, "application/json", bytes.NewBuffer(jsonBody))
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		fmt.Printf("文本消息发送失败,状态码:%d\n", resp.StatusCode)
	} else {
		fmt.Println("文本消息发送成功")
	}
}

// 上传文件
func uploadWebhoook(excelPathFile string, excelFile string) (mediaId string) {
	// 打开文件
	file, err := os.Open(excelPathFile)
	if err != nil {
		panic(err)
	}
	defer file.Close()

	// 创建一个multipart.Writer
	body := &bytes.Buffer{}
	writer := multipart.NewWriter(body)

	// 创建一个文件字段
	fileField, err := writer.CreateFormFile("media", excelFile)
	if err != nil {
		panic(err)
	}

	// 将文件内容拷贝到文件字段
	_, err = io.Copy(fileField, file)
	if err != nil {
		panic(err)
	}

	// 关闭multipart.Writer
	err = writer.Close()
	if err != nil {
		panic(err)
	}

	// 创建POST请求
	req, err := http.NewRequest("POST", uploadWebhookURL, body)
	if err != nil {
		panic(err)
	}
	req.Header.Set("Content-Type", writer.FormDataContentType())

	// 发送请求
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	// 解析响应
	var uploadResp uploadWebhookMessage
	err = json.NewDecoder(resp.Body).Decode(&uploadResp)
	if err != nil {
		panic(err)
	}

	// 检查错误
	if uploadResp.ErrCode != 0 {
		//fmt.Printf("上传文件失败: %s\n", uploadResp.ErrMsg)
		fmt.Printf("上传文件失败")
		return
	} else {
		fmt.Printf("上传文件成功")
		//fmt.Printf("上传文件成功,media_id: %s\n", uploadResp.MediaID)
	}
	return uploadResp.MediaID
}