SRE-copilot构建全过程
由多个agent组成的SRE专家组,代替人工进行准确的故障分析!👽
流程:从用户输入取时间范围(支持上传图片)、路由--》查询日志中心--》获取网关中cmdb信息--》查询CMDB获取更多细节(owner、变更、jenkins、gitlab信息)--》分析并给出建议:
执行效果:
图中是判断某接口504故障,从依据到结论过程
步骤
使用flowise,配置langchain中AgentFlow
由多个agent组成的SRE专家组,代替人工进行准确的故障分析!👽
流程:从用户输入取时间范围(支持上传图片)、路由--》查询日志中心--》获取网关中cmdb信息--》查询CMDB获取更多细节(owner、变更、jenkins、gitlab信息)--》分析并给出建议:
执行效果:
图中是判断某接口504故障,从依据到结论过程
使用flowise,配置langchain中AgentFlow
创建一个具备内部运维知识,识别自然语义,准确调用各种工具执行任务,严格控制幻觉的智能运维工程师。
使用 OpenAI 的 Assistant 功能,上传知识库,设置提示词。
Prompt Engineering:
Function Call:通过 Flowise 的自定义工具
先看效果
Streamlit 界面:
LLM(大型语言模型)配置设置:
gpt-4o-2024-08-06
)。ollama/llama3:latest
)的配置。助手代理(AssistantAgent)和用户代理(UserProxyAgent):
AssistantAgent
,负责与用户输入进行处理和响应,同时集成在 Streamlit 中以显示聊天消息。UserProxyAgent
,用于接收用户输入,处理用户命令,并在助手代理与用户之间进行代理交互。实用工具函数:
get_url_info_from_kong
:从 Kong API 网关中查询 URL 的路由、服务和上游配置的信息,并返回格式化结果。dns_record_status
:检查给定 URL 的 DNS 记录状态。query_from_cmdb
:从 CMDB(配置管理数据库)中检索特定云服务提供商(如阿里云、AWS 等)的服务器、数据库和中间件实例的数量。异步聊天系统:
asyncio
),用户代理(User Proxy Agent)可以异步与助手代理(Assistant Agent)进行对话,提供更高效的交互体验。+---------------------------------------------------------------+
| Streamlit Interface |
|---------------------------------------------------------------|
| +-----------------------------------------------------------+ |
| | Sidebar (Azure Endpoint Config, etc.) | |
| +-----------------------------------------------------------+ |
| |
| +-----------------------------------------------------------+ |
| | Chat Input / Output Area | |
| | | |
| | User Input --> UserProxyAgent --> AssistantAgent | |
| | | |
| | AssistantAgent --> UserProxyAgent --> Output Display | |
| +-----------------------------------------------------------+ |
+---------------------------------------------------------------+
+--------------------+ +--------------------+
| LLM Configurations | | Utility Functions|
|--------------------| |--------------------|
| - OpenAI (GPT-4) | | - get_url_info_from|
| - Local LLM (LLaMA)| | _kong() |
+--------------------+ | (Interacts with |
| Kong API Gateway)|
| - dns_record_status|
| (Checks DNS) |
| - query_from_cmdb |
| (Interacts with |
| CMDB Database) |
+--------------------+
+-----------------+ +-------------------+
| AssistantAgent | <--- asyncio ->| UserProxyAgent |
| (Handles LLM | | (Manages User |
| Requests) | | Input/Commands) |
+-----------------+ +-------------------+
^ | ^ |
| | | |
| v | v
+----------------+ +-------------------+
| LLM Config | | Utility Functions|
| Setup (OpenAI)| | (Kong, DNS, CMDB) |
+----------------+ +-------------------+
+----------------+
| Data Flow |
|----------------|
| - User Input |
| - Assistant |
| - ProxyAgent |
| - Utility Func|
+----------------+
超越简单的 RAG 和提示词工程:
使用 Function Call 完成真实世界的任务:
集成异步交互和高效任务处理:
asyncio
)实现用户代理和助手代理之间的异步通信,大幅提升了任务处理的效率和响应速度。这样的设计确保了系统能够并发处理多个任务,而不阻塞用户输入和系统响应。技术含量高,解决复杂场景问题:
get_url_info_from_kong
函数能够通过调用 Kong API,获取详细的路由、服务和插件信息,并对这些数据进行格式化处理和展示;query_from_cmdb
函数能够从 CMDB 中动态检索并整合不同云服务商的资源信息。这样的功能大大提升了系统的实际应用价值。提升企业运营效率与智能化水平:
最近在整理CMDB信息,以Jenkins为中枢,统计、梳理代码仓库位置、发布位置。形成以应用为中心,串连资源、管理者。
首先需要统计代码中涉及的配置文件信息,比如Mysql/Redis/Elasticsearch/MongoDB/Kafka/RocketMQ/MQTT/Doris/HBase/InfLuxDB/http等
意义:
相较传统的正则匹配,大模型加持下有如下优点
环境介绍
#!/bin/bash
# 设置环境变量
export CUDA_VISIBLE_DEVICES=0,1
# 启动 vllm 服务器并将其转移到后台运行
nohup python3 -m vllm.entrypoints.openai.api_server \
--model /data/vllm/Meta-Llama-3.1-8B-Instruct \
--served-model-name llama \
--tensor-parallel-size 2 \
--trust-remote-code > llama.log 2>&1 &
import os
import re
import requests
import time
# 定义中间件关键字的正则表达式,忽略大小写
KEYWORDS = ["mysql", "redis", "elasticsearch", "mongodb", "kafka", "rocketmq",
"rabbitmq", "emq", "mqtt", "nacos", "postgresql", "doris",
"hbase", "influxdb", "azkaban", "sls", "clickhouse",
"mse", "dataworks", "neo4j", "http", "gitlab", "jenkins"]
PATTERN = re.compile(r'\b(?:' + '|'.join(KEYWORDS) + r')\b', re.IGNORECASE)
def read_files(directory):
for root, _, files in os.walk(directory):
# 忽略 .git 文件夹
if '.git' in root:
continue
for file in files:
file_path = os.path.join(root, file)
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.readlines()
yield file_path, content
def extract_context(content, file_path):
results = []
for i, line in enumerate(content):
if PATTERN.search(line):
start = i # 从匹配到的行开始
end = min(i + 11, len(content)) # 包含匹配行及其下方10行
snippet = "".join(content[start:end]).strip()
results.append(f"文件路径: {file_path}\n{snippet}")
return results
def write_to_file(directory, contexts):
output_file = os.path.join(directory, 'matched_content.txt')
with open(output_file, 'w', encoding='utf-8') as f:
for context in contexts:
f.write(context + '\n' + '=' * 50 + '\n')
return output_file
def send_to_model(url, model_name, prompt, content):
headers = {"Content-Type": "application/json"}
data = {
"model": model_name,
"temperature": 0.2,
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": f"{prompt}\n\n{content}"}
]
}
try:
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
except requests.RequestException as e:
print(f"Request to model failed: {e}")
return None
response_json = response.json()
if 'choices' not in response_json:
print(f"Model response does not contain 'choices': {response_json}")
return None
return response_json['choices'][0]['message']['content']
def write_individual_results(directory, results, model_name):
output_file = os.path.join(directory, f'{model_name}_results.txt')
with open(output_file, 'w', encoding='utf-8') as f:
for result in results:
f.write(result + '\n' + '=' * 50 + '\n')
return output_file
def combine_and_summarize(directory, llama_file, qwen_file, qwen_url):
combined_content = ""
# 读取llama和qwen的结果文件
with open(llama_file, 'r', encoding='utf-8') as f:
combined_content += f.read()
with open(qwen_file, 'r', encoding='utf-8') as f:
combined_content += f.read()
# 使用qwen模型进行汇总处理
summary_prompt = """
1. 删除包含“配置信息未提供”等无用信息的部分。
"""
result_summary = send_to_model(qwen_url, "qwen", summary_prompt, combined_content)
if result_summary:
summary_file = os.path.join(directory, 'final_summary_combined.txt')
with open(summary_file, 'w', encoding='utf-8') as f:
f.write(result_summary)
print(f"汇总结果保存至: {summary_file}")
else:
print("汇总处理失败")
def main(directory):
start_time = time.time()
all_contexts = []
for file_path, content in read_files(directory):
contexts = extract_context(content, file_path)
all_contexts.extend(contexts)
# 将匹配到的内容写入文件
matched_file = write_to_file(directory, all_contexts)
results_llama = []
results_qwen = []
with open(matched_file, 'r', encoding='utf-8') as f:
content = f.read()
analysis_prompt = """
1. Ignore lines starting with #, //, /**, or <!--.
2. Exclude commented lines.
3. Extract configuration info for: MySQL, Redis, Elasticsearch, MongoDB, Kafka, RocketMQ, RabbitMQ, EMQ, MQTT, Nacos, PostgreSQL, Doris, HBase, InfluxDB, Azkaban, SLS, ClickHouse, MSE, DataWorks, Neo4j, HTTP, HTTPS, GitLab, Jenkins.
4. Focus on URLs, usernames, passwords, hosts, ports, and database names.
5. Extract the following attributes:
- Username
- Password
- Host
- Port
- Database Name
- URL or Connection String
6. Look for configuration patterns like key-value pairs and environment variables.
7. Ensure extracted values are not in commented sections.
8. Extract all distinct configurations.
9. Handle different configuration formats (JSON, YAML, dictionaries, env variables).
10. Delete sections containing “**配置信息未直接提供**” or similar useless content.
"""
# 分别调用llama和qwen模型
result_llama = send_to_model("http://1.1.1.1:8000/v1/chat/completions", "llama", analysis_prompt, content)
result_qwen = send_to_model("http://1.1.1.1:8001/v1/chat/completions", "qwen", analysis_prompt, content)
if result_llama:
results_llama.append(result_llama)
if result_qwen:
results_qwen.append(result_qwen)
# 分别保存llama和qwen的结果到不同文件
llama_file = write_individual_results(directory, results_llama, "llama")
qwen_file = write_individual_results(directory, results_qwen, "qwen")
# 汇总llama和qwen的结果
combine_and_summarize(directory, llama_file, qwen_file, "http://1.1.1.1:8001/v1/chat/completions")
end_time = time.time()
total_duration = end_time - start_time
print(f"总耗时: {total_duration:.2f} 秒")
if __name__ == "__main__":
main("/Users/jixing/PycharmProjects/AIOps-utils/Athena_Legacy")
在上一篇文档中实现了检查单台服务器故障的典型排错场景。此次我们加大难度
一、排查链路中故障,识别南北向流量走向并给出排查结果
难点
思路
二、与真实用户交流,给出域名申请建议并检测是否可用
难点
思路
整体难点,多agent执行顺序,“技能绑定”,来看效果。图1为用户与gatekeeper探讨需求
图2为agent建议用户使用的解析记录
图3为正确路由南北向流量问题,并使用对应function判断
关键代码片段
探索大模型在运维工作中的方向,此篇主要讲故障排查。是“Autogen 运维排错实践-复杂案例”的进一步整合,改进如下
用户在资产中选择目标机器
描述故障,选择策略(自动执行、逐步询问),点击执行
输出结果
利用堡垒机与所有目标机器互通,将aiagent部署在此。通过提示词确认专精方向、连接方式。后端使用Django开启websocket,前端使用xterm.js模拟终端
模型仅具备各领域的通用知识,对于垂类仍有进步空间,这也是医疗、政务类模型出现的原因。我们在尝试AIagent时发现模型并不够聪明,对于安装性能分析工具,vim前后台等问题无法进展到下一步,详见 Autogen 运维排错实践-复杂案例。此次尝试使用偏运维领域的ServerFault,爬取经过人工审核的有效答案来微调模型,观察效果。简言之,教模型所不擅长
先看效果,根据采集到的数据,统计出ServerFault热门词云
筛选逻辑,根据Active状态&前500页&作者vote过的问题,分别记录问题链接、标题、内容、发布时间、更新时间、被查看总数、投票总数;答案内容、得分9个字段,两张表通过外键关联
CREATETABLE Posts (
PostID INTEGERPRIMARYKEY,
PostLink TEXTNOTNULL,
Title TEXTNOTNULL,
PostContent TEXTNOTNULL,
PostTime TEXTNOTNULL,-- ISO8601 strings ("YYYY-MM-DD HH:MM:SS.SSS")
ModifyTime TEXTNOTNULL,
ViewCount INTEGERNOTNULL,
VoteCount INTEGERNOTNULL
);
CREATETABLE Answers (
AnswerID INTEGERPRIMARYKEY,
PostID INTEGER,
AnswerContent TEXTNOTNULL,
VoteCount INTEGERNOTNULL,
FOREIGNKEY(PostID)REFERENCES Posts(PostID)
);NO;
经过控制爬虫速率,切换代理地址,共采集问题、答案数
数量 | |
---|---|
Posts | 6681 |
Answers | 16253 |
VoteCount分布
0-100 | 101-200 | 201-300 | 301-400 | 401-500 | >500 | |
---|---|---|---|---|---|---|
Posts | 6278 | 85 | 32 | 13 | 1 | 5 |
Answers | 15643 | 150 | 31 | 16 | 7 | 8 |