由多个agent组成的SRE专家组,代替人工进行准确的故障分析!👽

流程:从用户输入取时间范围(支持上传图片)、路由--》查询日志中心--》获取网关中cmdb信息--》查询CMDB获取更多细节(owner、变更、jenkins、gitlab信息)--》分析并给出建议:

111.jpg

222.jpg

执行效果:

图中是判断某接口504故障,从依据到结论过程
333.jpg

444.jpg

555.jpg

步骤

使用flowise,配置langchain中AgentFlow

创建supervisor

  • 提示词,限制会话轮数

666.jpg

You are a supervisor tasked with managing a conversation between the following workers: {team_members}.
Given the following user request, respond with the worker to act next.
Each worker will perform a task and respond with their results and status.
When finished, respond with FINISH.
Select strategically to minimize the number of steps taken.
Task Instructions:

1. When a user inputs alarm details or uploads an alarm screenshot, it is handed over to log_agent for querying. 
2. Log_agent uses the app_name and passes it to cmdb_agent for querying. 
3. Fault_analyst_agent summarize the information from log_agent and cmdb_agent.

创建log agent

  • 后端服务以容器形式启动
# 部署位置
docker run --restart always -d -p 5001:5001 query-sls-log:v0.1
  • 使用flask查询sls
# app.py
from flask import Flask, request, jsonify
import time
from aliyun.log import *
from datetime import datetime

app = Flask(__name__)


