@clawhub-freepengyang-2bdf85f5b0
Collect server monitoring data (Zabbix / Prometheus / Alibaba / Tencent / Huawei Cloud), generate CSV/XLSX reports and send via email or Feishu.
---
name: server-monitor-collector
description: Collect server monitoring data (Zabbix / Prometheus / Alibaba / Tencent / Huawei Cloud), generate CSV/XLSX reports and send via email or Feishu.
triggers:
- collect server monitoring data
- server health report
- host monitoring采集
- zabbix prometheus monitoring
- cloud CVM monitoring
- server daily report cron
- TC3-HMAC-SHA256 signature
homepage: https://clawhub.ai/skills
metadata:
{
"openclaw":
{
"emoji": "🖥️",
"requires": { "bins": ["python3"] },
"install":
[
{
"id": "scripts",
"kind": "file",
"src": "scripts/zabbix_cron.py",
"label": "Main cron entry point (Zabbix + Cloud + Feishu + Email)"
},
{
"id": "scripts-cloud",
"kind": "file",
"src": "scripts/cloud_monitor.py",
"label": "Multi-cloud collector: Alibaba / Tencent / Huawei"
},
{
"id": "scripts-standalone",
"kind": "file",
"src": "scripts/zabbix_monitor.py",
"label": "Zabbix standalone collector + Excel report generator"
},
{
"id": "scripts-mail",
"kind": "file",
"src": "scripts/send_zabbix_report.py",
"label": "Standalone email sender"
},
{
"id": "hermes-skill",
"kind": "file",
"src": "references/zabbix-config.md",
"label": "Configure data sources in ~/.hermes/.env"
},
{
"id": "cloud-config",
"kind": "file",
"src": "references/cloud-config.md",
"label": "Cloud API credentials: Alibaba / Tencent / Huawei"
},
{
"id": "notification-config",
"kind": "file",
"src": "references/notification-config.md",
"label": "Feishu and email notification setup"
}
]
}
}
---
# Server Monitor Collector
Collect server or cloud VM monitoring data, generate formatted Excel reports, and optionally send summaries via email or Feishu/Lark.
## Supported Data Sources
| Source | Auth | Notes |
|--------|------|-------|
| Zabbix | User/Pass or API Token | Host groups, memory, CPU, disk |
| Prometheus | URL only | PromQL queries |
| Alibaba Cloud CMS | AccessKey/SecretKey | ECS, RDS, SLB, EIP metrics |
| Tencent Cloud CAM | SecretID/Key | TC3-HMAC-SHA256 signature |
| Huawei Cloud IAM | AccessKey/SecretKey | IAM Token auth |
Data sources are **auto-detected** from `.env` — configure credentials for any combination and they will all be collected.
## Setup
### 1. Configure Environment
Create/edit `~/.hermes/.env`. Only configure the sources you need:
```bash
# --- Zabbix (pick one auth method) ---
ZABBIX_URL=https://zabbix.example.com/api_jsonrpc.php
ZABBIX_USER=Admin
ZABBIX_PASSWORD=your_password
# ZABBIX_TOKEN=your_api_token # optional, takes priority over password
# --- Alibaba Cloud ---
ALIBABA_ACCESS_KEY_ID=your_key_id
ALIBABA_ACCESS_KEY_SECRET=your_secret
ALIBABA_REGION=cn-hangzhou
# ALIBABA_METRICS=CPUUtilization,MemoryUtilization,InternetInRate # optional
# --- Tencent Cloud ---
TENCENT_SECRET_ID=your_secret_id
TENCENT_SECRET_KEY=your_secret_key
TENCENT_REGION=ap-shanghai
# --- Huawei Cloud ---
HUAWEI_ACCESS_KEY=your_access_key
HUAWEI_SECRET_KEY=your_secret_key
HUAWEI_REGION=cn-east-3
# --- Notifications ---
FEISHU_CHAT_ID=oc_xxxx # optional
SMTP_HOST=smtp.example.com # optional, omit to skip email
SMTP_PORT=465
[email protected]
SMTP_TOKEN=your_token
[email protected]
# --- Report options ---
# TOPN: show top N hosts by memory+CPU score, 0=off (default: 50)
TOPN=50
```
### 2. Install Dependencies
**Zabbix / Prometheus** — no extra deps:
```bash
python3 zabbix_cron.py
```
**Alibaba Cloud** — needs SDK (use `uv` since venv has no pip):
```bash
uv run --with aliyun-python-sdk-core --with aliyun-python-sdk-cms \
python3 cloud_monitor.py
```
**Tencent / Huawei** — pure Python, only `httpx` needed:
```bash
uv run --with httpx python3 cloud_monitor.py
```
### 3. Run Once (Manual Test)
```bash
python3 zabbix_cron.py
```
Expected output:
- `~/.hermes/cron/output/zabbix_monitor.csv`
- `~/.hermes/cron/output/zabbix_monitor.xlsx` (one sheet per host group + overview + TOP sheet)
### 4. Schedule Daily Report
```bash
hermes cron create \
--name "Daily Server Health Report" \
--script zabbix_cron.py \
--schedule "30 9 * * *"
```
## Output Format
### CSV
- UTF-8-BOM encoding — opens correctly in Windows Excel without garbled characters
- Columns: `主机组`, `主机名`, `IP`, `内存可用(GB)`, `内存总量(GB)`, `内存占用率(%)`, `CPU占用率(%)`
### XLSX
- **总览** sheet: summary table with host group stats and alarm counts
- **Group sheets**: one per host group, sorted by memory usage descending
- **TOP50(内存+CPU)** sheet: top 50 hosts across all groups by combined memory+CPU score
- Cell coloring: `🔴 ≥80%` red, `🟠 ≥60%` orange, `🟡 ≥40%` yellow
## Auto-Detection Logic
Scripts detect which sources to use based on which env vars are set:
| Env var present | Data source used |
|----------------|-----------------|
| `ZABBIX_URL` | Zabbix API |
| `ALIBABA_ACCESS_KEY_ID` | Alibaba Cloud CMS (SDK) |
| `TENCENT_SECRET_ID` | Tencent Cloud CAM (TC3签名) |
| `HUAWEI_ACCESS_KEY` | Huawei Cloud IAM (Token) |
| `PROMETHEUS_URL` | Prometheus PromQL |
## Zabbix Host Group Exclusion
These groups are excluded by default (set in `EXCLUDE_GROUPS` in script):
- `Templates*` — template groups
- `Discovered hosts` — Zabbix auto-discovery
## Key Zabbix Item Keys
| Key | Description |
|-----|-------------|
| `vm.memory.size[available]` | Memory available (bytes) |
| `vm.memory.size[total]` | Memory total (bytes) |
| `system.cpu.util` | CPU utilization (%) |
| `vfs.fs.size[/,pused]` | Root disk usage (%) |
## Alarm Thresholds
| Metric | Warning | Alarm |
|--------|---------|-------|
| Memory usage | ≥40% yellow | ≥60% orange, ≥80% red |
| CPU usage | ≥40% yellow | ≥60% orange, ≥80% red |
## Feishu Message Format
Markdown card sent to `FEISHU_CHAT_ID` containing:
- Report timestamp, total hosts, group count
- Top 20 hosts with memory ≥60% or CPU ≥60%
- Color-coded: 🔴≥80%, 🟠≥60%, 🟡≥40%
## Email Format
- Subject: `服务器监控报告 YYYY-MM-DD HH:MM`
- Body: HTML summary matching the Feishu card
- Attachment: `zabbix_monitor.xlsx`
## References
- `references/zabbix-config.md` — Zabbix API details, item keys, auth options
- `references/notification-config.md` — Feishu and email SMTP setup, common providers
- `references/cloud-config.md` — Alibaba / Tencent / Huawei API endpoints, namespaces, SDK usage
## Guardrails
- **Never hardcode credentials** — always use `~/.hermes/.env`
- **Never print full credentials** in logs or chat
- **Never place scripts in web-accessible directories**
- If Zabbix host has no Agent — memory metrics show `N/A`, CPU still works
- Alibaba Cloud `MemoryUtilization` requires Cloud Monitor Agent installed on ECS instance
FILE:references/cloud-config.md
# 云服务商监控配置
## 通用说明
所有云服务商默认不启用——在 `.env` 中配置相应凭证后自动生效。
## 阿里云(Alibaba Cloud CMS)
### 环境变量
```bash
ALIBABA_ACCESS_KEY_ID=your_key_id
ALIBABA_ACCESS_KEY_SECRET=your_secret
ALIBABA_REGION=cn-hangzhou # 你的区域,如 cn-qingdao、cn-shanghai
# 可选:只拉取指定指标(逗号分隔)
# 可用指标: CPUUtilization, MemoryUtilization, InternetInRate, InternetOutRate,
# DiskReadBPS, DiskWriteBPS, SysOM_memMonInfo_util(需Agent)
ALIBABA_METRICS=CPUUtilization,MemoryUtilization,InternetInRate,DiskReadBPS
```
### SDK 安装(uv)
```bash
uv run --with aliyun-python-sdk-core --with aliyun-python-sdk-cms python3 script.py
```
### 命名空间与指标
| 服务 | 命名空间 | 可用指标 |
|------|----------|---------|
| ECS | `acs_ecs_dashboard` | CPUUtilization, InternetInRate, InternetOutRate, DiskReadBPS, DiskWriteBPS |
| RDS | `acs_rds_dashboard` | CpuUsage, MemoryUsage, DiskUsage, IOPSUsage, ConnectionUsage |
| SLB | `acs_slb_dashboard` | InstanceTrafficRX, InstanceTrafficTX, InstanceQps, InstanceRt |
| EIP | `acs_vpc_eip` | net_rx.rate, net_tx.rate, net_in.rate_percentage, net_out.rate_percentage |
> 注意:ECS 基础指标 `CPUUtilization`、`InternetInRate` 等无需云监控 Agent;但 `MemoryUtilization`、`MemoryUsed` 需要在 ECS 实例上安装云监控 Agent。
### API 调用要点
```python
# 返回值是 bytes,必须 .decode() 后再 json.loads()
data = json.loads(client.do_action_with_exception(req).decode("utf-8"))
# Datapoints 是 JSON 字符串,需要再次 json.loads()
pts = json.loads(data["Datapoints"])
# 分页用 NextToken + Length(不是 Page/PageSize)
# 时间参数必须是毫秒时间戳
```
### 元数据查询(查可用指标)
```python
from aliyunsdkcms.request.v20190101 import DescribeMetricMetaListRequest
req = DescribeMetricMetaListRequest.DescribeMetricMetaListRequest()
req.set_Namespace("acs_ecs_dashboard")
req.set_PageSize(200)
```
---
## 腾讯云(Tencent Cloud CAM)
### 环境变量
```bash
TENCENT_SECRET_ID=your_secret_id
TENCENT_SECRET_KEY=your_secret_key
TENCENT_REGION=ap-shanghai # 你的区域,如 ap-beijing、ap-guangzhou
```
### 签名方式
TC3-HMAC-SHA256,Python 手写实现,无需腾讯云 SDK。
### CVM 监控
- **命名空间**:`QCE/CVM`
- **监控端点**:`monitor.tencentcloudapi.com`
- **实例端点**:`cvm.tencentcloudapi.com`
### 签名流程
```
1. CanonicalRequest = HTTP_METHOD + "\n" + CanonicalURI + "\n" + CanonicalQueryString + "\n" + HashedPayload
2. StringToSign = "TC3-HMAC-SHA256\n" + timestamp + "\n" + date + "\n" + hashed_canonical_request
3. Signature = TC3-HMAC-SHA256嵌套(secret_key, date, "tc3_request", StringToSign)
```
---
## 华为云(Huawei Cloud IAM)
### 环境变量
```bash
HUAWEI_ACCESS_KEY=your_access_key
HUAWEI_SECRET_KEY=your_secret_key
HUAWEI_REGION=cn-east-3 # 你的区域,如 cn-north-4、cn-south-1
```
### 认证方式
IAM Token:POST 到 `https://iam.{region}.myhuaweicloud.com/v3.0/OS-CREDENTIAL/credentials`
### 关键端点
| 用途 | 端点 |
|------|------|
| IAM Token | `https://iam.{region}.myhuaweicloud.com/v3.0/OS-CREDENTIAL/credentials` |
| ECS 列表 | `https://ecs.{region}.myhuaweicloud.com/v1/{project_id}/cloudservers` |
| 监控数据 | `https://ces.{region}.myhuaweicloud.com/V1.0/{project_id}/metric_analytics` |
### 命名空间
- ECS:`SYS.ECS`
- RDS:`SYS.RDS`
- ELB:`SYS.ELB`
FILE:references/notification-config.md
# 飞书 + 邮件发送配置
## 飞书(Feishu/Lark)
### 环境变量
```bash
FEISHU_CHAT_ID=oc_xxxx # 飞书群会话 ID 或用户 open_id
```
### 获取 Chat ID
- **群聊**:在飞书群设置 → 群信息 → 基本信息 → 群 ID
- **单聊**:直接使用用户的 `open_id`(以 `ou_` 开头)
### 消息卡片格式
摘要消息为 Markdown 格式,包含:
- 采集时间、主机总数、主机组数
- 重点关注列表(内存占用≥60% 或 CPU≥60% 的主机,最多20条)
- 告警着色(红色=≥80%,橙色=≥60%,黄色=≥40%)
---
## 邮件发送
### 环境变量
```bash
SMTP_HOST=smtp.example.com
SMTP_PORT=465 # SSL 端口,通常 465
[email protected]
SMTP_TOKEN=your_smtp_token # 163邮箱用授权码,其他邮箱用密码
[email protected]
```
### 常见 SMTP 配置
| 邮箱 | SMTP_HOST | PORT | 说明 |
|------|-----------|------|------|
| 163 | `smtp.163.com` | 465 | 用授权码(非登录密码) |
| QQ | `smtp.qq.com` | 465 | 用授权码 |
| Gmail | `smtp.gmail.com` | 587 | 用应用专用密码 |
### 发送内容
- **主题**:服务器监控报告 `YYYY-MM-DD HH:MM`
- **正文**:HTML 格式的摘要(与飞书卡片内容一致)
- **附件**:`zabbix_monitor.xlsx`(Excel 报告)
### 跳过邮件
如果不想发送邮件,只填 `FEISHU_CHAT_ID` 而不填 `SMTP_*`,则只发飞书不发邮件。
FILE:references/zabbix-config.md
# Zabbix 配置
## 环境变量
```bash
ZABBIX_URL=http://zabbix.example.com/api_jsonrpc.php
ZABBIX_USER=Admin
ZABBIX_PASSWORD=your_password
# 可选:API Token(优先级高于用户名密码)
ZABBIX_TOKEN=optional_api_token
# TOPN: 所有主机按内存+CPU综合降序取前N台,0=关闭(默认50)
TOPN=50
```
## Zabbix API 认证方式
### 方式一:用户名 + 密码(默认)
```python
auth = api_call("user.login", {
"user": ZABBIX_USER,
"password": ZABBIX_PASSWORD,
})
```
### 方式二:API Token(更安全)
在 Zabbix Web UI 生成后填入 `.env`,脚本自动优先使用:
```python
auth = os.environ.get("ZABBIX_TOKEN") # 有值则跳过 login
```
## 核心采集指标
| 指标 Key | 说明 |
|----------|------|
| `vm.memory.size[available]` | 内存可用字节 |
| `vm.memory.size[total]` | 内存总量字节 |
| `vm.memory.size[pavailable]` | 内存可用百分比 |
| `system.cpu.util` | CPU 利用率(所有核心平均) |
| `vfs.fs.size[/,pused]` | 根分区磁盘使用率 |
## 主机组排除规则
以下名称的主机组默认排除(可在脚本中修改 `EXCLUDE_GROUPS`):
- `Templates*`(所有以 Templates 开头的主机组)
- `Discovered hosts`(Zabbix 自动发现的主机)
## 字段说明
- **内存占用率(%)**:`(mem_total - mem_avail) / mem_total * 100`
- **输出路径**:`~/.hermes/cron/output/zabbix_monitor.csv` 和 `.xlsx`
- **编码**:CSV 为 UTF-8-BOM,Windows Excel 打开不乱码
## 无 Agent 时
内存指标依赖 Zabbix Agent。若主机无 Agent:
- `mem_total` 和 `mem_avail` 均返回空
- 内存占用率显示 `N/A`
FILE:references/zabbix_cron.py
#!/usr/bin/env python3
"""
Zabbix 监控报告:采集数据 → XLSX/CSV → 飞书消息 → 邮件
"""
import os, sys, csv, json, smtplib
from datetime import datetime
from urllib.request import urlopen, Request
from urllib.error import URLError
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from collections import defaultdict
ZABBIX_URL = os.environ.get("ZABBIX_URL", "http://zabbix.ops.qiyujoy.com/api_jsonrpc.php")
ZABBIX_USER = os.environ.get("ZABBIX_USER", "Admin")
ZABBIX_PASSWORD = os.environ.get("ZABBIX_PASSWORD", "Rk&E6D5*#aW&")
ZABBIX_TOKEN = os.environ.get("ZABBIX_TOKEN", "")
EXCLUDE_GROUPS = {"Templates","Templates/Applications","Templates/Databases",
"Templates/Modules","Templates/Network devices",
"Templates/Operating systems","Templates/Server hardware",
"Templates/Virtualization","Discovered hosts"}
ITEMS_KEY = {
"memory_avail": "vm.memory.size[available]",
"memory_total": "vm.memory.size[total]",
"cpu": "system.cpu.util",
}
CSV_PATH = "/root/.hermes/cron/output/zabbix_monitor.csv"
XLSX_PATH = "/root/.hermes/cron/output/zabbix_monitor.xlsx"
def api_call(method, params, auth=None):
payload = {"jsonrpc":"2.0","method":method,"params":params,"id":1}
if auth: payload["auth"] = auth
data = json.dumps(payload).encode("utf-8")
req = Request(ZABBIX_URL, data=data, headers={"Content-Type":"application/json"})
try:
with urlopen(req, timeout=60) as resp:
result = json.loads(resp.read().decode("utf-8"))
except URLError as e:
print(f"API 请求失败: {e}"); sys.exit(1)
if "error" in result:
print(f"API 错误: {result['error']}"); sys.exit(1)
return result.get("result",[])
def fetch_all(auth):
groups = api_call("hostgroup.get",{"output":["groupid","name"]}, auth=auth)
groups = [g for g in groups if g["name"] not in EXCLUDE_GROUPS]
hosts = api_call("host.get",{
"output":["hostid","name","host"],
"groupids":[g["groupid"] for g in groups],
"selectGroups":["groupid","name"],
}, auth=auth)
all_items = []
for i in range(0, len(hosts), 100):
batch = [h["hostid"] for h in hosts[i:i+100]]
items = api_call("item.get",{
"output":["itemid","hostid","key_","lastvalue"],
"hostids":batch,
"filter":{"key_":list(ITEMS_KEY.values())},
}, auth=auth)
all_items.extend(items)
item_map = {(it["hostid"], it["key_"]): it.get("lastvalue","") for it in all_items}
rows = []
for host in hosts:
hid = host["hostid"]
gnames = [g["name"] for g in host.get("groups",[])]
valid = [n for n in gnames if n not in EXCLUDE_GROUPS]
if not valid: continue
gname = valid[0]
mem_total = item_map.get((hid, ITEMS_KEY["memory_total"]),"")
mem_avail = item_map.get((hid, ITEMS_KEY["memory_avail"]),"")
cpu = item_map.get((hid, ITEMS_KEY["cpu"]),"")
mt = float(mem_total)/(1024**3) if mem_total else None
ma = float(mem_avail)/(1024**3) if mem_avail else None
cp = float(cpu) if cpu else None
mp = (1 - float(mem_avail)/float(mem_total))*100 if mem_avail and mem_total else None
rows.append({"group":gname,"name":host["name"],"ip":host["host"],
"mem_total_gb":mt,"mem_avail_gb":ma,"mem_used_pct":mp,"cpu_pct":cp})
return rows
def generate_xlsx(rows):
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
def tb(): s=Side(style="thin",color="CCCCCC"); return Border(left=s,right=s,top=s,bottom=s)
def hdr(cell, text):
cell.value=text; cell.font=Font(name="微软雅黑",bold=True,size=10,color="FFFFFF")
cell.fill=PatternFill("solid",fgColor="4472C4")
cell.alignment=Alignment(horizontal="center",vertical="center"); cell.border=tb()
def pct_color(p, bg):
if p is None: return bg,"000000"
return ("FF4444","FFFFFF") if p>=80 else ("FFAA44","000000") if p>=60 else ("FFEE88","000000") if p>=40 else (bg,"000000")
gr = defaultdict(list)
for r in rows: gr[r["group"]].append(r)
wb = openpyxl.Workbook(); wb.remove(wb.active)
ws_ov = wb.create_sheet(title="总览")
ws_ov.cell(row=1,column=1,value="服务器监控总览").font=Font(name="微软雅黑",bold=True,size=14)
ws_ov.cell(row=1,column=1).alignment=Alignment(horizontal="left")
ws_ov.row_dimensions[1].height=24
ws_ov.cell(row=2,column=1,value=f"采集时间:{datetime.now().strftime('%Y-%m-%d %H:%M')}")
ws_ov.cell(row=2,column=1).font=Font(name="微软雅黑",size=10,color="666666")
ws_ov.cell(row=3,column=1,value=f"共 {len(rows)} 台主机,{len(gr)} 个主机组")
ws_ov.cell(row=3,column=1).font=Font(name="微软雅黑",size=10,color="666666")
for ci,h in enumerate(["主机组","主机数","内存告警(≥80%)","CPU告警(≥80%)"],1):
hdr(ws_ov.cell(row=5,column=ci),h)
ws_ov.row_dimensions[5].height=20
for ri,(gn,gd) in enumerate(sorted(gr.items()),start=6):
ma=sum(1 for r in gd if r["mem_used_pct"] is not None and r["mem_used_pct"]>=80)
ca=sum(1 for r in gd if r["cpu_pct"] is not None and r["cpu_pct"]>=80)
for ci,val in enumerate([gn,len(gd),ma,ca],1):
c=ws_ov.cell(row=ri,column=ci,value=val)
c.font=Font(name="微软雅黑",size=10); c.alignment=Alignment(horizontal="center",vertical="center"); c.border=tb()
if ci==3 and ma>0: c.fill=PatternFill("solid",fgColor="FF4444"); c.font=Font(name="微软雅黑",size=10,bold=True,color="FFFFFF")
elif ci==4 and ca>0: c.fill=PatternFill("solid",fgColor="FF4444"); c.font=Font(name="微软雅黑",size=10,bold=True,color="FFFFFF")
for ci,w in enumerate([24,10,16,16],1): ws_ov.column_dimensions[get_column_letter(ci)].width=w
cols=[("主机名",32),("IP",18),("内存总量(GB)",14),("内存可用(GB)",14),("内存占用率(%)",14),("CPU占用率(%)",13)]
for gn,gd in sorted(gr.items()):
ws=wb.create_sheet(title=gn[:31]); ws.row_dimensions[1].height=20
for ci,(ht,_) in enumerate(cols,1): hdr(ws.cell(row=1,column=ci),ht)
gd.sort(key=lambda x:(-(x["mem_used_pct"] or 0),-(x["cpu_pct"] or 0)))
for ri,r in enumerate(gd,start=2):
bg="EEF2FF" if ri%2==0 else "FFFFFF"
mb,mc=pct_color(r.get("mem_used_pct"),bg); cb,cc=pct_color(r.get("cpu_pct"),bg)
for ci,(val,cbg,cfc,fmt) in enumerate([
(r["name"],bg,"000000",None),(r["ip"],bg,"000000",None),
(r["mem_total_gb"],bg,"000000","0.0"),(r["mem_avail_gb"],bg,"000000","0.0"),
(r["mem_used_pct"],mb,mc,"0.0"),(r["cpu_pct"],cb,cc,"0.0"),
],1):
c=ws.cell(row=ri,column=ci)
if val is None: c.value="N/A"
else:
c.value=val
if fmt: c.number_format=fmt
c.font=Font(name="微软雅黑",size=10,color=cfc)
c.fill=PatternFill("solid",fgColor=cbg)
c.alignment=Alignment(horizontal="center",vertical="center"); c.border=tb()
for ci,(_,w) in enumerate(cols,1): ws.column_dimensions[get_column_letter(ci)].width=w
ws.freeze_panes="A2"
wb.save(XLSX_PATH); print(f"XLSX: {XLSX_PATH}")
def generate_csv(rows):
os.makedirs(os.path.dirname(CSV_PATH),exist_ok=True)
with open(CSV_PATH,"w",newline="",encoding="utf-8-sig") as f:
w=csv.writer(f); w.writerow(["主机组","主机名","IP","内存总量(GB)","内存可用(GB)","内存占用率(%)","CPU占用率(%)"])
for r in rows:
w.writerow([r["group"],r["name"],r["ip"],
f"{r['mem_total_gb']:.1f}" if r['mem_total_gb'] is not None else "N/A",
f"{r['mem_avail_gb']:.1f}" if r['mem_avail_gb'] is not None else "N/A",
f"{r['mem_used_pct']:.1f}" if r['mem_used_pct'] is not None else "N/A",
f"{r['cpu_pct']:.1f}" if r['cpu_pct'] is not None else "N/A",
])
print(f"CSV: {CSV_PATH}")
def build_feishu_summary(rows):
gr=defaultdict(list)
for r in rows: gr[r["group"]].append(r)
warn=[r for r in rows if (r["mem_used_pct"] or 0)>=60 or (r["cpu_pct"] or 0)>=60]
warn.sort(key=lambda x:(-(x["mem_used_pct"] or 0),-(x["cpu_pct"] or 0)))
lines=[f"## 服务器监控报告","",
f"**采集时间**:{datetime.now().strftime('%Y-%m-%d %H:%M')}",
f"共 **{len(rows)}** 台主机,覆盖 **{len(gr)}** 个主机组",""]
if warn:
lines+=["### ⚠ 重点关注(内存占用≥60% 或 CPU≥60%)",""]
lines+=["| 主机名 | 主机组 | 内存占用率(%) | CPU占用率(%) |","|---|---|---|---|"]
for r in warn[:20]: lines.append(f"| {r['name']} | {r['group']} | {r['mem_used_pct']:.1f} | {r['cpu_pct']:.1f} |")
if len(warn)>20: lines.append(f"...(共 {len(warn)} 台,详见附件)")
else:
lines+=["### ✅ 全部正常(无告警主机)",""]
lines+=["",f"完整数据:`{CSV_PATH}`"]
return "\n".join(lines)
def load_env():
p="/root/.hermes/.env"
if os.path.exists(p):
with open(p) as f:
for line in f:
line=line.strip()
if "=" in line and not line.startswith("#"):
k,v=line.split("=",1); os.environ[k]=v.strip()
def send_email(subject, html_body, attachments=None):
load_env()
host=os.environ.get("SMTP_HOST",""); port=os.environ.get("SMTP_PORT","465")
sender=os.environ.get("SMTP_FROM",""); token=os.environ.get("SMTP_TOKEN","")
target=os.environ.get("TARGET_EMAIL","")
if not all([host,sender,token,target]): print("邮件配置不完整,跳过"); return
msg=MIMEMultipart(); msg["From"]=sender; msg["To"]=target; msg["Subject"]=subject
msg.attach(MIMEText(html_body,"html","utf-8"))
for fpath in (attachments or []):
if os.path.exists(fpath):
with open(fpath,"rb") as f:
part=MIMEBase("application","octet-stream"); part.set_payload(f.read())
encoders.encode_base64(part)
part["Content-Disposition"]=f"attachment; filename={os.path.basename(fpath)}"
msg.attach(part)
try:
if port=="465":
with smtplib.SMTP_SSL(host,int(port)) as s: s.login(sender,token); s.sendmail(sender,target,msg.as_string())
else:
with smtplib.SMTP(host,int(port)) as s: s.starttls(); s.login(sender,token); s.sendmail(sender,target,msg.as_string())
print(f"邮件已发送: {target}")
except Exception as e: print(f"邮件发送失败: {e}")
def build_html_body(rows):
gr=defaultdict(list)
for r in rows: gr[r["group"]].append(r)
html=f"<html><body><h2>服务器监控报告</h2><p><b>采集时间:</b>{datetime.now().strftime('%Y-%m-%d %H:%M')}</p><p><b>共 {len(rows)} 台,{len(gr)} 组</b></p>"
for gn,gd in sorted(gr.items()):
html+=f"<h3>{gn} ({len(gd)} 台)</h3>"
html+="<table border='1' cellpadding='4' cellspacing='0' style='border-collapse:collapse;font-size:12px;'>"
html+="<tr bgcolor='#4472C4' style='color:white;'><th>主机名</th><th>IP</th><th>内存总量(GB)</th><th>内存可用(GB)</th><th>内存占用率(%)</th><th>CPU占用率(%)</th></tr>"
for i,r in enumerate(gd):
bg="#EEF2FF" if i%2==0 else "#FFFFFF"
mp=r["mem_used_pct"] or 0; cp=r["cpu_pct"] or 0
ms=("background:#FF4444;color:white;" if mp>=80 else "background:#FFAA44;" if mp>=60 else "background:#FFEE88;" if mp>=40 else "")
cs=("background:#FF4444;color:white;" if cp>=80 else "background:#FFAA44;" if cp>=60 else "background:#FFEE88;" if cp>=40 else "")
html+=f"<tr bgcolor='{bg}'><td>{r['name']}</td><td>{r['ip']}</td>"
html+=f"<td>{r['mem_total_gb']:.1f}</td>" if r['mem_total_gb'] else "<td>N/A</td>"
html+=f"<td>{r['mem_avail_gb']:.1f}</td>" if r['mem_avail_gb'] else "<td>N/A</td>"
html+=f"<td style='{ms}'>{r['mem_used_pct']:.1f}</td>" if r['mem_used_pct'] else "<td>N/A</td>"
html+=f"<td style='{cs}'>{r['cpu_pct']:.1f}</td>" if r['cpu_pct'] else "<td>N/A</td></tr>"
html+="</table><br/>"
html+="</body></html>"
return html
def main():
print(f"[{datetime.now().strftime('%H:%M:%S')}] 开始巡检...")
auth=api_call("user.login",{"user":ZABBIX_USER,"password":ZABBIX_PASSWORD})
print(f"[{datetime.now().strftime('%H:%M:%S')}] 登录成功")
rows=fetch_all(auth)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 采集完成: {len(rows)} 台")
generate_csv(rows); generate_xlsx(rows)
summary=build_feishu_summary(rows)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 飞书摘要:\n{summary[:500]}")
subject=f"【监控报告】服务器巡检 {datetime.now().strftime('%Y-%m-%d %H:%M')}"
atts=[f for f in [XLSX_PATH,CSV_PATH] if os.path.exists(f)]
send_email(subject, build_html_body(rows), atts)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 全部完成!")
if __name__=="__main__": main()
FILE:scripts/aliyun_monitor.py
#!/usr/bin/env python3
"""
阿里云 CMS 监控数据采集
支持 ECS / RDS / SLB / EIP
- ECS: acs_ecs_dashboard
- RDS: acs_rds_dashboard
- SLB: acs_slb_dashboard
- EIP: acs_vpc_eip
"""
import json, time, os, sys
import pandas as pd
from aliyunsdkcore.client import AcsClient
from aliyunsdkcms.request.v20190101 import DescribeMetricListRequest
# === 配置 ===
LTAI = os.environ.get("ALIBABA_ACCESS_KEY_ID", "LTAI5t9rEAm36j2kRinX5Yut")
SK = os.environ.get("ALIBABA_ACCESS_KEY_SECRET", "sFg3Bv3cT41ZGB7bzUIYNs0zTP9IC5")
REGION = os.environ.get("ALIBABA_REGION", "cn-qingdao")
# 指标定义: (namespace, [(metric_name, value_field)])
METRICS = {
"ECS": ("acs_ecs_dashboard", [
("CPUUtilization", "Average"),
("MemoryUsed", "Average"),
("MemoryUtilization", "Average"),
("DiskReadBPS", "Average"),
("DiskWriteBPS", "Average"),
("InternetInRate", "Average"),
("InternetOutRate", "Average"),
]),
"RDS": ("acs_rds_dashboard", [
("CpuUsage", "Average"),
("MemoryUsage", "Average"),
("DiskUsage", "Average"),
("IOPSUsage", "Average"),
("ConnectionUsage", "Average"),
("QPS", "Average"),
]),
"SLB": ("acs_slb_dashboard", [
("InstanceTrafficRX", "Average"),
("InstanceTrafficTX", "Average"),
("InstanceQps", "Average"),
("InstanceRt", "Average"),
("InstanceMaxConnection", "Average"),
]),
"EIP": ("acs_vpc_eip", [
("net_rx.rate", "Average"),
("net_tx.rate", "Average"),
("net_in.rate_percentage", "Average"),
("net_out.rate_percentage","Average"),
]),
}
now_ms = int(time.time() * 1000)
start_ms = now_ms - 4 * 86400 * 1000
client = AcsClient(LTAI, SK, REGION)
def fetch_metric(namespace, metric, value_field="Average"):
"""拉取单个指标最新数据(全量实例)"""
all_instances = {}
next_token = None
for _ in range(1, 500):
req = DescribeMetricListRequest.DescribeMetricListRequest()
req.set_MetricName(metric)
req.set_Namespace(namespace)
req.set_Period(60)
req.set_StartTime(start_ms)
req.set_EndTime(now_ms)
req.set_Length(100)
if next_token:
req.set_NextToken(next_token)
try:
resp = client.do_action_with_exception(req)
data = json.loads(resp.decode("utf-8"))
except Exception as e:
print(f" [{metric}] 请求异常: {e}", file=sys.stderr)
break
if data.get("Code") != "200":
break
pts = json.loads(data["Datapoints"])
for p in pts:
iid = (p.get("instanceId") or p.get("instanceId") or
p.get("instanceId") or p.get("eipId") or
p.get("loadBalancerId") or str(p.get("dimensions", {})))
ts = p.get("timestamp", 0)
if iid not in all_instances or ts > all_instances[iid].get("timestamp", 0):
all_instances[iid] = p
next_token = data.get("NextToken")
if not next_token:
break
return {iid: p.get(value_field, 0) for iid, p in all_instances.items()}
def collect():
"""采集所有服务,构建 DataFrame"""
rows = []
for svc, (ns, metrics) in METRICS.items():
print(f"\n=== {svc} ({ns}) ===")
svc_rows = {}
for metric, vf in metrics:
print(f" {metric}...", end=" ", flush=True)
data = fetch_metric(ns, metric, vf)
print(f"{len(data)} 实例")
for iid, val in data.items():
if iid not in svc_rows:
svc_rows[iid] = {"instanceId": iid, "service": svc}
svc_rows[iid][f"{metric}_{vf}"] = round(val, 2)
rows.extend(svc_rows.values())
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows)
df = df.set_index("instanceId")
return df
if __name__ == "__main__":
print(f"阿里云监控采集 | Region: {REGION} | 近4天数据")
df = collect()
print(f"\n结果: {len(df)} 条, {len(df.columns)} 列")
if not df.empty:
print(df.head(10).to_string())
out = "/root/.hermes/cron/output/aliyun_monitor.xlsx"
df.reset_index().to_excel(out, index=False)
print(f"\n已保存: {out}")
FILE:scripts/cloud_monitor.py
#!/usr/bin/env python3
"""
云服务商监控数据采集 — 统一入口
支持: 阿里云 / 腾讯云 / 华为云
配置方式(环境变量):
阿里云: ALIBABA_ACCESS_KEY_ID, ALIBABA_ACCESS_KEY_SECRET, ALIBABA_REGION
腾讯云: TENCENT_SECRET_ID, TENCENT_SECRET_KEY, TENCENT_REGION
华为云: HUAWEI_ACCESS_KEY, HUAWEI_SECRET_KEY, HUAWEI_REGION
输出: ~/.hermes/cron/output/cloud_monitor_{provider}.xlsx
"""
import os, sys, json, time, hashlib, hmac, struct, base64
from datetime import datetime, timezone
from dotenv import load_dotenv
load_dotenv() # 加载 ~/.hermes/.env
# ─── 公共工具 ────────────────────────────────────────────────────────────────
def md5_hex(data: str) -> str:
return hashlib.md5(data.encode()).hexdigest()
def sha256_hex(data: str) -> str:
return hashlib.sha256(data.encode()).hexdigest()
def hmac_sha256(key: str, msg: str) -> str:
return hmac.new(key.encode(), msg.encode(), hashlib.sha256).hexdigest()
# ═══════════════════════════════════════════════════════════════════════════════
# 腾讯云 — TC3-HMAC-SHA256 签名
# ═══════════════════════════════════════════════════════════════════════════════
class TencentCloudSigner:
"""TC3-HMAC-SHA256 签名实现"""
SERVICE = "cam"
VERSION = "2020-02-17" # CAM API 版本(监控用 monitor 版本)
def __init__(self, secret_id: str, secret_key: str, region: str):
self.secret_id = secret_id
self.secret_key = secret_key
self.region = region
def _sign_tc3(self, key: str, msg: str) -> str:
"""TC3 签名"""
k = ("TC3" + key).encode()
return hmac.new(k, msg.encode(), hashlib.sha256).hexdigest()
def _hmac_sha256_hex(self, key: str, msg: str) -> str:
return hmac.new(key.encode(), msg.encode(), hashlib.sha256).hexdigest()
def sign(self, method: str, host: str, uri: str,
params: dict, payload: str, timestamp: int) -> dict:
"""
生成签名 v5 标准的 HTTP 头
返回 {"Authorization": "...", "X-Date": "...", ...}
"""
# 1. HashedCanonicalRequest
hashed_payload = sha256_hex(payload)
timestamp_str = str(timestamp)
date_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).strftime("%Y-%m-%d")
canonical_uri = uri or "/"
canonical_query = "&".join(f"{k}={params[k]}" for k in sorted(params))
canonical_request = (
f"{method}\n"
f"{canonical_uri}\n"
f"{canonical_query}\n"
f"host:{host}\n"
f"content-type:application/json\n"
f"host\n"
f"{hashed_payload}"
)
hashed_canonical = sha256_hex(canonical_request)
# 2. StringToSign
credential_scope = f"{date_str}/tc3_request"
string_to_sign = (
f"TC3-HMAC-SHA256\n"
f"{timestamp_str}\n"
f"{credential_scope}\n"
f"{hashed_canonical}"
)
# 3. Signature
secret_date = self._sign_tc3(self.secret_key, date_str)
secret_signing = self._sign_tc3(secret_date, "tc3_request")
signature = self._sign_tc3(secret_signing, string_to_sign)
# 4. Authorization
authorization = (
f"TC3-HMAC-SHA256 "
f"Credential={self.secret_id}/{credential_scope}, "
f"SignedHeaders=host;content-type, "
f"Signature={signature}"
)
return {
"Authorization": authorization,
"X-Date": timestamp_str,
"X-Api-Key": self.secret_id,
"Content-Type": "application/json",
}
def tencent_api(action: str, payload: dict,
secret_id: str, secret_key: str,
region: str, service: str = "monitor",
version: str = "2018-07-24") -> dict:
"""
腾讯云 API 调用(Python 实现签名,无 SDK 依赖)
service: cam / monitor / cvm
"""
import httpx
host = f"{service}.tencentcloudapi.com"
uri = "/"
timestamp = int(time.time())
params = {
"Action": action,
"Version": version,
"Region": region,
"Timestamp": timestamp,
"Nonce": 1,
}
signer = TencentCloudSigner(secret_id, secret_key, region)
headers = signer.sign("POST", host, uri, params,
json.dumps(payload), timestamp)
url = f"https://{host}/"
with httpx.Client(timeout=30) as client:
resp = client.post(url, headers=headers, params=params,
content=json.dumps(payload).encode())
resp.raise_for_status()
return resp.json()
def collect_tencent_cvm() -> dict:
"""
采集腾讯云 CVM 实例基础监控
InstanceId, CPU, Memory, InternetIn, InternetOut
"""
secret_id = os.environ.get("TENCENT_SECRET_ID")
secret_key = os.environ.get("TENCENT_SECRET_KEY")
region = os.environ.get("TENCENT_REGION", "ap-shanghai")
if not secret_id or not secret_key:
print("[腾讯云] 未配置 TENCENT_SECRET_ID / TENCENT_SECRET_KEY,跳过")
return {}
print(f"\n=== 腾讯云 CVM (region={region}) ===")
# 1. 拉取实例列表
try:
res = tencent_api("DescribeInstances", {},
secret_id, secret_key, region, service="cvm",
version="2017-03-12")
instances = res.get("Response", {}).get("InstanceSet", [])
except Exception as e:
print(f" [腾讯云] 拉取实例列表失败: {e}")
return {}
if not instances:
print(f" [腾讯云] 无 CVM 实例")
return {}
print(f" 找到 {len(instances)} 台 CVM")
rows = {}
for inst in instances:
iid = inst.get("InstanceId", "?")
# 基础信息
rows[iid] = {
"instanceId": iid,
"service": "腾讯云_CVM",
"InstanceType": inst.get("InstanceType", ""),
"Status": inst.get("InstanceState", ""),
"CPU_Average": 0,
"Memory_Used_G": 0,
"Memory_Utilization": 0,
"InternetInRate": 0,
"InternetOutRate": 0,
}
# 2. 拉取监控数据(最新 1 小时)
end_time = int(time.time())
start_time = end_time - 3600
metrics_map = {
"CPU_Average": ["CPUUtilization"],
"Memory_Utilization": ["MemUtilization"],
"InternetInRate": ["InternetIn"],
"InternetOutRate": ["InternetOut"],
}
for iid in rows:
try:
m_res = tencent_api("DescribeMonitorData", {
"Namespace": "QCE/CVM",
"Instances": [
{"Dimensions": {"InstanceId": iid}}
],
"StartTime": start_time,
"EndTime": end_time,
"Period": 60,
}, secret_id, secret_key, region, service="monitor")
datapoints = m_res.get("Response", {}).get("DataPoints", [])
for dp in datapoints:
metric = dp.get("MetricName", "")
vals = dp.get("Values", [])
avg = round(sum(vals) / len(vals), 2) if vals else 0
for k, v in metrics_map.items():
if metric in v and k in rows[iid]:
rows[iid][k] = avg
except Exception as e:
print(f" [{iid}] 监控数据拉取失败: {e}")
return rows
# ═══════════════════════════════════════════════════════════════════════════════
# 华为云 — IAM Token + Cloud Eye 监控
# ═══════════════════════════════════════════════════════════════════════════════
def huawei_token(access_key: str, secret_key: str, region: str) -> tuple:
"""获取华为云 IAM Token,返回 (token, endpoint)"""
import httpx
# 统一身份认证 endpoint
iam_endpoints = {
"cn-east-3": "iam.cn-east-3.myhuaweicloud.com",
"cn-north-4": "iam.cn-north-4.myhuaweicloud.com",
"cn-south-1": "iam.cn-south-1.myhuaweicloud.com",
}
iam_host = iam_endpoints.get(region, f"iam.{region}.myhuaweicloud.com")
body = {
"auth": {
"identity": {
"methods": ["hw-access-key"],
"hw-access-key": {"access_key": access_key}
},
"scope": {"project": {"name": region}}
}
}
url = f"https://{iam_host}/v3.0/OS-CREDENTIAL/credentials"
headers = {"Content-Type": "application/json"}
with httpx.Client(timeout=30) as client:
resp = client.post(url, headers=headers, json=body)
resp.raise_for_status()
data = resp.json()
token = data["credential"]["token"]
return token, f"ces.{region}.myhuaweicloud.com"
def collect_huawei_ecs() -> dict:
"""
采集华为云 ECS 监控数据
"""
access_key = os.environ.get("HUAWEI_ACCESS_KEY")
secret_key = os.environ.get("HUAWEI_SECRET_KEY")
region = os.environ.get("HUAWEI_REGION", "cn-east-3")
if not access_key or not secret_key:
print("[华为云] 未配置 HUAWEI_ACCESS_KEY / HUAWEI_SECRET_KEY,跳过")
return {}
print(f"\n=== 华为云 ECS (region={region}) ===")
try:
token, ces_host = huawei_token(access_key, secret_key, region)
except Exception as e:
print(f" [华为云] 获取 Token 失败: {e}")
return {}
# 1. 拉取 ECS 实例列表
import httpx
headers = {"X-Auth-Token": token, "Content-Type": "application/json"}
list_url = f"https://ecs.{region}.myhuaweicloud.com/v1/{access_key}/cloudservers"
try:
with httpx.Client(timeout=30) as client:
resp = client.get(list_url, headers=headers,
params={"availability_zone": f"{region}-az1"})
resp.raise_for_status()
servers = resp.json().get("servers", [])
except Exception as e:
print(f" [华为云] 拉取实例列表失败: {e}")
return {}
if not servers:
print(f" [华为云] 无 ECS 实例")
return {}
print(f" 找到 {len(servers)} 台 ECS")
# 2. 拉取监控数据
end_time = int(time.time()) * 1000
start_time = (int(time.time()) - 3600) * 1000
rows = {}
metrics_to_fetch = [
("cpu_core", "cpu_core"),
("mem_used", "mem_used"),
("mem_util", "mem_utilization"),
("net_in", "net_in"),
("net_out", "net_out"),
]
for srv in servers:
iid = srv.get("id", "?")
rows[iid] = {
"instanceId": iid,
"service": "华为云_ECS",
"name": srv.get("name", ""),
"status": srv.get("status", ""),
"cpu_core": 0,
"mem_util": 0,
"net_in": 0,
"net_out": 0,
}
for metric_key, metric_name in metrics_to_fetch:
monitor_url = (
f"https://{ces_host}/V1.0/{access_key}/metric_analytics"
f"?search_object_id={iid}&namespace=SYS.ECS"
)
try:
with httpx.Client(timeout=30) as client:
m_resp = client.get(monitor_url, headers=headers)
m_resp.raise_for_status()
m_data = m_resp.json()
datapoints = m_data.get("datapoints", [])
if datapoints:
vals = [dp.get("average", 0) for dp in datapoints]
rows[iid][metric_key] = round(sum(vals) / len(vals), 2)
except Exception:
pass
return rows
# ═══════════════════════════════════════════════════════════════════════════════
# 阿里云 — SDK 采集(参考 aliyun_monitor.py)
# ═══════════════════════════════════════════════════════════════════════════════
def collect_aliyun() -> dict:
"""采集阿里云 ECS 监控"""
try:
import json as _json
from aliyunsdkcore.client import AcsClient
from aliyunsdkcms.request.v20190101 import DescribeMetricListRequest
except ImportError:
print("[阿里云] SDK 未安装,跳过 (uv run --with aliyun-python-sdk-core --with aliyun-python-sdk-cms)")
return {}
LTAI = os.environ.get("ALIBABA_ACCESS_KEY_ID")
SK = os.environ.get("ALIBABA_ACCESS_KEY_SECRET")
REGION = os.environ.get("ALIBABA_REGION", "cn-qingdao")
if not LTAI or not SK:
print("[阿里云] 未配置 ALIBABA_ACCESS_KEY_ID / ALIBABA_ACCESS_KEY_SECRET,跳过")
return {}
# 指标可配置: ALIBABA_METRICS=CPUUtilization,MemoryUtilization,InternetInRate,...
# 不配置则使用默认指标
default_metrics = [
("CPUUtilization", "CPU_Average"),
("InternetInRate", "InternetInRate"),
("InternetOutRate", "InternetOutRate"),
("DiskReadBPS", "DiskReadBPS"),
("DiskWriteBPS", "DiskWriteBPS"),
]
metrics_str = os.environ.get("ALIBABA_METRICS", "").strip()
if metrics_str:
# 格式: CPUUtilization,InternetInRate,DiskReadBPS
# 指标名即列名
METRICS = [(m.strip(), m.strip()) for m in metrics_str.split(",") if m.strip()]
print(f"[阿里云] 使用自定义指标: {[m[0] for m in METRICS]}")
else:
METRICS = default_metrics
client = AcsClient(LTAI, SK, REGION)
now_ms = int(time.time() * 1000)
start_ms = now_ms - 4 * 86400 * 1000
rows = {}
for metric, col_name in METRICS:
next_token = None
for _ in range(1, 500):
req = DescribeMetricListRequest.DescribeMetricListRequest()
req.set_MetricName(metric)
req.set_Namespace("acs_ecs_dashboard")
req.set_Period(60)
req.set_StartTime(start_ms)
req.set_EndTime(now_ms)
req.set_Length(100)
if next_token:
req.set_NextToken(next_token)
try:
resp = client.do_action_with_exception(req)
data = _json.loads(resp.decode("utf-8"))
except Exception as e:
print(f" [{metric}] 请求异常: {e}")
break
if data.get("Code") != "200":
print(f" [{metric}] API错误: {data.get('Code')}")
break
pts = _json.loads(data["Datapoints"])
for p in pts:
iid = p.get("instanceId", "?")
val = p.get("Average", 0)
if iid not in rows:
rows[iid] = {"instanceId": iid, "service": "阿里云_ECS"}
rows[iid][col_name] = round(val, 2)
next_token = data.get("NextToken")
if not next_token:
break
print(f"\n=== 阿里云 ECS (region={REGION}) ===")
print(f" 共 {len(rows)} 台 ECS 有监控数据")
return rows
# ═══════════════════════════════════════════════════════════════════════════════
# 统一入口
# ═══════════════════════════════════════════════════════════════════════════════
def main():
import pandas as pd
all_rows = {}
# 阿里云
aliyun_rows = collect_aliyun()
all_rows.update(aliyun_rows)
# 腾讯云
tencent_rows = collect_tencent_cvm()
all_rows.update(tencent_rows)
# 华为云
huawei_rows = collect_huawei_ecs()
all_rows.update(huawei_rows)
if not all_rows:
print("\n无任何云数据,请检查环境变量配置")
return
df = pd.DataFrame(list(all_rows.values()))
df = df.set_index("instanceId")
print(f"\n合计 {len(df)} 台实例:")
print(df.to_string())
out_dir = "/root/.hermes/cron/output"
os.makedirs(out_dir, exist_ok=True)
out = os.path.join(out_dir, "cloud_monitor.xlsx")
df.reset_index().to_excel(out, index=False)
print(f"\n已保存: {out}")
if __name__ == "__main__":
main()
FILE:scripts/send_zabbix_report.py
#!/usr/bin/env python3
"""
发送 Zabbix 监控报告邮件 + 飞书消息
"""
import os
import smtplib
import sys
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
CSV_PATH = "/root/.hermes/cron/output/zabbix_monitor.csv"
XLSX_PATH = "/root/.hermes/cron/output/zabbix_monitor.xlsx"
def load_env():
env_path = "/root/.hermes/.env"
if not os.path.exists(env_path):
return
with open(env_path) as f:
for line in f:
line = line.strip()
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
os.environ[k] = v.strip()
def send_email(subject, html_body, attachments=None):
load_env()
smtp_host = os.environ.get("SMTP_HOST", "")
smtp_port = os.environ.get("SMTP_PORT", "465")
smtp_from = os.environ.get("SMTP_FROM", "")
smtp_token = os.environ.get("SMTP_TOKEN", "")
target = os.environ.get("TARGET_EMAIL", "")
if not all([smtp_host, smtp_from, smtp_token, target]):
print("邮件配置不完整,跳过发送")
return False
msg = MIMEMultipart()
msg["From"] = smtp_from
msg["To"] = target
msg["Subject"] = subject
msg.attach(MIMEText(html_body, "html", "utf-8"))
# 附件
for fpath in (attachments or []):
if os.path.exists(fpath):
with open(fpath, "rb") as f:
part = MIMEBase("application", "octet-stream")
part.set_payload(f.read())
encoders.encode_base64(part)
fname = os.path.basename(fpath)
part.add_header("Content-Disposition", f"attachment; filename={fname}")
msg.attach(part)
try:
if smtp_port == "465":
with smtplib.SMTP_SSL(smtp_host, int(smtp_port)) as server:
server.login(smtp_from, smtp_token)
server.sendmail(smtp_from, target, msg.as_string())
else:
with smtplib.SMTP(smtp_host, int(smtp_port)) as server:
server.starttls()
server.login(smtp_from, smtp_token)
server.sendmail(smtp_from, target, msg.as_string())
print(f"邮件已发送至 {target}")
return True
except Exception as e:
print(f"邮件发送失败: {e}")
return False
def build_html_body():
"""从 CSV 读取数据,生成 HTML 表格"""
if not os.path.exists(CSV_PATH):
return "<p>CSV 文件不存在</p>"
import csv
from collections import defaultdict
groups = defaultdict(list)
with open(CSV_PATH, encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
groups[row["主机组"]].append(row)
html = f"""
<h2>服务器监控报告</h2>
<p><b>采集时间:</b>{datetime.now().strftime('%Y-%m-%d %H:%M')}</p>
"""
for gname, rows in sorted(groups.items()):
html += f"<h3>{gname} ({len(rows)} 台)</h3>"
html += "<table border='1' cellpadding='4' cellspacing='0' style='border-collapse:collapse;font-size:13px;'>"
html += "<tr bgcolor='#4472C4' style='color:white;'>"
for h in ["主机名", "IP", "内存总量(GB)", "内存可用(GB)", "内存占用率(%)", "CPU占用率(%)"]:
html += f"<th>{h}</th>"
html += "</tr>"
for i, r in enumerate(rows):
bg = "#EEF2FF" if i % 2 == 0 else "#FFFFFF"
mem_pct = float(r["内存占用率(%)"]) if r["内存占用率(%)"] != "N/A" else 0
cpu_pct = float(r["CPU占用率(%)"]) if r["CPU占用率(%)"] != "N/A" else 0
mem_style = ""
if mem_pct >= 80:
mem_style = "background:#FF4444;color:white;"
elif mem_pct >= 60:
mem_style = "background:#FFAA44;"
elif mem_pct >= 40:
mem_style = "background:#FFEE88;"
cpu_style = ""
if cpu_pct >= 80:
cpu_style = "background:#FF4444;color:white;"
elif cpu_pct >= 60:
cpu_style = "background:#FFAA44;"
elif cpu_pct >= 40:
cpu_style = "background:#FFEE88;"
html += f"<tr bgcolor='{bg}'>"
html += f"<td>{r['主机名']}</td>"
html += f"<td>{r['IP']}</td>"
html += f"<td>{r['内存总量(GB)']}</td>"
html += f"<td>{r['内存可用(GB)']}</td>"
html += f"<td style='{mem_style}'>{r['内存占用率(%)']}</td>"
html += f"<td style='{cpu_style}'>{r['CPU占用率(%)']}</td>"
html += "</tr>"
html += "</table><br/>"
return html
def main():
print("开始发送报告...")
# 1. 飞书消息(由 Hermes cron 自动发,这里只打印摘要)
print("飞书消息已通过主脚本发送")
# 2. 邮件
subject = f"【监控报告】服务器巡检 {datetime.now().strftime('%Y-%m-%d %H:%M')}"
html_body = build_html_body()
attachments = []
if os.path.exists(XLSX_PATH):
attachments.append(XLSX_PATH)
if os.path.exists(CSV_PATH):
attachments.append(CSV_PATH)
send_email(subject, html_body, attachments)
if __name__ == "__main__":
main()
FILE:scripts/zabbix_cron.py
#!/usr/bin/env python3
"""
Zabbix 监控报告:采集数据 → XLSX/CSV → 飞书消息 → 邮件
定时任务只运行这个脚本即可
"""
import os
import sys
import csv
import json
from dotenv import load_dotenv
load_dotenv() # 加载 ~/.hermes/.env
import smtplib
from datetime import datetime
from urllib.request import urlopen, Request
from urllib.error import URLError
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from collections import defaultdict
# ========== Zabbix 配置 ==========
ZABBIX_URL = "http://zabbix.ops.qiyujoy.com/api_jsonrpc.php"
ZABBIX_USER = "Admin"
ZABBIX_PASSWORD = "Rk&E6D5*#aW&"
ITEMS_KEY = {
"memory_avail": "vm.memory.size[available]",
"memory_total": "vm.memory.size[total]",
"cpu": "system.cpu.util",
}
EXCLUDE_GROUPS = {"Templates", "Templates/Applications", "Templates/Databases",
"Templates/Modules", "Templates/Network devices",
"Templates/Operating systems", "Templates/Server hardware",
"Templates/Virtualization", "Discovered hosts"}
CSV_PATH = "/root/.hermes/cron/output/zabbix_monitor.csv"
XLSX_PATH = "/root/.hermes/cron/output/zabbix_monitor.xlsx"
FEISHU_CHAT_ID = "oc_26aa4b60c17dc842e987777295396955"
# ========== Zabbix API ==========
def api_call(method, params, auth=None):
payload = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1}
if auth:
payload["auth"] = auth
data = json.dumps(payload).encode("utf-8")
req = Request(ZABBIX_URL, data=data, headers={"Content-Type": "application/json"})
try:
with urlopen(req, timeout=60) as resp:
result = json.loads(resp.read().decode("utf-8"))
except URLError as e:
print(f"API 请求失败: {e}")
sys.exit(1)
if "error" in result:
print(f"API 错误: {result['error']}")
sys.exit(1)
return result.get("result", [])
# ========== 数据采集 ==========
def fetch_all(auth):
groups = api_call("hostgroup.get", {"output": ["groupid", "name"]}, auth=auth)
groups = [g for g in groups if g["name"] not in EXCLUDE_GROUPS]
group_ids = [g["groupid"] for g in groups]
hosts = api_call("host.get", {
"output": ["hostid", "name", "host"],
"groupids": group_ids,
"selectGroups": ["groupid", "name"],
}, auth=auth)
all_items = []
for i in range(0, len(hosts), 100):
batch_ids = [h["hostid"] for h in hosts[i:i+100]]
items = api_call("item.get", {
"output": ["itemid", "hostid", "key_", "lastvalue"],
"hostids": batch_ids,
"filter": {"key_": list(ITEMS_KEY.values())},
}, auth=auth)
all_items.extend(items)
item_map = {(it["hostid"], it["key_"]): it.get("lastvalue", "")
for it in all_items}
rows = []
for host in hosts:
hid = host["hostid"]
host_groups = host.get("groups", [])
gnames = [g["name"] for g in host_groups]
valid_gnames = [n for n in gnames if n not in EXCLUDE_GROUPS]
if not valid_gnames:
continue
gname = valid_gnames[0]
mem_total = item_map.get((hid, ITEMS_KEY["memory_total"]), "")
mem_avail = item_map.get((hid, ITEMS_KEY["memory_avail"]), "")
cpu = item_map.get((hid, ITEMS_KEY["cpu"]), "")
mem_total_gb = float(mem_total) / (1024**3) if mem_total else None
mem_avail_gb = float(mem_avail) / (1024**3) if mem_avail else None
cpu_pct = float(cpu) if cpu else None
mem_used_pct = (1 - float(mem_avail) / float(mem_total)) * 100 \
if mem_avail and mem_total else None
rows.append({
"group": gname,
"name": host["name"],
"ip": host["host"],
"mem_total_gb": mem_total_gb,
"mem_avail_gb": mem_avail_gb,
"mem_used_pct": mem_used_pct,
"cpu_pct": cpu_pct,
})
return rows
# ========== XLSX 生成 ==========
def generate_xlsx(rows):
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
def thin_border():
s = Side(style="thin", color="CCCCCC")
return Border(left=s, right=s, top=s, bottom=s)
def hdr(cell, text):
cell.value = text
cell.font = Font(name="微软雅黑", bold=True, size=10, color="FFFFFF")
cell.fill = PatternFill("solid", fgColor="4472C4")
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.border = thin_border()
def pct_color(pct, bg_base):
if pct is None: return bg_base, "000000"
if pct >= 80: return "FF4444", "FFFFFF"
if pct >= 60: return "FFAA44", "000000"
if pct >= 40: return "FFEE88", "000000"
return bg_base, "000000"
group_rows = defaultdict(list)
for r in rows:
group_rows[r["group"]].append(r)
wb = openpyxl.Workbook()
wb.remove(wb.active)
# 总览 Sheet
ws_ov = wb.create_sheet(title="总览")
ws_ov.cell(row=1, column=1, value="服务器监控总览").font = Font(name="微软雅黑", bold=True, size=14)
ws_ov.cell(row=1, column=1).alignment = Alignment(horizontal="left")
ws_ov.row_dimensions[1].height = 24
ws_ov.cell(row=2, column=1, value=f"采集时间:{datetime.now().strftime('%Y-%m-%d %H:%M')}")
ws_ov.cell(row=2, column=1).font = Font(name="微软雅黑", size=10, color="666666")
ws_ov.cell(row=3, column=1, value=f"共 {len(rows)} 台主机,{len(group_rows)} 个主机组")
ws_ov.cell(row=3, column=1).font = Font(name="微软雅黑", size=10, color="666666")
for col_idx, h in enumerate(["主机组", "主机数", "内存告警(≥80%)", "CPU告警(≥80%)"], 1):
hdr(ws_ov.cell(row=5, column=col_idx), h)
ws_ov.row_dimensions[5].height = 20
for row_idx, (gname, gdata) in enumerate(sorted(group_rows.items()), start=6):
mem_alarm = sum(1 for r in gdata
if r["mem_used_pct"] is not None and r["mem_used_pct"] >= 80)
cpu_alarm = sum(1 for r in gdata
if r["cpu_pct"] is not None and r["cpu_pct"] >= 80)
for col_idx, val in enumerate([gname, len(gdata), mem_alarm, cpu_alarm], 1):
cell = ws_ov.cell(row=row_idx, column=col_idx, value=val)
cell.font = Font(name="微软雅黑", size=10)
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.border = thin_border()
if col_idx == 3 and mem_alarm > 0:
cell.fill = PatternFill("solid", fgColor="FF4444")
cell.font = Font(name="微软雅黑", size=10, bold=True, color="FFFFFF")
elif col_idx == 4 and cpu_alarm > 0:
cell.fill = PatternFill("solid", fgColor="FF4444")
cell.font = Font(name="微软雅黑", size=10, bold=True, color="FFFFFF")
for col_idx, w in enumerate([24, 10, 16, 16], 1):
ws_ov.column_dimensions[get_column_letter(col_idx)].width = w
# 各主机组 Sheet
col_defs = [("主机名", 32), ("IP", 18), ("内存总量(GB)", 14),
("内存可用(GB)", 14), ("内存占用率(%)", 14), ("CPU占用率(%)", 13)]
for gname, gdata in sorted(group_rows.items()):
ws = wb.create_sheet(title=gname[:31])
ws.row_dimensions[1].height = 20
for col_idx, (hdr_text, _) in enumerate(col_defs, 1):
hdr(ws.cell(row=1, column=col_idx), hdr_text)
gdata.sort(key=lambda x: (-(x["mem_used_pct"] or 0), -(x["cpu_pct"] or 0)))
for row_idx, r in enumerate(gdata, start=2):
bg = "EEF2FF" if row_idx % 2 == 0 else "FFFFFF"
mem_bg, mem_fc = pct_color(r.get("mem_used_pct"), bg)
cpu_bg, cpu_fc = pct_color(r.get("cpu_pct"), bg)
vals = [
(r["name"], bg, "000000", None),
(r["ip"], bg, "000000", None),
(r["mem_total_gb"], bg, "000000", "0.0"),
(r["mem_avail_gb"], bg, "000000", "0.0"),
(r["mem_used_pct"], mem_bg, mem_fc, "0.0"),
(r["cpu_pct"], cpu_bg, cpu_fc, "0.0"),
]
for col_idx, (val, cbg, cfc, fmt) in enumerate(vals, 1):
cell = ws.cell(row=row_idx, column=col_idx)
if val is None:
cell.value = "N/A"
else:
cell.value = val
if fmt:
cell.number_format = fmt
cell.font = Font(name="微软雅黑", size=10, color=cfc)
cell.fill = PatternFill("solid", fgColor=cbg)
cell.alignment = Alignment(horizontal="center", vertical="center")
cell.border = thin_border()
for col_idx, (_, width) in enumerate(col_defs, 1):
ws.column_dimensions[get_column_letter(col_idx)].width = width
ws.freeze_panes = "A2"
# ========== TOPN Sheet ==========
topn = int(os.environ.get("TOPN", "50"))
if topn > 0:
ws_top = wb.create_sheet(title=f"TOP{topn}(内存+CPU)")
ws_top.row_dimensions[1].height = 20
for col_idx, (col_hdr, _) in enumerate(col_defs, start=1):
hdr(ws_top.cell(row=1, column=col_idx), col_hdr)
# 合并所有数据,按内存+CPU综合降序
all_data = list(rows)
all_data.sort(key=lambda x: (-(x["mem_used_pct"] or 0), -(x["cpu_pct"] or 0)))
top_data = all_data[:topn]
for row_idx, r in enumerate(top_data, start=2):
bg = "EEF2FF" if row_idx % 2 == 0 else "FFFFFF"
mem_bg, mem_fc = pct_color(r.get("mem_used_pct"), bg)
cpu_bg, cpu_fc = pct_color(r.get("cpu_pct"), bg)
row_vals = [
(r["name"], bg, "000000"),
(r["ip"], bg, "000000"),
(r["mem_total_gb"],bg, "000000"),
(r["mem_avail_gb"],bg, "000000"),
(r["mem_used_pct"],mem_bg, mem_fc),
(r["cpu_pct"], cpu_bg, cpu_fc),
]
for col_idx, (val, cbg, cfc) in enumerate(row_vals, start=1):
if val is None:
display_val, fmt = "N/A", None
elif col_idx in (3, 4):
display_val, fmt = f"{val:.1f}", '0.0'
elif col_idx in (5, 6):
display_val, fmt = val, '0.0'
else:
display_val, fmt = val, None
cell = ws_top.cell(row=row_idx, column=col_idx, value=display_val)
cell.font = Font(name="微软雅黑", size=10, color=cfc)
cell.fill = PatternFill("solid", fgColor=cbg)
cell.alignment = Alignment(horizontal="center", vertical="center")
thin = Side(style="thin", color="CCCCCC")
cell.border = Border(left=thin, right=thin, top=thin, bottom=thin)
if fmt:
cell.number_format = fmt
for col_idx, (_, width) in enumerate(col_defs, start=1):
ws_top.column_dimensions[get_column_letter(col_idx)].width = width
ws_top.column_dimensions["A"].width = 36
ws_top.freeze_panes = "A2"
wb.save(XLSX_PATH)
print(f"XLSX: {XLSX_PATH}")
# ========== CSV 生成(UTF-8-BOM,兼容 Windows Excel)==========
def generate_csv(rows):
os.makedirs(os.path.dirname(CSV_PATH), exist_ok=True)
with open(CSV_PATH, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow(["主机组", "主机名", "IP", "内存总量(GB)",
"内存可用(GB)", "内存占用率(%)", "CPU占用率(%)"])
for r in rows:
writer.writerow([
r["group"],
r["name"],
r["ip"],
f"{r['mem_total_gb']:.1f}" if r['mem_total_gb'] is not None else "N/A",
f"{r['mem_avail_gb']:.1f}" if r['mem_avail_gb'] is not None else "N/A",
f"{r['mem_used_pct']:.1f}" if r['mem_used_pct'] is not None else "N/A",
f"{r['cpu_pct']:.1f}" if r['cpu_pct'] is not None else "N/A",
])
print(f"CSV: {CSV_PATH}")
# ========== 飞书消息 ==========
def build_feishu_summary(rows):
"""构建飞书摘要消息(Markdown格式)"""
from collections import defaultdict
group_rows = defaultdict(list)
for r in rows:
group_rows[r["group"]].append(r)
# 重点关注:内存占用≥60% 或 CPU≥60%
warnings = [r for r in rows
if (r["mem_used_pct"] or 0) >= 60 or (r["cpu_pct"] or 0) >= 60]
warnings.sort(key=lambda x: (-(x["mem_used_pct"] or 0), -(x["cpu_pct"] or 0)))
lines = ["## 服务器监控报告", ""]
lines.append(f"**采集时间**:{datetime.now().strftime('%Y-%m-%d %H:%M')}")
lines.append(f"共 **{len(rows)}** 台主机,覆盖 **{len(group_rows)}** 个主机组")
lines.append("")
if warnings:
lines.append("### ⚠ 重点关注(内存占用≥60% 或 CPU≥60%)")
lines.append("")
lines.append("| 主机名 | 主机组 | 内存占用率(%) | CPU占用率(%) |")
lines.append("|---|---|---|---|")
for r in warnings[:20]: # 最多显示20条
lines.append(f"| {r['name']} | {r['group']} | "
f"{r['mem_used_pct']:.1f} | {r['cpu_pct']:.1f} |")
if len(warnings) > 20:
lines.append(f"...(共 {len(warnings)} 台,详见附件)")
lines.append("")
else:
lines.append("### ✅ 全部正常(无告警主机)")
lines.append("")
lines.append(f"完整数据:`{CSV_PATH}`")
return "\n".join(lines)
# ========== 邮件发送 ==========
def load_env():
env_path = "/root/.hermes/.env"
if os.path.exists(env_path):
with open(env_path) as f:
for line in f:
line = line.strip()
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
os.environ[k] = v.strip()
def send_email(subject, html_body, attachments=None):
load_env()
smtp_host = os.environ.get("SMTP_HOST", "")
smtp_port = os.environ.get("SMTP_PORT", "465")
smtp_from = os.environ.get("SMTP_FROM", "")
smtp_token = os.environ.get("SMTP_TOKEN", "")
target = os.environ.get("TARGET_EMAIL", "")
if not all([smtp_host, smtp_from, smtp_token, target]):
print("邮件配置不完整,跳过")
return
msg = MIMEMultipart()
msg["From"] = smtp_from
msg["To"] = target
msg["Subject"] = subject
msg.attach(MIMEText(html_body, "html", "utf-8"))
for fpath in (attachments or []):
if os.path.exists(fpath):
with open(fpath, "rb") as f:
part = MIMEBase("application", "octet-stream")
part.set_payload(f.read())
encoders.encode_base64(part)
part["Content-Disposition"] = f"attachment; filename={os.path.basename(fpath)}"
msg.attach(part)
try:
if smtp_port == "465":
with smtplib.SMTP_SSL(smtp_host, int(smtp_port)) as s:
s.login(smtp_from, smtp_token)
s.sendmail(smtp_from, target, msg.as_string())
else:
with smtplib.SMTP(smtp_host, int(smtp_port)) as s:
s.starttls()
s.login(smtp_from, smtp_token)
s.sendmail(smtp_from, target, msg.as_string())
print(f"邮件已发送: {target}")
except Exception as e:
print(f"邮件发送失败: {e}")
def build_html_body(rows):
group_rows = defaultdict(list)
for r in rows:
group_rows[r["group"]].append(r)
html = f"""<html><body>
<h2>服务器监控报告</h2>
<p><b>采集时间:</b>{datetime.now().strftime('%Y-%m-%d %H:%M')}</p>
<p><b>共 {len(rows)} 台主机,{len(group_rows)} 个主机组</b></p>"""
for gname, gdata in sorted(group_rows.items()):
html += f"<h3>{gname} ({len(gdata)} 台)</h3>"
html += ("<table border='1' cellpadding='4' cellspacing='0' "
"style='border-collapse:collapse;font-size:12px;'>")
html += ("<tr bgcolor='#4472C4' style='color:white;'>"
"<th>主机名</th><th>IP</th>"
"<th>内存总量(GB)</th><th>内存可用(GB)</th>"
"<th>内存占用率(%)</th><th>CPU占用率(%)</th></tr>")
for i, r in enumerate(gdata):
bg = "#EEF2FF" if i % 2 == 0 else "#FFFFFF"
mp = r["mem_used_pct"] or 0
cp = r["cpu_pct"] or 0
ms = ("background:#FF4444;color:white;" if mp >= 80 else
"background:#FFAA44;" if mp >= 60 else
"background:#FFEE88;" if mp >= 40 else "")
cs = ("background:#FF4444;color:white;" if cp >= 80 else
"background:#FFAA44;" if cp >= 60 else
"background:#FFEE88;" if cp >= 40 else "")
html += f"<tr bgcolor='{bg}'>"
html += f"<td>{r['name']}</td><td>{r['ip']}</td>"
html += f"<td>{r['mem_total_gb']:.1f}</td>" if r['mem_total_gb'] else "<td>N/A</td>"
html += f"<td>{r['mem_avail_gb']:.1f}</td>" if r['mem_avail_gb'] else "<td>N/A</td>"
html += f"<td style='{ms}'>{r['mem_used_pct']:.1f}</td>" if r['mem_used_pct'] else "<td>N/A</td>"
html += f"<td style='{cs}'>{r['cpu_pct']:.1f}</td>" if r['cpu_pct'] else "<td>N/A</td>"
html += "</tr>"
html += "</table><br/>"
html += "</body></html>"
return html
# ========== 主流程 ==========
def main():
print(f"[{datetime.now().strftime('%H:%M:%S')}] 开始巡检...")
# 1. Zabbix 登录
auth = api_call("user.login", {
"user": ZABBIX_USER,
"password": ZABBIX_PASSWORD,
})
print(f"[{datetime.now().strftime('%H:%M:%S')}] 登录成功")
# 2. 采集数据
rows = fetch_all(auth)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 采集完成: {len(rows)} 台主机")
# 3. 生成文件
generate_csv(rows)
generate_xlsx(rows)
# 4. 飞书消息(通过 Hermes send_message API 发送)
summary = build_feishu_summary(rows)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 飞书摘要:\n{summary[:500]}")
# 5. 邮件
subject = f"【监控报告】服务器巡检 {datetime.now().strftime('%Y-%m-%d %H:%M')}"
html = build_html_body(rows)
atts = [f for f in [XLSX_PATH, CSV_PATH] if os.path.exists(f)]
send_email(subject, html, atts)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 全部完成!")
if __name__ == "__main__":
main()
FILE:scripts/zabbix_monitor.py
#!/usr/bin/env python3
"""
Zabbix 监控数据采集 → XLSX(每主机组一个 Sheet,按内存/CPU 占用率降序)
"""
import json
import csv
import sys
import os
from datetime import datetime
from dotenv import load_dotenv
load_dotenv() # 加载 ~/.hermes/.env
from urllib.request import urlopen, Request
from urllib.error import URLError
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
# ========== 配置 ==========
ZABBIX_URL = "http://zabbix.ops.qiyujoy.com/api_jsonrpc.php"
ZABBIX_USER = "Admin"
ZABBIX_PASSWORD = "Rk&E6D5*#aW&"
ITEMS_KEY = {
"memory_avail": "vm.memory.size[available]",
"memory_total": "vm.memory.size[total]",
"cpu": "system.cpu.util",
}
EXCLUDE_GROUPS = {"Templates", "Templates/Applications", "Templates/Databases",
"Templates/Modules", "Templates/Network devices",
"Templates/Operating systems", "Templates/Server hardware",
"Templates/Virtualization", "Discovered hosts"}
CSV_PATH = "/root/.hermes/cron/output/zabbix_monitor.csv"
XLSX_PATH = "/root/.hermes/cron/output/zabbix_monitor.xlsx"
# TOPN: 关注 top n 台机器(内存+CPU 综合排序),0=关闭
TOPN = int(os.environ.get("TOPN", "50"))
# ========== Zabbix API ==========
def api_call(method, params, auth=None):
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": 1,
}
if auth:
payload["auth"] = auth
data = json.dumps(payload).encode("utf-8")
req = Request(ZABBIX_URL, data=data, headers={"Content-Type": "application/json"})
try:
with urlopen(req, timeout=60) as resp:
result = json.loads(resp.read().decode("utf-8"))
except URLError as e:
print(f"API 请求失败: {e}", file=sys.stderr)
sys.exit(1)
if "error" in result:
print(f"API 错误: {result['error']}", file=sys.stderr)
sys.exit(1)
return result.get("result", [])
def fetch_all(auth):
"""获取所有主机+监控数据"""
# 1. 主机组
groups = api_call("hostgroup.get", {"output": ["groupid", "name"]}, auth=auth)
groups = [g for g in groups if g["name"] not in EXCLUDE_GROUPS]
print(f"有效主机组 ({len(groups)} 个)")
group_ids = [g["groupid"] for g in groups]
# 2. 主机
hosts = api_call("host.get", {
"output": ["hostid", "name", "host"],
"groupids": group_ids,
"selectGroups": ["groupid", "name"],
}, auth=auth)
print(f"主机总数: {len(hosts)}")
# 3. 监控项(分批)
key_filters = list(ITEMS_KEY.values())
all_items = []
host_ids = [h["hostid"] for h in hosts]
BATCH = 100
for i in range(0, len(host_ids), BATCH):
batch_ids = host_ids[i:i+BATCH]
items = api_call("item.get", {
"output": ["itemid", "hostid", "key_", "lastvalue"],
"hostids": batch_ids,
"filter": {"key_": key_filters},
}, auth=auth)
all_items.extend(items)
print(f"监控项: {len(all_items)} 个")
# 4. 组装数据
item_map = {}
for item in all_items:
item_map[(item["hostid"], item["key_"])] = item.get("lastvalue", "")
rows = []
for host in hosts:
hid = host["hostid"]
host_groups = host.get("groups", [])
gnames = [g["name"] for g in host_groups]
# 跳过完全属于排除组的机器(同时不属于任何有效组)
valid_gnames = [n for n in gnames if n not in EXCLUDE_GROUPS]
if not valid_gnames:
continue
# 用第一个有效组名作为该主机的归属组
gname = valid_gnames[0]
mem_total = item_map.get((hid, ITEMS_KEY["memory_total"]), "")
mem_avail = item_map.get((hid, ITEMS_KEY["memory_avail"]), "")
cpu = item_map.get((hid, ITEMS_KEY["cpu"]), "")
mem_total_gb = float(mem_total) / (1024**3) if mem_total else None
mem_avail_gb = float(mem_avail) / (1024**3) if mem_avail else None
cpu_pct = float(cpu) if cpu else None
# 内存占用率 = 100 - 可用率
if mem_avail and mem_total:
mem_used_pct = (1 - float(mem_avail) / float(mem_total)) * 100
else:
mem_used_pct = None
rows.append({
"group": gname,
"name": host["name"],
"ip": host["host"],
"mem_avail_gb": mem_avail_gb,
"mem_total_gb": mem_total_gb,
"mem_used_pct": mem_used_pct,
"cpu_pct": cpu_pct,
})
return rows
# ========== Excel 生成 ==========
def make_style(bold=False, size=11, color=None, bg_color=None, align="center"):
font = Font(name="微软雅黑", bold=bold, size=size, color=color or "000000")
if bg_color:
fill = PatternFill("solid", fgColor=bg_color)
else:
fill = None
align_obj = Alignment(horizontal=align, vertical="center", wrap_text=True)
return font, fill, align_obj
def style_header(cell, text):
cell.value = text
cell.font = Font(name="微软雅黑", bold=True, size=10, color="FFFFFF")
cell.fill = PatternFill("solid", fgColor="4472C4")
cell.alignment = Alignment(horizontal="center", vertical="center")
thin = Side(style="thin", color="CCCCCC")
cell.border = Border(left=thin, right=thin, top=thin, bottom=thin)
def style_data_cell(cell, value, bg="FFFFFF", font_color="000000", number_fmt=None):
cell.value = value
cell.font = Font(name="微软雅黑", size=10, color=font_color)
cell.fill = PatternFill("solid", fgColor=bg)
cell.alignment = Alignment(horizontal="center", vertical="center")
thin = Side(style="thin", color="CCCCCC")
cell.border = Border(left=thin, right=thin, top=thin, bottom=thin)
if number_fmt:
cell.number_format = number_fmt
def pct_color(pct, bg_base):
"""根据占用率百分比返回(背景色, 字体色)"""
if pct is None:
return bg_base, "000000"
if pct >= 80:
return "FF4444", "FFFFFF"
if pct >= 60:
return "FFAA44", "000000"
if pct >= 40:
return "FFEE88", "000000"
return bg_base, "000000"
def generate_xlsx(rows):
"""生成 xlsx,按主机组分 sheet,每 sheet 按内存+CPU 占用率降序"""
from collections import defaultdict
group_rows = defaultdict(list)
for r in rows:
group_rows[r["group"]].append(r)
wb = openpyxl.Workbook()
wb.remove(wb.active)
# 列定义:(列名, 列宽)
col_defs = [
("主机名", 32),
("IP", 18),
("内存总量(GB)", 14),
("内存可用(GB)", 14),
("内存占用率(%)", 14),
("CPU占用率(%)", 13),
]
# ========== 总览 Sheet ==========
ws_ov = wb.create_sheet(title="总览")
ws_ov.cell(row=1, column=1, value="服务器监控总览").font = Font(name="微软雅黑", bold=True, size=14)
ws_ov.cell(row=1, column=1).alignment = Alignment(horizontal="left")
ws_ov.row_dimensions[1].height = 24
ws_ov.cell(row=2, column=1, value=f"采集时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
ws_ov.cell(row=2, column=1).font = Font(name="微软雅黑", size=10, color="666666")
ws_ov.cell(row=3, column=1, value=f"共 {len(rows)} 台主机,{len(group_rows)} 个有效主机组")
ws_ov.cell(row=3, column=1).font = Font(name="微软雅黑", size=10, color="666666")
ov_headers = ["主机组", "主机数", "内存告警(≥80%)", "CPU告警(≥80%)"]
ov_widths = [22, 10, 16, 16]
for col_idx, hdr in enumerate(ov_headers, start=1):
style_header(ws_ov.cell(row=5, column=col_idx), hdr)
ws_ov.row_dimensions[5].height = 20
for row_idx, (gname, gdata) in enumerate(sorted(group_rows.items()), start=6):
valid_mem = [r for r in gdata if r["mem_used_pct"] is not None]
valid_cpu = [r for r in gdata if r["cpu_pct"] is not None]
mem_alarm = sum(1 for r in valid_mem if r["mem_used_pct"] >= 80)
cpu_alarm = sum(1 for r in valid_cpu if r["cpu_pct"] >= 80)
vals = [gname, len(gdata), mem_alarm, cpu_alarm]
for col_idx, val in enumerate(vals, start=1):
cell = ws_ov.cell(row=row_idx, column=col_idx, value=val)
cell.font = Font(name="微软雅黑", size=10)
cell.alignment = Alignment(horizontal="center", vertical="center")
thin = Side(style="thin", color="CCCCCC")
cell.border = Border(left=thin, right=thin, top=thin, bottom=thin)
if col_idx == 3 and mem_alarm > 0:
cell.fill = PatternFill("solid", fgColor="FF4444")
cell.font = Font(name="微软雅黑", size=10, bold=True, color="FFFFFF")
elif col_idx == 4 and cpu_alarm > 0:
cell.fill = PatternFill("solid", fgColor="FF4444")
cell.font = Font(name="微软雅黑", size=10, bold=True, color="FFFFFF")
for col_idx, width in enumerate(ov_widths, start=1):
ws_ov.column_dimensions[get_column_letter(col_idx)].width = width
ws_ov.column_dimensions["A"].width = 24
# ========== 各主机组 Sheet ==========
for gname, gdata in sorted(group_rows.items()):
ws = wb.create_sheet(title=gname[:31])
ws.row_dimensions[1].height = 20
# 表头
for col_idx, (hdr, _) in enumerate(col_defs, start=1):
style_header(ws.cell(row=1, column=col_idx), hdr)
# 排序:内存占用率降序,再 CPU 降序
gdata.sort(key=lambda x: (-(x["mem_used_pct"] or 0), -(x["cpu_pct"] or 0)))
# 数据行
for row_idx, r in enumerate(gdata, start=2):
bg = "EEF2FF" if row_idx % 2 == 0 else "FFFFFF"
mem_bg, mem_fc = pct_color(r.get("mem_used_pct"), bg)
cpu_bg, cpu_fc = pct_color(r.get("cpu_pct"), bg)
row_vals = [
(r["name"], bg, "000000"),
(r["ip"], bg, "000000"),
(r["mem_total_gb"], bg, "000000"),
(r["mem_avail_gb"], bg, "000000"),
(r["mem_used_pct"], mem_bg, mem_fc),
(r["cpu_pct"], cpu_bg, cpu_fc),
]
for col_idx, (val, cbg, cfc) in enumerate(row_vals, start=1):
if val is None:
display_val = "N/A"
fmt = None
elif col_idx in (3, 4):
display_val = f"{val:.1f}"
fmt = '0.0'
elif col_idx in (5, 6):
display_val = val
fmt = '0.0'
else:
display_val = val
fmt = None
cell = ws.cell(row=row_idx, column=col_idx, value=display_val)
cell.font = Font(name="微软雅黑", size=10, color=cfc)
cell.fill = PatternFill("solid", fgColor=cbg)
cell.alignment = Alignment(horizontal="center", vertical="center")
thin = Side(style="thin", color="CCCCCC")
cell.border = Border(left=thin, right=thin, top=thin, bottom=thin)
if fmt:
cell.number_format = fmt
# 列宽
for col_idx, (_, width) in enumerate(col_defs, start=1):
ws.column_dimensions[get_column_letter(col_idx)].width = width
ws.freeze_panes = "A2"
# ========== TOPN Sheet ==========
if TOPN > 0:
ws_top = wb.create_sheet(title=f"TOP{TOPN}(内存+CPU)")
ws_top.row_dimensions[1].height = 20
# 表头
for col_idx, (hdr, _) in enumerate(col_defs, start=1):
style_header(ws_top.cell(row=1, column=col_idx), hdr)
# 合并所有数据,按内存占用率+CPU占用率综合降序
all_data = []
for gname, gdata in group_rows.items():
for r in gdata:
r = dict(r) # 复制,避免跨组污染
r["group"] = gname
all_data.append(r)
all_data.sort(key=lambda x: (-(x["mem_used_pct"] or 0), -(x["cpu_pct"] or 0)))
top_data = all_data[:TOPN]
for row_idx, r in enumerate(top_data, start=2):
bg = "EEF2FF" if row_idx % 2 == 0 else "FFFFFF"
mem_bg, mem_fc = pct_color(r.get("mem_used_pct"), bg)
cpu_bg, cpu_fc = pct_color(r.get("cpu_pct"), bg)
row_vals = [
(r["name"], bg, "000000"),
(r["ip"], bg, "000000"),
(r["mem_total_gb"], bg, "000000"),
(r["mem_avail_gb"], bg, "000000"),
(r["mem_used_pct"], mem_bg, mem_fc),
(r["cpu_pct"], cpu_bg, cpu_fc),
]
for col_idx, (val, cbg, cfc) in enumerate(row_vals, start=1):
if val is None:
display_val = "N/A"
fmt = None
elif col_idx in (3, 4):
display_val = f"{val:.1f}"
fmt = '0.0'
elif col_idx in (5, 6):
display_val = val
fmt = '0.0'
else:
display_val = val
fmt = None
cell = ws_top.cell(row=row_idx, column=col_idx, value=display_val)
cell.font = Font(name="微软雅黑", size=10, color=cfc)
cell.fill = PatternFill("solid", fgColor=cbg)
cell.alignment = Alignment(horizontal="center", vertical="center")
thin = Side(style="thin", color="CCCCCC")
cell.border = Border(left=thin, right=thin, top=thin, bottom=thin)
if fmt:
cell.number_format = fmt
for col_idx, (_, width) in enumerate(col_defs, start=1):
ws_top.column_dimensions[get_column_letter(col_idx)].width = width
ws_top.column_dimensions["A"].width = 36 # 主机名列稍宽
ws_top.freeze_panes = "A2"
wb.save(XLSX_PATH)
print(f"XLSX 已写入: {XLSX_PATH}")
def main():
# 1. 登录
auth = api_call("user.login", {
"user": ZABBIX_USER,
"password": ZABBIX_PASSWORD,
})
print(f"登录成功")
# 2. 采集数据
rows = fetch_all(auth)
# 3. 生成 xlsx
generate_xlsx(rows)
# 4. 同时保留 CSV(UTF-8-BOM 编码,兼容 Windows Excel)
os.makedirs(os.path.dirname(CSV_PATH), exist_ok=True)
with open(CSV_PATH, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow(["主机组", "主机名", "IP", "内存可用(GB)",
"内存总量(GB)", "内存占用率(%)", "CPU占用率(%)"])
for r in rows:
writer.writerow([
r["group"], r["name"], r["ip"],
f"{r['mem_avail_gb']:.1f}" if r['mem_avail_gb'] is not None else "N/A",
f"{r['mem_total_gb']:.1f}" if r['mem_total_gb'] is not None else "N/A",
f"{r['mem_used_pct']:.1f}" if r['mem_used_pct'] is not None else "N/A",
f"{r['cpu_pct']:.1f}" if r['cpu_pct'] is not None else "N/A",
])
print(f"CSV 已写入: {CSV_PATH}")
if __name__ == "__main__":
main()
按照用户提供的信息收集线上故障的详细数据,生成结构化的故障异常报告,支持中英文输出。
# 线上故障异常报告速成
## Language Rule
Report language matches the user's language. Chinese request → Chinese report. English → English.
## 触发条件
用户请求生成以下类型的报告时激活:
- 异常报告
- 线上报告
- 故障报告
- 事故通报
- 故障通报
- 线上故障
- 异常通报
## 行为规则
### 第一步:收集信息
依次向用户收集以下必填项(如果用户未提供,提示补充):
1. **开始时间** — 格式:xxxx-xx-xx xx:xx:xx
2. **恢复时间** — 格式:xxxx-xx-xx xx:xx:xx
3. **异常描述** — 什么反馈、出现什么问题,尽量详细
4. **异常影响** — 影响范围(服/玩家数)、对业务的冲击程度
5. **异常原因** — 根本原因分析
6. **责任方** — 哪个团队/人员负责
以下为可选项:
7. **后续改进计划或建议** — 如何避免同类问题再次发生
### 第二步:生成报告
按以下 Markdown 模板填充内容:
```
开始时间: {开始时间}
恢复时间: {恢复时间}
异常描述:
{用户提供的描述文字}
异常影响:
{用户提供的影范围和对业务的影响程度}
异常原因:
{用户提供的异常原因分析}
责任方:
{用户提供的责任团队或人员}
后续改进计划或建议:
{用户提供的改进计划,若无则写"暂无"}
```
### 第三步:输出
生成后主动询问用户是否需要:
- 直接复制 Markdown 格式
- 发送到飞书群
- 导出为文件
## 不要做什么
- 不主动添加模板中没有的章节
- 不自行编造时间、原因、数据等关键信息,必须由用户确认
- 不在责任方一栏进行追责性质的措辞,以解决问题为导向
- 不输出未完成草稿,必须等用户补充完所有必填项再生成
FILE:incident-report.md
# 线上故障异常报告速成
## Language Rule
Report language matches the user's language. Chinese request → Chinese report. English → English.
## 触发条件
用户请求生成以下类型的报告时激活:
- 异常报告
- 线上报告
- 故障报告
- 事故通报
- 故障通报
- 线上故障
- 异常通报
## 行为规则
### 第一步:收集信息
依次向用户收集以下必填项(如果用户未提供,提示补充):
1. **开始时间** — 格式:xxxx-xx-xx xx:xx:xx
2. **恢复时间** — 格式:xxxx-xx-xx xx:xx:xx
3. **异常描述** — 什么反馈、出现什么问题,尽量详细
4. **异常影响** — 影响范围(服/玩家数)、对业务的冲击程度
5. **异常原因** — 根本原因分析
6. **责任方** — 哪个团队/人员负责
以下为可选项:
7. **后续改进计划或建议** — 如何避免同类问题再次发生
### 第二步:生成报告
按以下 Markdown 模板填充内容:
```
开始时间: {开始时间}
恢复时间: {恢复时间}
异常描述:
{用户提供的描述文字}
异常影响:
{用户提供的影范围和对业务的影响程度}
异常原因:
{用户提供的异常原因分析}
责任方:
{用户提供的责任团队或人员}
后续改进计划或建议:
{用户提供的改进计划,若无则写"暂无"}
```
### 第三步:输出
生成后主动询问用户是否需要:
- 直接复制 Markdown 格式
- 发送到飞书群
- 导出为文件
## 不要做什么
- 不主动添加模板中没有的章节
- 不自行编造时间、原因、数据等关键信息,必须由用户确认
- 不在责任方一栏进行追责性质的措辞,以解决问题为导向
- 不输出未完成草稿,必须等用户补充完所有必填项再生成
Automatically routes tasks to local or cloud AI models based on complexity, ensuring context continuity and single-entry model selection.
# Model Router · OpenClaw
> 本地优先的智能模型选择器 — 根据任务复杂度自动路由到本地或服务商模型
## 核心原则
1. **模型选择只在任务入口做一次**
2. **同一任务中途不切换模型**(保护上下文连续性)
3. **简单任务用本地,复杂任务用服务商**
---
## 功能
### 任务复杂度判断
- **简单任务** → 本地模型(Ollama / llama.cpp)
- 单次回答即可完成
- 不需要多步推理或迭代
- 类型:翻译、格式转换、查定义、简单写作、文本润色
- **复杂任务** → 服务商模型(MiniMax / OpenRouter / 其他)
- 需要深度推理、多次迭代
- 长上下文理解
- 类型:代码调试、架构设计、战略分析、长文撰写
### 模型配置
```yaml
providers:
local:
ollama:
endpoint: "http://localhost:11434"
default_model: "qwen2.5-coder:7b"
llama.cpp:
endpoint: "http://localhost:8080"
default_model: "qwen2.5-coder-7b-q4"
cloud:
minmax:
endpoint: "https://api.minimax.chat"
default_model: "MiniMax-Text-01"
openrouter:
endpoint: "https://openrouter.ai/api/v1"
default_model: "anthropic/claude-sonnet-4-5"
```
---
## 使用方式
### 交互式分类
```bash
python3 scripts/classify_task.py "帮我把这段中文翻译成英文"
```
输出示例:
```
复杂度:简单
路由:本地模型(Ollama/qwen2.5-coder:7b)
理由:单次完成,无需迭代
```
### 批量分类
```bash
python3 scripts/classify_task.py "分析这个产品的市场策略" --verbose
```
---
## 上下文连续性保护
| 情况 | 处理方式 |
|---|---|
| 任务内切换 | ❌ 不允许 |
| 新任务开始 | ✅ 重新判断 |
| 模型不可用 | 自动降级到同级别模型 |
---
## 扩展服务商
新增服务商只需在配置中添加:
```yaml
providers:
cloud:
新服务商:
endpoint: "API地址"
default_model: "默认模型"
```
---
## 决策示例
| 任务描述 | 判断 | 路由 |
|---|---|---|
| "翻译成英文" | 简单 | 本地模型 |
| "写一个快速排序" | 简单 | 本地模型 |
| "调试内存泄漏" | 复杂 | 服务商模型 |
| "分析市场策略" | 复杂 | 服务商模型 |
| "润色邮件" | 简单 | 本地模型 |
| "设计后端架构" | 复杂 | 服务商模型 |
---
## 文件结构
```
openclaw-model-router/
├── SKILL.md # 本文件
├── package.json # OpenClaw 清单
├── install.sh # 安装脚本
└── scripts/
├── classify_task.py # 任务分类器
└── setup_wizard.py # 配置向导
```
---
**版本**:v1.0.1
**许可证**:MIT
**作者**:freepengyang
FILE:_meta.json
{
"ownerId": "kn786az04ebxjs45db9bx4fydx84z33v",
"slug": "hermes-model-router",
"version": "v1.0.0",
"publishedAt": 1776934947138
}
FILE:install.sh
#!/bin/bash
#
# Model Router Skill Installer (OpenClaw Version)
# 本地优先的智能模型选择器安装脚本
#
set -e
echo "🧠 Model Router Skill Installer (OpenClaw)"
echo "============================================"
echo ""
# Colors
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
# Configuration
SKILL_DIR="$HOME/.clawdbot/skills/model-router"
WORKSPACE_SKILL_DIR="$HOME/clawd/skills/model-router"
# Determine where to install
if [ -d "$HOME/clawd/skills" ]; then
TARGET_DIR="$WORKSPACE_SKILL_DIR"
echo -e "GREEN📁NC Installing to workspace skills: $TARGET_DIR"
elif [ -d "$HOME/.clawdbot/skills" ]; then
TARGET_DIR="$SKILL_DIR"
echo -e "GREEN📁NC Installing to global skills: $TARGET_DIR"
else
mkdir -p "$SKILL_DIR"
TARGET_DIR="$SKILL_DIR"
echo -e "GREEN📁NC Creating global skills directory: $TARGET_DIR"
fi
# Get current directory
CURRENT_DIR="$(cd "$(dirname "BASH_SOURCE[0]")" && pwd)"
echo ""
echo -e "GREEN📦NC Installing from: $CURRENT_DIR"
# Copy files
echo -e "GREEN📋NC Copying skill files..."
cp -r "$CURRENT_DIR"/* "$TARGET_DIR/"
# Make scripts executable
echo -e "GREEN🔧NC Making scripts executable..."
chmod +x "$TARGET_DIR/scripts/"*.py 2>/dev/null || true
# Create config directory
mkdir -p "$HOME/.model-router"
chmod 700 "$HOME/.model-router"
echo ""
echo -e "GREEN✅NC Skill installed to: $TARGET_DIR"
echo ""
echo -e "GREEN🧠NC Model Router v1.0.0 (OpenClaw)"
echo ""
echo -e "GREEN📖NC Next steps:"
echo " 1. Run the setup wizard:"
echo " cd $TARGET_DIR"
echo " python3 scripts/setup_wizard.py"
echo ""
echo " 2. Configure your providers (local / cloud)"
echo ""
echo " 3. Start routing tasks optimally!"
echo ""
echo -e "GREEN📚NC Documentation: $TARGET_DIR/SKILL.md"
echo ""
echo -e "GREEN🎉NC Installation complete!"
FILE:package.json
{
"name": "openclaw-model-router",
"version": "1.0.1",
"description": "本地优先的智能模型选择器 — 简单任务路由到本地模型,复杂任务路由到服务商模型",
"main": "scripts/classify_task.py",
"scripts": {
"classify": "python3 scripts/classify_task.py",
"setup": "python3 scripts/setup_wizard.py",
"install": "bash install.sh"
},
"keywords": [
"model",
"router",
"ai",
"local",
"ollama",
"openrouter",
"minimax",
"cost-optimization",
"routing",
"model-selector"
],
"author": "freepengyang",
"license": "MIT",
"bugs": {
"url": "https://github.com/freepengyang/openclaw-model-router/issues"
},
"homepage": "https://github.com/freepengyang/openclaw-model-router",
"clawdbot": {
"category": "ai",
"skills": {
"model-router": {
"name": "Model Router",
"description": "智能路由任务到本地或服务商模型 — 简单任务用本地模型省成本,复杂任务用服务商模型保质量"
}
}
}
}
FILE:scripts/classify_task.py
#!/usr/bin/env python3
"""
Model Router - 任务分类器
根据任务描述判断复杂度并推荐模型
"""
import json
import sys
import os
# 简单任务关键词
SIMPLE_KEYWORDS = [
"翻译", "润色", "格式", "转换", "查", "定义",
"写一个简单的", "quick", "simple", "translate", "polish",
"总结", "摘要", "提取"
]
# 复杂任务关键词
COMPLEX_KEYWORDS = [
"调试", "分析", "架构", "设计", "战略", "报告",
"debug", "analyze", "architecture", "design", "strategy",
"3000字", "长文", "复杂", "推理", "证明", "为什么"
]
# 开发/全栈任务关键词(这类任务通常是复杂的)
DEV_KEYWORDS = [
"写一个", "写个", "开发", "前端", "后端", "全栈",
"SPA", "App", "应用", "网站", "系统", "平台",
"接口", "API", "数据库", "登录", "注册", "管理后台",
"组件", "页面", "路由", "切图", "UI", "UE",
"classify", "classifier", "script", "脚本", "工具",
"自动化", "机器人", "bot", "爬虫", "抓取",
"部署", "docker", "k8s", "kubernetes", "CI/CD",
]
def classify_task(task_description: str, verbose: bool = False) -> dict:
"""判断任务复杂度并返回推荐模型"""
task_lower = task_description.lower()
# 推理类任务默认复杂
has_reasoning = any(kw in task_lower for kw in ["推理", "证明", "为什么", "reason", "prove", "why"])
# 复杂任务判断
has_complex = any(kw in task_lower for kw in COMPLEX_KEYWORDS)
# 开发/全栈任务判断(这类任务通常是复杂的)
has_dev = any(kw in task_lower for kw in DEV_KEYWORDS)
# 简单任务判断
has_simple = any(kw in task_lower for kw in SIMPLE_KEYWORDS)
if has_complex or has_reasoning or has_dev:
complexity = "复杂"
route = "服务商模型 (MiniMax / OpenRouter)"
if has_dev:
reason = "开发/全栈任务,需要多文件/多模块实现"
else:
reason = "需要深度推理或多步分析"
local_model = None
cloud_model = "MiniMax-Text-01"
elif has_simple:
complexity = "简单"
route = "本地模型 (Ollama)"
reason = "单次完成,无需迭代"
local_model = "qwen2.5-coder:7b"
cloud_model = None
else:
complexity = "简单"
route = "本地模型 (Ollama)"
reason = "默认简单任务"
local_model = "qwen2.5-coder:7b"
cloud_model = None
result = {
"task": task_description,
"complexity": complexity,
"route": route,
"reason": reason
}
if verbose:
result["local_model"] = local_model
result["cloud_model"] = cloud_model
result["matched_keywords"] = [
kw for kw in COMPLEX_KEYWORDS + SIMPLE_KEYWORDS + DEV_KEYWORDS
if kw.lower() in task_lower
]
return result
def main():
verbose = "--verbose" in sys.argv or "-v" in sys.argv
if len(sys.argv) < 2:
print("用法: python3 classify_task.py <任务描述> [--verbose]")
print("")
print("示例:")
print(' python3 classify_task.py "帮我翻译成英文"')
print(' python3 classify_task.py "分析市场策略" --verbose')
sys.exit(1)
# 获取任务描述(过滤掉 --verbose)
task_description = " ".join([arg for arg in sys.argv[1:] if not arg.startswith("--")])
result = classify_task(task_description, verbose)
print(f"\n📋 任务: {result['task']}")
print(f"⚡ 复杂度: {result['complexity']}")
print(f"🎯 路由: {result['route']}")
print(f"💡 理由: {result['reason']}")
if verbose and result.get("matched_keywords"):
print(f"\n🔍 匹配关键词: {', '.join(result['matched_keywords'])}")
if verbose:
if result["local_model"]:
print(f"\n📦 本地模型: {result['local_model']}")
if result["cloud_model"]:
print(f"☁️ 服务商模型: {result['cloud_model']}")
print()
# 返回退出码:0=简单,1=复杂
sys.exit(0 if result["complexity"] == "简单" else 1)
if __name__ == "__main__":
main()
FILE:scripts/setup_wizard.py
#!/usr/bin/env python3
"""
Model Router - 配置向导
引导用户配置本地模型和服务商模型
"""
import json
import os
import sys
CONFIG_DIR = os.path.expanduser("~/.model-router")
CONFIG_FILE = os.path.join(CONFIG_DIR, "config.json")
def load_config() -> dict:
"""加载现有配置"""
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {
"local": {
"provider": "ollama",
"endpoint": "http://localhost:11434",
"model": "qwen2.5-coder:7b"
},
"cloud": {
"provider": "minmax",
"endpoint": "https://api.minimax.chat",
"model": "MiniMax-Text-01"
}
}
def save_config(config: dict):
"""保存配置"""
os.makedirs(CONFIG_DIR, exist_ok=True)
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(config, f, indent=2, ensure_ascii=False)
print(f"\n✅ 配置已保存到: {CONFIG_FILE}")
def setup_local_model(config: dict):
"""配置本地模型"""
print("\n" + "=" * 50)
print("🔧 本地模型配置")
print("=" * 50)
print("\n支持的本地模型提供商:")
print(" 1. Ollama (默认)")
print(" 2. llama.cpp")
provider = input("\n选择提供商 [1]: ").strip() or "1"
if provider == "2":
config["local"]["provider"] = "llama.cpp"
config["local"]["endpoint"] = input("endpoint [http://localhost:8080]: ").strip() or "http://localhost:8080"
config["local"]["model"] = input("模型名 [qwen2.5-coder-7b-q4]: ").strip() or "qwen2.5-coder-7b-q4"
else:
config["local"]["provider"] = "ollama"
config["local"]["endpoint"] = input("endpoint [http://localhost:11434]: ").strip() or "http://localhost:11434"
config["local"]["model"] = input("模型名 [qwen2.5-coder:7b]: ").strip() or "qwen2.5-coder:7b"
print(f"\n✅ 本地模型配置完成:")
print(f" 提供商: {config['local']['provider']}")
print(f" endpoint: {config['local']['endpoint']}")
print(f" 模型: {config['local']['model']}")
def setup_cloud_model(config: dict):
"""配置服务商模型"""
print("\n" + "=" * 50)
print("☁️ 服务商模型配置")
print("=" * 50)
print("\n支持的服务商:")
print(" 1. MiniMax (默认)")
print(" 2. OpenRouter")
print(" 3. 其他")
provider = input("\n选择服务商 [1]: ").strip() or "1"
if provider == "2":
config["cloud"]["provider"] = "openrouter"
config["cloud"]["endpoint"] = "https://openrouter.ai/api/v1"
config["cloud"]["model"] = input("模型 (如 anthropic/claude-sonnet-4-5) [anthropic/claude-sonnet-4-5]: ").strip() or "anthropic/claude-sonnet-4-5"
elif provider == "3":
config["cloud"]["provider"] = input("服务商名称: ").strip()
config["cloud"]["endpoint"] = input("API endpoint: ").strip()
config["cloud"]["model"] = input("默认模型: ").strip()
else:
config["cloud"]["provider"] = "minmax"
config["cloud"]["endpoint"] = "https://api.minimax.chat"
config["cloud"]["model"] = input("模型 [MiniMax-Text-01]: ").strip() or "MiniMax-Text-01"
print(f"\n✅ 服务商模型配置完成:")
print(f" 提供商: {config['cloud']['provider']}")
print(f" endpoint: {config['cloud']['endpoint']}")
print(f" 模型: {config['cloud']['model']}")
def main():
print("=" * 50)
print("🧠 Model Router 配置向导")
print("=" * 50)
print("\n本向导将帮助你配置:")
print(" 1. 本地模型 (用于简单任务)")
print(" 2. 服务商模型 (用于复杂任务)")
config = load_config()
print("\n当前配置:")
print(json.dumps(config, indent=2, ensure_ascii=False))
print("\n" + "-" * 50)
print("选择要配置的项目:")
print(" 1. 配置本地模型")
print(" 2. 配置服务商模型")
print(" 3. 全部配置")
print(" 4. 保存当前配置并退出")
print(" 5. 仅测试连接")
choice = input("\n选择 [3]: ").strip() or "3"
if choice == "1":
setup_local_model(config)
elif choice == "2":
setup_cloud_model(config)
elif choice == "3":
setup_local_model(config)
setup_cloud_model(config)
elif choice == "4":
save_config(config)
sys.exit(0)
elif choice == "5":
print("\n🔍 测试连接...")
# TODO: 实现连接测试
print("连接测试功能待实现")
sys.exit(0)
else:
print("无效选择")
sys.exit(1)
save_config(config)
print("\n" + "=" * 50)
print("🎉 配置完成!")
print("=" * 50)
print("\n使用示例:")
print(" python3 scripts/classify_task.py \"翻译成英文\"")
print(" python3 scripts/classify_task.py \"分析市场策略\" --verbose")
print()
if __name__ == "__main__":
main()
Code Review 安全扫描工具,自动化代码审计,支持 Django/Python、React+TypeScript、PHP 多语言。 自动识别 SVN 提交变更,调用 bandit/pylint/eslint/phpcs 进行安全扫描和代码规范检查, 报告推送飞书群。支持 post-commit hook...
---
name: ops-code-review
description: |
Code Review 安全扫描工具,自动化代码审计,支持 Django/Python、React+TypeScript、PHP 多语言。
自动识别 SVN 提交变更,调用 bandit/pylint/eslint/phpcs 进行安全扫描和代码规范检查,
报告推送飞书群。支持 post-commit hook 增量扫描和定时全量扫描。
关键词:Code Review SVN security scan Django React PHP bandit pylint eslint phpcs 代码审计 代码扫描 安全检测 CI/CD 自动化
metadata:
openclaw:
requires:
bins: [svn, bandit, pylint, npx, phpcs]
install:
- id: svn
kind: system
label: "Install SVN (subversion)"
command: "apt-get install subversion"
- id: bandit
kind: pip
label: "Install Bandit (Python security scanner)"
package: "bandit --break-system-packages"
- id: pylint
kind: pip
label: "Install Pylint (Python linter)"
package: "pylint --break-system-packages"
- id: node
kind: node
label: "Install Node.js (for npx/eslint)"
package: node
- id: composer
kind: system
label: "Install Composer (PHP package manager)"
command: "curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/local/bin --filename=composer"
- id: phpcs
kind: composer
label: "Install PHP CodeSniffer"
package: squizlabs/php_codesniffer
- id: typescript-eslint
kind: node_global
label: "Install typescript-eslint (ESLint parser for TS/TSX)"
package: "@typescript-eslint/parser @typescript-eslint/eslint-plugin typescript-eslint"
---
# Code Review Skill
## 首次使用前配置(必读)
### 1. 设置环境变量
**Linux/Mac**(加到 `~/.bashrc` 或 `~/.zshrc`):
```bash
export CODE_REVIEW_SVN_USER="你的SVN用户名"
export CODE_REVIEW_SVN_PASS="你的SVN密码"
export CODE_REVIEW_FEISHU_CHAT_ID="飞书群机器人chat_id"
```
**OpenClaw 配置**(推荐,写入 OpenClaw 环境变量):
在 OpenClaw 配置文件中添加:
```json
{
"env": {
"CODE_REVIEW_SVN_USER": "你的SVN用户名",
"CODE_REVIEW_SVN_PASS": "你的SVN密码",
"CODE_REVIEW_FEISHU_CHAT_ID": "飞书群机器人chat_id"
}
}
```
### 2. 配置仓库信息
创建配置文件(完整示例包含所有支持的仓库):
```bash
cat > config.json << 'EOF'
{
"repos": {
"ops_api": {
"url": "http://your-svn/ops/dev/branches/branch_dev/ops_api",
"lang": "django",
"type": "incremental",
"local": "/tmp/svn_repos/ops_api"
},
"ops_web": {
"url": "http://your-svn/ops/dev/branches/branch_dev/ops_web",
"lang": "react",
"type": "incremental",
"local": "/tmp/svn_repos/ops_web"
},
"ops_api_trunk": {
"url": "http://your-svn/ops/dev/trunk/ops_api",
"lang": "django",
"type": "full",
"local": "/tmp/svn_repos/ops_api_trunk"
},
"ops_web_trunk": {
"url": "http://your-svn/ops/dev/trunk/ops_web",
"lang": "react",
"type": "full",
"local": "/tmp/svn_repos/ops_web_trunk"
},
"gm": {
"url": "http://your-svn/gm/trunk",
"lang": "mixed",
"type": "both",
"local": "/tmp/svn_repos/gm"
}
}
}
EOF
```
将配置复制到运行时目录(必须):
```bash
cp -f config.json /tmp/code_review_config.json
```
**注意**:`config.json` 不随 skill 上传,请根据实际 SVN 地址修改各仓库的 `url` 字段。
### 3. 检查工具依赖
```bash
python3 scripts/code_review.py check-deps
```
如有缺失工具,自动安装:
```bash
python3 scripts/code_review.py install-deps
```
## 能力概览
| 功能 | 说明 |
|------|------|
| 🔍 增量扫描 | post-commit 触发,只扫本次提交的文件 |
| 📋 全量扫描 | 每周定时,扫描整个仓库 |
| 🛠️ 多语言支持 | Django / React+TS / PHP |
| 🔴 问题分级 | ERROR / WARNING / INFO 三级 |
| 🛠️ 修复建议 | 具体代码修改方案,不只是说"有问题" |
| 📤 飞书推送 | 报告发送到配置的飞书群 |
## 环境变量说明
| 变量名 | 必填 | 说明 |
|--------|------|------|
| `CODE_REVIEW_SVN_USER` | ✅ | SVN 只读用户名 |
| `CODE_REVIEW_SVN_PASS` | ✅ | SVN 密码 |
| `CODE_REVIEW_FEISHU_CHAT_ID` | ✅ | 飞书群 chat_id(机器人 webhook 或群 ID)|
| `CODE_REVIEW_CONFIG` | ❌ | config.json 路径,默认为 `/tmp/code_review_config.json` |
## 使用方式
### 手动触发扫描
```
审计代码 incremental ops_api # 增量扫描(只扫变更)
审计代码 full gm # 全量扫描指定仓库
审计代码 fullall # 全量扫描所有仓库
审计代码 sync # 同步所有 SVN 仓库
审计代码 check-deps # 检查工具依赖
```
### 增量扫描流程(post-commit hook)
```
用户提交代码
↓
SVN post-commit hook 触发
↓
code_review.py incremental <repo_name>
↓
SVN Manager 获取本次变更文件
↓
Analyzer 逐语言分析
↓
Report Generator 生成飞书格式报告
↓
推送到飞书群
```
### 全量扫描流程(定时任务)
```
每周定时触发
↓
code_review.py fullall
↓
遍历所有仓库,checkout 全量代码
↓
按语言全量扫描
↓
生成报告推送飞书
```
## 工具依赖
| 工具 | 用途 | 安装命令 |
|------|------|----------|
| `svn` | SVN 客户端 | `apt-get install subversion` |
| `bandit` | Python 安全扫描 | `pip install --break-system-packages bandit` |
| `pylint` | Python 代码检查 | `pip install --break-system-packages pylint` |
| `npx` | 执行 ESLint | `npm install -g npx` |
| `composer` | PHP 包管理器 | `curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/local/bin --filename=composer` |
| `phpcs` | PHP 代码规范 | `composer global require squizlabs/php_codesniffer` |
| `@typescript-eslint/*` | ESLint 的 TypeScript 解析器 | `npm install -g @typescript-eslint/parser @typescript-eslint/eslint-plugin typescript-eslint` |
**首次使用前必须运行工具检查**,确保所有依赖就绪。
## SVN Hook 部署
### 生成 hook 脚本
```
code_review.py setup-hooks
```
### 部署步骤
1. 在 SVN 服务器上找到仓库路径
2. 将生成的 `post-commit` 脚本放到 `hooks/` 目录
3. 赋予执行权限:`chmod +x hooks/post-commit`
4. 确保 SVN 服务器能访问本 skill 的 scripts 目录
## 定时任务配置(OpenClaw Cron)
推荐使用 OpenClaw 内置 cron,比服务器 crontab 更方便管理:
```json
{
"schedule": { "kind": "cron", "expr": "0 3 * * 1", "tz": "Asia/Shanghai" },
"payload": { "kind": "agentTurn", "message": "执行全量代码审计:\ncd /root/.openclaw/workspace/skills/code-review && python3 scripts/code_review.py fullall" },
"sessionTarget": "isolated"
}
```
创建方式:告诉 OpenClaw "帮我创建一个每周一凌晨3点执行全量代码审计的定时任务"
## 报告示例
```
📊 代码审计报告
━━━━━━━━━━━━━━━━━━
🏷️ 仓库:运维后台后端 (Django)
📅 时间:2026-04-14 10:00
🔍 方式:🔄 增量扫描
📁 扫描文件:5
🐛 问题总数:3
🔴 严重/错误:1
🟡 警告:1
🟢 提示:1
━━━━━━━━━━━━━━━━━━
🔴 严重 | models/user.py:45
└ B105: Hardcoded password(s) detected
└ 🛠️ 修复:使用环境变量或密钥管理服务,不要硬编码
🟡 警告 | views/order.py:88
└ Possible SQL Injection
└ 🛠️ 修复:使用参数化查询,避免字符串拼接 SQL
🟢 提示 | utils/helper.py:12
└ Consider using 'secrets' module for cryptographic random numbers
└ 🛠️ 修复:使用 secrets 模块替代 random
━━━━━━━━━━━━━━━━━━
⚙️ 由 OpenClaw 代码审计 Skill 自动生成
```
## 注意事项
1. **敏感信息**:SVN 账密和飞书群 ID 通过环境变量注入,不随 skill 上传
2. **首次使用**:先配置环境变量 → 创建 config.json → 复制到 `/tmp/code_review_config.json` → 运行 `check-deps` → 运行 `install-deps` → 运行 `sync`
3. **工具缺失**:记录在 `/tmp/code_review_missing_tools.json`,运行 `install-deps` 自动安装
4. **扫描效率**:全量扫描较慢,建议放在低峰时段
5. **误报处理**:如果某类问题确认为可接受的惯用写法,可以在报告中注明
FILE:_meta.json
{
"ownerId": "kn786az04ebxjs45db9bx4fydx84z33v",
"slug": "ops-code-review",
"version": "1.0.3",
"publishedAt": 1776742989092
}
FILE:configs/eslint.react.ts.js
/**
* OPS Code Review - ESLint 通用配置(React + TypeScript)
* 使用 --no-eslintrc 忽略项目原有配置,独立运行
*
* 兼容 ESLint v8/v9/v10,不依赖项目本地的 typescript-eslint
* 用法: npx eslint src --ext .ts,.tsx --no-eslintrc -c /path/to/this/file
*/
module.exports = [
{
ignores: [
'**/node_modules/**',
'**/dist/**',
'**/build/**',
'**/.next/**',
'**/coverage/**',
'**/*.min.js',
'**/vendor/**',
],
},
{
files: ['**/*.ts', '**/*.tsx'],
languageOptions: {
parser: require('@typescript-eslint/parser'),
parserOptions: {
ecmaFeatures: { jsx: true },
ecmaVersion: 2022,
sourceType: 'module',
},
globals: {
console: 'readonly',
window: 'readonly',
document: 'readonly',
navigator: 'readonly',
setTimeout: 'readonly',
setInterval: 'readonly',
clearTimeout: 'readonly',
clearInterval: 'readonly',
Promise: 'readonly',
fetch: 'readonly',
module: 'writable',
require: 'readonly',
exports: 'writable',
__dirname: 'readonly',
__filename: 'readonly',
process: 'readonly',
},
},
rules: {
// TypeScript 规则(基础)
'no-undef': 'error',
'no-unused-vars': 'off',
'no-dupe-else-if': 'error',
'no-duplicate-case': 'error',
'no-empty': 'warn',
'no-extra-semi': 'off',
'no-inner-declarations': 'off',
'no-irregular-whitespace': 'warn',
'no-loss-of-precision': 'error',
'no-negated-in-lhs': 'off',
'no-obj-calls': 'error',
'no-prototype-builtins': 'off',
'no-regex-spaces': 'error',
'no-sparse-arrays': 'warn',
'no-template-curly-in-string': 'warn',
'no-unexpected-multiline': 'error',
'no-unsafe-finally': 'error',
'no-unsafe-negation': 'error',
'use-isnan': 'error',
'valid-typeof': 'error',
// React
'react/jsx-uses-react': 'off',
'react/react-in-jsx-scope': 'off',
'react/prop-types': 'off',
'react/display-name': 'off',
// 通用风格
'no-console': 'off',
'prefer-const': 'warn',
'no-var': 'error',
'eqeqeq': ['error', 'always'],
'curly': ['error', 'all'],
'default-case': 'warn',
'no-else-return': 'warn',
'no-lone-blocks': 'warn',
'no-new-wrappers': 'error',
'no-redeclare': 'error',
'no-return-await': 'error',
'no-throw-literal': 'error',
'no-underscore-dangle': 'off',
'no-use-before-define': 'off',
'array-callback-return': 'error',
'consistent-return': 'warn',
'dot-notation': 'warn',
'no-eval': 'error',
'no-implied-eval': 'error',
'no-new': 'error',
'no-param-reassign': 'warn',
'no-proto': 'error',
'prefer-promise-reject-errors': 'warn',
'prefer-rest-params': 'warn',
'radix': 'error',
},
},
{
files: ['**/*.js', '**/*.jsx'],
languageOptions: {
globals: {
console: 'readonly',
window: 'readonly',
document: 'readonly',
navigator: 'readonly',
setTimeout: 'readonly',
setInterval: 'readonly',
Promise: 'readonly',
fetch: 'readonly',
module: 'writable',
require: 'readonly',
exports: 'writable',
},
},
rules: {
'no-console': 'off',
'no-undef': 'error',
'no-unused-vars': 'off',
'prefer-const': 'warn',
'no-var': 'error',
'eqeqeq': ['error', 'always'],
},
},
];
FILE:scripts/analyzer_runner.py
#!/usr/bin/env python3
"""
多语言代码分析调度器 v2 - 优化版
支持:Django/Python、React+TS、PHP、Java
"""
import subprocess
import sys
import os
import json
import re
from pathlib import Path
from typing import List, Dict, Any
REPOS = {
"ops_api": {"lang": "django", "local": "/tmp/svn_repos/ops_api"},
"ops_web": {"lang": "react", "local": "/tmp/svn_repos/ops_web"},
"ops_api_trunk": {"lang": "django", "local": "/tmp/svn_repos/ops_api_trunk"},
"ops_web_trunk": {"lang": "react", "local": "/tmp/svn_repos/ops_web_trunk"},
"gm": {"lang": "mixed", "local": "/tmp/svn_repos/gm"},
# gm full scan 时按语言切分后的 key
"gm_php": {"lang": "php", "local": "/tmp/svn_repos/gm"},
"gm_react": {"lang": "react", "local": "/tmp/svn_repos/gm"},
}
# 高置信度 Bandit 规则(只报告这些,减少误报)
BANDIT_HIGH_CONFIDENCE = {
"B101", "B102", "B103", "B104", "B105", "B106", "B107", "B108",
"B201", "B202", "B203",
"B301", "B302", "B303", "B304", "B305", "B306", "B307", "B308", "B309",
"B310", "B311", "B312", "B313", "B314", "B315", "B316", "B317",
"B401", "B402", "B403", "B404", "B405", "B406", "B407", "B408",
"B411", "B412", "B413", "B414", "B415", "B416", "B417", "B418", "B419",
"B501", "B502", "B503", "B504", "B505", "B506",
"B601", "B602", "B603", "B604", "B605", "B606", "B607", "B608",
"B701", "B702", "B703",
}
# Pylint 只关注的严重问题类型
PYLINT_SEVERE_MSGS = {
"fatal", "syntax-error", "parse-error", "import-error",
"no-member", "no-name-in-module", "unsubscriptable-object",
"not-callable", "invalid-unary-operand-type",
"unsupported-assignment-operation", "unsupported-delete-operation",
}
def run_bandit(base_path: str) -> Dict[str, Any]:
"""运行 Bandit 安全扫描 - 只返回高置信度结果"""
result = subprocess.run(
["bandit", "-r", base_path, "-f", "json"],
capture_output=True,
text=True,
cwd=base_path,
)
issues = []
try:
# 跳过 INFO 日志行
for line in result.stdout.splitlines():
if not line.startswith("["):
output = json.loads(line)
for finding in output.get("results", []):
test_id = finding.get("issue_id", "")
conf = finding.get("issue_confidence", "")
sev = finding.get("issue_severity", "")
# 只报高置信度的安全问题
if test_id in BANDIT_HIGH_CONFIDENCE and conf in ("HIGH", "MEDIUM"):
issues.append({
"severity": "ERROR" if sev == "HIGH" else "WARNING",
"file": finding.get("filename", ""),
"line": finding.get("line_number", 0),
"test_id": test_id,
"message": finding.get("issue_text", ""),
})
except (json.JSONDecodeError, Exception):
pass
return {"issues": issues, "summary": {"total": len(issues)}}
def run_pylint_heavy(base_path: str) -> Dict[str, Any]:
"""Pylint 只报严重错误(语法/导入/逻辑错误),不报风格问题"""
result = subprocess.run(
["pylint",
"--output-format=json",
"--disable=all",
"--enable=fatal,syntax-error,parse-error,import-error,no-member,"
"no-name-in-module,unsubscriptable-object,not-callable,"
"invalid-unary-operand-type,unsupported-assignment-operation,"
"unsupported-delete-operation,inherit-non-class,",
"--max-line-length=200",
base_path,
],
capture_output=True,
text=True,
cwd=base_path,
)
issues = []
try:
for item in json.loads(result.stdout):
issues.append({
"severity": "WARNING",
"file": item.get("path", ""),
"line": item.get("line", 0),
"message": item.get("message", "")[:120],
"symbol": item.get("symbol", ""),
})
except json.JSONDecodeError:
pass
return {"issues": issues, "summary": {"total": len(issues)}}
def check_syntax_errors(base_path: str) -> List[Dict]:
"""Python 语法检查 - 所有无法被 AST 解析的文件"""
issues = []
for root, _, files in os.walk(base_path):
# 跳过非源码目录
skip = ["__pycache__", ".git", "node_modules", "migrations", "venv", ".venv"]
if any(s in root for s in skip):
continue
for f in files:
if f.endswith(".py"):
path = os.path.join(root, f)
rc = subprocess.run(["python3", "-m", "py_compile", path],
capture_output=True, text=True)
if rc.returncode != 0:
# 提取行号
line_num = 0
for line in rc.stderr.splitlines():
m = re.search(r"line (\d+)", line)
if m:
line_num = int(m.group(1))
break
issues.append({
"severity": "ERROR",
"file": path,
"line": line_num,
"message": "Python syntax error",
})
return issues
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SKILL_DIR = os.path.dirname(SCRIPT_DIR)
ESLINT_CONFIG = os.path.join(SKILL_DIR, "configs", "eslint.react.ts.js")
def run_eslint(base_path: str) -> Dict[str, Any]:
"""ESLint + TypeScript 检查 - 使用内置配置,忽略项目原有 eslintrc"""
# 构建命令:使用 --no-config-lookup 忽略项目原有配置,用内置 config
# ESLint v9+ 用 --no-config-lookup 替代了 --no-eslintrc
import os
env = {**os.environ, "NODE_PATH": "/usr/lib/node_modules"}
cmd = [
"npx", "eslint", "src",
"--ext", ".ts,.tsx",
"--no-config-lookup",
"-c", ESLINT_CONFIG,
"--format=json",
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=base_path,
timeout=120,
env=env,
)
issues = []
try:
output = json.loads(result.stdout)
for file_result in output:
for msg in file_result.get("messages", []):
sev = msg.get("severity", 0)
if sev < 1: # 只报 warning + error
continue
rule = msg.get("ruleId", "")
# 过滤掉噪音规则(但保留 no-unused-vars,这是真实警告)
if rule in ("", "no-undef"):
continue
severity_map = {2: "ERROR", 1: "WARNING"}
issues.append({
"severity": severity_map.get(sev, "WARNING"),
"file": os.path.basename(file_result.get("filePath", "")),
"line": msg.get("line", 0),
"message": msg.get("message", ""),
"rule": rule,
})
except (json.JSONDecodeError, Exception):
pass
# TypeScript 类型检查默认跳过(tsc 需要完整 node_modules + 正确 tsconfig 路径配置)
# SVN 仓库通常没有完整 node_modules,且路径别名配置与本地不同,tsc 会报大量误报
# 如需开启,改为: tsc_result = subprocess.run(["npx", "tsc", "--noEmit", ...
return {"issues": issues[:50], "summary": {"total": len(issues)}}
def run_phpcs(files: List[str], base_path: str) -> Dict[str, Any]:
"""PHP CodeSniffer - 只报严重问题"""
if not files:
return {"issues": [], "summary": "No PHP files"}
php_files = [f for f in files if f.endswith(".php")]
if not php_files:
return {"issues": [], "summary": "No PHP files"}
# 只报 ERROR 级别
result = subprocess.run(
["phpcs", "--standard=PSR12", "--severity=5,4,3", "-q"] + php_files,
capture_output=True,
text=True,
cwd=base_path,
)
issues = []
for line in result.stdout.splitlines():
if ": error" in line or ": warning" in line.lower():
parts = line.split(":")
if len(parts) >= 4:
issues.append({
"severity": "WARNING",
"file": parts[0],
"line": parts[1],
"message": ":".join(parts[3:]).strip()[:100],
})
return {"issues": issues, "summary": {"total": len(issues)}}
def scan_repo(repo_name: str, files: List[str], base_path: str, lang: str = None) -> Dict[str, Any]:
"""根据仓库类型调度分析"""
if lang is None:
lang = REPOS.get(repo_name, {}).get("lang", "django")
all_issues = []
if lang in ("python", "django"):
# 1. 语法错误检查
syntax_issues = check_syntax_errors(base_path)
all_issues.extend(syntax_issues)
# 2. Bandit 安全扫描
bandit_result = run_bandit(base_path)
all_issues.extend(bandit_result.get("issues", []))
# 3. Pylint 严重错误
pylint_result = run_pylint_heavy(base_path)
all_issues.extend(pylint_result.get("issues", []))
elif lang == "react":
eslint_result = run_eslint(base_path)
all_issues.extend(eslint_result.get("issues", []))
elif lang == "php":
phpcs_result = run_phpcs(files, base_path)
all_issues.extend(phpcs_result.get("issues", []))
elif lang == "mixed":
# PHP 后端
phpcs_result = run_phpcs(files, base_path)
all_issues.extend(phpcs_result.get("issues", []))
# React 前端
eslint_result = run_eslint(base_path)
all_issues.extend(eslint_result.get("issues", []))
# 去重
seen = set()
unique = []
for issue in all_issues:
key = f"{os.path.basename(issue.get('file', ''))}:{issue.get('line', 0)}:{issue.get('message', '')[:50]}"
if key not in seen:
seen.add(key)
unique.append(issue)
return {
"issues": unique,
"summary": {
"total": len(unique),
"errors": sum(1 for i in unique if i.get("severity") == "ERROR"),
"warnings": sum(1 for i in unique if i.get("severity") == "WARNING"),
},
}
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python3 analyzer_runner.py <repo_name> <base_path>", file=sys.stderr)
sys.exit(1)
repo_name = sys.argv[1]
base_path = sys.argv[2]
# 全量扫描:先找所有文件
valid_exts = (".py", ".ts", ".tsx", ".js", ".jsx", ".php", ".java")
all_files = []
for root, dirs, files in os.walk(base_path):
dirs[:] = [d for d in dirs if d not in ("__pycache__", ".git", "node_modules", "migrations", "venv", ".venv")]
for f in files:
if f.endswith(valid_exts):
all_files.append(os.path.join(root, f))
result = scan_repo(repo_name, all_files, base_path)
print(json.dumps(result, ensure_ascii=False, indent=2))
FILE:scripts/check_dependencies.py
#!/usr/bin/env python3
"""
工具依赖检查 - 必须在 skill 加载前运行
"""
import subprocess
import sys
import os
import json
import shlex
REQUIRED_TOOLS = {
"svn": {
"help": "SVN 客户端,用于 checkout 和 diff",
"install": "apt-get install subversion",
},
"bandit": {
"help": "Python 安全扫描",
"install": "pip install --break-system-packages bandit",
},
"pylint": {
"help": "Python 代码检查",
"install": "pip install --break-system-packages pylint",
},
"npx": {
"help": "Node.js 包执行器(用于 eslint)",
"install": "npm install -g npx",
},
"phpcs": {
"help": "PHP 代码规范检查",
"install": "composer global require squizlabs/php_codesniffer",
"deps": ["composer"],
},
"composer": {
"help": "PHP 包管理器(用于安装 phpcs)",
"install": "curl -sS https://getcomposer.org/installer -o /tmp/composer-setup.php && php /tmp/composer-setup.php -- --install-dir=/usr/local/bin --filename=composer",
},
"typescript-eslint": {
"help": "ESLint TypeScript 解析器(前端代码审计必需)",
"install": "npm install -g @typescript-eslint/parser @typescript-eslint/eslint-plugin typescript-eslint",
},
}
MISSING_TOOLS_FILE = "/tmp/code_review_missing_tools.json"
def check_tool(name: str) -> bool:
"""检查工具是否存在"""
# 特殊处理 Node.js 模块(@scope/package-name 形式)
if name == "typescript-eslint":
result = subprocess.run(
["node", "-e", "require('@typescript-eslint/parser')"],
capture_output=True,
text=True,
env={**os.environ, "NODE_PATH": "/usr/lib/node_modules"},
)
return result.returncode == 0
result = subprocess.run(
["which", name],
capture_output=True,
text=True,
)
return result.returncode == 0
def check_all() -> dict:
"""检查所有工具,返回缺失列表"""
missing = {}
for tool, info in REQUIRED_TOOLS.items():
if not check_tool(tool):
missing[tool] = info
return missing
def ensure_tools() -> bool:
"""确保所有工具已安装,返回是否全部就绪"""
missing = check_all()
if not missing:
print("[OK] All required tools are installed")
if os.path.exists(MISSING_TOOLS_FILE):
os.remove(MISSING_TOOLS_FILE)
return True
print(f"[WARN] Missing tools: {', '.join(missing.keys())}")
with open(MISSING_TOOLS_FILE, "w") as f:
json.dump(missing, f, ensure_ascii=False, indent=2)
return False
def install_tool(name: str) -> bool:
"""安装指定工具"""
if name not in REQUIRED_TOOLS:
print(f"[ERROR] Unknown tool: {name}")
return False
info = REQUIRED_TOOLS[name]
install_cmd = info["install"]
print(f"[INFO] Installing {name}: {install_cmd}")
deps = info.get("deps", [])
for dep in deps:
if not check_tool(dep):
print(f"[INFO] Installing dependency {dep} first...")
if not install_tool(dep):
print(f"[ERROR] Failed to install dependency {dep}")
return False
result = subprocess.run(
shlex.split(install_cmd),
capture_output=True,
text=True,
)
if result.returncode == 0:
print(f"[OK] {name} installed successfully")
return True
else:
print(f"[ERROR] Failed to install {name}: {result.stderr}")
return False
def install_all() -> dict:
"""尝试安装所有缺失的工具,返回结果"""
missing = check_all()
results = {}
for tool in missing:
results[tool] = install_tool(tool)
return results
if __name__ == "__main__":
if len(sys.argv) < 2:
missing = check_all()
if not missing:
print("All tools OK")
sys.exit(0)
print(f"Missing: {', '.join(missing.keys())}")
print("\nTrying to install missing tools...")
results = install_all()
failed = [t for t, ok in results.items() if not ok]
if failed:
print(f"\nFailed to install: {', '.join(failed)}")
sys.exit(1)
print("\nAll tools installed successfully")
sys.exit(0)
action = sys.argv[1]
if action == "check":
missing = check_all()
if missing:
for tool, info in missing.items():
print(f"MISSING: {tool} - {info['help']}")
print(f" Install: {info['install']}")
sys.exit(1)
print("All tools OK")
sys.exit(0)
elif action == "install" and len(sys.argv) >= 3:
ok = install_tool(sys.argv[2])
sys.exit(0 if ok else 1)
elif action == "install-all":
results = install_all()
failed = [t for t, ok in results.items() if not ok]
if failed:
print(f"Failed: {', '.join(failed)}")
sys.exit(1)
print("All installed")
sys.exit(0)
elif action == "ensure":
ok = ensure_tools()
sys.exit(0 if ok else 1)
else:
print(f"Usage: {sys.argv[0]} [check|install|install-all|ensure]")
sys.exit(1)
FILE:scripts/code_review.py
#!/usr/bin/env python3
"""
代码审计主调度脚本
用法:
python3 code_review.py incremental <repo_name> # 增量扫描(post-commit hook 调用)
python3 code_review.py full <repo_name> # 全量扫描(定时任务调用)
python3 code_review.py fullall # 全量扫描所有仓库
python3 code_review.py sync # 同步所有仓库
python3 code_review.py setup-hooks # 生成 SVN post-commit hook 脚本
python3 code_review.py check-deps # 检查工具依赖
python3 code_review.py install-deps # 安装缺失工具
"""
import sys
import os
import json
import subprocess
from datetime import datetime
# 添加脚本目录到路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 依赖检查
from check_dependencies import ensure_tools, check_all
# 延迟导入(确保环境变量已加载)
from svn_manager import get_repos, checkout_or_update, get_changed_files_local, update_scan_state, get_feishu_chat_id
from analyzer_runner import scan_repo
from report_generator import build_report
def send_feishu_message(message: str, chat_id: str = None):
"""发送飞书消息"""
if chat_id is None:
chat_id = get_feishu_chat_id()
if not chat_id:
print("[WARN] No feishu chat_id configured. Message not sent.")
print(message)
return
# 保存消息到文件,由 OpenClaw 读取推送
msg_file = "/tmp/code_review_pending_msg.json"
with open(msg_file, "w") as f:
json.dump({"chat_id": chat_id, "message": message, "time": datetime.now().isoformat()}, f)
print(f"[OK] Message saved to {msg_file}")
def run_incremental_scan(repo_name: str):
"""增量扫描:post-commit 触发"""
print(f"[INFO] Starting incremental scan for {repo_name}")
# 1. 检查工具
print(f"[STEP 0/5] Checking dependencies...")
if not ensure_tools():
print(f"[WARN] Some tools missing. Run 'code_review.py install-deps' first.")
# 2. 更新仓库
print(f"[STEP 1/5] Syncing repo {repo_name}...")
ok = checkout_or_update(repo_name)
if not ok:
print(f"[ERROR] Failed to sync repo {repo_name}")
return
# 3. 获取变更文件
print(f"[STEP 2/5] Getting changed files...")
changed_files = get_changed_files_local(repo_name)
if not changed_files:
print(f"[OK] No changes detected for {repo_name}")
return
print(f"[INFO] Found {len(changed_files)} changed files")
# 4. 过滤有效文件
valid_extensions = (".py", ".ts", ".tsx", ".js", ".jsx", ".php", ".java")
valid_files = [f for f in changed_files if any(f.endswith(ext) for ext in valid_extensions)]
if not valid_files:
print(f"[OK] No scannable files changed")
return
print(f"[INFO] {len(valid_files)} files to scan")
# 5. 运行分析
print(f"[STEP 3/5] Running analysis...")
repos = get_repos()
repo_info = repos.get(repo_name, {})
base_path = repo_info.get("local", f"/tmp/svn_repos/{repo_name}")
lang = repo_info.get("lang", "python")
if lang == "mixed":
all_results = {}
php_files = [f for f in valid_files if f.endswith(".php")]
react_files = [f for f in valid_files if any(f.endswith(ext) for ext in [".ts", ".tsx", ".js", ".jsx"])]
if php_files:
all_results["php"] = scan_repo("gm_php", php_files, base_path)
if react_files:
all_results["react"] = scan_repo("gm_react", react_files, base_path)
else:
all_results = scan_repo(repo_name, valid_files, base_path)
# 6. 生成报告
print(f"[STEP 4/5] Generating report...")
report = build_report(repo_name, lang, all_results, "incremental", valid_files)
# 7. 推送飞书
print(f"[STEP 5/5] Sending to Feishu...")
send_feishu_message(report["text"])
# 保存报告
report_file = f"/tmp/code_review_{repo_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, "w") as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"[OK] Report saved to {report_file}")
# 更新状态
update_scan_state(repo_name)
return report
def run_full_scan(repo_name: str):
"""全量扫描:定时任务调用"""
print(f"[INFO] Starting full scan for {repo_name}")
# 0. 检查工具
print(f"[STEP 0/4] Checking dependencies...")
if not ensure_tools():
print(f"[WARN] Some tools missing. Run 'code_review.py install-deps' first.")
# 1. 更新仓库
print(f"[STEP 1/4] Syncing repo {repo_name}...")
ok = checkout_or_update(repo_name)
if not ok:
print(f"[ERROR] Failed to sync repo {repo_name}")
return
# 2. 获取全量文件
print(f"[STEP 2/4] Getting all files...")
repos = get_repos()
repo_info = repos.get(repo_name, {})
base_path = repo_info.get("local", f"/tmp/svn_repos/{repo_name}")
lang = repo_info.get("lang", "python")
valid_extensions = (".py", ".ts", ".tsx", ".js", ".jsx", ".php", ".java")
all_files = []
skip_dirs = ["__pycache__", ".git", "node_modules", ".svn", "migrations", "vendor", "target", "dist", "build", ".next", "venv", ".venv", "tests"]
for root, dirs, files in os.walk(base_path):
dirs[:] = [d for d in dirs if d not in skip_dirs]
for file in files:
if any(file.endswith(ext) for ext in valid_extensions):
all_files.append(os.path.join(root, file))
if not all_files:
print(f"[WARN] No files found in {repo_name}")
return
print(f"[INFO] Found {len(all_files)} files to scan")
# 3. 运行分析
print(f"[STEP 3/4] Running analysis...")
if lang == "mixed":
all_results = {}
php_files = [f for f in all_files if f.endswith(".php")]
react_files = [f for f in all_files if any(f.endswith(ext) for ext in [".ts", ".tsx", ".js", ".jsx"])]
if php_files:
all_results["php"] = scan_repo("gm_php", php_files, base_path, lang="php")
if react_files:
all_results["react"] = scan_repo("gm_react", react_files, base_path, lang="react")
else:
all_results = scan_repo(repo_name, all_files, base_path)
# 4. 生成报告
print(f"[STEP 4/4] Generating report...")
report = build_report(repo_name, lang, all_results, "full", all_files)
send_feishu_message(report["text"])
# 保存报告
report_file = f"/tmp/code_review_full_{repo_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, "w") as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"[OK] Report saved to {report_file}")
return report
def generate_hook_script() -> str:
"""生成 SVN post-commit hook 脚本"""
# 用 $$ 转义 $,使 Python 输出原始的 $ 字符(不被当作变量插值)
hook_script = """#!/bin/bash
# SVN Post-Commit Hook - 代码审计触发脚本
# 将此脚本放到 SVN 仓库的 hooks 目录,重命名为 post-commit
# 确保有执行权限: chmod +x post-commit
REPOS="$1"
REV="$2"
# 仓库名称映射(根据实际路径修改)
case "$REPOS" in
*/ops_api*) REPO_NAME="ops_api" ;;
*/ops_web*) REPO_NAME="ops_web" ;;
*/gm*) REPO_NAME="gm" ;;
*) echo "Unknown repo: $REPOS" && exit 0 ;;
esac
# 调用代码审计(后台执行,避免阻塞提交)
python3 /path/to/code-review/scripts/code_review.py incremental "$REPO_NAME" >> /var/log/svn_code_review.log 2>&1 &
exit 0
"""
return hook_script
if __name__ == "__main__":
if len(sys.argv) < 2:
print(__doc__)
sys.exit(1)
action = sys.argv[1]
if action == "incremental" and len(sys.argv) >= 3:
run_incremental_scan(sys.argv[2])
elif action == "full" and len(sys.argv) >= 3:
run_full_scan(sys.argv[2])
elif action == "fullall":
repos = get_repos()
for repo_name in repos:
repo_info = repos[repo_name]
if repo_info.get("type") in ("full", "both"):
run_full_scan(repo_name)
elif action == "sync":
from svn_manager import sync_all_repos
sync_all_repos()
elif action == "setup-hooks":
print(generate_hook_script())
elif action == "check-deps":
missing = check_all()
if missing:
for tool, info in missing.items():
print(f"MISSING: {tool} - {info['help']}")
print(f" Install: {info['install']}")
sys.exit(1)
print("All tools OK")
sys.exit(0)
elif action == "install-deps":
from check_dependencies import install_all
results = install_all()
failed = [t for t, ok in results.items() if not ok]
if failed:
print(f"Failed: {', '.join(failed)}")
sys.exit(1)
print("All installed")
else:
print(f"Unknown action: {action}", file=sys.stderr)
print(__doc__)
sys.exit(1)
FILE:scripts/report_generator.py
#!/usr/bin/env python3
"""
代码审计报告生成器 - 生成飞书消息格式
敏感配置(飞书群 ID)通过环境变量读取,不硬编码
"""
import json
import sys
import os
from datetime import datetime
from typing import Dict, List, Any
# 飞书群 ID 通过环境变量读取(不硬编码)
FEISHU_GROUP_ID = os.environ.get("CODE_REVIEW_FEISHU_CHAT_ID", "")
def severity_emoji(severity: str) -> str:
"""严重级别对应 emoji"""
mapping = {
"ERROR": "🔴",
"HIGH": "🔴",
"WARNING": "🟡",
"MEDIUM": "🟡",
"INFO": "🟢",
"LOW": "🟢",
}
return mapping.get(severity.upper(), "⚪")
def severity_label(severity: str) -> str:
"""严重级别标签"""
mapping = {
"ERROR": "错误",
"HIGH": "严重",
"WARNING": "警告",
"MEDIUM": "中等",
"INFO": "提示",
"LOW": "低",
}
return mapping.get(severity.upper(), severity)
def format_file_path(path: str) -> str:
"""精简文件路径显示"""
if "/tmp/svn_repos/" in path:
parts = path.split("/tmp/svn_repos/")[1].split("/", 1)
if len(parts) > 1:
return parts[1]
return os.path.basename(path)
def group_issues_by_file(issues: List[Dict]) -> Dict[str, List[Dict]]:
"""按文件分组问题"""
grouped = {}
for issue in issues:
file_key = issue.get("file", "unknown")
if file_key not in grouped:
grouped[file_key] = []
grouped[file_key].append(issue)
return grouped
def generate_fix_suggestion(issue: Dict, lang: str) -> str:
"""生成修复建议"""
message = issue.get("message", "")
test_id = issue.get("test_id", "")
symbol = issue.get("symbol", "")
rule = issue.get("rule", "")
suggestions = {
# Django/Python 安全规则
"B101": "检查随机数生成方式",
"B102": "使用更安全的动态代码执行方式",
"B103": "生产环境安全配置检查",
"B104": "网络地址配置检查",
"B105": "敏感信息使用密钥管理服务",
"B106": "使用标准加密算法",
"B107": "密钥从环境变量读取",
"B108": "日志脱敏检查",
"B201": "子进程调用方式检查",
"B301": "反序列化安全检查",
"B303": "加密算法强度检查",
"B307": "动态代码执行安全检查",
"B310": "URL 安全性检查",
"B311": "安全随机数生成检查",
"B313": "XML 解析安全检查",
"B323": "HTTPS 配置检查",
"B324": "哈希算法强度检查",
"B413": "加密库使用检查",
"B419": "SSL 证书验证检查",
"B506": "YAML 解析安全检查",
"B601": "系统命令调用安全检查",
"B602": "系统命令调用方式检查",
"B603": "系统调用方式检查",
"B604": "系统调用安全检查",
"B701": "表单输入安全检查",
}
if test_id in suggestions:
return suggestions[test_id]
if symbol in suggestions:
return suggestions[symbol]
# 根据消息内容生成通用建议
msg_lower = message.lower()
if "hardcoded" in msg_lower or "硬编码" in msg_lower:
return "使用环境变量或配置文件管理敏感信息"
if "sql" in msg_lower or "注入" in msg_lower:
return "使用参数化查询,避免字符串拼接 SQL"
if "eval" in msg_lower:
return "避免使用 eval(),改用更安全的替代方案"
if "shell" in msg_lower:
return "避免使用 shell=True,使用列表参数形式"
if "password" in msg_lower or "secret" in msg_lower:
return "使用环境变量或密钥管理服务,不要硬编码"
if "debug" in msg_lower:
return "确保生产环境 DEBUG=False"
return "建议人工审核并修复"
def build_report(
repo_name: str,
lang: str,
scan_result: Dict[str, Any],
scan_type: str,
changed_files: List[str] = None,
) -> Dict[str, Any]:
"""构建飞书报告内容"""
all_issues = []
if isinstance(scan_result, dict) and "issues" in scan_result and "summary" in scan_result:
all_issues = scan_result.get("issues", [])
elif isinstance(scan_result, dict):
for lang_result in scan_result.values():
if isinstance(lang_result, dict):
all_issues.extend(lang_result.get("issues", []))
total = len(all_issues)
errors = sum(1 for i in all_issues if i.get("severity") in ("ERROR", "HIGH"))
warnings = sum(1 for i in all_issues if i.get("severity") in ("WARNING", "MEDIUM"))
info = sum(1 for i in all_issues if i.get("severity") in ("INFO", "LOW"))
# 仓库别名(通用展示名称,不含敏感 URL)
repo_alias = {
"ops_api": "运维后台后端 (Django)",
"ops_web": "运维后台前端 (React+TS)",
"ops_api_trunk": "运维后台后端 (Django)",
"ops_web_trunk": "运维后台前端 (React+TS)",
"gm": "GM后台 (PHP+React)",
}.get(repo_name, repo_name)
# 仓库 URL 从 svn_manager 动态读取(不硬编码)
try:
from svn_manager import get_repo_url
repo_url = get_repo_url(repo_name)
except Exception:
repo_url = ""
scan_type_label = "🔄 增量扫描" if scan_type == "incremental" else "📋 全量扫描"
time_str = datetime.now().strftime("%Y-%m-%d %H:%M")
url_line = f"\n🔗 {repo_url}" if repo_url else ""
header = f"""📊 **代码审计报告**
━━━━━━━━━━━━━━━━━━
🏷️ 仓库:{repo_alias}{url_line}
📅 时间:{time_str}
🔍 方式:{scan_type_label}
📁 扫描文件:{len(changed_files) if changed_files else 'N/A'}
🐛 问题总数:{total}
🔴 严重/错误:{errors}
🟡 警告:{warnings}
🟢 提示:{info}
━━━━━━━━━━━━━━━━━━"""
if total == 0:
body = "\n\n✅ **本次扫描无问题,代码质量良好!**"
else:
sorted_issues = sorted(
all_issues,
key=lambda x: (
0 if x.get("severity") in ("ERROR", "HIGH") else
1 if x.get("severity") in ("WARNING", "MEDIUM") else 2
)
)
display_issues = sorted_issues[:20]
body_parts = []
for issue in display_issues:
emoji = severity_emoji(issue.get("severity", "INFO"))
file_p = format_file_path(issue.get("file", ""))
line = issue.get("line", 0)
message = issue.get("message", "")[:80]
suggestion = generate_fix_suggestion(issue, lang)
issue_text = f"""{emoji} **{severity_label(issue.get("severity", "INFO"))}** | `{file_p}`:{line}
└ {message}
└ 🛠️ 修复:{suggestion}"""
body_parts.append(issue_text)
body = "\n\n".join(body_parts)
if total > 20:
body += f"\n\n_...还有 {total - 20} 个问题未显示_"
footer = "\n━━━━━━━━━━━━━━━━━━\n⚙️ 由 OpenClaw 代码审计 Skill 自动生成"
full_report = header + body + footer
return {
"repo_name": repo_name,
"repo_alias": repo_alias,
"scan_type": scan_type,
"total_issues": total,
"errors": errors,
"warnings": warnings,
"info": info,
"changed_files_count": len(changed_files) if changed_files else 0,
"text": full_report,
"issues": all_issues[:50],
}
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python3 report_generator.py <repo_name> [scan_result_json]", file=sys.stderr)
sys.exit(1)
repo_name = sys.argv[1]
if len(sys.argv) >= 3:
scan_result = json.loads(sys.argv[2])
else:
scan_result = {}
scan_type = sys.argv[3] if len(sys.argv) >= 4 else "incremental"
changed_files_json = sys.argv[4] if len(sys.argv) >= 5 else "[]"
changed_files = json.loads(changed_files_json) if changed_files_json != "[]" else None
lang_map = {
"ops_api": "django",
"ops_web": "react",
"ops_api_trunk": "django",
"ops_web_trunk": "react",
"gm": "mixed",
}
report = build_report(
repo_name,
lang_map.get(repo_name, "unknown"),
scan_result,
scan_type,
changed_files,
)
print(json.dumps(report, ensure_ascii=False, indent=2))
FILE:scripts/svn_manager.py
#!/usr/bin/env python3
"""
SVN 仓库管理:检出、更新、获取变更文件
敏感配置通过环境变量或 config.json 注入,不随 skill 上传
"""
import subprocess
import sys
import os
import re
import json
import os
from typing import List, Dict, Optional
# === 敏感配置:从环境变量读取 ===
SVN_USER = os.environ.get("CODE_REVIEW_SVN_USER", "")
SVN_PASS = os.environ.get("CODE_REVIEW_SVN_PASS", "")
# === 仓库配置:从环境变量或 config.json 读取 ===
# config.json 不随 skill 上传,用户需要自行创建
_config_cache = None
def _load_config() -> dict:
"""加载配置文件"""
global _config_cache
if _config_cache is not None:
return _config_cache
config_paths = [
os.environ.get("CODE_REVIEW_CONFIG", ""),
"/etc/code-review/config.json",
os.path.expanduser("~/.config/code-review/config.json"),
"/tmp/code_review_config.json",
]
for path in config_paths:
if path and os.path.exists(path):
with open(path) as f:
_config_cache = json.load(f)
return _config_cache
# 没有配置文件时的默认值(用户必须配置)
_config_cache = {}
return _config_cache
def get_repo_url(repo_name: str) -> str:
"""获取仓库 URL"""
config = _load_config()
repos = config.get("repos", {})
if repo_name in repos:
return repos[repo_name].get("url", "")
return ""
def get_feishu_chat_id() -> str:
"""获取飞书群 ID,优先从环境变量读取"""
return os.environ.get("CODE_REVIEW_FEISHU_CHAT_ID", "")
# === 仓库配置(通用模板,上传时脱敏)===
# URL 和认证信息通过 config.json 配置
REPOS_TEMPLATE = {
# 模板占位符,实际配置在 config.json 中
# 格式:repo_name: { "lang": "python|react|mixed", "type": "incremental|full|both" }
}
def get_repos() -> dict:
"""动态获取仓库配置"""
config = _load_config()
return config.get("repos", {})
def svn_auth_cmd(cmd: List[str]) -> List[str]:
"""给 SVN 命令加上账密"""
if SVN_USER and SVN_PASS:
return cmd + ["--username", SVN_USER, "--password", SVN_PASS, "--no-auth-cache"]
return cmd
def run_cmd(cmd: List[str], cwd: Optional[str] = None) -> tuple:
"""执行命令,返回 (returncode, stdout, stderr)"""
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
)
return result.returncode, result.stdout, result.stderr
def checkout_or_update(repo_name: str) -> bool:
"""检出或更新仓库"""
repos = get_repos()
if repo_name not in repos:
print(f"[ERROR] Repo {repo_name} not configured. Add it to config.json", file=sys.stderr)
return False
repo = repos[repo_name]
url = repo.get("url", "")
local_path = repo.get("local", f"/tmp/svn_repos/{repo_name}")
if not url:
print(f"[ERROR] No URL configured for {repo_name}", file=sys.stderr)
return False
os.makedirs(os.path.dirname(local_path.rstrip("/")), exist_ok=True)
if os.path.exists(os.path.join(local_path, ".svn")):
cmd = svn_auth_cmd(["svn", "update", "--accept", "theirs-full"])
rc, out, err = run_cmd(cmd, cwd=local_path)
else:
cmd = svn_auth_cmd(["svn", "checkout", url, local_path])
rc, out, err = run_cmd(cmd)
if rc != 0:
print(f"[ERROR] SVN {repo_name} failed: {err}", file=sys.stderr)
return False
print(f"[OK] SVN {repo_name} updated")
return True
def get_changed_files_local(repo_name: str) -> List[str]:
"""本地获取最新变更文件(用于增量扫描)"""
repos = get_repos()
if repo_name not in repos:
return []
repo = repos[repo_name]
local_path = repo.get("local", f"/tmp/svn_repos/{repo_name}")
if not os.path.exists(os.path.join(local_path, ".svn")):
print(f"[WARN] {repo_name} not checked out yet", file=sys.stderr)
return []
# 获取当前版本
cmd = svn_auth_cmd(["svn", "info", "--show-item", "revision"])
rc, out, err = run_cmd(cmd, cwd=local_path)
if rc != 0:
return []
current_rev = out.strip()
# 与上一个已扫版本比较
state_file = f"/tmp/svn_repos/.scan_state_{repo_name}.json"
last_rev = "1"
if os.path.exists(state_file):
with open(state_file) as f:
last_rev = json.load(f).get("last_rev", "1")
if last_rev == current_rev:
return []
cmd = svn_auth_cmd([
"svn", "diff",
"--revision", f"{last_rev}:{current_rev}",
local_path,
])
rc, out, err = run_cmd(cmd)
if rc != 0:
return []
files = []
for line in out.splitlines():
if line.startswith(("A ", "M ", "MM ")):
f = line[4:].strip()
if f:
files.append(os.path.join(local_path, f))
return files
def update_scan_state(repo_name: str):
"""更新扫描状态"""
repos = get_repos()
if repo_name not in repos:
return
local_path = repos[repo_name].get("local", f"/tmp/svn_repos/{repo_name}")
os.makedirs("/tmp/svn_repos", exist_ok=True)
state_file = f"/tmp/svn_repos/.scan_state_{repo_name}.json"
cmd = svn_auth_cmd(["svn", "info", "--show-item", "revision"])
rc, out, err = run_cmd(cmd, cwd=local_path)
if rc == 0:
with open(state_file, "w") as f:
json.dump({"last_rev": out.strip()}, f)
def sync_all_repos():
"""同步所有仓库"""
for repo_name in get_repos():
checkout_or_update(repo_name)
def list_repos():
"""列出所有已配置的仓库"""
repos = get_repos()
if not repos:
print("No repos configured. Create config.json first.")
return
for name, info in repos.items():
print(f" {name}: {info.get('lang')} - {info.get('url')}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python3 svn_manager.py <action> [args]")
print(" list - 列出所有已配置仓库")
print(" sync - 同步所有仓库")
print(" sync <repo_name> - 同步指定仓库")
print(" changed <repo> - 获取变更文件")
sys.exit(1)
action = sys.argv[1]
if action == "list":
list_repos()
elif action == "sync":
if len(sys.argv) >= 3:
checkout_or_update(sys.argv[2])
else:
sync_all_repos()
elif action == "changed" and len(sys.argv) >= 3:
files = get_changed_files_local(sys.argv[2])
print("\n".join(files))
else:
print(f"Unknown action: {action}", file=sys.stderr)
sys.exit(1)