探索大模型在运维工作中的方向,此篇主要讲故障排查。是“Autogen 运维排错实践-复杂案例”的进一步整合,改进如下

  • 通过跳板机,不需要在目标机器安装agent,零侵入
  • 入口统一,集成在运维系统
  • 模型自由切换,GPT-4/Claude/tongyi等等

效果

用户在资产中选择目标机器
host_management.png
描述故障,选择策略(自动执行、逐步询问),点击执行
start_tr.png
输出结果
end_tr.png

方案设计

利用堡垒机与所有目标机器互通,将aiagent部署在此。通过提示词确认专精方向、连接方式。后端使用Django开启websocket,前端使用xterm.js模拟终端
topology.png

重点

  • Xterm.js学习曲线陡峭,捕获中文、英文、空格、回退,快捷键等均需要自定义。在即将完成时看到有封装更简单的项目webssh
  • Autogen中与openai通信使用了api.openai.com,改对应库中的域名至代理域名
  • websocket模式需要配置asgi使用,加载静态文件有差别。consumers.py和routing.py需自定义
  • AIagent中提示词需要明确,注意模型的上下文限制,通过提示词截取部分结果
    <script>
        var term = new Terminal({
            cursorBlink: true
        });
        term.open(document.getElementById('terminal'));
        term.write('Attempting to connect to the server...\r\n');
        
        var hostIp = "172.16.1.53"; 
        var ws = new WebSocket('ws://' + window.location.host + '/ws/ssh/' + hostIp);
        ws.onopen = function () {
            console.log('WebSocket connection opened.');
            ws.send(JSON.stringify({message: '\n'})); // 发送换行符来初始化会话
        };

        ws.onmessage = function (event) {
            console.log('Received message:', event.data);
            try {
                var data = JSON.parse(event.data);
                console.log('Parsed data:', data);
                if (data.message !== undefined) {
                    var cleanMessage = data.message.replace(/(\[.*?@.*?\s.*?\])\$/g, "");
                    term.write(cleanMessage);
                } else {
                    console.error('No message field in received data');
                }
            } catch (error) {
                console.error("Error parsing JSON:", error);
                term.write("\r\nReceived non-JSON message: " + event.data);
            }
        };

        term.onData(data => {
            console.log('Data received:', data); // 打印接收到的数据,帮助调试
            ws.send(JSON.stringify({message: data})); // 直接发送接收到的数据
            if (data.charCodeAt(0) === 13) { // 回车符的 char code 是 13
                // 可以选择在这里手动处理新行,如果服务器没有正确处理
                term.write('\r\n');
            } else {
                // 对于其他的数据,写回终端
                term.write(data);
            }
        });

        term.onKey(e => {
            const {key, domEvent} = e;
            console.log('Key pressed:', key); // 添加日志来跟踪哪些键被按下
            if (domEvent.keyCode === 8) {
                // 处理退格键,删除缓冲区中的最后一个字符并更新终端显示
                term.write('\b \b');
            }
        });

        ws.onclose = function () {
            console.log('WebSocket connection closed.');
            term.write('\r\nConnection Closed\r\n');
        };

        document.getElementById('executeButton').addEventListener('click', function () {
            var commandOption = document.getElementById('exampleFormControlSelect1').value; // 获取下拉选项
            var clueInput = document.getElementById('exampleFormControlInput1').value; // 获取输入框内容

            // 构建要执行的完整命令字符串,这里以 'ls' 命令为例
            var initCommand = `cd /data/autogen && source venv/bin/activate && \n`
            var fullCommand = `python twoagent.py {{ host_ip }},${clueInput} ${commandOption} \n`;

            // 进入目录
            ws.send(JSON.stringify({message: initCommand}));
            
            // 发送命令到 WebSocket
            ws.send(JSON.stringify({message: fullCommand}));
            console.log('Command sent:', fullCommand);

            // 清除输入框或处理其他UI逻辑
            document.getElementById('exampleFormControlInput1').value = ''; // 选择性地清空输入框
        });

    </script>
# consumers.py
import json
import threading
from channels.generic.websocket import WebsocketConsumer
import paramiko


class SSHConsumer(WebsocketConsumer):
    def connect(self):
        # 从WebSocket URL中获取host_ip
        self.host_ip = self.scope['url_route']['kwargs']['host_ip']
        # print("Attempting to connect with host_ip:", self.scope['url_route']['kwargs']['host_ip'])
        self.accept()  # 接受WebSocket连接

        # 初始化SSH连接
        self.ssh = paramiko.SSHClient()
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            self.ssh.connect(self.host_ip, username='root')
            self.channel = self.ssh.invoke_shell(term='xterm')
            self.channel.send('stty -echo\r')  # 关闭回显
            self.channel.send('clear\r')  # 清屏

            # 开启线程来监听SSH通道
            thread = threading.Thread(target=self.listen_to_ssh)
            thread.start()
        except Exception as e:
            self.send(text_data=json.dumps({'error': str(e)}))
            self.close()

    def listen_to_ssh(self):
        while True:
            if self.channel.recv_ready():
                data = self.channel.recv(10240).decode('utf-8')
                self.send(text_data=json.dumps({'message': data}))

    def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # if message.strip() or message == '\r':
        #     self.channel.send(message)
        self.channel.send(message)

    def disconnect(self, close_code):
        if hasattr(self, 'ssh'):
            self.ssh.close()
            if hasattr(self, 'channel'):
                self.channel.close()
# twoagent.py
# -*- coding: utf-8 -*-
import autogen
import argparse  # 导入argparse库

# 创建一个解析器对象
parser = argparse.ArgumentParser(description='启动与助手的聊天')
# 添加预期的命令行参数,这里我们期待一个名为 'message' 的字符串
parser.add_argument('message', type=str, help='用户想要传达给助手的消息')
# 添加一个必须参数来指定human_input_mode
parser.add_argument('input_mode', type=str, help='设置用户代理的输入模式')

# 解析通过命令行提供的参数
args = parser.parse_args()

# 现在,args.message 包含用户通过命令行传入的消息

config_list = autogen.config_list_from_json(
    "OAI_CONFIG_LIST",
    filter_dict={
        "model": ["gpt-4"],
    },
)

llm_config = {
    "seed": 60,
    "config_list": config_list,
    "temperature": 0.1,
}

assistant = autogen.AssistantAgent(
    name="assistant",
    llm_config=llm_config,
    system_message="""
                You are a professional DevOps engineer named Small Wei, with expertise in Linux, Kubernetes, and other areas.
                1. The program runs on a bastion host, and passwordless login has been set up with all machines using the username 'root' and the private key located at /root/.ssh/id_rsa;
                2. Must use Paramiko to connect to servers;
                3. If the execution result is too long, please truncate the first 1000 characters;
                4. Please respond in Chinese.
    """
)

user_proxy = autogen.UserProxyAgent(
    name="小维",
    human_input_mode=args.input_mode,  # 使用通过命令行参数传入的输入模式
    code_execution_config={"work_dir": ".", "use_docker": False},
    max_consecutive_auto_reply=10,
    is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE"),
    llm_config=llm_config,
    system_message="""
                If the task has been completely and satisfactorily resolved, then reply "TERMINATE". Otherwise, reply with CONTINUE or the reason why the task has not been resolved.
    """
)

user_proxy.initiate_chat(
    assistant,
    message=args.message  # 使用用户输入的消息
)

标签: 大模型, autogen, aiops, django, xterm.js, websocket

添加新评论