# 时间格式化函数
def parse_time(time_str):
    """解析时间字符串,转换为时间戳"""
    try:
        return int(time.mktime(datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S").timetuple()))
    except ValueError:
        return None


# 作为Flask应用的查询接口
@app.route('/query_logs', methods=['GET'])
def query_logs():
    # 从查询参数中获取 r_q_path
    r_q_path = request.args.get('r_q_path', None)
    from_time_str = request.args.get('from_time', None)
    to_time_str = request.args.get('to_time', None)

    if not r_q_path:
        return jsonify({"error": "r_q_path parameter is required"}), 400

    if not from_time_str or not to_time_str:
        return jsonify({"error": "Both from_time and to_time parameters are required"}), 400

    # 解析时间
    from_time = parse_time(from_time_str)
    to_time = parse_time(to_time_str)

    if from_time is None or to_time is None:
        return jsonify({"error": "Invalid time format. Expected format: YYYY-MM-DD HH:MM:SS"}), 400

    if from_time >= to_time:
        return jsonify({"error": "from_time must be earlier than to_time"}), 400

    # 日志服务的服务接入点
    endpoint = 'cn-hangzhou.log.aliyuncs.com'
    access_key_id = 'xxx'
    access_key = 'yyy'

    # Project 和 Logstore 名称
    project = 'pubresource-release-kong-log'
    logstore = 'kong-release-logstore'

    # 创建日志服务 Client
    client = LogClient(endpoint, access_key_id, access_key)

    # 定义查询语句列表
    queries = [
        f'* and content.r_q_path: "{r_q_path}" |select count(*) as count ',
        f'* and content.r_q_path: "{r_q_path}" and content.cost > 1000 |select "content.@timestamp" as time, "content.r_header.cdn-src-ip" as ip, "content.c_f_ip" as c_f_ip, "content.cost" as cost, "content.r_body" as r_body, "content.r_d_body" as r_d_body, "content.s_body" as s_body order by "content.cost" desc limit 10 ',
        f'"content.r_q_path" = "{r_q_path}" |select distinct "content.r_host" as host, "content.k_route" as route, "content.k_svr" as service ,"content.k_up_uri" as origin_url, "content.r_q_path" as full_path',
        f'"content.r_q_path" = "{r_q_path}" AND content.s_status > 500 | select distinct "content.s_body" as s_body',
        f'* and content.r_q_path: "{r_q_path}" |select "content.c_f_ip" as ip,count(*) as count group by ip order by count desc limit 10 ',
        f'content.r_q_path: "{r_q_path}" | select "content.s_status" as status,count(*) as count group by status order by count desc '
    ]

    descriptions = [
        "查询请求总数",
        "查询耗时大于1000ms的请求,仅显示top10",
        "获取 Kong 详情",
        "取报错详情",
        "按照IP访问频次降序top10",
        "状态码分布"
    ]

    results = []
    for query, description in zip(queries, descriptions):
        query_result = query_logs_service(client, project, logstore, from_time, to_time, query)
        formatted_result = format_results(query_result)
        results.append({
            'description': description,  # 将 description 放在 data 之前
            'data': formatted_result
        })

    return jsonify(results)


def query_logs_service(client, project, logstore, from_time, to_time, query):
    """查询日志并返回结果"""
    request = GetLogsRequest(project, logstore, from_time, to_time, '', query=query, line=3, offset=0, reverse=False)
    response = client.get_logs(request)
    results = []
    for log in response.get_logs():
        results.append(log.contents.items())
    return results


def format_results(results):
    """格式化查询结果"""
    formatted_results = []
    for result in results:
        formatted_result = {key: value for key, value in result}
        formatted_results.append(formatted_result)
    return formatted_results


if __name__ == '__main__':
    app.run(debug=False, host='0.0.0.0', port=5001)
  • Dockerfile
# 使用官方 Python 3.11 镜像作为基础镜像
FROM python:3.11-slim

# 设置工作目录
WORKDIR /app

# 复制当前目录下的所有文件到 Docker 容器中的 /app 目录
COPY . /app

# 设置清华镜像源并升级 pip
RUN pip install --upgrade pip
RUN pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple

# 安装 Flask 和 Gunicorn
RUN pip install Flask gunicorn

# 由于你的应用使用阿里云日志服务 SDK,你可能需要安装此 SDK
RUN pip install aliyun-log-python-sdk

# 设置环境变量,告知 Flask 以生产模式运行
ENV FLASK_ENV=production

# 暴露 Flask 默认端口 5001(根据你的应用设置)
EXPOSE 5001

# 使用 Gunicorn 启动 Flask 应用,注意你之前更改了默认端口为 5001
CMD ["gunicorn", "--workers", "2", "--bind", "0.0.0.0:5001", "app:app"]
  • 构建镜像
docker build -t query-sls-log:v0.1 .
  • 创建log agent,提示词
As the log agent, your task is to execute the 'query_sls' method based on the inputs provided. When you receive the route (r_q_path), start time (from_time), and fault time (to_time), perform the following steps:

1.Route: Extract the field value immediately following 'route:' in the user's uploaded screenshot.
2.Start Time: Calculated as 30 minutes before the fault time.
3.Fault Time: Extracted from the user's uploaded screenshot.
If the user does not specify a year, or if the year is not indicated in the image, please use the current year, which is 2024. Use these details to perform a query using the 'query_sls' method.
4.Please uses 'api-hardware' as the app_name and passes it to cmdb_agent for querying. 

The available tools include:
- query_sls
  • custom tool调用,注意flowise参数配置,日期识别异常,建议string

777.jpg

const fetch = require('node-fetch');
const { URLSearchParams } = require('url'); // 确保正确引入 URLSearchParams
const moment = require('moment'); // 引入 moment 库来处理时间格式化

// 定义查询参数
const r_q_path = $r_q_path;   // 例如: "service=/getfiles"
let from_time = $from_time;   // 例如: "12-23 12:00:00"
let to_time = $to_time;       // 例如: "12-23 13:00:00"


// 使用 moment.js 进行时间格式化,确保时间格式为 YYYY-MM-DD HH:mm:ss
from_time = moment(from_time, ['YYYY-MM-DD HH:mm:ss', 'YYYY-MM-DDTHH:mm:ss', 'YYYY-MM-DD HH:mm', 'YYYY-MM-DD']).format('YYYY-MM-DD HH:mm:ss');
to_time = moment(to_time, ['YYYY-MM-DD HH:mm:ss', 'YYYY-MM-DDTHH:mm:ss', 'YYYY-MM-DD HH:mm', 'YYYY-MM-DD']).format('YYYY-MM-DD HH:mm:ss');

// 使用 URLSearchParams 构建查询字符串
const queryParams = new URLSearchParams({
    r_q_path: r_q_path,
    from_time: from_time,
    to_time: to_time
});

// 构建完整的 URL,替换为你服务器域名
const url = `http://11.22.33.44:5001/query_logs?${queryParams.toString()}`;

const options = {
    method: 'GET',
    headers: {
        'Content-Type': 'application/json'
    }
};

try {
    const response = await fetch(url, options);
    const text = await response.text();
    return text;
} catch (error) {
    console.error(error);
    return '';
}

创建cmdb agent

  • cmdb agent创建,提示词
As the CMDB query agent, your responsibility is to use the 'get_cmdb_data' method to query the kong information provided by log_agent.
app_name = api-hardware

The tools available are:
- get_cmdb_data
  • custom tool配置,注意变量配置

888.jpg


const fetch = require('node-fetch');
const crypto = require('crypto');
const { URL } = require('url');

// 替换为自己cmdb中的key
const API_URL = "https://yourcmdb.xx.com/api/v0.1/ci/s";
const KEY = "xxx";
const SECRET = "yyy";

// 获取用户输入的 app_name (确保 appName 正确设置)
const appName = $app_name;

// 构建 API 密钥的函数
function buildApiKey(path, params) {
    // 获取并排序参数键,排除 _key、_secret 以及值为对象或数组的键
    const sortedKeys = Object.keys(params).sort();
    const values = sortedKeys
        .filter(k => k !== '_key' && k !== '_secret' && typeof params[k] !== 'object' && !Array.isArray(params[k]))
        .map(k => params[k])
        .join('');
    
    // 创建要哈希的字符串
    const secretString = path + SECRET + values;
    
    // 生成 SHA1 哈希
    const secretHash = crypto.createHash('sha1').update(secretString, 'utf8').digest('hex');
    
    // 添加 _secret 和 _key 到参数中
    params['_secret'] = secretHash;
    params['_key'] = KEY;
    
    return params;
}

// 定义初始负载
let payload = {
    "q": appName,
    "_type": "repository"
};

// 解析 URL 以获取路径
const parsedUrl = new URL(API_URL);
const path = parsedUrl.pathname;

// 构建签名后的负载
payload = buildApiKey(path, payload);

// 构造带有查询参数的完整 URL
const urlWithParams = new URL(API_URL);
Object.keys(payload).forEach(key => urlWithParams.searchParams.append(key, payload[key]));

// 定义请求选项
const options = {
    method: 'GET',
    headers: {
        'Content-Type': 'application/json'
    }
};

try {
    // 发送 GET 请求并等待响应
    const response = await fetch(urlWithParams.toString(), options);
    
    // 检查响应状态
    if (!response.ok) {
        throw new Error(`HTTP 错误!状态码: ${response.status}`);
    }
    
    // 解析响应为 JSON
    const json = await response.json();
    
    // 返回 JSON 字符串
    return JSON.stringify(json);
} catch (error) {
    console.error('获取 CI 数据时出错:', error);
    return '';
}

创建fault analyst agent

  • 提示词
As a fault analyst, please summarize the information from log_agent and cmdb_agent. Based on your judgment, provide a summary that includes, but is not limited to, the following: log information before the failure time, routing information, container information, pipeline information (address, last deployment status), code repository information, owner information, and fault cause analysis. Please respond in Chinese.

待办

可以增加更多的agent,来共同讨论,出具一个更为可信的结果,比如 monitor agent,负责和prometheus、阿里云监控,code agent等等等等

标签: aiops, sre, AIagent

添加新评论