@clawhub-zhangpengle-53c84bff66
团队工作计划周度/月度汇总分析。仅当用户**明确提供**了团队工作计划数据(粘贴内容或指向企微智能表格)并要求汇总/分析时触发。典型意图:"这是团队工作计划,帮我看下上周/本周情况"、"我把数据复制过来了,分析一下"、"帮我从企微表格拉取工作计划汇总"。**不触发**:仅提到"周报"、"本周工作"等通用词但未提供...
---
name: wecom-workplan-summary
description: 团队工作计划周度/月度汇总分析。仅当用户**明确提供**了团队工作计划数据(粘贴内容或指向企微智能表格)并要求汇总/分析时触发。典型意图:"这是团队工作计划,帮我看下上周/本周情况"、"我把数据复制过来了,分析一下"、"帮我从企微表格拉取工作计划汇总"。**不触发**:仅提到"周报"、"本周工作"等通用词但未提供数据或未明确指向企微表格。
allowed-tools: Bash(python3:*), Bash(mkdir:*), Write
---
# 团队工作计划汇总技能
按周度或月度汇总团队工作记录,支持两种输入方式:从企微智能表格读取(MCP 模式),或直接粘贴表格内容(粘贴模式)。脚本负责数据整理,你负责生成自然语言报告。
## 数据源(MCP 模式)
**智能表格**:
- docid: `dcrZNwuyF7QzK5GW4oQ9B3Y3i6Vdc_RzIxAc1zWMMVr9K4EWCEEza2Ea1XGGuQumBW2IKp9XR6au-lDtMi--j27A`
- sheet_id: `q979lj`
- 字段:`日期`、`今日计划`、`姓名`/`成员`/`提交人`、`岗位`/`职位`
## 执行步骤
### 分支 A — 粘贴模式(用户提供了工作计划内容)
**触发条件**:用户消息中包含多行表格内容(tab 或逗号分隔),并说明这是团队工作计划数据。
1. 将用户提供的表格内容原样写入 `/tmp/workplan_paste.tsv`
2. 根据用户是否指定时间范围调用脚本:
**用户未指定时间**(脚本自动从数据中检测):
```bash
python3 ~/.openclaw/workspace/skills/wecom-workplan-summary/scripts/summary.py 周度 --data /tmp/workplan_paste.tsv
```
**用户指定了"上周"/"本周"/"第X周"**(换算为周数后传入):
```bash
python3 ~/.openclaw/workspace/skills/wecom-workplan-summary/scripts/summary.py 周度 <week_num> <year> --data /tmp/workplan_paste.tsv
```
**用户指定了月份**:
```bash
python3 ~/.openclaw/workspace/skills/wecom-workplan-summary/scripts/summary.py 月度 <month> <year> --data /tmp/workplan_paste.tsv
```
3. 根据脚本输出的结构化原始数据,生成最终报告(见下方"报告生成规范")
**粘贴数据格式要求**(可在执行前提示用户):
- 必须包含列:`日期`、`姓名`(或`成员`/`提交人`)、`今日计划`(或`工作内容`/`工作计划`)
- 可选列:`岗位`/`职位`
- 支持 Tab 分隔(从企微/Excel/WPS 直接复制)或逗号分隔
### 分支 B — MCP 模式(用户未提供数据,要求从企微读取)
**触发条件**:用户明确说"从企微表格读取"、"帮我拉一下数据"等,未粘贴内容。
1. 调用 wecom_mcp 获取智能表格记录:
```
wecom_mcp call doc smartsheet_get_records '{"docid":"dcrZNwuyF7QzK5GW4oQ9B3Y3i6Vdc_RzIxAc1zWMMVr9K4EWCEEza2Ea1XGGuQumBW2IKp9XR6au-lDtMi--j27A","sheet_id":"q979lj"}'
```
2. 根据用户指定时间范围调用脚本:
```bash
python3 ~/.openclaw/workspace/skills/wecom-workplan-summary/scripts/summary.py 周度 <week_num> <year>
python3 ~/.openclaw/workspace/skills/wecom-workplan-summary/scripts/summary.py 月度 <month> <year>
```
3. 根据脚本输出生成最终报告(见下方"报告生成规范")
---
## 报告生成规范
脚本输出的是结构化原始数据(每人 + 每日计划明细),**你需要将其转化为自然语言报告**。
### 输出格式模板
```
📅 第N周工作周报(MM/DD-MM/DD)
{姓名} · {岗位}
本周:{自然语言汇总}
🔹 目标对齐:{一句话判断}
💡 建议:{针对性建议}
---
{姓名} · {岗位}
本周:...
🔹 目标对齐:...
💡 建议:...
---
注:{未提交情况说明}
```
### 生成原则
**本周汇总(最重要)**:
- 相同或高度相似的任务合并,标注天数:`HJJ驻场(4天)`
- 有明显进展逻辑的用 `→` 连接:`接口页面重构 → SOP重构 → 页面内容优化`
- 多个不同任务用 ` + ` 连接:`RAG知识库开发 + mineruAPI对接`
- 编号列表(1. 2. 3.)提炼为核心主题,不照搬编号
- 请假直接标注:`周三请假,其余时间:XX工作`
**🔹 目标对齐**:
- 判断该人工作与团队当前目标的关联,1句话,要有实质内容
- 不写"任务推进中"、"目标明确"等套话
- 例:`技术侧支撑AI标书工具开发` / `项目交付-前端重构推进` / `驻场支持客户现场`
**💡 建议**:
- 结合当周实际工作内容给出具体建议,不泛泛而谈
- 不写"持续推进"、"注意进度同步"等无意义话
- 例:`连续4天驻场,建议定期输出问题清单同步团队` / `单一任务深耕,可适当输出中间产物便于评估`
**结尾注释**:
- 全员提交:`注:本周全员均有记录`
- 有缺失:`注:以下人员本周未提交记录:XX、XX`
---
## 注意事项
1. 管理层(张鹏乐、王紫龙、付岩)放在最前面
2. 无记录人员单独在末尾说明,不猜测原因
3. 语气亲切、专业,不严厉
## 依赖工具
- wecom_mcp:MCP 模式下读取智能表格数据
- Bash(python3:*):执行汇总脚本
FILE:scripts/summary.py
#!/usr/bin/env python3
"""
团队工作计划汇总脚本
用法:
python3 summary.py [周度/月度] [周数/月数] [年] [--data <文件路径>]
python3 summary.py 周度 --data /tmp/workplan_paste.tsv # 粘贴模式,自动检测周期
python3 summary.py 周度 13 2026 # MCP模式,指定第13周
"""
import json
import subprocess
import sys
import re
import csv
import io
from collections import defaultdict
from datetime import datetime, timedelta
# 智能表格配置
DOCID = "dcrZNwuyF7QzK5GW4oQ9B3Y3i6Vdc_RzIxAc1zWMMVr9K4EWCEEza2Ea1XGGuQumBW2IKp9XR6au-lDtMi--j27A"
SHEET_ID = "q979lj"
# 管理层角色
ROLE_MAP = {
"张鹏乐": ("总经理", 0),
"王紫龙": ("项目总监", 1),
"付岩": ("技术总监", 2),
}
# 列名别名映射
COLUMN_ALIASES = {
"日期": ["日期", "date", "时间", "提交日期"],
"姓名": ["姓名", "成员", "提交人", "name", "人员"],
"岗位": ["岗位", "职位", "position", "角色", "role"],
"计划": ["今日计划", "工作内容", "计划内容", "工作计划", "今日工作", "本日计划", "任务"],
}
def call_smartsheet_get_records():
"""调用 wecom_mcp 获取智能表格记录"""
cmd = [
"wecom_mcp", "call", "doc", "smartsheet_get_records",
json.dumps({"docid": DOCID, "sheet_id": SHEET_ID})
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"MCP调用失败: {result.stderr}")
try:
data = json.loads(result.stdout)
if data.get("errcode") != 0:
raise Exception(f"API错误: {data.get('errmsg')}")
return data.get("data", {}).get("records", [])
except json.JSONDecodeError:
raise Exception(f"返回数据解析失败: {result.stdout}")
def detect_separator(text):
"""检测分隔符,优先 tab,其次逗号"""
first_line = text.split('\n')[0] if '\n' in text else text
if '\t' in first_line:
return '\t'
return ','
def match_column(header, field_key):
"""模糊匹配列名到字段键"""
header_lower = header.strip().lower()
for alias in COLUMN_ALIASES.get(field_key, []):
if header_lower == alias.lower():
return True
return False
def parse_data_file(filepath):
"""解析 TSV/CSV 文件,返回与 MCP 格式相同的 records 列表"""
with open(filepath, 'r', encoding='utf-8-sig') as f:
content = f.read()
sep = detect_separator(content)
reader = csv.reader(io.StringIO(content), delimiter=sep)
rows = list(reader)
if not rows:
raise Exception("数据文件为空")
# 识别表头行(找第一个非空行)
header_row = None
data_start = 0
for i, row in enumerate(rows):
if any(cell.strip() for cell in row):
header_row = row
data_start = i + 1
break
if header_row is None:
raise Exception("未找到表头行")
# 建立列名到索引的映射
col_map = {}
for field_key in COLUMN_ALIASES:
for j, header in enumerate(header_row):
if match_column(header, field_key):
col_map[field_key] = j
break
if "日期" not in col_map:
raise Exception(f"未找到日期列,当前表头:{header_row}")
if "姓名" not in col_map:
raise Exception(f"未找到姓名列,当前表头:{header_row}")
if "计划" not in col_map:
raise Exception(f"未找到计划列,当前表头:{header_row}")
date_col = col_map["日期"]
name_col = col_map["姓名"]
plan_col = col_map["计划"]
pos_col = col_map.get("岗位")
# 转换为内部 records 格式
# 多行计划处理:编号列表出现在最后一行(含姓名/日期)之前
# 策略:缓存无姓名/日期的孤立行,遇到有效命名行时将缓存合并为该记录的前置计划内容
max_col = max(col_map.values())
records = []
pending_lines = [] # 缓存等待归属的孤立行
for row in rows[data_start:]:
if not any(cell.strip() for cell in row):
continue
# 补齐短行
while len(row) <= max_col:
row.append("")
name = row[name_col].strip()
date_str = row[date_col].strip()
plan = row[plan_col].strip()
# 判断是否是有效记录行(有姓名且有日期)
has_name = bool(name)
has_date = bool(date_str) and parse_date(date_str) is not None
if has_name and has_date:
# 将缓存的孤立行 + 当前行的计划合并
all_parts = [p for p in pending_lines if p] + ([plan] if plan else [])
combined_plan = "\n".join(all_parts)
pending_lines = []
fields = {
"日期": date_str,
"姓名": name,
"今日计划": combined_plan,
}
if pos_col is not None:
fields["岗位"] = row[pos_col].strip()
records.append({"fields": fields, "record_id": name})
elif not has_name and not has_date and plan:
# 纯文本续行(无姓名也无日期):缓存,等待归属到下一个有名字的记录
pending_lines.append(plan)
# 有日期但无姓名的行(如机器人/匿名条目)直接跳过
return records
def get_week_range(week_num, year):
"""获取指定周数的日期范围(周一到周五)"""
jan_1 = datetime(year, 1, 1)
first_monday = jan_1 + timedelta(days=(7 - jan_1.weekday()) % 7)
target_monday = first_monday + timedelta(weeks=week_num - 1)
dates = []
for i in range(5):
d = target_monday + timedelta(days=i)
dates.append(d)
return dates
def get_month_range(year, month):
"""获取指定月份的日期范围(仅工作日)"""
from calendar import monthrange
start = datetime(year, month, 1)
_, last_day = monthrange(year, month)
end = datetime(year, month, last_day)
dates = []
d = start
while d <= end:
if d.weekday() < 5:
dates.append(d)
d += timedelta(days=1)
return dates
def get_week_num(date, year):
"""计算某个日期是该年第几周(与 get_week_range 对应)"""
jan_1 = datetime(year, 1, 1)
first_monday = jan_1 + timedelta(days=(7 - jan_1.weekday()) % 7)
delta = date - first_monday
if delta.days < 0:
return 1
return delta.days // 7 + 1
def parse_date(date_str):
"""解析日期字符串"""
formats = [
"%Y年%m月%d日",
"%Y-%m-%d",
"%Y/%m/%d",
"%m/%d/%Y",
"%m-%d-%Y",
]
for fmt in formats:
try:
return datetime.strptime(date_str.strip(), fmt)
except ValueError:
continue
return None
def detect_time_range(records, mode="周度"):
"""从记录中自动检测时间范围:找数据量最多的周/月"""
date_counts = defaultdict(int)
for record in records:
fields = record.get("fields", {})
date_val = fields.get("日期", "")
if isinstance(date_val, list):
date_val = date_val[0].get("text", "") if date_val else ""
elif isinstance(date_val, dict):
date_val = date_val.get("text", "")
parsed = parse_date(str(date_val))
if not parsed:
continue
if mode == "周度":
# 以"年-周数"为键
year = parsed.year
wn = get_week_num(parsed, year)
date_counts[(year, wn)] += 1
else:
date_counts[(parsed.year, parsed.month)] += 1
if not date_counts:
raise Exception("数据中未找到有效日期,无法自动检测时间范围")
# 取数据最多的,若相同则取最近的
best = max(date_counts.keys(), key=lambda k: (date_counts[k], k[0], k[1]))
if mode == "周度":
year, week_num = best
return get_week_range(week_num, year), week_num, year
else:
year, month = best
return get_month_range(year, month), month, year
def filter_by_range(records, date_range, date_field="日期"):
"""按日期范围筛选记录,按人员聚合,保留每条记录的日期"""
date_set = set(d.date() for d in date_range)
# 结构: {name: {'岗位': str, '计划': [(date_str, plan_str), ...]}}
filtered = defaultdict(lambda: {'岗位': '', '计划': []})
for record in records:
fields = record.get("fields", {})
date_val = fields.get(date_field, "")
if isinstance(date_val, list):
date_val = date_val[0].get("text", "") if date_val else ""
elif isinstance(date_val, dict):
date_val = date_val.get("text", "")
parsed_date = parse_date(str(date_val))
if not parsed_date:
continue
if parsed_date.date() not in date_set:
continue
date_label = parsed_date.strftime("%m/%d")
name = record.get("record_id", "未知")
name_field = fields.get("姓名") or fields.get("成员") or fields.get("提交人")
if name_field:
if isinstance(name_field, list):
name = name_field[0].get("text", name) if name_field else name
elif isinstance(name_field, dict):
name = name_field.get("text", name)
else:
name = str(name_field)
# 过滤无效记录:空姓名、智能助手等机器人条目
position_raw = fields.get("岗位", "")
if isinstance(position_raw, list):
position_raw = position_raw[0].get("text", "") if position_raw else ""
elif isinstance(position_raw, dict):
position_raw = position_raw.get("text", "")
if not name or name == "未知" or str(position_raw) == "智能助手":
continue
plan_field = fields.get("今日计划", "")
if isinstance(plan_field, list):
plan = plan_field[0].get("text", "") if plan_field else ""
elif isinstance(plan_field, dict):
plan = plan_field.get("text", "")
else:
plan = str(plan_field)
# 无论是否有计划都记录该人(便于识别未提交)
filtered[name]['计划'].append((date_label, plan.strip()))
position_field = fields.get("岗位") or fields.get("职位")
if position_field and not filtered[name]['岗位']:
if isinstance(position_field, list):
filtered[name]['岗位'] = position_field[0].get("text", "") if position_field else ""
elif isinstance(position_field, dict):
filtered[name]['岗位'] = position_field.get("text", "")
else:
filtered[name]['岗位'] = str(position_field)
return filtered
def format_raw(people, date_range, mode="周度", week_num=None):
"""输出结构化原始数据供 Claude 生成自然语言报告"""
def sort_key(item):
name = item[0]
if name in ROLE_MAP:
return (ROLE_MAP[name][1], name)
return (99, name)
if mode == "周度":
if week_num is None:
week_num = get_week_num(date_range[0], date_range[0].year)
start_str = date_range[0].strftime("%m/%d")
end_str = date_range[-1].strftime("%m/%d")
header = f"第{week_num}周({start_str}-{end_str})"
period_label = "本周"
else:
month = date_range[0].month
year = date_range[0].year
header = f"{year}年{month}月"
period_label = "本月"
lines = [f"=== {header}原始工作数据 ===", ""]
no_plan = []
for name, info in sorted(people.items(), key=sort_key):
role = ROLE_MAP[name][0] if name in ROLE_MAP else info['岗位']
plans = [(d, p) for d, p in info['计划'] if p]
if not plans:
no_plan.append(f"{name}({role})")
continue
lines.append(f"【{name} · {role}】")
for date_label, plan in plans:
plan_clean = plan.replace('\n', ' / ').strip()
lines.append(f" {date_label}: {plan_clean}")
lines.append("")
if no_plan:
lines.append(f"【{period_label}未提交计划】")
for p in no_plan:
lines.append(f" {p}")
else:
lines.append(f"【{period_label}未提交计划】")
lines.append(" 无,全员均有记录")
lines.append("")
lines.append("=== END ===")
return "\n".join(lines)
def main():
mode = "周度"
week_num = None
month = None
year = None
data_file = None
auto_detect = True
# 解析参数
args = sys.argv[1:]
i = 0
positional = []
while i < len(args):
if args[i] == "--data" and i + 1 < len(args):
data_file = args[i + 1]
i += 2
else:
positional.append(args[i])
i += 1
if positional and positional[0] in ["周度", "月度"]:
mode = positional[0]
positional = positional[1:]
if positional:
if mode == "周度":
week_num = int(positional[0])
else:
month = int(positional[0])
auto_detect = False
if len(positional) >= 2:
year = int(positional[1])
now = datetime.now()
if year is None:
year = now.year
try:
# 读取数据
if data_file:
records = parse_data_file(data_file)
print(f"已从文件读取 {len(records)} 条记录")
else:
print("正在读取智能表格数据…")
records = call_smartsheet_get_records()
print(f"共获取 {len(records)} 条记录")
# 计算日期范围
if auto_detect:
date_range, time_param, detected_year = detect_time_range(records, mode)
if mode == "周度":
week_num = time_param
year = detected_year
print(f"自动检测到:{year}年第{week_num}周")
else:
month = time_param
year = detected_year
print(f"自动检测到:{year}年{month}月")
else:
if mode == "周度":
if week_num is None:
week_num = get_week_num(datetime(year, now.month, now.day), year)
date_range = get_week_range(week_num, year)
else:
if month is None:
month = now.month
date_range = get_month_range(year, month)
if not date_range:
print("错误:无效的日期范围")
return
# 筛选数据
people = filter_by_range(records, date_range)
print(f"筛选后 {len(people)} 人有记录\n")
# 输出结构化原始数据
output = format_raw(people, date_range, mode, week_num)
print(output)
except Exception as e:
print(f"错误:{e}")
if not data_file:
print("请确认:\n1. MCP 工具已正确配置\n2. docid 和 sheet_id 正确")
if __name__ == "__main__":
main()
军政采招投标商机管理专用工具。负责项目登记/标书采购/封标/开标/结果录入/中标统计/胜算评估,不处理合同履约、发票、报销或其他非招投标事务。
---
name: bidding-tracker
description: 军政采招投标商机管理专用工具。负责项目登记/标书采购/封标/开标/结果录入/中标统计/胜算评估,不处理合同履约、发票、报销或其他非招投标事务。
metadata: {"openclaw":{"emoji":"📋","requires":{"bins":["bidding-tracker"]},"install":"pip install -e {baseDir}"}}
---
# bidding-tracker 使用指南
> 本文档面向 LLM,说明如何通过 `bidding-tracker` CLI 管理招投标项目全生命周期。
>
> **适用范围**:军队采购(军采)和政府采购(政采)的投标商机跟踪,包括项目登记、标书购买、封标投递、开标结果、中标统计、招标文件胜算评估。
> **不适用**:合同管理、发票报销、项目交付、非招投标的采购咨询。
---
## 首次初始化
系统首次使用时,必须先注册总监账号,否则所有非 init 命令均会报错。
```bash
bidding-tracker init --name "王总监"
```
执行后返回 `{"status": "ok"}` 即可继续使用其他命令。
---
## 核心操作流程
### 1. 注册项目
收到招标公告时,提取关键字段并调用 register。`--json` 传入结构化字段,`--manager-name` 指定负责人姓名。
```bash
bidding-tracker register \
--json '{"project_name":"XX系统采购","budget":500000,"bid_opening_time":"2026-05-10T14:00:00","doc_purchase_deadline":"2026-04-20T17:00:00"}' \
--manager-name "张经理" \
--travel-days 2
```
**`--json` 支持的字段(均为可选,`*` 为强烈建议填写):**
| 字段 | 类型 | 说明 |
|------|------|------|
| `project_name` * | string | 项目名称 |
| `bid_opening_time` * | ISO8601 | 开标时间(用于推算封标时间) |
| `doc_purchase_deadline` * | ISO8601 | 标书购买截止时间 |
| `budget` | number | 预算(元) |
| `procurer` | string | 采购方名称 |
| `bid_agency` | string | 招标代理机构 |
| `manager_contact` | string | 负责人联系方式 |
| `registration_deadline` | ISO8601 | 报名截止时间 |
| `registration_location` | string | 报名地点 |
| `doc_purchase_location` | string | 标书购买地点 |
| `doc_purchase_price` | number | 标书售价(0 表示免费) |
| `doc_required_materials` | string\|array | 报名所需材料 |
| `bid_opening_location` | string | 开标地点 |
成功返回:
```json
{"project_id": 1, "project_no": "2026-001", "project_name": "...", "suggested_seal_time": "2026-05-08T14:00:00", "attachment_dir": "..."}
```
若有招标公告文件,追加 `--file /path/to/file.pdf`,文件将自动移入附件目录。
### 2. 查看项目列表
```bash
bidding-tracker status # 查看所有活跃项目
bidding-tracker status --keyword "关键词" # 按项目名或编号搜索
bidding-tracker status --upcoming-days 7 # 查看7天内有关键节点的项目
```
### 3. 状态推进
根据用户反馈选择对应命令,关键字可以是项目名称片段或完整项目编号(如 `2026-001`):
| 用户场景 | 命令 |
|----------|------|
| 已购买标书 | `bidding-tracker purchased "项目名"` |
| 已封标/已寄出标书 | `bidding-tracker seal "项目名"` |
| 项目废标、放弃投标 | `bidding-tracker cancel "项目名"` |
### 4. 录入开标结果
开标后,根据结果录入。项目须处于 `opened` 状态(由 `seal` 推进而来)。
```bash
# 中标
bidding-tracker result "项目名" --won --our-price 980000 --winning-price 980000
# 未中标(尽量提供完整信息)
bidding-tracker result "项目名" --lost --our-price 1050000 --winning-price 980000 --winner "某某公司" --notes "报价偏高"
```
---
## 状态机约束
以下为合法流转路径,LLM 不可跳步操作:
```
registered → doc_pending → doc_purchased → preparing → sealed → opened → won
→ lost
任意非终态 → cancelled(终态,不可逆)
```
**实用映射:**
- `purchased` 命令 → 状态推进至 `doc_purchased`
- `seal` 命令 → 状态推进至 `sealed`(同时记录实际封标时间)
- `result` 命令 → 仅在 `opened` 状态可执行,推进至 `won` 或 `lost`
> `opened` 状态需通过 `update_project` 直接写库或由提醒 Cron 触发,`result` 命令本身不推进 `sealed → opened`。
---
## 团队管理
```bash
bidding-tracker users # 查看所有成员
bidding-tracker users --role manager # 仅看负责人
bidding-tracker adduser --new-user-id wx_uid --name "李经理" --contact "138xxxx"
```
> `adduser` 仅总监可执行。
---
## 统计分析
```bash
bidding-tracker stats # 全局汇总(胜率、平均预算等)
bidding-tracker stats --by-month # 按月趋势
bidding-tracker stats --by-manager # 按负责人分组
bidding-tracker stats --by-month --period 2026-Q1 # 限定季度范围
bidding-tracker stats --by-month --period 2026-03 # 限定月份范围
```
> `--by-manager` 与 `--by-month` 不可同时使用。
---
## 胜算评估
### evaluate - 解析招标文件
读取 PDF/Word/TXT 招标文件,提取文本并组装分析 prompt,供 LLM 进行深度博弈评估。
```bash
bidding-tracker evaluate --file /path/to/tender.pdf
bidding-tracker evaluate --file /path/to/tender.docx
```
成功返回:
```json
{
"status": "ok",
"message": "招标文件《xxx.pdf》已解析(12345 字),请按分析框架进行深度博弈评估",
"data": {
"analysis_prompt": "...",
"profiles": "...",
"document_text": "...",
"file_name": "xxx.pdf"
}
}
```
LLM 收到响应后,应将 `analysis_prompt` 作为分析框架指令,`profiles` 作为投标主体战略资产库上下文,`document_text` 作为待分析的招标文件原文,三者结合输出深度博弈报告。
> **自定义主体档案:** `profiles` 字段内容优先读取 `~/.config/bidding-tracker/profiles.md`(如存在),否则使用包内默认。用户可编辑该文件更新公司资质、人员与业绩信息,无需修改代码。
### bind-eval - 绑定胜算评估结果
将评估胜率(0-1)和报告摘要绑定到指定项目,记录评估时间。
```bash
bidding-tracker bind-eval "项目名" --probability 0.75
bidding-tracker bind-eval "项目名" --probability 0.75 --report "技术优势明显,但缺乏CMMI认证"
bidding-tracker bind-eval "2026-001" --probability 0.4 --report "疑似定向标,建议陪跑"
```
`--probability` 取值范围 `0.0~1.0`,`--report` 为可选的预测报告摘要(建议不超过200字)。
---
## 输出格式约定
所有命令成功时,stdout 输出 JSON,exit code 为 0:
```json
{"status": "ok", "message": "...", "data": {...}}
```
失败时,stderr 输出 JSON,exit code 为 1:
```json
{"error": "具体错误原因", "code": 1}
```
遇到错误时,将 `error` 字段内容直接反馈给用户即可。
---
## 环境配置
| 变量 | 说明 | 默认值 |
|------|------|--------|
| `DB_PATH` | SQLite 数据库路径 | `{CWD}/data/bids.db` |
| `ATTACHMENTS_DIR` | 附件存储根目录 | `{DB_PATH同级}/attachments` |
加载优先级:进程环境变量 > `{CWD}/.env` > `~/.config/bidding-tracker/.env` > 默认值。
FILE:README.md
# 军事招投标全周期追踪系统
集成企业微信(WeCom)的招投标全生命周期管理工具,基于 OpenClaw LLM 框架运行。
## 快速上手
### 环境要求
- Python 3.10+
- SQLite3
- 企业微信(WeCom)账号(用于接收通知)
### 安装
```bash
# 1. 克隆项目
git clone <repo-url>
cd bidding-tracker
# 2. 初始化数据库
python3 scripts/init_db.py
# 输出:数据库初始化完成:data/bids.db
# 3. 配置(可选)
cp .env.example .env
# 编辑 .env,设置 DB_PATH 等参数
```
### 初始化管理员
```bash
# 首位用户自动注册为系统总监
python3 scripts/manage_users.py \
--bootstrap --user-id <wecom_userid> --name "张三"
```
### 注册新项目
```bash
python3 scripts/register_project.py \
--json '{"project_name":"网络安全设备采购","budget":500000,"bid_agency":"军采中心","bid_opening_time":"2026-04-10T14:00:00"}' \
--manager-name "李经理" \
--travel-days 2
```
### 查询项目
```bash
# 总监查看全部活跃项目
python3 scripts/query_projects.py --user-id <wecom_userid> --active-only
# 经理查看本人项目
python3 scripts/query_projects.py --user-id <wecom_userid>
# 按关键词搜索
python3 scripts/query_projects.py --user-id <wecom_userid> --keyword "2026"
```
### 记录投标结果
```bash
python3 scripts/record_result.py \
--project-id 1 \
--our-price 480000 \
--winning-price 490000 \
--winner "我方" \
--won true
```
### 统计分析
```bash
# 全局统计
python3 scripts/stats.py
# 按负责人分组
python3 scripts/stats.py --by-manager
# 按月度趋势
python3 scripts/stats.py --by-month
# 指定季度
python3 scripts/stats.py --by-month --period 2026-Q1
```
### Cron 提醒设置
系统支持定时提醒(在开标前1天、截标前2天、文件购买截止前3天自动推送)。
```bash
# 查看当前定时任务
python3 scripts/reminder_check.py
```
## 项目状态说明
```
registered → doc_pending → doc_purchased → preparing → sealed → opened → won/lost/cancelled
```
## 目录结构
```
scripts/ Python CLI 工具脚本
tools/ OpenClaw Tool 函数(鉴权网关)
tests/ pytest 测试套件
data/ SQLite 数据库(gitignored)
docs/ 技术文档与接口规格
```
## 运行测试
```bash
DB_PATH=/tmp/test_bids.db python3 -m pytest tests/ -v
```
## 相关文档
- [技术设计文档](docs/technical-design.md)
- [接口规格文档](docs/api-interfaces.md)
- [文件结构说明](docs/file-structure.md)
FILE:bidding_tracker/__init__.py
# bidding_tracker - 招投标商机全周期追踪工具
FILE:bidding_tracker/config.py
"""
配置加载模块:环境变量与 .env 文件管理。
加载优先级(从高到低):
1. 进程启动时已有的环境变量(最高,永不被覆盖)
2. 当前工作目录下的 .env (项目级配置)
3. ~/.config/bidding-tracker/.env (用户级配置)
4. 内置默认值 (最低)
用法:
from bidding_tracker.config import get_db_path, get_attachments_dir
DB_PATH = get_db_path()
ATTACHMENTS_DIR = get_attachments_dir()
"""
import os
from pathlib import Path
_env_loaded: bool = False
def _read_dotenv(path: Path) -> dict[str, str]:
"""解析 .env 文件,返回键值字典(不修改 os.environ)。"""
result: dict[str, str] = {}
try:
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, _, value = line.partition("=")
key = key.strip()
value = value.strip()
# 去除可选的首尾引号
if len(value) >= 2 and value[0] == value[-1] and value[0] in ('"', "'"):
value = value[1:-1]
if key:
result[key] = os.path.expanduser(value)
except OSError:
pass
return result
def load_env() -> None:
"""加载 .env 配置文件到 os.environ(幂等,多次调用只生效一次)。
规则:
- 进程启动前已存在的环境变量不会被任何 .env 覆盖
- CWD/.env 优先于 ~/.config/bidding-tracker/.env
"""
global _env_loaded
if _env_loaded:
return
_env_loaded = True
# 快照进程原有的键(这些键不会被覆盖)
original_keys: frozenset[str] = frozenset(os.environ.keys())
user_env = Path.home() / ".config" / "bidding-tracker" / ".env"
cwd_env = Path.cwd() / ".env"
# 读取两个文件,CWD 优先(后者覆盖前者相同的键)
merged = _read_dotenv(user_env)
merged.update(_read_dotenv(cwd_env))
# 仅注入进程原本没有的键
for key, value in merged.items():
if key not in original_keys:
os.environ[key] = value
def get_db_path() -> str:
"""返回数据库文件绝对路径(自动触发 .env 加载)。
优先级:进程环境变量 > CWD/.env > ~/.config/bidding-tracker/.env > 默认值
默认值:{CWD}/data/bids.db
"""
load_env()
return os.environ.get("DB_PATH", str(Path.cwd() / "data" / "bids.db"))
def get_attachments_dir() -> str:
"""返回附件存储目录路径(自动触发 .env 加载)。
优先级:进程环境变量 > CWD/.env > ~/.config/bidding-tracker/.env > 默认值
默认值:与 DB_PATH 同级的 attachments/ 子目录
"""
load_env()
default = str(Path(get_db_path()).parent / "attachments")
return os.environ.get("ATTACHMENTS_DIR", default)
def get_evaluate_prompt() -> str:
"""返回评估分析 prompt 内容(自动触发 .env 加载)。
优先级:~/.config/bidding-tracker/evaluate_prompt.md > 包内默认
"""
load_env()
user_prompt = Path.home() / ".config" / "bidding-tracker" / "evaluate_prompt.md"
if user_prompt.exists():
return user_prompt.read_text(encoding="utf-8")
default = Path(__file__).parent / "prompts" / "evaluate_prompt.md"
return default.read_text(encoding="utf-8")
def get_profiles() -> str:
"""返回投标主体战略资产库内容(自动触发 .env 加载)。
优先级:~/.config/bidding-tracker/profiles.md > 包内默认
用户可编辑 ~/.config/bidding-tracker/profiles.md 更新公司资质/业绩,无需修改代码。
"""
load_env()
user_profiles = Path.home() / ".config" / "bidding-tracker" / "profiles.md"
if user_profiles.exists():
return user_profiles.read_text(encoding="utf-8")
default = Path(__file__).parent / "prompts" / "profiles.md"
return default.read_text(encoding="utf-8")
FILE:bidding_tracker/prompts/__init__.py
FILE:bidding_tracker/prompts/evaluate_prompt.md
# Role
You are a Lead Strategic Bid Architect & Delivery Evaluator. Your objective is to dissect procurement documents and conduct a Two-Phase Game Theory Calculus.
# Core Directive: Entity Decoupling
Note: Always reference the provided profiles.md to select the most lethal entity. Do not assume capabilities; dynamically read moats, credentials, and track records strictly from the file.
# System Logic: Two-Phase "Interactive Sniper" Calculus
🛑 PHASE 1: 准入核验与交付灵魂拷问
Whenever the user provides a new bidding document, you MUST execute PHASE 1 ONLY and wait for the user's reply.
Step 1.1: Absolute Gatekeepers: Check for SME Exclusivity and Military Clearances. Scan profiles.md to see which entities survive.
Step 1.2: Extract Core Delivery Hurdles: Identify the 3 most difficult physical/technical deliverables (e.g., heavy hardware, specific API, rare certs like DCMM).
Step 1.3: Capability Mapping: Cross-reference hurdles strictly against profiles.md. Identify exactly what our entities LACK.
Step 1.4: THE QUERY (STOP & ASK): Present findings and explicitly ask the user for solution paths for missing capabilities.
[PHASE 1 Output Format - Mandatory Chinese]
⚠️ [项目名称] 投标准入核验与交付难点确认
1. 绝对门槛核验: > [说明资质限制,对照 profiles.md 指出存活/淘汰主体]
2. 核心交付难点与我方能力盲区:
难点一: [描述] -> 盲区: [基于 profiles.md 指出缺乏的能力]
难点二: [描述] -> 盲区: [基于 profiles.md 指出缺乏的能力]
难点三: [描述] -> 盲区: [基于 profiles.md 指出缺乏的能力]
3. 关键决策提问: [此处提出具体问题。例如:“我们在 profiles 中缺乏 LBS 底层数据,请问本次是否通过外部采购解决?”]
(Stop generation here. Wait for user input.)
🎯 PHASE 2: 全面战略精算与方案生成
Execute ONLY AFTER the user has replied to Phase 1.
Step 2.1: Entity Finalization: Lock in the most lethal entity or JV.
Step 2.2: Objective Score Attrition: Calculate exact point losses based on profiles.md.
Step 2.3: Subjective Proposal Blueprint: Provide a 3-point structural blueprint for "subjective points" (主观分).
Step 2.4: Hidden Costs & Price Delta: Audit on-site costs and calculate price drop % to recover lost points.
[PHASE 2 Output Format - Mandatory Chinese]
🎯 [项目名称] 投标狙击与战略精算最终报告
一、 最终出战主体与资源整合方案
出战决断: [引证 profiles.md 说明战略优势]
二、 客观分精算:硬损耗与“萝卜坑”
分值损耗: -X 分 [比对 profiles.md 说明具体扣分项]
总客观亏空: -X 分
三、 基于主体基因的满分标书骨架 (Subjective Blueprint)
隐性痛点洞察: [采购方真实痛点]
降维打击策略:
[策略1:主体护城河 + 用户确认方案]
[策略2]
[策略3]
四、 隐性成本预警与报价对冲模型 (Price Delta)
实施成本预警: [驻场、硬件垫资等]
价格对冲精算: [建议下浮 %,含中小企业政策计算]
五、 最终决断与行动指令 (Final Verdict)
综合中标胜率评估: [XX]%
下一步执行指令: [针对商务/技术团队的具体建议]
# Execution Command
Start with PHASE 1 immediately upon receiving a document.
Reference profiles.md for every logic gate.
Output exclusively in Chinese.
FILE:bidding_tracker/prompts/profiles.md
# 投标主体战略资产库 (Strategic Entity Profiles)
## 【主体 A】 湖南省先进技术研究院 (Hunan Advanced)
\> **属性**:省直属事业单位 | 核心护城河:高等级军工资资与体制内信任。
* **合规性资质 (Gatekeeper)**:
* **保密资格**:国家秘密载体印制/科研生产**【二级】**。
* **装备承制**:已入名录,具备**【核心/二类】**资格。
* **质量体系**:GJB 9001C 军工质量管理体系。
* **技术与人才 (Tech & Personnel)**:
* **核心领域**:数智化仿真、战损评估、任务规划算法、大模型 Agent。
* **梯队**:硕博比 > 85%。持证含:高级信息系统项目管理师 (3名)、系统分析师 (2名)、PMP、高级工。
* **历史业绩 (Track Record)**:
* **旗舰项目**:2024 年某部 **1300 万级**数智化决策平台开发(具备千万级交付证明)。
* **支撑项目**:多项百万级以上军队仿真、实兵对抗、无人集群协同合同。
---
## 【主体 B】 北京方略有数科技有限公司 (Beijing Formulas)
\> **属性**:民营/小微企业 | 核心护城河:前沿算法溢价与 SME 价格红利。
* **基本信息与政策 (SME Bonus)**:
* **成立/注资**:2020 年成立,注册资本 **3000 万**。
* **划型**:**小型/微型企业 (SME)**。在政采中享受 **10%-20% 价格扣除优惠**。
* **合规性资质 (Credentials)**:
* **体系认证**:**ISO 9001 质量管理体系认证** (2026年3月获证)。
* **资信**:2021-2024 连续四年财务审计报告;具备无欠税证明及社保缴纳记录。
* **技术方案溢价 (Tech Edge)**:
* **核心架构**:本体 (Ontology) + AI Agent + MCP (Model Context Protocol)。
* **应用方向**:非结构化数据治理、多模态情报洞察、数据资产价值运营。
* **团队画像 (Personnel)**:
* **配置**:精锐研发团队(社保约 5 人),核心成员为**本科/硕士**学历。
* **领域**:数据科学、NLP、系统架构、敏捷开发。
* **实战业绩 (Proven Track Record)**:
* **头部外协**:蚂蚁金服(绿色搜推模型)、快手(生成式推荐技术)。
* **防务/科研**:航天宏图(技术服务)、观想科技(情景分析工具)、大连理工(非结构化数据管理)、浙江理工(体系建模系统)、河南科技学院(智慧育种平台)。
---
## 【路由决策模型】
1. **强准入标**:要求“承制资格”或“涉密” -> 必须选 **主体 A**。
2. **政策红利标**:北京市/部委政采且鼓励中小企业 -> 优先选 **主体 B**(靠价格折扣对冲商务弱项)。
3. **技术外协标**:高校/研究所/军工集团外协 -> 选 **主体 B**(利用大厂业绩背书技术深度)。
FILE:bidding_tracker/scripts/init_db.py
#!/usr/bin/env python3
"""
初始化 SQLite 数据库,创建所有表结构。
幂等运行:多次执行安全,不会重复建表或丢失数据。
用法:python3 scripts/init_db.py
"""
import json
import sqlite3
import os
import sys
from bidding_tracker.config import get_db_path, get_attachments_dir
DB_PATH = get_db_path()
ATTACHMENTS_DIR = get_attachments_dir()
def get_conn() -> sqlite3.Connection:
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
DDL = """
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_no TEXT NOT NULL UNIQUE,
project_name TEXT NOT NULL,
budget REAL,
procurer TEXT,
bid_agency TEXT,
project_manager TEXT,
manager_contact TEXT,
registration_deadline TEXT,
registration_location TEXT,
doc_purchase_location TEXT,
doc_purchase_price REAL,
doc_purchase_deadline TEXT,
doc_required_materials TEXT,
doc_purchased_at TEXT,
doc_attachment_path TEXT,
bid_opening_time TEXT,
bid_opening_location TEXT,
travel_days INTEGER DEFAULT 0,
suggested_seal_time TEXT,
actual_seal_time TEXT,
announcement_path TEXT,
status TEXT NOT NULL DEFAULT 'registered',
created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
updated_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
win_probability REAL,
win_prediction TEXT,
win_eval_at TEXT
);
CREATE TABLE IF NOT EXISTS bid_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id INTEGER NOT NULL REFERENCES projects(id),
our_bid_price REAL,
winning_price REAL,
winner TEXT,
is_winner INTEGER NOT NULL DEFAULT 0,
notes TEXT,
recorded_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
CREATE TABLE IF NOT EXISTS reminders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id INTEGER NOT NULL REFERENCES projects(id),
reminder_type TEXT NOT NULL,
sent_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
recipient_role TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
wecom_userid TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
role TEXT NOT NULL,
contact TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
"""
def main():
try:
os.makedirs(ATTACHMENTS_DIR, exist_ok=True)
db_path = os.path.abspath(DB_PATH)
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn = get_conn()
conn.executescript(DDL)
conn.commit()
# 迁移:为已有数据库安全添加评估字段
for col_name, col_type in [("win_probability", "REAL"), ("win_prediction", "TEXT"), ("win_eval_at", "TEXT")]:
try:
conn.execute(f"ALTER TABLE projects ADD COLUMN {col_name} {col_type}")
except sqlite3.OperationalError:
pass # 列已存在
conn.commit()
conn.close()
print(f"数据库初始化完成:{db_path}")
except Exception as e:
print(json.dumps({"error": f"数据库初始化失败: {e}", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()
FILE:bidding_tracker/scripts/manage_users.py
#!/usr/bin/env python3
"""
用户管理:Bootstrap 注册总监、添加负责人、列出用户。
用法:
python3 scripts/manage_users.py --bootstrap --user-id <id> --name <name>
python3 scripts/manage_users.py --add --caller-id <director_id> --user-id <id> --name <name> [--contact <c>]
python3 scripts/manage_users.py --list [--role <role>]
"""
import argparse
import json
import os
import sqlite3
import sys
from bidding_tracker.config import get_db_path
DB_PATH = get_db_path()
def get_conn():
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
return conn
def cmd_bootstrap(user_id: str, name: str):
"""Bootstrap 模式:注册首位总监。幂等,同 userid 返回 ok;不同 userid 且已有总监则报错。"""
conn = get_conn()
try:
cur = conn.execute("SELECT * FROM users WHERE role = 'director'")
existing = cur.fetchone()
if existing:
if existing['wecom_userid'] == user_id:
print(json.dumps({"status": "ok", "message": f"总监已存在:{existing['name']}"}))
sys.exit(0)
else:
print(json.dumps({"error": "系统已初始化,总监已存在且非当前用户", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
conn.execute(
"INSERT INTO users (wecom_userid, name, role) VALUES (?, ?, 'director')",
(user_id, name))
conn.commit()
print(json.dumps({"status": "ok", "message": f"总监注册成功:{name}"}))
sys.exit(0)
finally:
conn.close()
def cmd_add(caller_id: str, user_id: str, name: str, contact: str | None):
"""添加负责人:验证 caller 是总监,再插入。"""
conn = get_conn()
try:
cur = conn.execute("SELECT role FROM users WHERE wecom_userid = ?", (caller_id,))
caller_row = cur.fetchone()
if not caller_row or caller_row['role'] != 'director':
print(json.dumps({"error": "权限不足:仅总监可添加用户", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
cur2 = conn.execute("SELECT id FROM users WHERE wecom_userid = ?", (user_id,))
if cur2.fetchone():
print(json.dumps({"error": f"用户 {user_id} 已存在", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
conn.execute(
"INSERT INTO users (wecom_userid, name, role, contact) VALUES (?, ?, 'manager', ?)",
(user_id, name, contact))
conn.commit()
print(json.dumps({"status": "ok", "message": f"用户添加成功:{name} (manager)"}))
sys.exit(0)
finally:
conn.close()
def cmd_list(role: str | None):
"""列出用户,可选按 role 过滤。"""
conn = get_conn()
try:
if role:
cur = conn.execute(
"SELECT id, wecom_userid, name, role, contact, created_at "
"FROM users WHERE role = ? ORDER BY id",
(role,))
else:
cur = conn.execute(
"SELECT id, wecom_userid, name, role, contact, created_at "
"FROM users ORDER BY id")
rows = cur.fetchall()
result = [dict(r) for r in rows]
print(json.dumps(result, ensure_ascii=False))
sys.exit(0)
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description='用户管理')
parser.add_argument('--bootstrap', action='store_true')
parser.add_argument('--add', action='store_true')
parser.add_argument('--list', action='store_true')
parser.add_argument('--user-id', help='企业微信 userid')
parser.add_argument('--name', help='用户显示名称')
parser.add_argument('--caller-id', help='调用者的 wecom_userid(--add 模式必填)')
parser.add_argument('--contact', help='联系方式')
parser.add_argument('--role', choices=['director', 'manager'], help='按角色过滤(--list 模式)')
args = parser.parse_args()
mode = sum([args.bootstrap, args.add, args.list])
if mode != 1:
print(json.dumps({"error": "必须指定 --bootstrap、--add 或 --list 其一", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
if args.bootstrap:
if not args.user_id or not args.name:
print(json.dumps({"error": "--bootstrap 模式需要 --user-id 和 --name", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
cmd_bootstrap(args.user_id, args.name)
elif args.add:
if not args.caller_id or not args.user_id or not args.name:
print(json.dumps({"error": "--add 模式需要 --caller-id、--user-id 和 --name", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
cmd_add(args.caller_id, args.user_id, args.name, args.contact)
elif args.list:
cmd_list(args.role)
if __name__ == '__main__':
main()
FILE:bidding_tracker/scripts/query_projects.py
#!/usr/bin/env python3
"""
查询项目列表或项目详情,按角色过滤,输出 JSON。
用法:
python3 scripts/query_projects.py --user-id WangDirector --active-only
python3 scripts/query_projects.py --user-id ZhangManager --active-only
python3 scripts/query_projects.py --user-id WangDirector --keyword "2026-001"
python3 scripts/query_projects.py --user-id ZhangManager --keyword "网安"
python3 scripts/query_projects.py --id 1
python3 scripts/query_projects.py --user-id WangDirector --status preparing
python3 scripts/query_projects.py --user-id WangDirector --upcoming-days 7
"""
import argparse
import json
import os
import sqlite3
import sys
from datetime import datetime, timedelta
from bidding_tracker.config import get_db_path
DB_PATH = get_db_path()
ACTIVE_STATUSES = ('registered', 'doc_pending', 'doc_purchased', 'preparing', 'sealed')
def get_conn():
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
return conn
def query_all(user_id: str | None,
keyword: str | None,
status: str | None,
active_only: bool,
upcoming_days: int | None) -> list:
"""
按用户身份和条件查询项目列表。
无 --user-id 时默认只返回活跃项目(向后兼容)。
"""
conn = get_conn()
try:
role, name = None, None
# 从 users 表解析角色和姓名
if user_id:
row = conn.execute(
"SELECT role, name FROM users WHERE wecom_userid = ?",
(user_id,)).fetchone()
if not row:
print(json.dumps({"error": "用户不存在", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
role, name = row['role'], row['name']
sql = "SELECT * FROM projects WHERE 1=1"
params = []
# --keyword:先精确匹配 project_no,再模糊匹配 project_name
if keyword:
exact = conn.execute(
"SELECT * FROM projects WHERE project_no = ?", (keyword,)).fetchone()
if exact:
cols = list(exact.keys())
return [dict(zip(cols, exact))]
sql += " AND project_name LIKE ?"
params.append(f"%{keyword}%")
# 角色过滤:manager 只看本人项目
if role == 'manager' and name:
sql += " AND project_manager = ?"
params.append(name)
# 状态过滤
if status:
sql += " AND status = ?"
params.append(status)
elif active_only or not status:
# 默认只返回活跃项目(active_only=True 或无 status 参数时)
placeholders = ','.join(['?'] * len(ACTIVE_STATUSES))
sql += f" AND status IN ({placeholders})"
params.extend(ACTIVE_STATUSES)
# 近期关键节点过滤
if upcoming_days is not None:
now_str = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
cutoff = (datetime.now() + timedelta(days=upcoming_days)).strftime("%Y-%m-%dT%H:%M:%S")
sql += (" AND (bid_opening_time BETWEEN ? AND ?"
" OR doc_purchase_deadline BETWEEN ? AND ?"
" OR suggested_seal_time BETWEEN ? AND ?)")
params.extend([now_str, cutoff, now_str, cutoff, now_str, cutoff])
sql += " ORDER BY bid_opening_time ASC"
cur = conn.execute(sql, params)
cols = [d[0] for d in cur.description]
rows = cur.fetchall()
return [dict(zip(cols, row)) for row in rows]
finally:
conn.close()
def query_by_id(project_id: int) -> dict | None:
"""查询单个项目详情。"""
conn = get_conn()
try:
cur = conn.execute("SELECT * FROM projects WHERE id=?", (project_id,))
cols = [d[0] for d in cur.description]
row = cur.fetchone()
return dict(zip(cols, row)) if row else None
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description='查询招投标项目')
parser.add_argument('--user-id', help='当前用户 wecom_userid(从 users 表查角色)')
parser.add_argument('--keyword', help='搜索关键词:先精确匹配 project_no,再模糊匹配 project_name')
parser.add_argument('--id', type=int, help='查询指定项目详情')
parser.add_argument('--status', help='按状态过滤')
parser.add_argument('--active-only', action='store_true', help='仅返回活跃项目')
parser.add_argument('--upcoming-days', type=int, help='返回 N 天内有关键节点的项目')
args = parser.parse_args()
if args.id:
result = query_by_id(args.id)
else:
result = query_all(args.user_id, args.keyword, args.status,
args.active_only, args.upcoming_days)
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0)
if __name__ == '__main__':
main()
FILE:bidding_tracker/scripts/record_result.py
#!/usr/bin/env python3
"""
录入开标结果,同步更新项目状态为 won 或 lost。
用法:
python3 scripts/record_result.py \
--project-id 1 \
--our-price 980000 \
--winning-price 950000 \
--winner "某某公司" \
--won false \
--notes "排名第二,差距3万"
"""
import argparse
import json
import os
import sqlite3
import sys
from bidding_tracker.config import get_db_path
DB_PATH = get_db_path()
def get_conn():
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def record(project_id: int, our_price: float | None, winning_price: float | None,
winner: str | None, is_won: bool, notes: str | None):
"""写入开标结果并更新项目状态。"""
conn = get_conn()
try:
row = conn.execute(
"SELECT status FROM projects WHERE id=?", (project_id,)).fetchone()
if row is None:
print(json.dumps({"error": f"项目不存在:{project_id}", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
if row[0] != 'opened':
print(json.dumps(
{"error": f"项目当前状态 '{row[0]}' 不允许录入结果,仅opened状态可录入", "code": 1},
ensure_ascii=False), file=sys.stderr)
sys.exit(1)
conn.execute(
"INSERT INTO bid_results"
" (project_id, our_bid_price, winning_price, winner, is_winner, notes)"
" VALUES (?, ?, ?, ?, ?, ?)",
(project_id, our_price, winning_price, winner, int(is_won), notes))
new_status = 'won' if is_won else 'lost'
conn.execute(
"UPDATE projects SET status=?, updated_at=datetime('now','localtime')"
" WHERE id=?",
(new_status, project_id))
conn.commit()
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description='录入开标结果')
parser.add_argument('--project-id', type=int, required=True, help='项目 ID')
parser.add_argument('--our-price', type=float, help='我方报价(元)')
parser.add_argument('--winning-price', type=float, help='中标价格(元)')
parser.add_argument('--winner', help='中标单位名称')
parser.add_argument('--won', required=True, choices=['true', 'false'], help='是否中标')
parser.add_argument('--notes', help='备注(评分、排名等现场情况)')
args = parser.parse_args()
is_won = args.won == 'true'
record(args.project_id, args.our_price, args.winning_price, args.winner, is_won, args.notes)
result_text = "中标" if is_won else "未中标"
print(json.dumps({"status": "ok", "message": f"项目 {args.project_id} 开标结果已录入:{result_text}", "new_status": result_text}, ensure_ascii=False))
sys.exit(0)
if __name__ == '__main__':
main()
FILE:bidding_tracker/scripts/register_project.py
#!/usr/bin/env python3
"""
注册新招投标项目。
接收从招标公告(PDF/PNG)中提取的结构化信息,写入数据库,
自动生成项目编号,推算建议封标时间,并将附件移动到正确路径。
用法:
python3 scripts/register_project.py --json '{"project_name": "...", ...}' --manager-name "张经理"
python3 scripts/register_project.py --json '...' --manager-name "张经理" --travel-days 3
python3 scripts/register_project.py --json '...' --manager-name "张经理" --announcement-file /tmp/xxx.pdf
"""
import argparse
import json
import os
import shutil
import sqlite3
import sys
from datetime import datetime, timedelta
from bidding_tracker.config import get_db_path, get_attachments_dir
DB_PATH = get_db_path()
ATTACHMENTS_DIR = get_attachments_dir()
def get_conn():
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
# 支持的字段(与数据库列名对应)
ALLOWED_FIELDS = [
'project_name', 'budget', 'procurer', 'bid_agency',
'manager_contact',
'registration_deadline', 'registration_location',
'doc_purchase_location', 'doc_purchase_price', 'doc_purchase_deadline',
'doc_required_materials',
'bid_opening_time', 'bid_opening_location',
]
def calc_suggested_seal_time(bid_opening_time: str, travel_days: int) -> str | None:
"""根据开标时间和运输天数推算建议封标时间,自动避开周末(退到周五)。"""
try:
opening = datetime.fromisoformat(bid_opening_time)
except (ValueError, TypeError):
return None
suggested = opening - timedelta(days=travel_days)
weekday = suggested.weekday() # 0=Mon, 5=Sat, 6=Sun
if weekday == 5: # 周六 → 周五
suggested -= timedelta(days=1)
elif weekday == 6: # 周日 → 周五
suggested -= timedelta(days=2)
return suggested.strftime("%Y-%m-%dT%H:%M:%S")
def generate_project_no(conn) -> str:
"""按 YYYY-NNN 格式自动生成 project_no。"""
year = datetime.now().year
cur = conn.execute(
"SELECT MAX(CAST(SUBSTR(project_no, 6) AS INTEGER)) FROM projects WHERE project_no LIKE ?",
(f"{year}-%",))
max_seq = cur.fetchone()[0]
seq = (max_seq or 0) + 1
return f"{year}-{seq:03d}"
def move_announcement(src: str, project_id: int) -> str:
"""将公告文件移动到 data/attachments/{project_id}/ 目录。"""
dest_dir = os.path.abspath(os.path.join(ATTACHMENTS_DIR, str(project_id)))
os.makedirs(dest_dir, exist_ok=True)
dest = os.path.join(dest_dir, os.path.basename(src))
shutil.move(src, dest)
return dest
def register(data: dict, manager_name: str, travel_days: int, announcement_file: str | None) -> dict:
"""
将项目数据写入数据库,返回包含 project_id, project_no 等信息的 dict。
"""
# 过滤只保留合法字段
fields = {k: v for k, v in data.items() if k in ALLOWED_FIELDS}
# 列表型字段序列化为 JSON 字符串
if isinstance(fields.get('doc_required_materials'), list):
fields['doc_required_materials'] = json.dumps(
fields['doc_required_materials'], ensure_ascii=False)
# 设置必填字段
fields['project_manager'] = manager_name
fields['travel_days'] = travel_days
# 推算建议封标时间
suggested = calc_suggested_seal_time(fields.get('bid_opening_time'), travel_days)
if suggested:
fields['suggested_seal_time'] = suggested
conn = get_conn()
attachment_dir = None
try:
# 生成 project_no(需要连接才能查询)
project_no = generate_project_no(conn)
fields['project_no'] = project_no
columns = list(fields.keys())
placeholders = ', '.join(['?'] * len(columns))
col_clause = ', '.join(columns)
values = [fields[c] for c in columns]
cur = conn.execute(
f"INSERT INTO projects ({col_clause}) VALUES ({placeholders})",
values)
conn.commit()
project_id = cur.lastrowid
# 移动公告文件
if announcement_file and os.path.exists(announcement_file):
dest = move_announcement(announcement_file, project_id)
conn.execute(
"UPDATE projects SET announcement_path=? WHERE id=?",
(dest, project_id))
conn.commit()
attachment_dir = os.path.dirname(dest)
else:
attachment_dir = os.path.join(os.path.abspath(ATTACHMENTS_DIR), str(project_id))
return {
"project_id": project_id,
"project_no": project_no,
"project_name": fields.get('project_name', ''),
"suggested_seal_time": suggested or '',
"attachment_dir": attachment_dir,
}
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description='注册新招投标项目')
parser.add_argument('--json', required=True, help='项目信息 JSON 字符串')
parser.add_argument('--manager-name', required=True, help='指派的项目负责人姓名')
parser.add_argument('--travel-days', type=int, default=2, help='运输/路程天数(用于推算封标时间)')
parser.add_argument('--announcement-file', help='招标公告文件路径(PDF/PNG),由 Tool 层从 __context__ 拦截后传入')
args = parser.parse_args()
try:
data = json.loads(args.json)
except json.JSONDecodeError as e:
print(json.dumps({"error": f"JSON 解析失败:{e}", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
result = register(data, args.manager_name, args.travel_days, args.announcement_file)
print(json.dumps(result, ensure_ascii=False))
sys.exit(0)
if __name__ == '__main__':
main()
FILE:bidding_tracker/scripts/reminder_check.py
#!/usr/bin/env python3
"""
扫描所有活跃项目,判断是否需要发送提醒,输出 JSON 数组。
由 Cron 每日调用两次(8:47 / 17:53 工作日)。
提醒规则:
- 购买截止 ≤ 3 天 且 未购买 → doc_purchase → manager
- 封标建议时间 ≤ 2 天 且 未封标 → seal_warning → manager
- 开标时间 ≤ 1 天 → bid_opening → manager + director
输出格式(JSON 数组):
[
{
"project_id": 1,
"project_name": "xxx项目",
"reminder_type": "doc_purchase",
"recipient_role": "manager",
"project_manager": "张三",
"message": "..."
},
...
]
若无待提醒项,输出空数组 [],Agent 应静默退出。
"""
import json
import os
import sqlite3
import sys
from datetime import datetime, timedelta
from bidding_tracker.config import get_db_path
DB_PATH = get_db_path()
ACTIVE_STATUSES = ('registered', 'doc_pending', 'doc_purchased', 'preparing', 'sealed')
def get_conn():
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def is_already_sent(conn, project_id: int, reminder_type: str) -> bool:
"""检查今日是否已发送同类型提醒。"""
cur = conn.execute(
"SELECT COUNT(*) FROM reminders "
"WHERE project_id = ? AND reminder_type = ? "
"AND DATE(sent_at) = DATE('now', 'localtime')",
(project_id, reminder_type))
return cur.fetchone()[0] > 0
def record_sent(conn, project_id: int, reminder_type: str, recipient_role: str):
"""记录已发送的提醒。"""
conn.execute(
"INSERT INTO reminders (project_id, reminder_type, recipient_role) VALUES (?, ?, ?)",
(project_id, reminder_type, recipient_role))
def check_reminders() -> list:
"""扫描活跃项目,返回需要发送的提醒列表,并写入 reminders 表。"""
conn = get_conn()
try:
placeholders = ','.join(['?'] * len(ACTIVE_STATUSES))
cur = conn.execute(
f"SELECT id, project_name, project_manager, status,"
f" doc_purchase_deadline, suggested_seal_time, bid_opening_time"
f" FROM projects WHERE status IN ({placeholders})",
ACTIVE_STATUSES)
cols = [d[0] for d in cur.description]
projects = [dict(zip(cols, row)) for row in cur.fetchall()]
except Exception as e:
conn.close()
print(json.dumps({"error": f"数据库连接失败: {e}", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
def parse_dt(s):
try:
return datetime.fromisoformat(s) if s else None
except ValueError:
return None
now = datetime.now()
reminders = []
for p in projects:
pid = p['id']
name = p['project_name']
manager = p['project_manager']
status = p['status']
# 规则 1:标书购买截止 ≤ 3 天且未购买
if status in ('registered', 'doc_pending'):
dl = parse_dt(p['doc_purchase_deadline'])
if dl and timedelta(0) <= (dl - now) <= timedelta(days=3):
if not is_already_sent(conn, pid, 'doc_purchase'):
reminders.append({
"project_id": pid,
"project_name": name,
"reminder_type": "doc_purchase",
"recipient_role": "manager",
"project_manager": manager,
"message": (
f"【标书购买提醒】{name} 购买截止 {dl.strftime('%Y-%m-%d')},"
f"请尽快办理"
),
})
record_sent(conn, pid, 'doc_purchase', 'manager')
# 规则 2:建议封标时间 ≤ 2 天且未封标
if status in ('registered', 'doc_pending', 'doc_purchased', 'preparing'):
seal = parse_dt(p['suggested_seal_time'])
if seal and timedelta(0) <= (seal - now) <= timedelta(days=2):
if not is_already_sent(conn, pid, 'seal_warning'):
reminders.append({
"project_id": pid,
"project_name": name,
"reminder_type": "seal_warning",
"recipient_role": "manager",
"project_manager": manager,
"message": (
f"【封标提醒】{name} 建议封标时间 {seal.strftime('%Y-%m-%d')},"
f"请确认制标进度"
),
})
record_sent(conn, pid, 'seal_warning', 'manager')
# 规则 3:开标时间 ≤ 1 天 → 同时通知 manager 和 director
opening = parse_dt(p['bid_opening_time'])
if opening and timedelta(0) <= (opening - now) <= timedelta(days=1):
if not is_already_sent(conn, pid, 'bid_opening'):
for role in ("manager", "director"):
reminders.append({
"project_id": pid,
"project_name": name,
"reminder_type": "bid_opening",
"recipient_role": role,
"project_manager": manager,
"message": (
f"【开标提醒】{name} 开标时间 "
f"{opening.strftime('%Y-%m-%d %H:%M')},请做好准备"
),
})
# 去重表只写1条(manager角色),is_already_sent 按 project_id+type+DATE 检查,与角色无关
record_sent(conn, pid, 'bid_opening', 'manager')
conn.commit()
conn.close()
return reminders
def main():
reminders = check_reminders()
print(json.dumps(reminders or [], ensure_ascii=False, indent=2))
sys.exit(0)
if __name__ == '__main__':
main()
FILE:bidding_tracker/scripts/stats.py
#!/usr/bin/env python3
"""
统计招投标项目表现,输出 JSON。
用法:
python3 scripts/stats.py # 全局统计
python3 scripts/stats.py --by-manager # 按负责人分组统计
python3 scripts/stats.py --by-month # 按月度趋势统计
python3 scripts/stats.py --period 2026-Q1 # 指定季度
python3 scripts/stats.py --period 2026-03 # 指定月份
python3 scripts/stats.py --manager 张经理 # 指定负责人
"""
import argparse
import json
import os
import re
import sqlite3
import sys
from bidding_tracker.config import get_db_path
DB_PATH = get_db_path()
def get_conn():
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def parse_period(period: str | None) -> tuple[str, str] | None:
"""将 period 转为 (start, end) 日期范围字符串。"""
if not period:
return None
if re.match(r'^\d{4}-Q[1-4]$', period):
year = int(period[:4])
quarter = int(period[-1])
start_month = (quarter - 1) * 3 + 1
start = f"{year}-{start_month:02d}-01"
end_month = start_month + 2
if end_month in (1, 3, 5, 7, 8, 10, 12):
end = f"{year}-{end_month:02d}-31"
elif end_month in (4, 6, 9, 11):
end = f"{year}-{end_month:02d}-30"
else:
end = f"{year}-{end_month:02d}-28"
return start, end
elif re.match(r'^\d{4}-\d{2}$', period):
start = f"{period}-01"
year, month = int(period[:4]), int(period[5:7])
if month == 12:
end = f"{year + 1}-01-01"
else:
end = f"{year}-{month + 1:02d}-01"
return start, end
else:
print(json.dumps({"error": "无效的 period 格式,支持 YYYY-MM 或 YYYY-QN", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
def stats_global(period: str | None) -> dict:
"""全局统计:总数、中标数、胜率、平均报价差。"""
conn = get_conn()
try:
date_filter, params = "", []
date_range = parse_period(period)
if date_range:
date_filter = " AND p.bid_opening_time >= ? AND p.bid_opening_time < ?"
params = list(date_range)
sql = f"""
SELECT
COUNT(DISTINCT p.id) AS total,
COUNT(DISTINCT CASE WHEN r.is_winner = 1 THEN p.id END) AS won,
COUNT(DISTINCT CASE WHEN r.is_winner = 0 THEN p.id END) AS lost,
COUNT(DISTINCT CASE WHEN r.id IS NOT NULL AND p.status NOT IN ('won', 'lost', 'cancelled') THEN p.id END) AS active,
AVG(p.budget) AS avg_budget,
AVG(r.our_bid_price - r.winning_price) AS avg_price_diff
FROM projects p
LEFT JOIN bid_results r ON r.project_id = p.id
WHERE r.id IS NOT NULL
{date_filter}
"""
row = conn.execute(sql, params).fetchone()
total = row[0] or 0
won = row[1] or 0
lost = row[2] or 0
active = row[3] or 0
win_rate = round(won / (won + lost), 3) if (won + lost) > 0 else 0.0
return {
"total": total,
"won": won,
"lost": lost,
"active": active,
"win_rate": win_rate,
"avg_budget": row[4],
"avg_price_diff": row[5],
}
finally:
conn.close()
def stats_by_manager(period: str | None) -> list:
"""按项目负责人分组统计。"""
conn = get_conn()
try:
date_filter, params = "", []
date_range = parse_period(period)
if date_range:
date_filter = " AND p.bid_opening_time >= ? AND p.bid_opening_time < ?"
params = list(date_range)
sql = f"""
SELECT
p.project_manager AS manager,
COUNT(DISTINCT p.id) AS total,
COUNT(DISTINCT CASE WHEN r.is_winner = 1 THEN p.id END) AS won,
COUNT(DISTINCT CASE WHEN r.is_winner = 0 THEN p.id END) AS lost,
COUNT(DISTINCT CASE WHEN p.status NOT IN ('won', 'lost', 'cancelled') THEN p.id END) AS active
FROM projects p
LEFT JOIN bid_results r ON r.project_id = p.id
WHERE r.id IS NOT NULL
{date_filter}
GROUP BY p.project_manager
ORDER BY won DESC
"""
rows = conn.execute(sql, params).fetchall()
result = []
for row in rows:
won = row[2] or 0
lost = row[3] or 0
result.append({
"manager": row[0] or "(未指派)",
"total": row[1] or 0,
"won": won,
"lost": lost,
"active": row[4] or 0,
"win_rate": round(won / (won + lost), 3) if (won + lost) > 0 else 0.0,
})
return result
finally:
conn.close()
def stats_by_month(period: str | None) -> list:
"""按月度统计趋势(可选 period 过滤)。"""
conn = get_conn()
try:
date_filter, params = "", []
date_range = parse_period(period)
if date_range:
date_filter = " AND p.bid_opening_time >= ? AND p.bid_opening_time < ?"
params = list(date_range)
sql = f"""
SELECT
STRFTIME('%Y-%m', p.bid_opening_time) AS month,
COUNT(DISTINCT p.id) AS total,
COUNT(DISTINCT CASE WHEN r.is_winner = 1 THEN p.id END) AS won,
COUNT(DISTINCT CASE WHEN r.is_winner = 0 THEN p.id END) AS lost
FROM projects p
LEFT JOIN bid_results r ON r.project_id = p.id
WHERE p.bid_opening_time IS NOT NULL
{date_filter}
GROUP BY month
ORDER BY month ASC
"""
rows = conn.execute(sql, params).fetchall()
result = []
for row in rows:
if row[0] is None:
continue
won = row[2] or 0
lost = row[3] or 0
result.append({
"month": row[0],
"total": row[1] or 0,
"won": won,
"lost": lost,
"win_rate": round(won / (won + lost), 3) if (won + lost) > 0 else 0.0,
})
return result
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description='招投标统计分析')
parser.add_argument('--by-manager', action='store_true', help='按负责人分组统计')
parser.add_argument('--by-month', action='store_true', help='按月度趋势统计')
parser.add_argument('--period', help='指定时间范围,格式 YYYY-MM 或 YYYY-QN')
args = parser.parse_args()
if args.by_month:
result = stats_by_month(args.period)
elif args.by_manager:
result = stats_by_manager(args.period)
else:
result = stats_global(args.period)
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0)
if __name__ == '__main__':
main()
FILE:bidding_tracker/scripts/update_project.py
#!/usr/bin/env python3
"""
更新项目字段,含状态机合法性验证。
用法:
python3 scripts/update_project.py --id 1 --field status --value doc_purchased
python3 scripts/update_project.py --id 1 --field actual_seal_time --value "2026-04-08T15:00:00"
状态流转(合法路径):
registered → doc_pending / doc_purchased / cancelled
doc_pending → doc_purchased / cancelled
doc_purchased → preparing / sealed / cancelled
preparing → sealed / cancelled
sealed → opened / cancelled
opened → won / lost
任意终态(won/lost/cancelled)不可流转
"""
import argparse
import json
import os
import sqlite3
import sys
from bidding_tracker.config import get_db_path
DB_PATH = get_db_path()
def get_conn():
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
# 合法的状态流转路径(按 api-interfaces.md §5 修复)
VALID_TRANSITIONS = {
'registered': {'doc_pending', 'doc_purchased', 'cancelled'},
'doc_pending': {'doc_purchased', 'cancelled'},
'doc_purchased': {'preparing', 'sealed', 'cancelled'},
'preparing': {'sealed', 'cancelled'},
'sealed': {'opened', 'cancelled'},
'opened': {'won', 'lost'},
'won': set(),
'lost': set(),
'cancelled': set(),
}
# 允许通过此脚本更新的字段白名单(防止字段名 SQL 注入)
UPDATABLE_FIELDS = {
'status', 'doc_purchased_at', 'doc_attachment_path',
'actual_seal_time', 'project_manager', 'manager_contact',
'bid_opening_time', 'bid_opening_location', 'suggested_seal_time',
'announcement_path',
'win_probability', 'win_prediction', 'win_eval_at',
}
def validate_status_transition(current: str, new: str) -> bool:
"""验证状态流转是否合法。"""
return new in VALID_TRANSITIONS.get(current, set())
def update(project_id: int, field: str, value: str):
"""更新指定项目的指定字段。"""
if field not in UPDATABLE_FIELDS:
print(json.dumps({"error": f"不支持更新字段:{field}", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
conn = get_conn()
try:
row = conn.execute(
"SELECT status FROM projects WHERE id=?", (project_id,)).fetchone()
if row is None:
print(json.dumps({"error": f"项目不存在:{project_id}", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
if field == 'status':
current_status = row[0]
if not validate_status_transition(current_status, value):
print(json.dumps({"error": f"非法状态流转:{current_status} → {value}", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
# 自动记录封标时间
if value == 'sealed':
conn.execute(
"UPDATE projects SET actual_seal_time=datetime('now','localtime') WHERE id=?",
(project_id,))
conn.execute(
f"UPDATE projects SET {field}=?, updated_at=datetime('now','localtime')"
f" WHERE id=?",
(value, project_id))
conn.commit()
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description='更新项目字段')
parser.add_argument('--id', type=int, required=True, help='项目 ID')
parser.add_argument('--field', required=True, help='要更新的字段名')
parser.add_argument('--value', required=True, help='新值')
args = parser.parse_args()
update(args.id, args.field, args.value)
print(json.dumps({"status": "ok", "message": f"项目 {args.id} 的 {args.field} 已更新为:{args.value}"}, ensure_ascii=False))
sys.exit(0)
if __name__ == '__main__':
main()
FILE:bidding_tracker/skill.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
招投标商机全周期追踪工具 - Skill 入口
提供 CLI 和函数调用两种方式,统一权限控制:
- CLI 模式:通过命令行参数调用
- 函数模式:通过 bid_project_manager() 调用(OpenClaw Skill)
用法:
python -m bidding_tracker.skill --help
python -m bidding_tracker.skill init --name "王总监"
python -m bidding_tracker.skill register --json '{...}' --manager-name "张经理"
"""
import argparse
import json
import os
import sqlite3
import subprocess
import sys
from bidding_tracker.config import get_db_path, load_env, get_evaluate_prompt, get_profiles
SCRIPTS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'scripts')
HELP_TEXT = """招投标商机全周期追踪工具
用法:
bidding-tracker init --name "姓名" 初始化系统(首次使用,总监注册)
bidding-tracker register 注册新项目(交互式)
bidding-tracker register --json '...' 注册新项目(JSON 参数)
bidding-tracker status 查看项目列表
bidding-tracker status --keyword "关键词" 按关键词搜索项目
bidding-tracker purchased "项目名" 确认已购买标书
bidding-tracker seal "项目名" 确认已封标
bidding-tracker result "项目名" --won 录入中标结果
bidding-tracker result "项目名" --lost 录入未中标结果
bidding-tracker cancel "项目名" 取消项目
bidding-tracker users 查看团队成员
bidding-tracker adduser --new-user-id xxx --name "姓名" 添加负责人
bidding-tracker stats 查看统计(默认按月)
bidding-tracker stats --by-manager 按负责人统计
bidding-tracker stats --by-month --period 2026-Q1 按季度统计
bidding-tracker evaluate --file /path/to/tender.pdf 解析招标文件(胜算评估)
bidding-tracker bind-eval "项目名" --probability 0.75 绑定胜算到项目
bidding-tracker bind-eval "项目名" --probability 0.75 --report "技术优势明显"
示例:
bidding-tracker init --name "王总监"
bidding-tracker status
bidding-tracker result "XX系统采购" --won --our-price 980000 --winning-price 950000
bidding-tracker stats --by-month
"""
# 权限常量
DIRECTOR_ONLY = {'register', 'adduser', 'users', 'stats'}
def get_conn():
"""获取数据库连接"""
conn = sqlite3.connect(os.path.abspath(get_db_path()), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
return conn
def get_user(conn, user_id: str) -> dict | None:
"""查询用户身份"""
row = conn.execute(
"SELECT role, name FROM users WHERE wecom_userid = ?",
(user_id,)).fetchone()
return dict(row) if row else None
def check_permission(user_id: str, action: str) -> tuple[bool, str, str]:
"""
检查用户权限,返回 (允许, 错误信息, 用户角色)
"""
conn = get_conn()
try:
if action == 'init':
# init 操作:检查是否已有总监
director = conn.execute(
"SELECT name FROM users WHERE role = 'director'"
).fetchone()
if director:
return False, f"系统已初始化,总监:{director['name']}", ""
# init 无需用户存在,可直接注册
return True, "", "director"
# 非 init 操作:检查系统是否已初始化
director = conn.execute(
"SELECT wecom_userid, name FROM users WHERE role = 'director'"
).fetchone()
if not director:
return False, "系统尚未初始化,请先执行 init", ""
# 查询当前用户
user = get_user(conn, user_id)
if not user:
return False, "您尚未被添加为系统用户", ""
role, name = user['role'], user['name']
# 命令级权限校验
if action in DIRECTOR_ONLY and role != 'director':
return False, "仅总监可执行此操作", role
return True, "", role
finally:
conn.close()
def check_project_access(user_id: str, keyword: str, role: str) -> tuple[int | None, str]:
"""
检查项目访问权限,返回 (project_id, 错误信息)
"""
if not keyword:
return None, "缺少项目标识"
conn = get_conn()
try:
if role == 'director':
row = conn.execute(
"SELECT id FROM projects WHERE project_no = ? OR project_name LIKE ?",
(keyword, f"%{keyword}%")).fetchone()
else:
row = conn.execute(
"SELECT p.id FROM projects p "
"JOIN users u ON u.name = p.project_manager "
"WHERE u.wecom_userid = ? AND (p.project_no = ? OR p.project_name LIKE ?)",
(user_id, keyword, f"%{keyword}%")).fetchone()
if not row:
msg = "项目不存在" if role == 'director' else "该项目不在您的负责范围内"
return None, msg
return row['id'], ""
finally:
conn.close()
def run_script(script_name: str, args: list[str], db_path: str = None) -> subprocess.CompletedProcess:
"""运行 scripts/ 下的脚本"""
script_path = os.path.join(SCRIPTS_DIR, script_name)
cmd = [sys.executable, script_path] + args
env = os.environ.copy()
env['DB_PATH'] = db_path or get_db_path()
# 确保子进程能够导入 bidding_tracker 包
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
existing_pp = env.get('PYTHONPATH', '')
env['PYTHONPATH'] = f"{project_root}:{existing_pp}" if existing_pp else project_root
result = subprocess.run(cmd, capture_output=True, text=True, env=env)
return result
def extract_document_text(file_path: str) -> tuple[str, str]:
"""提取文档文本,返回 (文本内容, 错误信息)。支持 PDF、Word、纯文本。"""
path = os.path.abspath(file_path)
if not os.path.exists(path):
return "", f"文件不存在:{path}"
ext = os.path.splitext(path)[1].lower()
if ext == '.pdf':
try:
from pypdf import PdfReader
reader = PdfReader(path)
pages = [page.extract_text() or "" for page in reader.pages]
return "\n".join(pages), ""
except ImportError:
return "", "缺少依赖:请执行 pip install pypdf"
except Exception as e:
return "", f"PDF 解析失败:{e}"
elif ext == '.docx':
try:
import docx
doc = docx.Document(path)
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
return "\n".join(paragraphs), ""
except ImportError:
return "", "缺少依赖:请执行 pip install python-docx"
except Exception as e:
return "", f"Word 文档解析失败:{e}"
else:
try:
with open(path, encoding='utf-8') as f:
return f.read(), ""
except Exception as e:
return "", f"文件读取失败:{e}"
# ========== 命令处理函数 ==========
def cmd_init(args, user_id: str):
"""初始化系统(注册总监)"""
# 检查是否已有总监
conn = get_conn()
try:
director = conn.execute(
"SELECT name FROM users WHERE role = 'director'"
).fetchone()
if director:
return {"status": "error", "message": f"系统已初始化,总监:{director['name']}"}
finally:
conn.close()
result = run_script('manage_users.py', ['--bootstrap', '--user-id', user_id, '--name', args.name])
if result.returncode == 0:
return {"status": "ok", "message": f"总监注册成功:{args.name}"}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "注册失败")}
def cmd_register(args, user_id: str):
"""注册新项目"""
if not args.json and not args.interactive:
return {"status": "error", "message": "需要提供 --json 或使用交互模式"}
if not args.manager_name:
return {"status": "error", "message": "需要提供 --manager-name 参数"}
travel_days = args.travel_days or 2
cmd_args = ['--json', args.json or '{}', '--manager-name', args.manager_name, '--travel-days', str(travel_days)]
if args.file:
cmd_args.extend(['--announcement-file', args.file])
result = run_script('register_project.py', cmd_args)
if result.returncode == 0:
data = json.loads(result.stdout)
return {"status": "ok", "message": f"项目注册成功: {data.get('project_no')}", "data": data}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "注册失败")}
def cmd_status(args, user_id: str, role: str):
"""查看项目列表"""
cmd_args = []
if args.keyword:
cmd_args.extend(['--keyword', args.keyword])
if args.active_only:
cmd_args.append('--active-only')
if args.upcoming_days:
cmd_args.extend(['--upcoming-days', str(args.upcoming_days)])
# 非总监需要传递 user-id 进行过滤
if role == 'manager':
cmd_args.extend(['--user-id', user_id])
result = run_script('query_projects.py', cmd_args)
if result.returncode == 0:
projects = json.loads(result.stdout)
return {"status": "ok", "data": projects}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "查询失败")}
def _update_project_status(user_id: str, keyword: str, status_value: str, role: str) -> dict:
"""更新项目状态"""
project_id, err = check_project_access(user_id, keyword, role)
if err:
return {"status": "error", "message": err}
result = run_script('update_project.py', ['--id', str(project_id), '--field', 'status', '--value', status_value])
if result.returncode == 0:
return {"status": "ok", "message": f"项目状态已更新为 {status_value}"}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "更新失败")}
def cmd_purchased(args, user_id: str, role: str):
"""确认标书已购买"""
if not args.keyword:
return {"status": "error", "message": "需要提供项目名称"}
return _update_project_status(user_id, args.keyword, 'doc_purchased', role)
def cmd_seal(args, user_id: str, role: str):
"""确认已封标"""
if not args.keyword:
return {"status": "error", "message": "需要提供项目名称"}
return _update_project_status(user_id, args.keyword, 'sealed', role)
def cmd_result(args, user_id: str, role: str):
"""录入开标结果"""
if not args.keyword:
return {"status": "error", "message": "需要提供项目名称"}
if not args.won and not args.lost:
return {"status": "error", "message": "需要指定 --won 或 --lost"}
project_id, err = check_project_access(user_id, args.keyword, role)
if err:
return {"status": "error", "message": err}
cmd_args = ['--project-id', str(project_id), '--won', 'true' if args.won else 'false']
if args.our_price:
cmd_args.extend(['--our-price', str(args.our_price)])
if args.winning_price:
cmd_args.extend(['--winning-price', str(args.winning_price)])
if args.winner:
cmd_args.extend(['--winner', args.winner])
if args.notes:
cmd_args.extend(['--notes', args.notes])
result = run_script('record_result.py', cmd_args)
if result.returncode == 0:
return {"status": "ok", "message": "开标结果已录入"}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "录入失败")}
def cmd_cancel(args, user_id: str, role: str):
"""取消项目"""
if not args.keyword:
return {"status": "error", "message": "需要提供项目名称"}
return _update_project_status(user_id, args.keyword, 'cancelled', role)
def cmd_users(args, user_id: str):
"""查看用户列表"""
cmd_args = ['--list']
if args.role:
cmd_args.extend(['--role', args.role])
result = run_script('manage_users.py', cmd_args)
if result.returncode == 0:
users = json.loads(result.stdout)
return {"status": "ok", "data": users}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "查询失败")}
def cmd_adduser(args, user_id: str):
"""添加用户"""
if not args.new_user_id or not args.name:
return {"status": "error", "message": "需要提供 --new-user-id 和 --name 参数"}
cmd_args = ['--add', '--caller-id', user_id, '--user-id', args.new_user_id, '--name', args.name]
if args.contact:
cmd_args.extend(['--contact', args.contact])
result = run_script('manage_users.py', cmd_args)
if result.returncode == 0:
return {"status": "ok", "message": f"用户添加成功:{args.name}"}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "添加失败")}
def cmd_stats(args, user_id: str):
"""查看统计"""
if args.by_manager and args.by_month:
return {"status": "error", "message": "--by-manager 和 --by-month 不能同时使用"}
cmd_args = []
if args.by_manager:
cmd_args.append('--by-manager')
elif args.by_month:
cmd_args.append('--by-month')
else:
cmd_args.append('--by-month')
if args.period:
cmd_args.extend(['--period', args.period])
result = run_script('stats.py', cmd_args)
if result.returncode == 0:
stats = json.loads(result.stdout)
return {"status": "ok", "data": stats}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "统计失败")}
def cmd_evaluate(args, user_id: str):
"""解析招标文件并组装分析 prompt(供 OpenClaw LLM 消费)"""
if not args.file:
return {"status": "error", "message": "需要提供 --file 参数指定招标文件路径"}
text, err = extract_document_text(args.file)
if err:
return {"status": "error", "message": err}
if not text.strip():
return {"status": "error", "message": "文档内容为空,无法解析"}
prompt = get_evaluate_prompt()
file_name = os.path.basename(args.file)
return {
"status": "ok",
"message": f"招标文件《{file_name}》已解析({len(text)} 字),请按分析框架进行深度博弈评估",
"data": {
"analysis_prompt": prompt,
"profiles": get_profiles(),
"document_text": text,
"file_name": file_name,
}
}
def cmd_bind_eval(args, user_id: str, role: str):
"""将评估结果绑定到项目"""
if not args.keyword:
return {"status": "error", "message": "需要提供项目名称或编号"}
if args.probability is None:
return {"status": "error", "message": "需要提供 --probability 参数"}
if not (0.0 <= args.probability <= 1.0):
return {"status": "error", "message": "--probability 取值范围 0.0~1.0"}
project_id, err = check_project_access(user_id, args.keyword, role)
if err:
return {"status": "error", "message": err}
from datetime import datetime
eval_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
result = run_script('update_project.py', ['--id', str(project_id), '--field', 'win_probability', '--value', str(args.probability)])
if result.returncode != 0:
return {"status": "error", "message": json.loads(result.stderr).get("error", "更新胜率失败")}
if args.report:
result2 = run_script('update_project.py', ['--id', str(project_id), '--field', 'win_prediction', '--value', args.report])
if result2.returncode != 0:
return {"status": "error", "message": json.loads(result2.stderr).get("error", "更新预测报告失败")}
run_script('update_project.py', ['--id', str(project_id), '--field', 'win_eval_at', '--value', eval_at])
prob_pct = f"{args.probability * 100:.0f}%"
report_preview = f",报告:{args.report[:50]}..." if args.report and len(args.report) > 50 else (f",报告:{args.report}" if args.report else "")
return {"status": "ok", "message": f"胜算评估已绑定:{prob_pct}{report_preview}"}
# ========== CLI 入口 ==========
def main():
"""CLI 入口函数"""
parser = argparse.ArgumentParser(
description='招投标商机全周期追踪工具',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=HELP_TEXT
)
parser.add_argument('--user-id', help='用户 ID(CLI 模式下使用环境变量)')
subparsers = parser.add_subparsers(dest='command', help='子命令')
# init
p_init = subparsers.add_parser('init', help='初始化系统(首次使用,总监注册)')
p_init.add_argument('--name', required=True, help='总监显示名称')
# register
p_reg = subparsers.add_parser('register', help='注册新项目')
p_reg.add_argument('--json', help='项目信息 JSON 字符串')
p_reg.add_argument('--manager-name', help='项目负责人姓名')
p_reg.add_argument('--travel-days', type=int, default=2, help='运输天数')
p_reg.add_argument('--file', help='招标公告文件路径')
p_reg.add_argument('--interactive', action='store_true', help='交互式输入')
# status
p_status = subparsers.add_parser('status', help='查看项目列表')
p_status.add_argument('--keyword', help='搜索关键词')
p_status.add_argument('--active-only', action='store_true', help='仅显示活跃项目')
p_status.add_argument('--upcoming-days', type=int, help='显示 N 天内有关键节点的项目')
# purchased
p_purchased = subparsers.add_parser('purchased', help='确认已购买标书')
p_purchased.add_argument('keyword', nargs='?', help='项目名称或编号')
# seal
p_seal = subparsers.add_parser('seal', help='确认已封标')
p_seal.add_argument('keyword', nargs='?', help='项目名称或编号')
# result
p_result = subparsers.add_parser('result', help='录入开标结果')
p_result.add_argument('keyword', nargs='?', help='项目名称或编号')
p_result.add_argument('--won', action='store_true', help='中标')
p_result.add_argument('--lost', action='store_true', help='未中标')
p_result.add_argument('--our-price', type=int, help='我方报价')
p_result.add_argument('--winning-price', type=int, help='中标价')
p_result.add_argument('--winner', help='中标单位')
p_result.add_argument('--notes', help='备注')
# cancel
p_cancel = subparsers.add_parser('cancel', help='取消项目')
p_cancel.add_argument('keyword', nargs='?', help='项目名称或编号')
# users
p_users = subparsers.add_parser('users', help='查看团队成员')
p_users.add_argument('--role', choices=['director', 'manager'], help='按角色过滤')
# adduser
p_adduser = subparsers.add_parser('adduser', help='添加负责人')
p_adduser.add_argument('--new-user-id', required=True, help='新用户 ID')
p_adduser.add_argument('--name', required=True, help='用户显示名称')
p_adduser.add_argument('--contact', help='联系方式')
# stats
p_stats = subparsers.add_parser('stats', help='查看统计')
p_stats.add_argument('--by-manager', action='store_true', help='按负责人统计')
p_stats.add_argument('--by-month', action='store_true', help='按月份统计')
p_stats.add_argument('--period', help='统计周期')
# evaluate
p_eval = subparsers.add_parser('evaluate', help='解析招标文件,生成胜算评估所需内容')
p_eval.add_argument('--file', required=True, help='招标文件路径 (PDF/Word/TXT)')
# bind-eval
p_bind = subparsers.add_parser('bind-eval', help='将评估胜率绑定到项目')
p_bind.add_argument('keyword', nargs='?', help='项目名称或编号')
p_bind.add_argument('--probability', type=float, required=True, help='胜率 0.0-1.0')
p_bind.add_argument('--report', help='预测报告摘要')
args = parser.parse_args()
# 确保数据目录存在
load_env()
os.makedirs(os.path.dirname(os.path.abspath(get_db_path())), exist_ok=True)
if not args.command:
parser.print_help()
sys.exit(0)
# 获取用户 ID
user_id = args.user_id or os.environ.get('USER', 'cli_user')
# 检查权限
ok, err_msg, role = check_permission(user_id, args.command)
if not ok:
print(json.dumps({"status": "error", "message": err_msg}, ensure_ascii=False))
sys.exit(1)
# 执行命令
result = None
if args.command == 'init':
result = cmd_init(args, user_id)
elif args.command == 'register':
result = cmd_register(args, user_id)
elif args.command == 'status':
result = cmd_status(args, user_id, role)
elif args.command == 'purchased':
result = cmd_purchased(args, user_id, role)
elif args.command == 'seal':
result = cmd_seal(args, user_id, role)
elif args.command == 'result':
result = cmd_result(args, user_id, role)
elif args.command == 'cancel':
result = cmd_cancel(args, user_id, role)
elif args.command == 'users':
result = cmd_users(args, user_id)
elif args.command == 'adduser':
result = cmd_adduser(args, user_id)
elif args.command == 'stats':
result = cmd_stats(args, user_id)
elif args.command == 'evaluate':
result = cmd_evaluate(args, user_id)
elif args.command == 'bind-eval':
result = cmd_bind_eval(args, user_id, role)
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0 if result.get('status') == 'ok' else 1)
# ========== OpenClaw Skill 函数入口 ==========
def bid_project_manager(action_type: str, project_data: dict = None, **kwargs) -> dict:
"""
OpenClaw Skill Tool 函数入口
Args:
action_type: 操作类型
项目生命周期: init / register / status / purchased / seal / result / cancel
团队管理: users / adduser
统计分析: stats
胜算评估: evaluate / bind-eval
project_data: 业务参数 dict
**kwargs: OpenClaw 引擎注入,含 __context__
Returns:
{"status": "ok"|"error", "message": "...", "data": {...}}
"""
project_data = project_data or {}
# 从上下文获取用户身份
try:
user_id = kwargs['__context__']['body']['from']['userid']
except (KeyError, TypeError):
return {"status": "error", "message": "无法识别您的企业微信身份"}
# 附件路径处理
if action_type == 'register':
try:
attachments = kwargs['__context__']['body'].get('attachments', [])
if attachments:
project_data['_attachment_path'] = attachments[0].get('local_path')
except (KeyError, TypeError, IndexError):
pass
# 检查权限
ok, err_msg, role = check_permission(user_id, action_type)
if not ok:
return {"status": "error", "message": err_msg}
# 构建 args 对象
class Args:
pass
args = Args()
if action_type == 'init':
args.name = project_data.get('name', user_id)
elif action_type == 'register':
args.json = json.dumps(project_data.get('fields', {}), ensure_ascii=False)
args.manager_name = project_data.get('manager_name', '')
args.travel_days = project_data.get('travel_days', 2)
args.file = project_data.get('_attachment_path')
args.interactive = False
elif action_type in ('status', 'purchased', 'seal', 'result', 'cancel'):
args.keyword = project_data.get('keyword')
if action_type == 'status':
args.active_only = project_data.get('active_only', False)
args.upcoming_days = project_data.get('upcoming_days')
if action_type == 'result':
args.won = project_data.get('is_won')
args.lost = not project_data.get('is_won') if 'is_won' in project_data else False
args.our_price = project_data.get('our_price')
args.winning_price = project_data.get('winning_price')
args.winner = project_data.get('winner')
args.notes = project_data.get('notes')
elif action_type == 'users':
args.role = project_data.get('role')
elif action_type == 'adduser':
args.new_user_id = project_data.get('user_id', '')
args.name = project_data.get('name', '')
args.contact = project_data.get('contact')
elif action_type == 'stats':
args.by_manager = project_data.get('by_manager')
args.by_month = project_data.get('by_month')
args.period = project_data.get('period')
elif action_type == 'evaluate':
try:
attachments = kwargs['__context__']['body'].get('attachments', [])
if attachments:
project_data['file'] = attachments[0].get('local_path', '')
except (KeyError, TypeError, IndexError):
pass
args.file = project_data.get('file', '')
elif action_type == 'bind-eval':
args.keyword = project_data.get('keyword', '')
args.probability = project_data.get('probability')
args.report = project_data.get('report', '')
# 执行命令
if action_type == 'init':
return cmd_init(args, user_id)
elif action_type == 'register':
return cmd_register(args, user_id)
elif action_type == 'status':
return cmd_status(args, user_id, role)
elif action_type == 'purchased':
return cmd_purchased(args, user_id, role)
elif action_type == 'seal':
return cmd_seal(args, user_id, role)
elif action_type == 'result':
return cmd_result(args, user_id, role)
elif action_type == 'cancel':
return cmd_cancel(args, user_id, role)
elif action_type == 'users':
return cmd_users(args, user_id)
elif action_type == 'adduser':
return cmd_adduser(args, user_id)
elif action_type == 'stats':
return cmd_stats(args, user_id)
elif action_type == 'evaluate':
return cmd_evaluate(args, user_id)
elif action_type == 'bind-eval':
return cmd_bind_eval(args, user_id, role)
else:
return {"status": "error", "message": f"未知操作类型:{action_type}"}
if __name__ == '__main__':
main()
FILE:build/lib/milb_tracker/__init__.py
# milb_tracker - 军工招投标商机全周期追踪工具
FILE:build/lib/milb_tracker/config.py
"""
配置加载模块:环境变量与 .env 文件管理。
加载优先级(从高到低):
1. 进程启动时已有的环境变量(最高,永不被覆盖)
2. 当前工作目录下的 .env (项目级配置)
3. ~/.config/milb-tracker/.env (用户级配置)
4. 内置默认值 (最低)
用法:
from milb_tracker.config import get_db_path, get_attachments_dir
DB_PATH = get_db_path()
ATTACHMENTS_DIR = get_attachments_dir()
"""
import os
from pathlib import Path
_env_loaded: bool = False
def _read_dotenv(path: Path) -> dict[str, str]:
"""解析 .env 文件,返回键值字典(不修改 os.environ)。"""
result: dict[str, str] = {}
try:
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, _, value = line.partition("=")
key = key.strip()
value = value.strip()
# 去除可选的首尾引号
if len(value) >= 2 and value[0] == value[-1] and value[0] in ('"', "'"):
value = value[1:-1]
if key:
result[key] = value
except OSError:
pass
return result
def load_env() -> None:
"""加载 .env 配置文件到 os.environ(幂等,多次调用只生效一次)。
规则:
- 进程启动前已存在的环境变量不会被任何 .env 覆盖
- CWD/.env 优先于 ~/.config/milb-tracker/.env
"""
global _env_loaded
if _env_loaded:
return
_env_loaded = True
# 快照进程原有的键(这些键不会被覆盖)
original_keys: frozenset[str] = frozenset(os.environ.keys())
user_env = Path.home() / ".config" / "milb-tracker" / ".env"
cwd_env = Path.cwd() / ".env"
# 读取两个文件,CWD 优先(后者覆盖前者相同的键)
merged = _read_dotenv(user_env)
merged.update(_read_dotenv(cwd_env))
# 仅注入进程原本没有的键
for key, value in merged.items():
if key not in original_keys:
os.environ[key] = value
def get_db_path() -> str:
"""返回数据库文件绝对路径(自动触发 .env 加载)。
优先级:进程环境变量 > CWD/.env > ~/.config/milb-tracker/.env > 默认值
默认值:{CWD}/data/bids.db
"""
load_env()
return os.environ.get("DB_PATH", str(Path.cwd() / "data" / "bids.db"))
def get_attachments_dir() -> str:
"""返回附件存储目录路径(自动触发 .env 加载)。
优先级:进程环境变量 > CWD/.env > ~/.config/milb-tracker/.env > 默认值
默认值:与 DB_PATH 同级的 attachments/ 子目录
"""
load_env()
default = str(Path(get_db_path()).parent / "attachments")
return os.environ.get("ATTACHMENTS_DIR", default)
FILE:build/lib/milb_tracker/scripts/init_db.py
#!/usr/bin/env python3
"""
初始化 SQLite 数据库,创建所有表结构。
幂等运行:多次执行安全,不会重复建表或丢失数据。
用法:python3 scripts/init_db.py
"""
import json
import sqlite3
import os
import sys
from milb_tracker.config import get_db_path, get_attachments_dir
DB_PATH = get_db_path()
ATTACHMENTS_DIR = get_attachments_dir()
def get_conn() -> sqlite3.Connection:
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
DDL = """
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_no TEXT NOT NULL UNIQUE,
project_name TEXT NOT NULL,
budget REAL,
procurer TEXT,
bid_agency TEXT,
project_manager TEXT,
manager_contact TEXT,
registration_deadline TEXT,
registration_location TEXT,
doc_purchase_location TEXT,
doc_purchase_price REAL,
doc_purchase_deadline TEXT,
doc_required_materials TEXT,
doc_purchased_at TEXT,
doc_attachment_path TEXT,
bid_opening_time TEXT,
bid_opening_location TEXT,
travel_days INTEGER DEFAULT 0,
suggested_seal_time TEXT,
actual_seal_time TEXT,
announcement_path TEXT,
status TEXT NOT NULL DEFAULT 'registered',
created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
updated_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
CREATE TABLE IF NOT EXISTS bid_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id INTEGER NOT NULL REFERENCES projects(id),
our_bid_price REAL,
winning_price REAL,
winner TEXT,
is_winner INTEGER NOT NULL DEFAULT 0,
notes TEXT,
recorded_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
CREATE TABLE IF NOT EXISTS reminders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id INTEGER NOT NULL REFERENCES projects(id),
reminder_type TEXT NOT NULL,
sent_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
recipient_role TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
wecom_userid TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
role TEXT NOT NULL,
contact TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
"""
def main():
try:
os.makedirs(ATTACHMENTS_DIR, exist_ok=True)
db_path = os.path.abspath(DB_PATH)
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn = get_conn()
conn.executescript(DDL)
conn.commit()
conn.close()
print(f"数据库初始化完成:{db_path}")
except Exception as e:
print(json.dumps({"error": f"数据库初始化失败: {e}", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()
FILE:build/lib/milb_tracker/scripts/manage_users.py
#!/usr/bin/env python3
"""
用户管理:Bootstrap 注册总监、添加负责人、列出用户。
用法:
python3 scripts/manage_users.py --bootstrap --user-id <id> --name <name>
python3 scripts/manage_users.py --add --caller-id <director_id> --user-id <id> --name <name> [--contact <c>]
python3 scripts/manage_users.py --list [--role <role>]
"""
import argparse
import json
import os
import sqlite3
import sys
from milb_tracker.config import get_db_path
DB_PATH = get_db_path()
def get_conn():
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
return conn
def cmd_bootstrap(user_id: str, name: str):
"""Bootstrap 模式:注册首位总监。幂等,同 userid 返回 ok;不同 userid 且已有总监则报错。"""
conn = get_conn()
try:
cur = conn.execute("SELECT * FROM users WHERE role = 'director'")
existing = cur.fetchone()
if existing:
if existing['wecom_userid'] == user_id:
print(json.dumps({"status": "ok", "message": f"总监已存在:{existing['name']}"}))
sys.exit(0)
else:
print(json.dumps({"error": "系统已初始化,总监已存在且非当前用户", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
conn.execute(
"INSERT INTO users (wecom_userid, name, role) VALUES (?, ?, 'director')",
(user_id, name))
conn.commit()
print(json.dumps({"status": "ok", "message": f"总监注册成功:{name}"}))
sys.exit(0)
finally:
conn.close()
def cmd_add(caller_id: str, user_id: str, name: str, contact: str | None):
"""添加负责人:验证 caller 是总监,再插入。"""
conn = get_conn()
try:
cur = conn.execute("SELECT role FROM users WHERE wecom_userid = ?", (caller_id,))
caller_row = cur.fetchone()
if not caller_row or caller_row['role'] != 'director':
print(json.dumps({"error": "权限不足:仅总监可添加用户", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
cur2 = conn.execute("SELECT id FROM users WHERE wecom_userid = ?", (user_id,))
if cur2.fetchone():
print(json.dumps({"error": f"用户 {user_id} 已存在", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
conn.execute(
"INSERT INTO users (wecom_userid, name, role, contact) VALUES (?, ?, 'manager', ?)",
(user_id, name, contact))
conn.commit()
print(json.dumps({"status": "ok", "message": f"用户添加成功:{name} (manager)"}))
sys.exit(0)
finally:
conn.close()
def cmd_list(role: str | None):
"""列出用户,可选按 role 过滤。"""
conn = get_conn()
try:
if role:
cur = conn.execute(
"SELECT id, wecom_userid, name, role, contact, created_at "
"FROM users WHERE role = ? ORDER BY id",
(role,))
else:
cur = conn.execute(
"SELECT id, wecom_userid, name, role, contact, created_at "
"FROM users ORDER BY id")
rows = cur.fetchall()
result = [dict(r) for r in rows]
print(json.dumps(result, ensure_ascii=False))
sys.exit(0)
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description='用户管理')
parser.add_argument('--bootstrap', action='store_true')
parser.add_argument('--add', action='store_true')
parser.add_argument('--list', action='store_true')
parser.add_argument('--user-id', help='企业微信 userid')
parser.add_argument('--name', help='用户显示名称')
parser.add_argument('--caller-id', help='调用者的 wecom_userid(--add 模式必填)')
parser.add_argument('--contact', help='联系方式')
parser.add_argument('--role', choices=['director', 'manager'], help='按角色过滤(--list 模式)')
args = parser.parse_args()
mode = sum([args.bootstrap, args.add, args.list])
if mode != 1:
print(json.dumps({"error": "必须指定 --bootstrap、--add 或 --list 其一", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
if args.bootstrap:
if not args.user_id or not args.name:
print(json.dumps({"error": "--bootstrap 模式需要 --user-id 和 --name", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
cmd_bootstrap(args.user_id, args.name)
elif args.add:
if not args.caller_id or not args.user_id or not args.name:
print(json.dumps({"error": "--add 模式需要 --caller-id、--user-id 和 --name", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
cmd_add(args.caller_id, args.user_id, args.name, args.contact)
elif args.list:
cmd_list(args.role)
if __name__ == '__main__':
main()
FILE:build/lib/milb_tracker/scripts/query_projects.py
#!/usr/bin/env python3
"""
查询项目列表或项目详情,按角色过滤,输出 JSON。
用法:
python3 scripts/query_projects.py --user-id WangDirector --active-only
python3 scripts/query_projects.py --user-id ZhangManager --active-only
python3 scripts/query_projects.py --user-id WangDirector --keyword "2026-001"
python3 scripts/query_projects.py --user-id ZhangManager --keyword "网安"
python3 scripts/query_projects.py --id 1
python3 scripts/query_projects.py --user-id WangDirector --status preparing
python3 scripts/query_projects.py --user-id WangDirector --upcoming-days 7
"""
import argparse
import json
import os
import sqlite3
import sys
from datetime import datetime, timedelta
from milb_tracker.config import get_db_path
DB_PATH = get_db_path()
ACTIVE_STATUSES = ('registered', 'doc_pending', 'doc_purchased', 'preparing', 'sealed')
def get_conn():
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
return conn
def query_all(user_id: str | None,
keyword: str | None,
status: str | None,
active_only: bool,
upcoming_days: int | None) -> list:
"""
按用户身份和条件查询项目列表。
无 --user-id 时默认只返回活跃项目(向后兼容)。
"""
conn = get_conn()
try:
role, name = None, None
# 从 users 表解析角色和姓名
if user_id:
row = conn.execute(
"SELECT role, name FROM users WHERE wecom_userid = ?",
(user_id,)).fetchone()
if not row:
print(json.dumps({"error": "用户不存在", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
role, name = row['role'], row['name']
sql = "SELECT * FROM projects WHERE 1=1"
params = []
# --keyword:先精确匹配 project_no,再模糊匹配 project_name
if keyword:
exact = conn.execute(
"SELECT * FROM projects WHERE project_no = ?", (keyword,)).fetchone()
if exact:
cols = list(exact.keys())
return [dict(zip(cols, exact))]
sql += " AND project_name LIKE ?"
params.append(f"%{keyword}%")
# 角色过滤:manager 只看本人项目
if role == 'manager' and name:
sql += " AND project_manager = ?"
params.append(name)
# 状态过滤
if status:
sql += " AND status = ?"
params.append(status)
elif active_only or not status:
# 默认只返回活跃项目(active_only=True 或无 status 参数时)
placeholders = ','.join(['?'] * len(ACTIVE_STATUSES))
sql += f" AND status IN ({placeholders})"
params.extend(ACTIVE_STATUSES)
# 近期关键节点过滤
if upcoming_days is not None:
now_str = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
cutoff = (datetime.now() + timedelta(days=upcoming_days)).strftime("%Y-%m-%dT%H:%M:%S")
sql += (" AND (bid_opening_time BETWEEN ? AND ?"
" OR doc_purchase_deadline BETWEEN ? AND ?"
" OR suggested_seal_time BETWEEN ? AND ?)")
params.extend([now_str, cutoff, now_str, cutoff, now_str, cutoff])
sql += " ORDER BY bid_opening_time ASC"
cur = conn.execute(sql, params)
cols = [d[0] for d in cur.description]
rows = cur.fetchall()
return [dict(zip(cols, row)) for row in rows]
finally:
conn.close()
def query_by_id(project_id: int) -> dict | None:
"""查询单个项目详情。"""
conn = get_conn()
try:
cur = conn.execute("SELECT * FROM projects WHERE id=?", (project_id,))
cols = [d[0] for d in cur.description]
row = cur.fetchone()
return dict(zip(cols, row)) if row else None
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description='查询招投标项目')
parser.add_argument('--user-id', help='当前用户 wecom_userid(从 users 表查角色)')
parser.add_argument('--keyword', help='搜索关键词:先精确匹配 project_no,再模糊匹配 project_name')
parser.add_argument('--id', type=int, help='查询指定项目详情')
parser.add_argument('--status', help='按状态过滤')
parser.add_argument('--active-only', action='store_true', help='仅返回活跃项目')
parser.add_argument('--upcoming-days', type=int, help='返回 N 天内有关键节点的项目')
args = parser.parse_args()
if args.id:
result = query_by_id(args.id)
else:
result = query_all(args.user_id, args.keyword, args.status,
args.active_only, args.upcoming_days)
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0)
if __name__ == '__main__':
main()
FILE:build/lib/milb_tracker/scripts/record_result.py
#!/usr/bin/env python3
"""
录入开标结果,同步更新项目状态为 won 或 lost。
用法:
python3 scripts/record_result.py \
--project-id 1 \
--our-price 980000 \
--winning-price 950000 \
--winner "某某公司" \
--won false \
--notes "排名第二,差距3万"
"""
import argparse
import json
import os
import sqlite3
import sys
from milb_tracker.config import get_db_path
DB_PATH = get_db_path()
def get_conn():
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def record(project_id: int, our_price: float | None, winning_price: float | None,
winner: str | None, is_won: bool, notes: str | None):
"""写入开标结果并更新项目状态。"""
conn = get_conn()
try:
row = conn.execute(
"SELECT status FROM projects WHERE id=?", (project_id,)).fetchone()
if row is None:
print(json.dumps({"error": f"项目不存在:{project_id}", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
if row[0] != 'opened':
print(json.dumps(
{"error": f"项目当前状态 '{row[0]}' 不允许录入结果,仅opened状态可录入", "code": 1},
ensure_ascii=False), file=sys.stderr)
sys.exit(1)
conn.execute(
"INSERT INTO bid_results"
" (project_id, our_bid_price, winning_price, winner, is_winner, notes)"
" VALUES (?, ?, ?, ?, ?, ?)",
(project_id, our_price, winning_price, winner, int(is_won), notes))
new_status = 'won' if is_won else 'lost'
conn.execute(
"UPDATE projects SET status=?, updated_at=datetime('now','localtime')"
" WHERE id=?",
(new_status, project_id))
conn.commit()
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description='录入开标结果')
parser.add_argument('--project-id', type=int, required=True, help='项目 ID')
parser.add_argument('--our-price', type=float, help='我方报价(元)')
parser.add_argument('--winning-price', type=float, help='中标价格(元)')
parser.add_argument('--winner', help='中标单位名称')
parser.add_argument('--won', required=True, choices=['true', 'false'], help='是否中标')
parser.add_argument('--notes', help='备注(评分、排名等现场情况)')
args = parser.parse_args()
is_won = args.won == 'true'
record(args.project_id, args.our_price, args.winning_price, args.winner, is_won, args.notes)
result_text = "中标" if is_won else "未中标"
print(json.dumps({"status": "ok", "message": f"项目 {args.project_id} 开标结果已录入:{result_text}", "new_status": result_text}, ensure_ascii=False))
sys.exit(0)
if __name__ == '__main__':
main()
FILE:build/lib/milb_tracker/scripts/register_project.py
#!/usr/bin/env python3
"""
注册新招投标项目。
接收从招标公告(PDF/PNG)中提取的结构化信息,写入数据库,
自动生成项目编号,推算建议封标时间,并将附件移动到正确路径。
用法:
python3 scripts/register_project.py --json '{"project_name": "...", ...}' --manager-name "张经理"
python3 scripts/register_project.py --json '...' --manager-name "张经理" --travel-days 3
python3 scripts/register_project.py --json '...' --manager-name "张经理" --announcement-file /tmp/xxx.pdf
"""
import argparse
import json
import os
import shutil
import sqlite3
import sys
from datetime import datetime, timedelta
from milb_tracker.config import get_db_path, get_attachments_dir
DB_PATH = get_db_path()
ATTACHMENTS_DIR = get_attachments_dir()
def get_conn():
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
# 支持的字段(与数据库列名对应)
ALLOWED_FIELDS = [
'project_name', 'budget', 'procurer', 'bid_agency',
'manager_contact',
'registration_deadline', 'registration_location',
'doc_purchase_location', 'doc_purchase_price', 'doc_purchase_deadline',
'doc_required_materials',
'bid_opening_time', 'bid_opening_location',
]
def calc_suggested_seal_time(bid_opening_time: str, travel_days: int) -> str | None:
"""根据开标时间和运输天数推算建议封标时间,自动避开周末(退到周五)。"""
try:
opening = datetime.fromisoformat(bid_opening_time)
except (ValueError, TypeError):
return None
suggested = opening - timedelta(days=travel_days)
weekday = suggested.weekday() # 0=Mon, 5=Sat, 6=Sun
if weekday == 5: # 周六 → 周五
suggested -= timedelta(days=1)
elif weekday == 6: # 周日 → 周五
suggested -= timedelta(days=2)
return suggested.strftime("%Y-%m-%dT%H:%M:%S")
def generate_project_no(conn) -> str:
"""按 YYYY-NNN 格式自动生成 project_no。"""
year = datetime.now().year
cur = conn.execute(
"SELECT MAX(CAST(SUBSTR(project_no, 6) AS INTEGER)) FROM projects WHERE project_no LIKE ?",
(f"{year}-%",))
max_seq = cur.fetchone()[0]
seq = (max_seq or 0) + 1
return f"{year}-{seq:03d}"
def move_announcement(src: str, project_id: int) -> str:
"""将公告文件移动到 data/attachments/{project_id}/ 目录。"""
dest_dir = os.path.abspath(os.path.join(ATTACHMENTS_DIR, str(project_id)))
os.makedirs(dest_dir, exist_ok=True)
dest = os.path.join(dest_dir, os.path.basename(src))
shutil.move(src, dest)
return dest
def register(data: dict, manager_name: str, travel_days: int, announcement_file: str | None) -> dict:
"""
将项目数据写入数据库,返回包含 project_id, project_no 等信息的 dict。
"""
# 过滤只保留合法字段
fields = {k: v for k, v in data.items() if k in ALLOWED_FIELDS}
# 列表型字段序列化为 JSON 字符串
if isinstance(fields.get('doc_required_materials'), list):
fields['doc_required_materials'] = json.dumps(
fields['doc_required_materials'], ensure_ascii=False)
# 设置必填字段
fields['project_manager'] = manager_name
fields['travel_days'] = travel_days
# 推算建议封标时间
suggested = calc_suggested_seal_time(fields.get('bid_opening_time'), travel_days)
if suggested:
fields['suggested_seal_time'] = suggested
conn = get_conn()
attachment_dir = None
try:
# 生成 project_no(需要连接才能查询)
project_no = generate_project_no(conn)
fields['project_no'] = project_no
columns = list(fields.keys())
placeholders = ', '.join(['?'] * len(columns))
col_clause = ', '.join(columns)
values = [fields[c] for c in columns]
cur = conn.execute(
f"INSERT INTO projects ({col_clause}) VALUES ({placeholders})",
values)
conn.commit()
project_id = cur.lastrowid
# 移动公告文件
if announcement_file and os.path.exists(announcement_file):
dest = move_announcement(announcement_file, project_id)
conn.execute(
"UPDATE projects SET announcement_path=? WHERE id=?",
(dest, project_id))
conn.commit()
attachment_dir = os.path.dirname(dest)
else:
attachment_dir = os.path.join(os.path.abspath(ATTACHMENTS_DIR), str(project_id))
return {
"project_id": project_id,
"project_no": project_no,
"project_name": fields.get('project_name', ''),
"suggested_seal_time": suggested or '',
"attachment_dir": attachment_dir,
}
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description='注册新招投标项目')
parser.add_argument('--json', required=True, help='项目信息 JSON 字符串')
parser.add_argument('--manager-name', required=True, help='指派的项目负责人姓名')
parser.add_argument('--travel-days', type=int, default=2, help='运输/路程天数(用于推算封标时间)')
parser.add_argument('--announcement-file', help='招标公告文件路径(PDF/PNG),由 Tool 层从 __context__ 拦截后传入')
args = parser.parse_args()
try:
data = json.loads(args.json)
except json.JSONDecodeError as e:
print(json.dumps({"error": f"JSON 解析失败:{e}", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
result = register(data, args.manager_name, args.travel_days, args.announcement_file)
print(json.dumps(result, ensure_ascii=False))
sys.exit(0)
if __name__ == '__main__':
main()
FILE:build/lib/milb_tracker/scripts/reminder_check.py
#!/usr/bin/env python3
"""
扫描所有活跃项目,判断是否需要发送提醒,输出 JSON 数组。
由 Cron 每日调用两次(8:47 / 17:53 工作日)。
提醒规则:
- 购买截止 ≤ 3 天 且 未购买 → doc_purchase → manager
- 封标建议时间 ≤ 2 天 且 未封标 → seal_warning → manager
- 开标时间 ≤ 1 天 → bid_opening → manager + director
输出格式(JSON 数组):
[
{
"project_id": 1,
"project_name": "xxx项目",
"reminder_type": "doc_purchase",
"recipient_role": "manager",
"project_manager": "张三",
"message": "..."
},
...
]
若无待提醒项,输出空数组 [],Agent 应静默退出。
"""
import json
import os
import sqlite3
import sys
from datetime import datetime, timedelta
from milb_tracker.config import get_db_path
DB_PATH = get_db_path()
ACTIVE_STATUSES = ('registered', 'doc_pending', 'doc_purchased', 'preparing', 'sealed')
def get_conn():
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def is_already_sent(conn, project_id: int, reminder_type: str) -> bool:
"""检查今日是否已发送同类型提醒。"""
cur = conn.execute(
"SELECT COUNT(*) FROM reminders "
"WHERE project_id = ? AND reminder_type = ? "
"AND DATE(sent_at) = DATE('now', 'localtime')",
(project_id, reminder_type))
return cur.fetchone()[0] > 0
def record_sent(conn, project_id: int, reminder_type: str, recipient_role: str):
"""记录已发送的提醒。"""
conn.execute(
"INSERT INTO reminders (project_id, reminder_type, recipient_role) VALUES (?, ?, ?)",
(project_id, reminder_type, recipient_role))
def check_reminders() -> list:
"""扫描活跃项目,返回需要发送的提醒列表,并写入 reminders 表。"""
conn = get_conn()
try:
placeholders = ','.join(['?'] * len(ACTIVE_STATUSES))
cur = conn.execute(
f"SELECT id, project_name, project_manager, status,"
f" doc_purchase_deadline, suggested_seal_time, bid_opening_time"
f" FROM projects WHERE status IN ({placeholders})",
ACTIVE_STATUSES)
cols = [d[0] for d in cur.description]
projects = [dict(zip(cols, row)) for row in cur.fetchall()]
except Exception as e:
conn.close()
print(json.dumps({"error": f"数据库连接失败: {e}", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
def parse_dt(s):
try:
return datetime.fromisoformat(s) if s else None
except ValueError:
return None
now = datetime.now()
reminders = []
for p in projects:
pid = p['id']
name = p['project_name']
manager = p['project_manager']
status = p['status']
# 规则 1:标书购买截止 ≤ 3 天且未购买
if status in ('registered', 'doc_pending'):
dl = parse_dt(p['doc_purchase_deadline'])
if dl and timedelta(0) <= (dl - now) <= timedelta(days=3):
if not is_already_sent(conn, pid, 'doc_purchase'):
reminders.append({
"project_id": pid,
"project_name": name,
"reminder_type": "doc_purchase",
"recipient_role": "manager",
"project_manager": manager,
"message": (
f"【标书购买提醒】{name} 购买截止 {dl.strftime('%Y-%m-%d')},"
f"请尽快办理"
),
})
record_sent(conn, pid, 'doc_purchase', 'manager')
# 规则 2:建议封标时间 ≤ 2 天且未封标
if status in ('registered', 'doc_pending', 'doc_purchased', 'preparing'):
seal = parse_dt(p['suggested_seal_time'])
if seal and timedelta(0) <= (seal - now) <= timedelta(days=2):
if not is_already_sent(conn, pid, 'seal_warning'):
reminders.append({
"project_id": pid,
"project_name": name,
"reminder_type": "seal_warning",
"recipient_role": "manager",
"project_manager": manager,
"message": (
f"【封标提醒】{name} 建议封标时间 {seal.strftime('%Y-%m-%d')},"
f"请确认制标进度"
),
})
record_sent(conn, pid, 'seal_warning', 'manager')
# 规则 3:开标时间 ≤ 1 天 → 同时通知 manager 和 director
opening = parse_dt(p['bid_opening_time'])
if opening and timedelta(0) <= (opening - now) <= timedelta(days=1):
if not is_already_sent(conn, pid, 'bid_opening'):
for role in ("manager", "director"):
reminders.append({
"project_id": pid,
"project_name": name,
"reminder_type": "bid_opening",
"recipient_role": role,
"project_manager": manager,
"message": (
f"【开标提醒】{name} 开标时间 "
f"{opening.strftime('%Y-%m-%d %H:%M')},请做好准备"
),
})
# 去重表只写1条(manager角色),is_already_sent 按 project_id+type+DATE 检查,与角色无关
record_sent(conn, pid, 'bid_opening', 'manager')
conn.commit()
conn.close()
return reminders
def main():
reminders = check_reminders()
print(json.dumps(reminders or [], ensure_ascii=False, indent=2))
sys.exit(0)
if __name__ == '__main__':
main()
FILE:build/lib/milb_tracker/scripts/stats.py
#!/usr/bin/env python3
"""
统计招投标项目表现,输出 JSON。
用法:
python3 scripts/stats.py # 全局统计
python3 scripts/stats.py --by-manager # 按负责人分组统计
python3 scripts/stats.py --by-month # 按月度趋势统计
python3 scripts/stats.py --period 2026-Q1 # 指定季度
python3 scripts/stats.py --period 2026-03 # 指定月份
python3 scripts/stats.py --manager 张经理 # 指定负责人
"""
import argparse
import json
import os
import re
import sqlite3
import sys
from milb_tracker.config import get_db_path
DB_PATH = get_db_path()
def get_conn():
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def parse_period(period: str | None) -> tuple[str, str] | None:
"""将 period 转为 (start, end) 日期范围字符串。"""
if not period:
return None
if re.match(r'^\d{4}-Q[1-4]$', period):
year = int(period[:4])
quarter = int(period[-1])
start_month = (quarter - 1) * 3 + 1
start = f"{year}-{start_month:02d}-01"
end_month = start_month + 2
if end_month in (1, 3, 5, 7, 8, 10, 12):
end = f"{year}-{end_month:02d}-31"
elif end_month in (4, 6, 9, 11):
end = f"{year}-{end_month:02d}-30"
else:
end = f"{year}-{end_month:02d}-28"
return start, end
elif re.match(r'^\d{4}-\d{2}$', period):
start = f"{period}-01"
year, month = int(period[:4]), int(period[5:7])
if month == 12:
end = f"{year + 1}-01-01"
else:
end = f"{year}-{month + 1:02d}-01"
return start, end
else:
print(json.dumps({"error": "无效的 period 格式,支持 YYYY-MM 或 YYYY-QN", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
def stats_global(period: str | None) -> dict:
"""全局统计:总数、中标数、胜率、平均报价差。"""
conn = get_conn()
try:
date_filter, params = "", []
date_range = parse_period(period)
if date_range:
date_filter = " AND p.bid_opening_time >= ? AND p.bid_opening_time < ?"
params = list(date_range)
sql = f"""
SELECT
COUNT(DISTINCT p.id) AS total,
COUNT(DISTINCT CASE WHEN r.is_winner = 1 THEN p.id END) AS won,
COUNT(DISTINCT CASE WHEN r.is_winner = 0 THEN p.id END) AS lost,
COUNT(DISTINCT CASE WHEN r.id IS NOT NULL AND p.status NOT IN ('won', 'lost', 'cancelled') THEN p.id END) AS active,
AVG(p.budget) AS avg_budget,
AVG(r.our_bid_price - r.winning_price) AS avg_price_diff
FROM projects p
LEFT JOIN bid_results r ON r.project_id = p.id
WHERE r.id IS NOT NULL
{date_filter}
"""
row = conn.execute(sql, params).fetchone()
total = row[0] or 0
won = row[1] or 0
lost = row[2] or 0
active = row[3] or 0
win_rate = round(won / (won + lost), 3) if (won + lost) > 0 else 0.0
return {
"total": total,
"won": won,
"lost": lost,
"active": active,
"win_rate": win_rate,
"avg_budget": row[4],
"avg_price_diff": row[5],
}
finally:
conn.close()
def stats_by_manager(period: str | None) -> list:
"""按项目负责人分组统计。"""
conn = get_conn()
try:
date_filter, params = "", []
date_range = parse_period(period)
if date_range:
date_filter = " AND p.bid_opening_time >= ? AND p.bid_opening_time < ?"
params = list(date_range)
sql = f"""
SELECT
p.project_manager AS manager,
COUNT(DISTINCT p.id) AS total,
COUNT(DISTINCT CASE WHEN r.is_winner = 1 THEN p.id END) AS won,
COUNT(DISTINCT CASE WHEN r.is_winner = 0 THEN p.id END) AS lost,
COUNT(DISTINCT CASE WHEN p.status NOT IN ('won', 'lost', 'cancelled') THEN p.id END) AS active
FROM projects p
LEFT JOIN bid_results r ON r.project_id = p.id
WHERE r.id IS NOT NULL
{date_filter}
GROUP BY p.project_manager
ORDER BY won DESC
"""
rows = conn.execute(sql, params).fetchall()
result = []
for row in rows:
won = row[2] or 0
lost = row[3] or 0
result.append({
"manager": row[0] or "(未指派)",
"total": row[1] or 0,
"won": won,
"lost": lost,
"active": row[4] or 0,
"win_rate": round(won / (won + lost), 3) if (won + lost) > 0 else 0.0,
})
return result
finally:
conn.close()
def stats_by_month(period: str | None) -> list:
"""按月度统计趋势(可选 period 过滤)。"""
conn = get_conn()
try:
date_filter, params = "", []
date_range = parse_period(period)
if date_range:
date_filter = " AND p.bid_opening_time >= ? AND p.bid_opening_time < ?"
params = list(date_range)
sql = f"""
SELECT
STRFTIME('%Y-%m', p.bid_opening_time) AS month,
COUNT(DISTINCT p.id) AS total,
COUNT(DISTINCT CASE WHEN r.is_winner = 1 THEN p.id END) AS won,
COUNT(DISTINCT CASE WHEN r.is_winner = 0 THEN p.id END) AS lost
FROM projects p
LEFT JOIN bid_results r ON r.project_id = p.id
WHERE p.bid_opening_time IS NOT NULL
{date_filter}
GROUP BY month
ORDER BY month ASC
"""
rows = conn.execute(sql, params).fetchall()
result = []
for row in rows:
if row[0] is None:
continue
won = row[2] or 0
lost = row[3] or 0
result.append({
"month": row[0],
"total": row[1] or 0,
"won": won,
"lost": lost,
"win_rate": round(won / (won + lost), 3) if (won + lost) > 0 else 0.0,
})
return result
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description='招投标统计分析')
parser.add_argument('--by-manager', action='store_true', help='按负责人分组统计')
parser.add_argument('--by-month', action='store_true', help='按月度趋势统计')
parser.add_argument('--period', help='指定时间范围,格式 YYYY-MM 或 YYYY-QN')
args = parser.parse_args()
if args.by_month:
result = stats_by_month(args.period)
elif args.by_manager:
result = stats_by_manager(args.period)
else:
result = stats_global(args.period)
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0)
if __name__ == '__main__':
main()
FILE:build/lib/milb_tracker/scripts/update_project.py
#!/usr/bin/env python3
"""
更新项目字段,含状态机合法性验证。
用法:
python3 scripts/update_project.py --id 1 --field status --value doc_purchased
python3 scripts/update_project.py --id 1 --field actual_seal_time --value "2026-04-08T15:00:00"
状态流转(合法路径):
registered → doc_pending / doc_purchased / cancelled
doc_pending → doc_purchased / cancelled
doc_purchased → preparing / sealed / cancelled
preparing → sealed / cancelled
sealed → opened / cancelled
opened → won / lost
任意终态(won/lost/cancelled)不可流转
"""
import argparse
import json
import os
import sqlite3
import sys
from milb_tracker.config import get_db_path
DB_PATH = get_db_path()
def get_conn():
conn = sqlite3.connect(os.path.abspath(DB_PATH), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
# 合法的状态流转路径(按 api-interfaces.md §5 修复)
VALID_TRANSITIONS = {
'registered': {'doc_pending', 'doc_purchased', 'cancelled'},
'doc_pending': {'doc_purchased', 'cancelled'},
'doc_purchased': {'preparing', 'sealed', 'cancelled'},
'preparing': {'sealed', 'cancelled'},
'sealed': {'opened', 'cancelled'},
'opened': {'won', 'lost'},
'won': set(),
'lost': set(),
'cancelled': set(),
}
# 允许通过此脚本更新的字段白名单(防止字段名 SQL 注入)
UPDATABLE_FIELDS = {
'status', 'doc_purchased_at', 'doc_attachment_path',
'actual_seal_time', 'project_manager', 'manager_contact',
'bid_opening_time', 'bid_opening_location', 'suggested_seal_time',
'announcement_path',
}
def validate_status_transition(current: str, new: str) -> bool:
"""验证状态流转是否合法。"""
return new in VALID_TRANSITIONS.get(current, set())
def update(project_id: int, field: str, value: str):
"""更新指定项目的指定字段。"""
if field not in UPDATABLE_FIELDS:
print(json.dumps({"error": f"不支持更新字段:{field}", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
conn = get_conn()
try:
row = conn.execute(
"SELECT status FROM projects WHERE id=?", (project_id,)).fetchone()
if row is None:
print(json.dumps({"error": f"项目不存在:{project_id}", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
if field == 'status':
current_status = row[0]
if not validate_status_transition(current_status, value):
print(json.dumps({"error": f"非法状态流转:{current_status} → {value}", "code": 1}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)
# 自动记录封标时间
if value == 'sealed':
conn.execute(
"UPDATE projects SET actual_seal_time=datetime('now','localtime') WHERE id=?",
(project_id,))
conn.execute(
f"UPDATE projects SET {field}=?, updated_at=datetime('now','localtime')"
f" WHERE id=?",
(value, project_id))
conn.commit()
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description='更新项目字段')
parser.add_argument('--id', type=int, required=True, help='项目 ID')
parser.add_argument('--field', required=True, help='要更新的字段名')
parser.add_argument('--value', required=True, help='新值')
args = parser.parse_args()
update(args.id, args.field, args.value)
print(json.dumps({"status": "ok", "message": f"项目 {args.id} 的 {args.field} 已更新为:{args.value}"}, ensure_ascii=False))
sys.exit(0)
if __name__ == '__main__':
main()
FILE:build/lib/milb_tracker/skill.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
军工招投标商机全周期追踪工具 - Skill 入口
提供 CLI 和函数调用两种方式,统一权限控制:
- CLI 模式:通过命令行参数调用
- 函数模式:通过 bid_project_manager() 调用(OpenClaw Skill)
用法:
python -m milb_tracker.skill --help
python -m milb_tracker.skill init --name "王总监"
python -m milb_tracker.skill register --json '{...}' --manager-name "张经理"
"""
import argparse
import json
import os
import sqlite3
import subprocess
import sys
from milb_tracker.config import get_db_path, load_env
SCRIPTS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'scripts')
HELP_TEXT = """军工招投标商机全周期追踪工具
用法:
milb-tracker init --name "姓名" 初始化系统(首次使用,总监注册)
milb-tracker register 注册新项目(交互式)
milb-tracker register --json '...' 注册新项目(JSON 参数)
milb-tracker status 查看项目列表
milb-tracker status --keyword "关键词" 按关键词搜索项目
milb-tracker purchased "项目名" 确认已购买标书
milb-tracker seal "项目名" 确认已封标
milb-tracker result "项目名" --won 录入中标结果
milb-tracker result "项目名" --lost 录入未中标结果
milb-tracker cancel "项目名" 取消项目
milb-tracker users 查看团队成员
milb-tracker adduser --user-id xxx --name "姓名" 添加负责人
milb-tracker stats 查看统计(默认按月)
milb-tracker stats --by-manager 按负责人统计
milb-tracker stats --by-month --period 2026-Q1 按季度统计
示例:
milb-tracker init --name "王总监"
milb-tracker status
milb-tracker result "XX系统采购" --won --our-price 980000 --winning-price 950000
milb-tracker stats --by-month
"""
# 权限常量
DIRECTOR_ONLY = {'register', 'adduser', 'users', 'stats'}
def get_conn():
"""获取数据库连接"""
conn = sqlite3.connect(os.path.abspath(get_db_path()), timeout=10.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
return conn
def get_user(conn, user_id: str) -> dict | None:
"""查询用户身份"""
row = conn.execute(
"SELECT role, name FROM users WHERE wecom_userid = ?",
(user_id,)).fetchone()
return dict(row) if row else None
def check_permission(user_id: str, action: str) -> tuple[bool, str, str]:
"""
检查用户权限,返回 (允许, 错误信息, 用户角色)
"""
conn = get_conn()
try:
if action == 'init':
# init 操作:检查是否已有总监
director = conn.execute(
"SELECT name FROM users WHERE role = 'director'"
).fetchone()
if director:
return False, f"系统已初始化,总监:{director['name']}", ""
# init 无需用户存在,可直接注册
return True, "", "director"
# 非 init 操作:检查系统是否已初始化
director = conn.execute(
"SELECT wecom_userid, name FROM users WHERE role = 'director'"
).fetchone()
if not director:
return False, "系统尚未初始化,请先执行 init", ""
# 查询当前用户
user = get_user(conn, user_id)
if not user:
return False, "您尚未被添加为系统用户", ""
role, name = user['role'], user['name']
# 命令级权限校验
if action in DIRECTOR_ONLY and role != 'director':
return False, "仅总监可执行此操作", role
return True, "", role
finally:
conn.close()
def check_project_access(user_id: str, keyword: str, role: str) -> tuple[int | None, str]:
"""
检查项目访问权限,返回 (project_id, 错误信息)
"""
if not keyword:
return None, "缺少项目标识"
conn = get_conn()
try:
if role == 'director':
row = conn.execute(
"SELECT id FROM projects WHERE project_no = ? OR project_name LIKE ?",
(keyword, f"%{keyword}%")).fetchone()
else:
row = conn.execute(
"SELECT p.id FROM projects p "
"JOIN users u ON u.name = p.project_manager "
"WHERE u.wecom_userid = ? AND (p.project_no = ? OR p.project_name LIKE ?)",
(user_id, keyword, f"%{keyword}%")).fetchone()
if not row:
msg = "项目不存在" if role == 'director' else "该项目不在您的负责范围内"
return None, msg
return row['id'], ""
finally:
conn.close()
def run_script(script_name: str, args: list[str], db_path: str = None) -> subprocess.CompletedProcess:
"""运行 scripts/ 下的脚本"""
script_path = os.path.join(SCRIPTS_DIR, script_name)
cmd = [sys.executable, script_path] + args
env = os.environ.copy()
env['DB_PATH'] = db_path or get_db_path()
result = subprocess.run(cmd, capture_output=True, text=True, env=env)
return result
# ========== 命令处理函数 ==========
def cmd_init(args, user_id: str):
"""初始化系统(注册总监)"""
# 检查是否已有总监
conn = get_conn()
try:
director = conn.execute(
"SELECT name FROM users WHERE role = 'director'"
).fetchone()
if director:
return {"status": "error", "message": f"系统已初始化,总监:{director['name']}"}
finally:
conn.close()
result = run_script('manage_users.py', ['--bootstrap', '--user-id', user_id, '--name', args.name])
if result.returncode == 0:
return {"status": "ok", "message": f"总监注册成功:{args.name}"}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "注册失败")}
def cmd_register(args, user_id: str):
"""注册新项目"""
if not args.json and not args.interactive:
return {"status": "error", "message": "需要提供 --json 或使用交互模式"}
if not args.manager_name:
return {"status": "error", "message": "需要提供 --manager-name 参数"}
travel_days = args.travel_days or 2
cmd_args = ['--json', args.json or '{}', '--manager-name', args.manager_name, '--travel-days', str(travel_days)]
if args.file:
cmd_args.extend(['--announcement-file', args.file])
result = run_script('register_project.py', cmd_args)
if result.returncode == 0:
data = json.loads(result.stdout)
return {"status": "ok", "message": f"项目注册成功: {data.get('project_no')}", "data": data}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "注册失败")}
def cmd_status(args, user_id: str, role: str):
"""查看项目列表"""
cmd_args = []
if args.keyword:
cmd_args.extend(['--keyword', args.keyword])
if args.active_only:
cmd_args.append('--active-only')
if args.upcoming_days:
cmd_args.extend(['--upcoming-days', str(args.upcoming_days)])
# 非总监需要传递 user-id 进行过滤
if role == 'manager':
cmd_args.extend(['--user-id', user_id])
result = run_script('query_projects.py', cmd_args)
if result.returncode == 0:
projects = json.loads(result.stdout)
return {"status": "ok", "data": projects}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "查询失败")}
def _update_project_status(user_id: str, keyword: str, status_value: str, role: str) -> dict:
"""更新项目状态"""
project_id, err = check_project_access(user_id, keyword, role)
if err:
return {"status": "error", "message": err}
result = run_script('update_project.py', ['--id', str(project_id), '--field', 'status', '--value', status_value])
if result.returncode == 0:
return {"status": "ok", "message": f"项目状态已更新为 {status_value}"}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "更新失败")}
def cmd_purchased(args, user_id: str, role: str):
"""确认标书已购买"""
if not args.keyword:
return {"status": "error", "message": "需要提供项目名称"}
return _update_project_status(user_id, args.keyword, 'doc_purchased', role)
def cmd_seal(args, user_id: str, role: str):
"""确认已封标"""
if not args.keyword:
return {"status": "error", "message": "需要提供项目名称"}
return _update_project_status(user_id, args.keyword, 'sealed', role)
def cmd_result(args, user_id: str, role: str):
"""录入开标结果"""
if not args.keyword:
return {"status": "error", "message": "需要提供项目名称"}
if not args.won and not args.lost:
return {"status": "error", "message": "需要指定 --won 或 --lost"}
project_id, err = check_project_access(user_id, args.keyword, role)
if err:
return {"status": "error", "message": err}
cmd_args = ['--project-id', str(project_id), '--won', 'true' if args.won else 'false']
if args.our_price:
cmd_args.extend(['--our-price', str(args.our_price)])
if args.winning_price:
cmd_args.extend(['--winning-price', str(args.winning_price)])
if args.winner:
cmd_args.extend(['--winner', args.winner])
if args.notes:
cmd_args.extend(['--notes', args.notes])
result = run_script('record_result.py', cmd_args)
if result.returncode == 0:
return {"status": "ok", "message": "开标结果已录入"}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "录入失败")}
def cmd_cancel(args, user_id: str, role: str):
"""取消项目"""
if not args.keyword:
return {"status": "error", "message": "需要提供项目名称"}
return _update_project_status(user_id, args.keyword, 'cancelled', role)
def cmd_users(args, user_id: str):
"""查看用户列表"""
cmd_args = ['--list']
if args.role:
cmd_args.extend(['--role', args.role])
result = run_script('manage_users.py', cmd_args)
if result.returncode == 0:
users = json.loads(result.stdout)
return {"status": "ok", "data": users}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "查询失败")}
def cmd_adduser(args, user_id: str):
"""添加用户"""
if not args.user_id or not args.name:
return {"status": "error", "message": "需要提供 --user-id 和 --name 参数"}
cmd_args = ['--add', '--caller-id', user_id, '--user-id', args.user_id, '--name', args.name]
if args.contact:
cmd_args.extend(['--contact', args.contact])
result = run_script('manage_users.py', cmd_args)
if result.returncode == 0:
return {"status": "ok", "message": f"用户添加成功:{args.name}"}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "添加失败")}
def cmd_stats(args, user_id: str):
"""查看统计"""
if args.by_manager and args.by_month:
return {"status": "error", "message": "--by-manager 和 --by-month 不能同时使用"}
cmd_args = []
if args.by_manager:
cmd_args.append('--by-manager')
elif args.by_month:
cmd_args.append('--by-month')
else:
cmd_args.append('--by-month')
if args.period:
cmd_args.extend(['--period', args.period])
result = run_script('stats.py', cmd_args)
if result.returncode == 0:
stats = json.loads(result.stdout)
return {"status": "ok", "data": stats}
else:
return {"status": "error", "message": json.loads(result.stderr).get("error", "统计失败")}
# ========== CLI 入口 ==========
def main():
"""CLI 入口函数"""
parser = argparse.ArgumentParser(
description='军工招投标商机全周期追踪工具',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=HELP_TEXT
)
parser.add_argument('--user-id', help='用户 ID(CLI 模式下使用环境变量)')
subparsers = parser.add_subparsers(dest='command', help='子命令')
# init
p_init = subparsers.add_parser('init', help='初始化系统(首次使用,总监注册)')
p_init.add_argument('--name', required=True, help='总监显示名称')
# register
p_reg = subparsers.add_parser('register', help='注册新项目')
p_reg.add_argument('--json', help='项目信息 JSON 字符串')
p_reg.add_argument('--manager-name', help='项目负责人姓名')
p_reg.add_argument('--travel-days', type=int, default=2, help='运输天数')
p_reg.add_argument('--file', help='招标公告文件路径')
p_reg.add_argument('--interactive', action='store_true', help='交互式输入')
# status
p_status = subparsers.add_parser('status', help='查看项目列表')
p_status.add_argument('--keyword', help='搜索关键词')
p_status.add_argument('--active-only', action='store_true', help='仅显示活跃项目')
p_status.add_argument('--upcoming-days', type=int, help='显示 N 天内有关键节点的项目')
# purchased
p_purchased = subparsers.add_parser('purchased', help='确认已购买标书')
p_purchased.add_argument('keyword', nargs='?', help='项目名称或编号')
# seal
p_seal = subparsers.add_parser('seal', help='确认已封标')
p_seal.add_argument('keyword', nargs='?', help='项目名称或编号')
# result
p_result = subparsers.add_parser('result', help='录入开标结果')
p_result.add_argument('keyword', nargs='?', help='项目名称或编号')
p_result.add_argument('--won', action='store_true', help='中标')
p_result.add_argument('--lost', action='store_true', help='未中标')
p_result.add_argument('--our-price', type=int, help='我方报价')
p_result.add_argument('--winning-price', type=int, help='中标价')
p_result.add_argument('--winner', help='中标单位')
p_result.add_argument('--notes', help='备注')
# cancel
p_cancel = subparsers.add_parser('cancel', help='取消项目')
p_cancel.add_argument('keyword', nargs='?', help='项目名称或编号')
# users
p_users = subparsers.add_parser('users', help='查看团队成员')
p_users.add_argument('--role', choices=['director', 'manager'], help='按角色过滤')
# adduser
p_adduser = subparsers.add_parser('adduser', help='添加负责人')
p_adduser.add_argument('--user-id', required=True, help='用户 ID')
p_adduser.add_argument('--name', required=True, help='用户显示名称')
p_adduser.add_argument('--contact', help='联系方式')
# stats
p_stats = subparsers.add_parser('stats', help='查看统计')
p_stats.add_argument('--by-manager', action='store_true', help='按负责人统计')
p_stats.add_argument('--by-month', action='store_true', help='按月份统计')
p_stats.add_argument('--period', help='统计周期')
args = parser.parse_args()
# 确保数据目录存在
load_env()
os.makedirs(os.path.dirname(os.path.abspath(get_db_path())), exist_ok=True)
if not args.command:
parser.print_help()
sys.exit(0)
# 获取用户 ID
user_id = args.user_id or os.environ.get('USER', 'cli_user')
# 检查权限
ok, err_msg, role = check_permission(user_id, args.command)
if not ok:
print(json.dumps({"status": "error", "message": err_msg}, ensure_ascii=False))
sys.exit(1)
# 执行命令
result = None
if args.command == 'init':
result = cmd_init(args, user_id)
elif args.command == 'register':
result = cmd_register(args, user_id)
elif args.command == 'status':
result = cmd_status(args, user_id, role)
elif args.command == 'purchased':
result = cmd_purchased(args, user_id, role)
elif args.command == 'seal':
result = cmd_seal(args, user_id, role)
elif args.command == 'result':
result = cmd_result(args, user_id, role)
elif args.command == 'cancel':
result = cmd_cancel(args, user_id, role)
elif args.command == 'users':
result = cmd_users(args, user_id)
elif args.command == 'adduser':
result = cmd_adduser(args, user_id)
elif args.command == 'stats':
result = cmd_stats(args, user_id)
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0 if result.get('status') == 'ok' else 1)
# ========== OpenClaw Skill 函数入口 ==========
def bid_project_manager(action_type: str, project_data: dict = None, **kwargs) -> dict:
"""
OpenClaw Skill Tool 函数入口
Args:
action_type: 操作类型 (init/register/status/purchased/seal/result/cancel/users/adduser/stats)
project_data: 业务参数 dict
**kwargs: OpenClaw 引擎注入,含 __context__
Returns:
{"status": "ok"|"error", "message": "...", "data": {...}}
"""
project_data = project_data or {}
# 从上下文获取用户身份
try:
user_id = kwargs['__context__']['body']['from']['userid']
except (KeyError, TypeError):
return {"status": "error", "message": "无法识别您的企业微信身份"}
# 附件路径处理
if action_type == 'register':
try:
attachments = kwargs['__context__']['body'].get('attachments', [])
if attachments:
project_data['_attachment_path'] = attachments[0].get('local_path')
except (KeyError, TypeError, IndexError):
pass
# 检查权限
ok, err_msg, role = check_permission(user_id, action_type)
if not ok:
return {"status": "error", "message": err_msg}
# 构建 args 对象
class Args:
pass
args = Args()
if action_type == 'init':
args.name = project_data.get('name', user_id)
elif action_type == 'register':
args.json = json.dumps(project_data.get('fields', {}), ensure_ascii=False)
args.manager_name = project_data.get('manager_name', '')
args.travel_days = project_data.get('travel_days', 2)
args.file = project_data.get('_attachment_path')
args.interactive = False
elif action_type in ('status', 'purchased', 'seal', 'result', 'cancel'):
args.keyword = project_data.get('keyword')
if action_type == 'result':
args.won = project_data.get('is_won')
args.lost = not project_data.get('is_won') if 'is_won' in project_data else False
args.our_price = project_data.get('our_price')
args.winning_price = project_data.get('winning_price')
args.winner = project_data.get('winner')
args.notes = project_data.get('notes')
elif action_type == 'users':
args.role = project_data.get('role')
elif action_type == 'adduser':
args.user_id = project_data.get('user_id', '')
args.name = project_data.get('name', '')
args.contact = project_data.get('contact')
elif action_type == 'stats':
args.by_manager = project_data.get('by_manager')
args.by_month = project_data.get('by_month')
args.period = project_data.get('period')
# 执行命令
if action_type == 'init':
return cmd_init(args, user_id)
elif action_type == 'register':
return cmd_register(args, user_id)
elif action_type == 'status':
return cmd_status(args, user_id, role)
elif action_type == 'purchased':
return cmd_purchased(args, user_id, role)
elif action_type == 'seal':
return cmd_seal(args, user_id, role)
elif action_type == 'result':
return cmd_result(args, user_id, role)
elif action_type == 'cancel':
return cmd_cancel(args, user_id, role)
elif action_type == 'users':
return cmd_users(args, user_id)
elif action_type == 'adduser':
return cmd_adduser(args, user_id)
elif action_type == 'stats':
return cmd_stats(args, user_id)
else:
return {"status": "error", "message": f"未知操作类型:{action_type}"}
if __name__ == '__main__':
main()
FILE:milb_tracker.egg-info/SOURCES.txt
README.md
pyproject.toml
milb_tracker/__init__.py
milb_tracker/config.py
milb_tracker/skill.py
milb_tracker.egg-info/PKG-INFO
milb_tracker.egg-info/SOURCES.txt
milb_tracker.egg-info/dependency_links.txt
milb_tracker.egg-info/entry_points.txt
milb_tracker.egg-info/requires.txt
milb_tracker.egg-info/top_level.txt
milb_tracker/scripts/init_db.py
milb_tracker/scripts/manage_users.py
milb_tracker/scripts/query_projects.py
milb_tracker/scripts/record_result.py
milb_tracker/scripts/register_project.py
milb_tracker/scripts/reminder_check.py
milb_tracker/scripts/stats.py
milb_tracker/scripts/update_project.py
tests/test_config.py
tests/test_init_db.py
tests/test_manage_users.py
tests/test_query_projects.py
tests/test_record_result.py
tests/test_register_project.py
tests/test_reminder_check.py
tests/test_stats.py
tests/test_update_project.py
FILE:milb_tracker.egg-info/dependency_links.txt
FILE:milb_tracker.egg-info/entry_points.txt
[console_scripts]
milb-tracker = milb_tracker.skill:main
FILE:milb_tracker.egg-info/top_level.txt
milb_tracker
FILE:pyproject.toml
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "bidding-tracker"
version = "0.1.0"
description = "招投标商机全周期追踪工具"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"pypdf>=4.0",
"python-docx>=1.1",
]
[project.optional-dependencies]
dev = ["pytest"]
[project.scripts]
bidding-tracker = "bidding_tracker.skill:main"
[tool.setuptools.packages.find]
where = ["."]
include = ["bidding_tracker*"]
[tool.pytest.ini_options]
testpaths = ["tests"]
政府采购商机专用推报工具。根据商机数据自动生成 Excel 并通过 SMTP 发送邮件。当用户说"govb-email"、"政府采购商机邮件"、"推送政府采购商机"、"政采商机通报"时触发。注意:这不是通用邮件客户端,仅用于执行 govb 业务逻辑。
---
name: govb-email
description: 政府采购商机专用推报工具。根据商机数据自动生成 Excel 并通过 SMTP 发送邮件。当用户说"govb-email"、"政府采购商机邮件"、"推送政府采购商机"、"政采商机通报"时触发。注意:这不是通用邮件客户端,仅用于执行 govb 业务逻辑。
metadata: {"openclaw":{"emoji":"📧","requires":{"bins":["govb-email","govb-fetcher"]},"install":"pip install -e {baseDir}"}}
---
# Govb Email
自动抓取政府采购商机并发送邮件报告。
## 环境变量要求
该技能必须在 `.env` 中配置以下核心参数才能激活:
- `EMAIL_TO`, `EMAIL_CC`, `EMAIL_FROM`: 收发件地址
- `EMAIL_SMTP_HOST`, `EMAIL_SMTP_PORT`: SMTP 服务器信息
- `EMAIL_SMTP_USER`, `EMAIL_SMTP_PASSWORD`: 认证信息
- `EMAIL_SUBJECT_PREFIX`, `EMAIL_BODY_INTRO`: 邮件模板配置
- `EMAIL_RECIPIENT_NAME`, `EMAIL_SENDER_NAME`: 称呼和签名
## 快速使用
- `/govb-email` → 发送昨日报告(默认)
- `/govb-email --help` → 显示帮助信息
- `/govb-email --today` → 发送今日报告
- `/govb-email --date 2026-03-23` → 发送指定日期报告
- `/govb-email --keywords "模型,仿真"` → 使用自定义关键词筛选
- `/govb-email --to [email protected]` → 测试发送至指定收件人
## 参数说明
| 参数 | 说明 | 默认值 |
|------|------|--------|
| 无参数 | 默认昨日 | 启用 |
| `--today` | 今日 | - |
| `--date YYYY-MM-DD` | 指定日期 | - |
| `--keywords WORDS` | 关键词,逗号分隔 | govb_fetcher 配置的默认关键词 |
| `--to ADDRESS` | 测试发送至指定收件人 | .env 中的配置 |
## 数据源
- 北京政采(zbcg-bjzc.zhongcy.com)
- 湖南政采(ccgp-hunan.gov.cn)
## 触发词
发送邮件、推送报告、邮件通知、政采商机通报
## 配置文件
配置文件位于 `~/.config/govb-email/.env`,可配置以下参数:
| 环境变量 | 用途 |
|----------|------|
| `EMAIL_TO` | 收件人,逗号分隔 |
| `EMAIL_CC` | 抄送人,逗号分隔 |
| `EMAIL_FROM` | 发件人 |
| `EMAIL_RECIPIENT_NAME` | 收件人称呼 |
| `EMAIL_SENDER_NAME` | 发件人签名 |
| `EMAIL_SUBJECT_PREFIX` | 邮件主题前缀 |
| `EMAIL_BODY_INTRO` | 邮件正文开头 |
| `EMAIL_SMTP_HOST` | SMTP 服务器 |
| `EMAIL_SMTP_PORT` | SMTP 端口 |
| `EMAIL_SMTP_USER` | SMTP 用户名 |
| `EMAIL_SMTP_PASSWORD` | SMTP 密码 |
创建配置文件可复制 `.env.example` 为 `~/.config/govb-email/.env` 后修改。
## 技术说明
- 使用 SMTP 直接发送邮件(配置 EMAIL_SMTP_* 环境变量)
- 使用文件锁防止并发执行
FILE:CLAUDE.md
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
`govb-email` is a Chinese government procurement opportunity email reporting tool. It fetches bidding information from two data sources (北京政采, 湖南政采), generates Excel reports, and sends them via email.
## Commands
```bash
# Install package
pip install -e .
# Run the tool (sends yesterday's report by default)
govb-email
# Send today's report
govb-email --today
# Send report for specific date
govb-email --date 2026-03-23
# Use custom keywords to filter opportunities
govb-email --keywords "模型,仿真,AI"
# Test send to specific recipient
govb-email --to [email protected]
```
## Architecture
**Core Modules:**
- `govb_email/fetcher.py` - Main entry point, handles CLI arguments, fetches data, sends emails
- `govb_email/config.py` - Configuration loader, reads from `.env` file
**Data Flow:**
1. CLI parses arguments (`--date`, `--today`, `--keywords`, `--to`)
2. Calls `govb_fetcher.fetcher.fetch_all_bidding(target_date)` to fetch data from two sources
3. Generates email body with high-recommendation items
4. Sends email via SMTP (smtplib)
5. Attaches Excel report from `~/.openclaw/workspace/govb-bidding/`
**Email Sending:**
- SMTP via smtplib (SMTP_SSL, port 465)
- File lock at `/tmp/govb_bidding_email.lock` prevents concurrent execution
## Configuration
All config via `.env` file (see `.env.example`):
- Email addresses: `EMAIL_TO`, `EMAIL_CC`, `EMAIL_FROM`
- SMTP: `EMAIL_SMTP_HOST`, `EMAIL_SMTP_PORT`, `EMAIL_SMTP_USER`, `EMAIL_SMTP_PASSWORD`
- Templates: `EMAIL_SUBJECT_PREFIX`, `EMAIL_BODY_INTRO`, `EMAIL_RECIPIENT_NAME`, `EMAIL_SENDER_NAME`
Config file search order:
1. `.env` in current working directory
2. `~/.config/govb-email/.env`
FILE:_meta.json
{
"ownerId": "kn7egg3a3hx7nftrgxb1s3aj7s838fg5",
"slug": "goverment-bidding-email",
"version": "0.1.0"
}
FILE:govb_email/__init__.py
# govb_email - 政府采购商机邮件发送工具
FILE:govb_email/config.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
配置加载模块 (govb-email)
从 .env 文件读取配置,所有配置项必填,缺失时直接报错
只加载 EMAIL_* 前缀的配置
"""
from pathlib import Path
from typing import Dict
# 必填配置项列表
REQUIRED_CONFIG_KEYS = [
'EMAIL_TO',
'EMAIL_CC',
'EMAIL_FROM',
'EMAIL_SMTP_HOST',
'EMAIL_SMTP_PORT',
'EMAIL_SMTP_USER',
'EMAIL_SMTP_PASSWORD',
]
def load_config() -> Dict:
"""
从 .env 文件加载配置
Returns:
Dict: 配置字典,包含所有配置项
"""
# 按优先级查找:当前工作目录 → ~/.config/govb-email/
candidates = [
Path.cwd() / '.env',
Path.home() / '.config' / 'govb-email' / '.env',
]
env_path = next((p for p in candidates if p.exists()), None)
config = {}
if env_path:
with open(env_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
if '=' in line:
key, value = line.split('=', 1)
config[key.strip()] = value.strip()
return config
def get_config(key: str) -> str:
"""
获取配置项,缺失时抛出异常
Args:
key: 配置键名
Returns:
配置值字符串
Raises:
ValueError: 配置项未在 .env 中设置
"""
config = load_config()
value = config.get(key)
if not value:
raise ValueError(f"配置项 {key} 未在 .env 中设置,请参考 .env.example")
return value
def get_email_config() -> Dict:
"""
获取邮件配置,缺失任何必填项则报错
Returns:
Dict: 邮件配置字典
Raises:
ValueError: 任何必填配置项缺失时
"""
# 先验证必填项
for key in REQUIRED_CONFIG_KEYS:
get_config(key) # 缺失会抛异常
return {
'to': get_config('EMAIL_TO').split(','),
'cc': get_config('EMAIL_CC').split(','),
'from': get_config('EMAIL_FROM'),
'recipient_name': get_config('EMAIL_RECIPIENT_NAME'),
'sender_name': get_config('EMAIL_SENDER_NAME'),
'subject_prefix': get_config('EMAIL_SUBJECT_PREFIX'),
'body_intro': get_config('EMAIL_BODY_INTRO'),
'smtp_host': get_config('EMAIL_SMTP_HOST'),
'smtp_port': get_config('EMAIL_SMTP_PORT'),
'smtp_user': get_config('EMAIL_SMTP_USER'),
'smtp_password': get_config('EMAIL_SMTP_PASSWORD'),
}
if __name__ == '__main__':
# 测试输出
print("=== govb-email 配置加载测试 ===")
try:
email_cfg = get_email_config()
print(f"收件人: {email_cfg['to']}")
print(f"抄送人: {email_cfg['cc']}")
print(f"发件人: {email_cfg['from']}")
print(f"收件人称呼: {email_cfg['recipient_name']}")
print(f"发件人签名: {email_cfg['sender_name']}")
print(f"主题前缀: {email_cfg['subject_prefix']}")
except ValueError as e:
print(f"[ERROR] {e}")
FILE:govb_email/fetcher.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
政府采购商机邮件发送工具 (govb-email)
自动抓取北京政采、湖南政采的商机数据,
生成 Excel 汇总文件,并通过邮件发送报告。
用法:
govb-email [选项]
示例:
govb-email # 发送昨日报告(默认)
govb-email --today # 发送今日报告
govb-email --date 2026-03-23 # 发送指定日期报告
govb-email --keywords "模型,仿真" # 使用自定义关键词筛选
govb-email --to [email protected] # 测试发送至指定收件人
"""
import sys
import os
from pathlib import Path
from govb_email.config import get_email_config
def _import_fetcher():
"""延迟导入避免循环依赖"""
from govb_fetcher.fetcher import fetch_all_bidding, save_to_excel
from govb_fetcher.config import get_output_dir
return fetch_all_bidding, save_to_excel, get_output_dir
def get_high_recommend_items(results: dict) -> dict:
"""
从各渠道数据中提取高推荐等级的项目。
Args:
results: fetch_all_bidding 返回的字典,key 为渠道名,value 为记录列表
Returns:
dict: 包含各渠道高推荐项目的字典,结构为 {'bjzc': [], 'hnzc': []}
"""
result = {'bjzc': [], 'hnzc': []}
# 北京政采
for record in results.get('北京政采', []):
if record.get('推荐等级') == '高':
lot_name = record.get('标段名称', '').strip()
proj_name = record.get('项目名称', '').strip()
result['bjzc'].append({
'title': (lot_name or proj_name)[:50],
'method': record.get('招标方式', ''),
'budget': record.get('合同估价(元)', ''),
'purchaser': record.get('采购人', ''),
})
# 湖南政采
for record in results.get('湖南政采', []):
if record.get('推荐等级') == '高':
result['hnzc'].append({
'title': record.get('项目名称', '')[:50],
'method': record.get('招标方式', ''),
'budget': record.get('合同估价(元)', ''),
'purchaser': record.get('采购人', ''),
})
return result
def send_email(date_str, results, excel_path, to_override=None):
"""
发送商机报告邮件。
Args:
date_str: 报告日期字符串,格式 YYYY-MM-DD
results: fetch_all_bidding 返回的字典
excel_path: Excel 附件文件路径(由 get_output_dir() 计算)
to_override: 覆盖默认收件人,用于测试发送
"""
env_config = get_email_config()
recipient_name = env_config['recipient_name']
sender_name = env_config['sender_name']
subject_prefix = env_config['subject_prefix']
body_intro = env_config['body_intro']
# 构建邮件主题
subject = f"【{subject_prefix}】{date_str}"
# 获取高推荐项目
high_items = get_high_recommend_items(results)
bjzc_list = results.get('北京政采', [])
hnzc_list = results.get('湖南政采', [])
# 构建邮件正文
body = f"""{recipient_name},{body_intro}
汇总如下:
【北京政采】{len(bjzc_list)}条({date_str})
高推荐项目:
"""
if high_items['bjzc']:
for i, item in enumerate(high_items['bjzc'], 1):
budget = f" | 预算{item['budget']}" if item.get('budget') else ""
purchaser = f" | {item['purchaser']}" if item.get('purchaser') else ""
body += f"{i}. {item['title']}... | {item['method']}{budget}{purchaser}\n"
else:
body += "(暂无高推荐项目)\n"
body += f"""
【湖南政采】{len(hnzc_list)}条({date_str})
高推荐项目:
"""
if high_items['hnzc']:
for i, item in enumerate(high_items['hnzc'], 1):
budget = f" | 预算{item['budget']}" if item.get('budget') else ""
purchaser = f" | {item['purchaser']}" if item.get('purchaser') else ""
body += f"{i}. {item['title']}... | {item['method']}{budget}{purchaser}\n"
else:
body += "(暂无高推荐项目)\n"
body += f"""
详情请见附件Excel。
{sender_name}"""
# 确定收件人/抄送人
if to_override:
to_list = [to_override]
cc_list = []
else:
to_list = env_config['to']
cc_list = env_config['cc']
try:
send_email_via_smtp(subject, body, excel_path, to_list, cc_list)
print("[SUCCESS] 邮件发送成功")
return True
except Exception as e:
print(f"[ERROR] 邮件发送失败: {e}")
return False
def send_email_via_smtp(subject, body, excel_path, to_list, cc_list):
"""
使用 SMTP 直接发送邮件。
Args:
subject: 邮件主题字符串
body: 邮件正文纯文本字符串
excel_path: Excel 附件文件路径(文件不存在时跳过附件)
to_list: 收件人列表
cc_list: 抄送人列表
Returns:
bool: 发送成功返回 True
"""
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.header import Header
from email.utils import formatdate
env_config = get_email_config()
SMTP_HOST = env_config['smtp_host']
SMTP_PORT = int(env_config['smtp_port'])
SMTP_USER = env_config['smtp_user']
SMTP_PASSWORD = env_config['smtp_password']
# 创建邮件,主题使用 RFC 2047 编码防止中文乱码
msg = MIMEMultipart()
msg['From'] = SMTP_USER
msg['To'] = ', '.join(to_list)
msg['Cc'] = ', '.join(cc_list)
msg['Subject'] = Header(subject, 'utf-8').encode()
msg['Date'] = formatdate()
msg.attach(MIMEText(body.strip(), 'plain', 'utf-8'))
# 添加附件(文件不存在时跳过)
if excel_path and os.path.exists(excel_path):
with open(excel_path, 'rb') as f:
part = MIMEApplication(f.read(), _subtype='xlsx')
filename_encoded = Header(os.path.basename(excel_path), 'utf-8').encode()
part.add_header('Content-Disposition', 'attachment', filename=filename_encoded)
msg.attach(part)
elif excel_path:
print(f"[WARNING] 附件不存在,跳过: {excel_path}")
# 发送邮件
smtp = smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT)
smtp.login(SMTP_USER, SMTP_PASSWORD)
smtp.send_message(msg)
smtp.quit()
print(f"[INFO] 邮件已发送(SMTP)至: {', '.join(to_list)}, 抄送: {', '.join(cc_list)}")
return True
def send_bidding_report(date=None, keywords=None, to_override=None):
"""
发送商机报告邮件。
Args:
date: 指定日期,格式 YYYY-MM-DD。默认为昨日
keywords: 关键词列表,用于筛选商机。默认为 None,使用 govb_fetcher 配置的默认关键词
to_override: 覆盖默认收件人,用于测试发送
Returns:
dict: fetch_all_bidding 返回的结果字典
"""
from datetime import datetime, timedelta
# 确定查询日期
if date is None:
date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
print(f"[INFO] 默认获取昨日商机: {date}")
print(f"[INFO] 开始抓取商机信息 - 日期: {date}")
# 抓取数据
fetch_all_bidding, save_to_excel, get_output_dir = _import_fetcher()
results = fetch_all_bidding(date, keywords=keywords)
bjzc_count = len(results.get('北京政采', []))
hnzc_count = len(results.get('湖南政采', []))
print(f"[INFO] 采集完成: 北京政采{bjzc_count}条, 湖南政采{hnzc_count}条")
# 保存 Excel
output_path = get_output_dir() / f'政府采购商机汇总_{date}.xlsx'
save_to_excel(results, output_path)
print(f"[INFO] Excel 已保存: {output_path}")
# 发送邮件
send_email(date, results, output_path, to_override=to_override)
return results
HELP_TEXT = """政府采购商机邮件发送工具
用法:
govb-email [选项]
选项:
--date DATE 日期,格式 YYYY-MM-DD
--keywords WORDS 关键词,逗号分隔,如: 模型,仿真,AI
--to ADDRESS 测试发送至指定收件人
--today 发送今日报告
--help 显示此帮助信息
示例:
govb-email # 发送昨日报告(默认)
govb-email --today # 发送今日报告
govb-email --date 2026-03-23 # 发送指定日期报告
govb-email --keywords "模型,仿真" # 使用自定义关键词
govb-email --to [email protected] # 测试发送
"""
def main():
"""
CLI 入口函数。
支持的参数:
--date DATE: 指定日期,格式 YYYY-MM-DD
--keywords WORDS: 关键词,逗号分隔
--to ADDRESS: 测试发送至指定收件人
--today: 发送今日报告
--help: 显示帮助信息
"""
import argparse
import fcntl
from datetime import datetime
parser = argparse.ArgumentParser(
description='政府采购商机邮件发送工具',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=HELP_TEXT
)
parser.add_argument('--date', type=str, help='日期,格式 YYYY-MM-DD')
parser.add_argument('--keywords', type=str, help='关键词,逗号分隔')
parser.add_argument('--to', type=str, help='测试发送至指定收件人(覆盖配置)')
parser.add_argument('--today', action='store_true', help='发送今日报告')
args = parser.parse_args()
# --help 时显示帮助
if '--help' in sys.argv:
print(HELP_TEXT)
sys.exit(0)
# 确定日期
date = args.date
if args.today and date is None:
date = datetime.now().strftime('%Y-%m-%d')
# 锁文件防止并发执行
LOCK_FILE = '/tmp/govb_bidding_email.lock'
try:
lock_fd = open(LOCK_FILE, 'w')
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except (IOError, OSError):
print("[INFO] 已有任务在运行,跳过本次执行")
sys.exit(0)
# 解析关键词
keywords = None
if args.keywords:
keywords = args.keywords.split(',')
# 发送报告
try:
send_bidding_report(date=date, keywords=keywords, to_override=args.to)
finally:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
lock_fd.close()
try:
os.remove(LOCK_FILE)
except Exception:
pass
if __name__ == '__main__':
main()
FILE:pyproject.toml
[project]
name = "govb-email"
version = "0.1.0"
description = "政府采购商机邮件发送工具"
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"requests",
]
[project.scripts]
govb-email = "govb_email.fetcher:main"
[tool.setuptools.packages.find]
where = ["."]
地方政府采购商机自动抓取工具(非军队采购)。从北京中建云智、湖南政府采购网等地方政府采购平台抓取招标公告,按关键词过滤,补全采购人、代理机构、预算、时间节点等详情,生成 Excel 报表。与 milb-fetcher(军队采购)互补,本工具专注地方政府渠道。
---
name: govb-fetcher
description: 地方政府采购商机自动抓取工具(非军队采购)。从北京中建云智、湖南政府采购网等地方政府采购平台抓取招标公告,按关键词过滤,补全采购人、代理机构、预算、时间节点等详情,生成 Excel 报表。与 milb-fetcher(军队采购)互补,本工具专注地方政府渠道。
metadata: {"openclaw":{"emoji":"🏛️","requires":{govb-fetcher},"install":"uv pip install -e {baseDir}"}}
---
# govb-fetcher
从多个地方政府采购平台自动抓取招标公告,关键词过滤后生成 Excel 报表。
## 快速使用
- `/govb-fetcher` → 抓取今日数据(默认)
- `/govb-fetcher --help` → 显示帮助信息
日期选择(三选一,不指定则默认今日)
- `/govb-fetcher --today` → 抓取今日
- `/govb-fetcher --yesterday` → 抓取昨日
- `/govb-fetcher --date 2026-03-30` → 抓取指定日期
输出控制
- `--no-detail` → 仅输出列表字段(更快,跳过详情接口,Excel 只保留有值的列)
- `--output /path/to/file.xlsx` → 指定输出路径
筛选参数
- `--keywords "关键词1,关键词2"` → 覆盖核心关键词
- `--exclude-keywords "词1,词2"` → 覆盖排除关键词
- `--high-value-keywords "词1,词2"` → 覆盖高价值关键词(影响推荐等级)
凭证更新
- `/govb-fetcher --set-cookie --source bjzc --bearer "Bearer xxx" --session "YGCG_TBSESSION=xxx; JSESSIONID=xxx; jcloud_alb_route=xxx"`
## 数据源
| 标识 | 平台 | 认证 |
|------|------|------|
| `bjzc` | 北京中建云智政府采购网 | 需 Cookie + Bearer |
| `hnzc` | 湖南政府采购网 | 免认证 |
## 推荐等级
- **高**:命中高价值关键词(模型/仿真/数据/AI/软件等)或含「意向」
- **中**:命中「系统」关键词
- **空**:其他匹配项
## 触发词
政府采购商机、地方政府采购、地方招标、北京政府采购、湖南政府采购、政府商机
## 配置文件
配置文件搜索顺序(高优先级在前):
1. 当前目录 `.env`
2. `~/.config/govb-fetcher/.env`
| 环境变量 | 用途 |
|---------|------|
| `FETCHER_BJZC_BEARER_TOKEN` | 北京政采 Bearer token |
| `FETCHER_BJZC_TBSESSION` | 北京政采 YGCG_TBSESSION(自动刷新) |
| `FETCHER_BJZC_JSESSIONID` | 北京政采 JSESSIONID |
| `FETCHER_BJZC_ALB_ROUTE` | 北京政采负载均衡路由 |
| `FETCHER_KEYWORDS` | 核心关键词,逗号分隔 |
| `FETCHER_EXCLUDE_KEYWORDS` | 排除关键词,逗号分隔 |
| `FETCHER_HIGH_VALUE_KEYWORDS` | 高价值关键词,逗号分隔 |
| `FETCHER_OUTPUT_DIR` | Excel 输出目录(默认 `~/.openclaw/workspace/govb-bidding`)|
| `FETCHER_USE_PROXY` | 是否启用代理,`true` / `false`(默认 `false`)|
| `FETCHER_PROXY` | 代理地址,格式 `http://user:pass@host:port` |
FILE:CLAUDE.md
# govb-fetcher
地方政府采购商机抓取工具,openClaw CLI skill 插件。
## 安装
```bash
uv pip install -e .
```
## 常用命令
```bash
govb-fetcher --today # 抓取今日,含详情补全
govb-fetcher --today --no-detail # 快速预览,只调列表接口
govb-fetcher --date 2026-03-28 # 指定日期
govb-fetcher --set-cookie --source bjzc \
--bearer "Bearer xxx" \
--session "YGCG_TBSESSION=xxx; JSESSIONID=xxx; jcloud_alb_route=xxx"
```
## 项目结构
```
govb_fetcher/
├── config.py # 配置加载(.env 优先级、关键词、凭证读取)
└── fetcher.py # 核心逻辑(抓取、过滤、详情补全、Excel 输出、CLI)
SKILL.md # openClaw skill 定义(触发词、用法说明)
.env.example # 配置模板
```
## 数据源
### 北京中建云智(bjzc)
- 列表接口:`POST /gt-jy-toubiao/api/cggg/gonggao/queryZBGongGaoList.do`
- 详情接口①:`GET /cggg/gonggao/queryGgBdList.do?ggGuid=xxx`(分包+时间)
- 详情接口②:`GET /cggg/gonggao/queryPurchaserInfo.do?gcGuid=xxx`(采购人+代理机构)
- **需要 Cookie + Bearer**,凭证通过 `--set-cookie` 写入 `~/.config/govb-fetcher/.env`
- 每次请求后服务器下发的新 `YGCG_TBSESSION` 自动写回 `.env`
### 湖南政府采购网(hnzc)
- 列表接口:`POST http://www.ccgp-hunan.gov.cn/mvc/getNoticeList4Web.do`
- 详情接口:`GET /mvc/viewNoticeContent.do?noticeId=xxx`(返回 HTML,regex 解析)
- **免认证**,无需配置凭证
## 过滤体系
```
列表全量抓取
↓
排除关键词过滤(FETCHER_EXCLUDE_KEYWORDS)→ 丢弃
↓
核心关键词过滤(FETCHER_KEYWORDS)→ 不命中则丢弃
↓
仅对命中记录调详情接口(--no-detail 跳过此步)
↓
推荐评级 + 备注生成 → Excel
```
## 配置系统
变量命名规范:`FETCHER_{SOURCE}_{PARAM}`
- 通用参数:`FETCHER_KEYWORDS` / `FETCHER_EXCLUDE_KEYWORDS` / `FETCHER_HIGH_VALUE_KEYWORDS` / `FETCHER_OUTPUT_DIR`
- 代理:`FETCHER_USE_PROXY`(`true`/`false`,默认 `false`)/ `FETCHER_PROXY`(格式 `http://user:pass@host:port`)
- 北京政采凭证:`FETCHER_BJZC_BEARER_TOKEN` / `FETCHER_BJZC_TBSESSION` / `FETCHER_BJZC_JSESSIONID` / `FETCHER_BJZC_ALB_ROUTE`
优先级(高→低):环境变量 > 当前目录 `.env` > `~/.config/govb-fetcher/.env` > 硬编码默认值(仅关键词)
## 新增数据源步骤
1. **`fetcher.py`**:新增 `_build_{src}_session()`、`_fetch_{src}_page()`、`_fetch_{src}_detail()`、`fetch_{src}_bidding()` 四个函数
2. **`fetch_all_bidding()`**:追加调用并加入返回 dict(key = Sheet 名)
3. **`SOURCE_COOKIE_MAP`**:若需认证,在注册表中添加一条(key = `--source` 标识)
4. **`config.py`**:新增 `get_{src}_xxx()` 凭证读取函数(若需认证)
5. **`.env.example`**:补充凭证变量示例
## Excel 输出
- 文件名:`政府采购商机汇总_{日期}.xlsx`
- 每个数据源一个 Sheet
- `--no-detail` 模式:自动去掉全为空的列,保持表格整洁
- 推荐等级高(黄色)/中(蓝色)行着色
FILE:govb_fetcher/__init__.py
FILE:govb_fetcher/config.py
"""
配置加载模块。
命名规范:FETCHER_{SOURCE}_{PARAM}
- 通用参数:FETCHER_KEYWORDS / FETCHER_EXCLUDE_KEYWORDS / ...
- 北京中建云智:FETCHER_BJZC_BEARER_TOKEN / FETCHER_BJZC_TBSESSION / ...
- 后续新增数据源:FETCHER_CCGP_xxx / FETCHER_BJGP_xxx / ...
优先级(高→低):
1. 当前运行目录的 .env
2. ~/.config/govb-fetcher/.env
3. 硬编码默认值(仅关键词等非敏感配置)
"""
import os
import re
import threading
from pathlib import Path
from typing import Optional
DEFAULTS = {
'FETCHER_KEYWORDS': '体系,模型,仿真,数据,决策,规划,分析,智能,AI,软件,系统,信息,算法,效能',
'FETCHER_EXCLUDE_KEYWORDS': '体能,训练鞋,鞋类,服装,被装,医疗,药品,器械,膝关节,光纤,电梯,物业,绿化,装修,工程,建材,食材,食品,副食',
'FETCHER_HIGH_VALUE_KEYWORDS': '模型,仿真,数据,决策,分析,智能,AI,软件,意向',
'FETCHER_OUTPUT_DIR': '~/.openclaw/workspace/govb-bidding',
'FETCHER_USE_PROXY': 'false',
'FETCHER_PROXY': '',
# 凭证类不设默认值,必须由用户通过 --set-cookie 或 .env 提供
}
_config_cache: dict = {}
_env_lock = threading.Lock()
def _load_env_file(path: Path) -> dict:
result = {}
if not path.exists():
return result
for line in path.read_text(encoding='utf-8').splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
k, _, v = line.partition('=')
result[k.strip()] = v.strip()
return result
def load_config() -> dict:
global _config_cache
if _config_cache:
return _config_cache
cfg = dict(DEFAULTS)
# 全局配置(低优先级)
global_env = Path.home() / '.config' / 'govb-fetcher' / '.env'
cfg.update(_load_env_file(global_env))
# 局部配置(高优先级)
local_env = Path.cwd() / '.env'
cfg.update(_load_env_file(local_env))
# 环境变量最高优先级
for k in list(cfg.keys()) + list(os.environ.keys()):
if k.startswith('FETCHER_') and k in os.environ:
cfg[k] = os.environ[k]
_config_cache = cfg
return cfg
def get_config(key: str, default: str = '') -> str:
return load_config().get(key, default)
# ── 通用关键词配置 ──────────────────────────────
def get_keywords() -> list:
return [k.strip() for k in get_config('FETCHER_KEYWORDS').split(',') if k.strip()]
def get_exclude_keywords() -> list:
return [k.strip() for k in get_config('FETCHER_EXCLUDE_KEYWORDS').split(',') if k.strip()]
def get_high_value_keywords() -> list:
return [k.strip() for k in get_config('FETCHER_HIGH_VALUE_KEYWORDS').split(',') if k.strip()]
def get_output_dir() -> Path:
return Path(get_config('FETCHER_OUTPUT_DIR')).expanduser()
def get_proxies() -> Optional[dict]:
"""返回 requests 用的 proxies 字典,FETCHER_USE_PROXY=true 时生效,否则返回 None。"""
use_proxy = get_config('FETCHER_USE_PROXY', 'false').lower() == 'true'
if not use_proxy:
return None
proxy_url = get_config('FETCHER_PROXY')
if not proxy_url:
return None
return {'http': proxy_url, 'https': proxy_url}
# ── 北京中建云智(BJZC)凭证 ───────────────────
def get_bjzc_bearer_token() -> str:
return get_config('FETCHER_BJZC_BEARER_TOKEN')
def get_bjzc_tbsession() -> str:
return get_config('FETCHER_BJZC_TBSESSION')
def get_bjzc_jsessionid() -> str:
return get_config('FETCHER_BJZC_JSESSIONID')
def get_bjzc_alb_route() -> str:
return get_config('FETCHER_BJZC_ALB_ROUTE')
# ── .env 文件操作 ───────────────────────────────
def get_env_path() -> Path:
"""返回当前生效的 .env 文件路径(优先局部,其次全局)。"""
local = Path.cwd() / '.env'
if local.exists():
return local
return Path.home() / '.config' / 'govb-fetcher' / '.env'
def save_to_env(updates: dict) -> Path:
"""
将 updates 中的 key=value 写入 .env(存在则替换,不存在则追加)。
不影响其他已有配置项。返回写入的文件路径。线程安全。
"""
with _env_lock:
return _save_to_env_unsafe(updates)
def _save_to_env_unsafe(updates: dict) -> Path:
env_path = get_env_path()
if not env_path.exists():
example = Path(__file__).parent.parent / '.env.example'
env_path.parent.mkdir(parents=True, exist_ok=True)
if example.exists():
env_path.write_text(example.read_text(encoding='utf-8'), encoding='utf-8')
else:
env_path.write_text('', encoding='utf-8')
lines = env_path.read_text(encoding='utf-8').splitlines(keepends=True)
updated_keys = set()
for i, line in enumerate(lines):
for k, v in updates.items():
if re.match(rf'^\s*{re.escape(k)}\s*=', line):
lines[i] = f'{k}={v}\n'
updated_keys.add(k)
for k, v in updates.items():
if k not in updated_keys:
lines.append(f'{k}={v}\n')
env_path.write_text(''.join(lines), encoding='utf-8')
# 清缓存,下次 get_config 重新读取
global _config_cache
_config_cache = {}
return env_path
FILE:govb_fetcher/fetcher.py
"""
govb-fetcher 核心抓取逻辑。
数据源:
- 北京中建云智政府采购网(zbcg-bjzc.zhongcy.com)
"""
import argparse
import os
import re
import sys
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, date, timedelta
from pathlib import Path
from typing import Optional
import requests
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from govb_fetcher.config import (
get_keywords, get_exclude_keywords, get_high_value_keywords,
get_bjzc_bearer_token, get_bjzc_tbsession, get_bjzc_jsessionid, get_bjzc_alb_route,
get_output_dir, save_to_env, get_proxies,
)
_DETAIL_WORKERS = 5 # 详情并发数,避免触发服务端限速
_PRINT_LOCK = threading.Lock()
BASE_URL = 'http://zbcg-bjzc.zhongcy.com/gt-jy-toubiao/api'
DETAIL_BASE = 'http://zbcg-bjzc.zhongcy.com/bjczj-jy-toubiao/index.html'
HNZC_LIST_URL = 'http://www.ccgp-hunan.gov.cn/mvc/getNoticeList4Web.do'
HNZC_DETAIL_URL = 'http://www.ccgp-hunan.gov.cn/mvc/viewNoticeContent.do'
HNZC_PAGE_URL = 'http://www.ccgp-hunan.gov.cn/page/notice/notice.jsp'
# ──────────────────────────────────────────────
# Session 管理
# ──────────────────────────────────────────────
def _build_session() -> requests.Session:
session = requests.Session()
proxies = get_proxies()
if proxies:
session.proxies.update(proxies)
session.cookies.update({
'YGCG_TBSESSION': get_bjzc_tbsession(),
'JSESSIONID': get_bjzc_jsessionid(),
'jcloud_alb_route': get_bjzc_alb_route(),
})
session.headers.update({
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Authorization': f'Bearer {get_bjzc_bearer_token()}',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded',
'Origin': 'http://zbcg-bjzc.zhongcy.com',
'Pragma': 'no-cache',
'Referer': 'http://zbcg-bjzc.zhongcy.com/bjczj-jy-toubiao/index.html',
'User-Agent': (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/146.0.0.0 Safari/537.36'
),
'contentType': 'formType',
})
return session
def _refresh_session_cookie(session: requests.Session) -> None:
"""清除重复的 YGCG_TBSESSION,保留最新值,并同步写回 .env。"""
values = [c.value for c in session.cookies if c.name == 'YGCG_TBSESSION']
if len(values) > 1:
latest = values[-1]
others = [c for c in session.cookies if c.name != 'YGCG_TBSESSION']
session.cookies.clear()
for c in others:
session.cookies.set(c.name, c.value)
session.cookies.set('YGCG_TBSESSION', latest)
# 自动写回 .env,保证下次启动时使用最新值
try:
save_to_env({'FETCHER_BJZC_TBSESSION': latest})
except Exception:
pass
def _t() -> int:
return int(time.time())
# ──────────────────────────────────────────────
# 北京中建云智政府采购网
# ──────────────────────────────────────────────
def _fetch_bjzc_page(session: requests.Session, page: int, rows: int = 100) -> dict:
url = f'{BASE_URL}/cggg/gonggao/queryZBGongGaoList.do'
data = {
'ggName': '', 'gcBH': '', 'gcName': '',
'bdBH': '', 'bdName': '', 'xmStatus': '',
'page': str(page), 'rows': str(rows),
}
resp = session.post(url, data=data, timeout=15)
_refresh_session_cookie(session)
return resp.json()
def _fetch_bjzc_gg_bd_list(session: requests.Session, gg_guid: str) -> list:
"""
获取公告下的分包列表(queryGgBdList)。
每条记录含 bdGuid、文件获取时间、开标时间等关键字段。
"""
url = f'{BASE_URL}/cggg/gonggao/queryGgBdList.do'
params = {'_t': _t(), 'ggGuid': gg_guid}
try:
resp = session.get(url, params=params, timeout=15)
_refresh_session_cookie(session)
data = resp.json()
if isinstance(data, dict):
# 响应结构:{"data": {"ggBdList": [...]}}
inner = data.get('data', {})
if isinstance(inner, dict):
bd_list = inner.get('ggBdList', [])
if isinstance(bd_list, list):
return bd_list
except Exception as e:
print(f' [warn] 获取分包列表失败 ggGuid={gg_guid}: {e}')
return []
def _fetch_bjzc_purchaser_info(session: requests.Session, gc_guid: str) -> dict:
"""
获取采购人及代理机构信息(queryPurchaserInfo)。
确认字段:zbRName、lianXiRenPhone、zbDLName、zbDLZBFuZeRenMobile/zbDLZBFuZeRenPhone
"""
url = f'{BASE_URL}/cggg/gonggao/queryPurchaserInfo.do'
params = {'_t': _t(), 'gcGuid': gc_guid}
try:
resp = session.get(url, params=params, timeout=15)
_refresh_session_cookie(session)
data = resp.json()
if isinstance(data, dict) and isinstance(data.get('data'), dict):
return data['data']
except Exception as e:
print(f' [warn] 获取采购人信息失败 gcGuid={gc_guid}: {e}')
return {}
def _fetch_bjzc_project_overview(session: requests.Session, gc_guid: str, gg_guid: str) -> str:
"""
获取项目概况文本(projectOverview)。
尝试字段:ggNeiRong、zbGkfw、zbRequire(按优先级取第一个非空值)
"""
url = f'{BASE_URL}/cggg/gonggao/projectOverview.do'
params = {'_t': _t(), 'gcGuid': gc_guid, 'ggGuid': gg_guid}
try:
resp = session.get(url, params=params, timeout=15)
_refresh_session_cookie(session)
data = resp.json()
inner = data.get('data', {}) if isinstance(data, dict) else {}
text = ''
for field in ('ggNeiRong', 'zbGkfw', 'zbRequire', 'bgContent'):
val = inner.get(field)
if isinstance(val, str) and val.strip():
text = val.strip()
break
text = re.sub(r'<[^>]+>', '', text)
text = re.sub(r'&[a-zA-Z#0-9]+;', ' ', text)
text = re.sub(r'\s+', ' ', text).strip()
return text[:100] if text else ''
except Exception as e:
print(f' [warn] 获取项目概况失败 gcGuid={gc_guid}: {e}')
return ''
def _build_detail_url(bd_guid: str, gc_guid: str, gg_guid: str) -> str:
return (
f'{DETAIL_BASE}#/steps/noticepageyg'
f'?bdGuid={bd_guid}&gcGuid={gc_guid}&ggGuid={gg_guid}'
)
def _ts_to_date(ts_ms: Optional[int]) -> str:
if not ts_ms:
return ''
try:
return datetime.fromtimestamp(ts_ms / 1000).strftime('%Y-%m-%d')
except Exception:
return ''
def _ts_to_datetime(ts_ms) -> str:
if not ts_ms:
return ''
try:
return datetime.fromtimestamp(int(ts_ms) / 1000).strftime('%Y-%m-%d %H:%M')
except Exception:
return str(ts_ms)
def _extract_field(d: dict, *keys) -> str:
"""从字典中按优先顺序取第一个非空字符串值。"""
for k in keys:
v = d.get(k)
if v is not None and str(v).strip() and str(v).strip() not in ('null', 'None', '0'):
return str(v).strip()
return ''
# ──────────────────────────────────────────────
# 湖南政府采购网(HNZC)
# ──────────────────────────────────────────────
def _build_hnzc_session() -> requests.Session:
session = requests.Session()
proxies = get_proxies()
if proxies:
session.proxies.update(proxies)
session.headers.update({
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'User-Agent': (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/146.0.0.0 Safari/537.36'
),
'X-Requested-With': 'XMLHttpRequest',
})
return session
def _fetch_hnzc_page(session: requests.Session, target_date: str, page: int, page_size: int = 100) -> dict:
data = {
'nType': 'prcmNotices', 'pType': '', 'prcmPrjName': '',
'prcmItemCode': '', 'prcmOrgName': '', 'startDate': target_date,
'endDate': target_date, 'prcmPlanNo': '',
'page': str(page), 'pageSize': str(page_size),
}
resp = session.post(HNZC_LIST_URL, data=data, timeout=15)
return resp.json()
def _fetch_hnzc_detail(session: requests.Session, notice_id, notice_category_code: str) -> dict:
"""抓取湖南政采详情 HTML,用 regex 提取关键字段。"""
try:
resp = session.get(
HNZC_DETAIL_URL,
params={'noticeId': notice_id, 'area_id': '', 'isKJXY': 'null'},
timeout=15,
)
resp.encoding = 'utf-8'
html = resp.text
# 去掉 script/style 块
html = re.sub(r'<(script|style)[^>]*>.*?</\1>', '', html, flags=re.S)
# 去掉所有 HTML 标签
text = re.sub(r'<[^>]+>', '', html)
# 合并连续空白(保留换行以便定位章节)
text = re.sub(r'[ \t]+', ' ', text)
def _rex(pattern, default=''):
m = re.search(pattern, text, re.S)
return m.group(1).strip() if m else default
# 采购预算(两种写法均有)
budget = _rex(r'(?:采购项目预算|采购预算)[::]\s*([\d,]+)元')
# 文件获取时间(第五章节 "有意参加投标者,于...至...")
file_start = _rex(r'于(\d{4}年\d{1,2}月\d{1,2}日)\s*至')
file_end = _rex(r'于.+?至(\d{4}年\d{1,2}月\d{1,2}日)')
# 开标时间(第六章节 "3、开标时间:")
open_bid = _rex(r'开标时间[::]\s*(\d{4}年\d{1,2}月\d{1,2}日\s*\d{2}:\d{2})')
# 采购人电话(十一章 采购人信息 (5)电话)
purchaser_phone = _rex(r'1、采购人信息.+?(5)电\s+话[::]\s*([^\s(\n]+)')
# 代理机构名称
agency_name = _rex(r'2、采购代理机构信息.+?(1)名\s+称[::]\s*([^\n(]+)')
# 代理机构电话
agency_phone = _rex(r'2、采购代理机构信息.+?(5)电\s+话[::]\s*([^\s(\n]+)')
return {
'budget': budget,
'file_start': file_start,
'file_end': file_end,
'open_bid': open_bid,
'purchaser_phone': purchaser_phone,
'agency_name': agency_name.strip(),
'agency_phone': agency_phone,
}
except Exception as e:
print(f' [warn] 获取湖南详情失败 noticeId={notice_id}: {e}')
return {}
def fetch_hnzc_bidding(
session: requests.Session,
target_date: str,
keywords: list,
exclude_kw: list,
high_value_kw: list,
fetch_detail: bool = True,
) -> list:
"""抓取湖南政府采购网指定日期的公告,先过滤关键词,再对匹配项查详情。"""
import math
print(f'[hnzc] 抓取日期 {target_date},逐页获取列表...')
# 1. 翻页抓取
raw_rows = []
page = 1
total_pages = 1
while page <= total_pages:
print(f' 第 {page} 页...', end=' ', flush=True)
try:
result = _fetch_hnzc_page(session, target_date, page)
except Exception as e:
print(f'请求失败: {e},停止')
break
rows = result.get('rows') or []
total = result.get('total') or 0
total_pages = math.ceil(total / 100) if total > 0 else 1
print(f'{len(rows)} 条 (共 {total} 条)')
raw_rows.extend(rows)
if len(rows) < 100 or page >= total_pages:
break
page += 1
print(f'[hnzc] 当日原始记录 {len(raw_rows)} 条,开始关键词过滤...')
# 2. 关键词过滤
filtered = []
for row in raw_rows:
title = row.get('NOTICE_TITLE', '') or ''
if any(ex in title for ex in exclude_kw):
continue
matched = [kw for kw in keywords if kw in title]
if not matched:
continue
row['_matched_kw'] = matched
filtered.append(row)
print(f'[hnzc] 过滤后剩余 {len(filtered)} 条,{"开始补全详情..." if fetch_detail else "跳过详情补全"}')
# 3. 构建基础记录
def _build_base(row: dict) -> dict:
notice_id = row.get('NOTICE_ID', '')
cat_code = row.get('NOTICE_CATEGORY_CODE', '')
title = row.get('NOTICE_TITLE', '')
matched_kw = row.get('_matched_kw', [])
detail_link = f'{HNZC_PAGE_URL}?noticeId={notice_id}¬iceTypeCode={cat_code}'
return {
'项目名称': title,
'标段名称': '',
'招标方式': row.get('PRCM_MODE_NAME', ''),
'合同估价(元)': '',
'文件获取开始时间': '',
'文件获取截止时间': '',
'开标时间': '',
'采购人': row.get('ORG_NAME', ''),
'采购人电话': '',
'代理机构': '',
'详情链接': detail_link,
'发布日期': row.get('NEWWORK_DATE', ''),
'匹配关键词': ','.join(matched_kw),
'推荐等级': _get_recommendation(title, matched_kw, high_value_kw),
'备注': _generate_remarks(title, matched_kw),
}
if not fetch_detail:
return [_build_base(row) for row in filtered]
# 并行补全详情
def _fetch_one(args):
i, row = args
notice_id = row.get('NOTICE_ID', '')
cat_code = row.get('NOTICE_CATEGORY_CODE', '')
title = row.get('NOTICE_TITLE', '')
record = _build_base(row)
d = _fetch_hnzc_detail(session, notice_id, cat_code)
record.update({
'合同估价(元)': d.get('budget', ''),
'文件获取开始时间': d.get('file_start', ''),
'文件获取截止时间': d.get('file_end', ''),
'开标时间': d.get('open_bid', ''),
'采购人电话': d.get('purchaser_phone', ''),
'代理机构': d.get('agency_name', ''),
})
with _PRINT_LOCK:
print(f' [{i + 1}/{len(filtered)}] 完成: {title[:30]}...')
return i, record
ordered: list = [None] * len(filtered)
with ThreadPoolExecutor(max_workers=_DETAIL_WORKERS) as executor:
for i, record in executor.map(_fetch_one, enumerate(filtered)):
ordered[i] = record
return [r for r in ordered if r is not None]
# ──────────────────────────────────────────────
# 关键词过滤与评级
# ──────────────────────────────────────────────
def _filter_by_keywords(rows: list, keywords: list, exclude_kw: list) -> list:
results = []
for row in rows:
title = row.get('ggName', '') or ''
bd_name = row.get('bdNames', '') or ''
combined = title + bd_name
if any(ex in combined for ex in exclude_kw):
continue
matched = [kw for kw in keywords if kw in combined]
if not matched:
continue
row['_matched_kw'] = matched
results.append(row)
return results
def _get_recommendation(title: str, matched_kw: list, high_value_kw: list) -> str:
if '意向' in title:
return '高'
if any(kw in matched_kw for kw in high_value_kw):
return '高'
if '系统' in matched_kw:
return '中'
return '低'
def _generate_remarks(title: str, matched_kw: list) -> str:
remarks = []
if '意向' in title:
remarks.append('采购意向前期可跟进')
if '体系' in title:
remarks.append('体系设计相关')
if '模型' in title:
remarks.append('模型开发可参与')
if '仿真' in title:
remarks.append('仿真项目可做')
if '数据' in title:
remarks.append('数据分析需求')
if 'AI' in title or '智能' in title:
remarks.append('AI智能项目')
if '软件' in title:
remarks.append('软件定制开发')
if '系统' in title:
remarks.append('系统集成类')
if '算法' in title:
remarks.append('算法研发类')
if '平台' in title:
remarks.append('平台建设类')
if '决策' in title or '规划' in title:
remarks.append('决策规划类')
if not remarks:
remarks.append('可关注')
return '; '.join(remarks[:2])
# ──────────────────────────────────────────────
# 北京中建云智 主抓取流程
# ──────────────────────────────────────────────
def fetch_bjzc_bidding(
session: requests.Session,
target_date: str,
keywords: list,
exclude_kw: list,
high_value_kw: list,
fetch_detail: bool = True,
) -> list:
"""
抓取北京中建云智政府采购网指定日期的公告,
先过滤关键词,再对匹配项查询详情。
"""
# 日期时间戳范围
d = datetime.strptime(target_date, '%Y-%m-%d')
day_start = int(datetime(d.year, d.month, d.day, 0, 0, 0).timestamp() * 1000)
day_end = int(datetime(d.year, d.month, d.day, 23, 59, 59).timestamp() * 1000)
# 1. 抓取当日列表
print(f'[bjzc] 抓取日期 {target_date},逐页获取列表...')
raw_rows = []
page = 1
while True:
print(f' 第 {page} 页...', end=' ', flush=True)
result = _fetch_bjzc_page(session, page)
if not result.get('success') or not result.get('data', {}).get('rows'):
print('无数据,停止')
break
rows = result['data']['rows']
print(f'{len(rows)} 条')
stop = False
for row in rows:
ts = row.get('ggStartTime')
if ts is None:
continue
if ts < day_start:
stop = True
break
if day_start <= ts <= day_end:
raw_rows.append(row)
if stop or len(rows) < 100:
break
page += 1
print(f'[bjzc] 当日原始记录 {len(raw_rows)} 条,开始关键词过滤...')
# 2. 关键词过滤
filtered = _filter_by_keywords(raw_rows, keywords, exclude_kw)
print(f'[bjzc] 过滤后剩余 {len(filtered)} 条,{"开始补全详情..." if fetch_detail else "跳过详情补全"}')
# 3. 详情补全
if not fetch_detail:
results = []
for row in filtered:
title = row.get('ggName', '')
matched_kw = row.get('_matched_kw', [])
results.append({
'项目名称': title,
'标段名称': row.get('bdNames', ''),
'招标方式': row.get('zbFangShiName', ''),
'合同估价(元)': '',
'文件获取开始时间': '',
'文件获取截止时间': '',
'开标时间': '',
'采购人': '',
'采购人电话': '',
'代理机构': '',
'详情链接': '',
'发布日期': _ts_to_date(row.get('ggStartTime')),
'匹配关键词': ','.join(matched_kw),
'推荐等级': _get_recommendation(title, matched_kw, high_value_kw),
'备注': _generate_remarks(title, matched_kw),
})
return results
# 串行补全详情(bjzc 使用滚动式 session cookie,并发请求会导致旧 token 失效,字段为空)
results = []
for idx, row in enumerate(filtered, 1):
gg_guid = row.get('ggGuid', '')
gc_guid = row.get('gcGuid', '')
title = row.get('ggName', '')
matched_kw = row.get('_matched_kw', [])
publish_date = _ts_to_date(row.get('ggStartTime'))
base_record = {
'项目名称': title,
'标段名称': row.get('bdNames', ''),
'招标方式': row.get('zbFangShiName', ''),
'合同估价(元)': '',
'文件获取开始时间': '',
'文件获取截止时间': '',
'开标时间': '',
'采购人': '',
'采购人电话': '',
'代理机构': '',
'详情链接': '',
'发布日期': publish_date,
'匹配关键词': ','.join(matched_kw),
'推荐等级': _get_recommendation(title, matched_kw, high_value_kw),
'备注': _generate_remarks(title, matched_kw),
}
print(f' [{idx}/{len(filtered)}] 补全详情: {title[:30]}...')
# Step1: queryGgBdList — 分包列表 + 文件获取时间 + 开标时间
bd_list = _fetch_bjzc_gg_bd_list(session, gg_guid)
if not bd_list:
bd_list = [{}]
# Step2: queryPurchaserInfo — 采购人 + 代理机构(每个公告只查一次)
purchaser_info = _fetch_bjzc_purchaser_info(session, gc_guid) if gc_guid else {}
purchaser_name = purchaser_info.get('zbRName', '')
purchaser_phone = purchaser_info.get('lianXiRenPhone', '')
agency_name = purchaser_info.get('zbDLName', '')
agency_phone = (purchaser_info.get('zbDLZBFuZeRenMobile')
or purchaser_info.get('zbDLZBFuZeRenPhone') or '')
for bd in bd_list:
bd_guid = bd.get('bdGuid', '')
file_start = _ts_to_datetime(bd.get('zbWJHuoQuStartTime'))
file_end = _ts_to_datetime(bd.get('zbWJHuoQuEndTime'))
open_bid = _ts_to_datetime(bd.get('kbTime'))
detail_url = _build_detail_url(bd_guid, gc_guid, gg_guid) if bd_guid else ''
record = dict(base_record)
contract_price = bd.get('bdHeTongGuJia')
# 接口返回单位为分,转换为元并格式化为逗号分隔
if contract_price:
try:
yuan = int(contract_price) // 100
contract_price_fmt = f'{yuan:,}'
except (ValueError, TypeError):
contract_price_fmt = str(contract_price)
else:
contract_price_fmt = ''
record.update({
'标段名称': bd.get('bdName') or base_record['标段名称'],
'合同估价(元)': contract_price_fmt,
'文件获取开始时间': file_start,
'文件获取截止时间': file_end,
'开标时间': open_bid,
'采购人': purchaser_name,
'采购人电话': purchaser_phone,
'代理机构': agency_name,
'详情链接': detail_url,
})
results.append(record)
return results
# ──────────────────────────────────────────────
# 汇总入口
# ──────────────────────────────────────────────
def fetch_all_bidding(
target_date: str,
keywords: list = None,
exclude_kw: list = None,
high_value_kw: list = None,
fetch_detail: bool = True,
) -> dict:
keywords = keywords or get_keywords()
exclude_kw = exclude_kw or get_exclude_keywords()
high_value_kw = high_value_kw or get_high_value_keywords()
def _run_bjzc():
return fetch_bjzc_bidding(
_build_session(), target_date, keywords, exclude_kw, high_value_kw, fetch_detail
)
def _run_hnzc():
return fetch_hnzc_bidding(
_build_hnzc_session(), target_date, keywords, exclude_kw, high_value_kw, fetch_detail
)
with ThreadPoolExecutor(max_workers=2) as executor:
f_bjzc = executor.submit(_run_bjzc)
f_hnzc = executor.submit(_run_hnzc)
bjzc_results = f_bjzc.result()
hnzc_results = f_hnzc.result()
return {
'北京政采': bjzc_results,
'湖南政采': hnzc_results,
}
# ──────────────────────────────────────────────
# Excel 输出
# ──────────────────────────────────────────────
COLUMNS = [
('序号', 8),
('项目名称', 55),
('标段名称', 40),
('招标方式', 12),
('合同估价(元)', 15),
('文件获取开始时间', 18),
('文件获取截止时间', 18),
('开标时间', 18),
('采购人', 25),
('采购人电话', 15),
('代理机构', 25),
('详情链接', 80),
('发布日期', 12),
('匹配关键词', 20),
('推荐等级', 10),
('备注', 30),
]
_HEADER_FONT = Font(bold=True, color='FFFFFF')
_HEADER_FILL = PatternFill(start_color='4472C4', end_color='4472C4', fill_type='solid')
_HEADER_ALIGN = Alignment(horizontal='center', vertical='center', wrap_text=True)
_CELL_ALIGN = Alignment(vertical='center', wrap_text=True)
_THIN_BORDER = Border(
left=Side(style='thin'), right=Side(style='thin'),
top=Side(style='thin'), bottom=Side(style='thin'),
)
_HIGH_FILL = PatternFill(start_color='FFE699', end_color='FFE699', fill_type='solid')
_MID_FILL = PatternFill(start_color='D9E1F2', end_color='D9E1F2', fill_type='solid')
_LOW_FILL = PatternFill(start_color='E2EFDA', end_color='E2EFDA', fill_type='solid')
def _active_columns(rows: list) -> list:
"""返回至少有一行有值的列(序号列始终保留)。用于 --no-detail 模式。"""
result = []
for col_name, col_width in COLUMNS:
if col_name == '序号':
result.append((col_name, col_width))
continue
if any(record.get(col_name) for record in rows):
result.append((col_name, col_width))
return result
def _write_sheet(ws, rows: list, columns: list = None) -> None:
cols = columns if columns is not None else COLUMNS
# 表头
for col_idx, (col_name, col_width) in enumerate(cols, 1):
cell = ws.cell(row=1, column=col_idx, value=col_name)
cell.font = _HEADER_FONT
cell.fill = _HEADER_FILL
cell.alignment = _HEADER_ALIGN
cell.border = _THIN_BORDER
ws.column_dimensions[cell.column_letter].width = col_width
ws.row_dimensions[1].height = 20
# 数据行
for row_idx, record in enumerate(rows, 2):
recommendation = record.get('推荐等级', '')
row_fill = (_HIGH_FILL if recommendation == '高' else
_MID_FILL if recommendation == '中' else
_LOW_FILL if recommendation == '低' else None)
ws.cell(row=row_idx, column=1, value=row_idx - 1)
for col_idx, (col_name, _) in enumerate(cols[1:], 2):
cell = ws.cell(row=row_idx, column=col_idx, value=record.get(col_name, ''))
cell.alignment = _CELL_ALIGN
cell.border = _THIN_BORDER
if row_fill:
cell.fill = row_fill
# 序号列样式
seq_cell = ws.cell(row=row_idx, column=1)
seq_cell.alignment = Alignment(horizontal='center', vertical='center')
seq_cell.border = _THIN_BORDER
if row_fill:
seq_cell.fill = row_fill
ws.freeze_panes = 'A2'
def save_to_excel(all_results: dict, output_path: Path, no_detail: bool = False) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
wb = Workbook()
wb.remove(wb.active)
for sheet_name, rows in all_results.items():
ws = wb.create_sheet(title=sheet_name[:31]) # sheet名最长31字符
cols = _active_columns(rows) if no_detail else COLUMNS
_write_sheet(ws, rows, cols)
wb.save(output_path)
print(f'\n[done] Excel 已保存: {output_path}')
# ──────────────────────────────────────────────
# 数据源凭证注册表
# 新增数据源时在此处添加一条记录即可
# ──────────────────────────────────────────────
# SOURCE_COOKIE_MAP[source_id] = {cookie_key_in_session_str: env_var_name}
SOURCE_COOKIE_MAP = {
'bjzc': {
'_bearer': 'FETCHER_BJZC_BEARER_TOKEN',
'YGCG_TBSESSION': 'FETCHER_BJZC_TBSESSION',
'JSESSIONID': 'FETCHER_BJZC_JSESSIONID',
'jcloud_alb_route': 'FETCHER_BJZC_ALB_ROUTE',
},
# 后续新增示例:
# 'ccgp': {
# '_bearer': 'FETCHER_CCGP_BEARER_TOKEN',
# 'SESSION_ID': 'FETCHER_CCGP_SESSION',
# },
}
# --set-cookie 子命令
def cmd_set_cookie(source: str, bearer: str, session_str: str) -> None:
"""解析并写入指定数据源的凭证到 .env 文件。"""
source = source.lower().strip()
if source not in SOURCE_COOKIE_MAP:
known = ', '.join(SOURCE_COOKIE_MAP.keys())
print(f'[error] 未知数据源 "{source}",当前支持: {known}')
sys.exit(1)
cookie_map = SOURCE_COOKIE_MAP[source]
updates = {}
if bearer:
token = bearer.strip()
if token.lower().startswith('bearer '):
token = token[7:].strip()
if '_bearer' in cookie_map:
updates[cookie_map['_bearer']] = token
if session_str:
for part in session_str.split(';'):
part = part.strip()
if '=' not in part:
continue
k, _, v = part.partition('=')
k, v = k.strip(), v.strip()
if k in cookie_map:
updates[cookie_map[k]] = v
if not updates:
print('[error] 未能解析任何凭证,请检查 --bearer 和 --session 参数。')
sys.exit(1)
env_path = save_to_env(updates)
print(f'[ok] [{source}] 凭证已写入: {env_path}')
for k, v in updates.items():
print(f' {k} = {v[:12]}...' if len(v) > 12 else f' {k} = {v}')
# ──────────────────────────────────────────────
# CLI 入口
# ──────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
prog='govb-fetcher',
description='政府采购商机自动抓取工具',
)
# 凭证更新子命令
parser.add_argument('--set-cookie', action='store_true',
help='更新 .env 中的凭证信息(不执行抓取)')
parser.add_argument('--source', default='bjzc',
help=f'数据源标识,与 --set-cookie 配合使用,可选: {", ".join(SOURCE_COOKIE_MAP.keys())}(默认: bjzc)')
parser.add_argument('--bearer', default='',
help='Bearer token,格式:"Bearer xxx" 或 "xxx"')
parser.add_argument('--session', default='',
help='Cookie 字符串,格式:"YGCG_TBSESSION=xxx; JSESSIONID=xxx; ..."')
# 日期参数(互斥)
date_group = parser.add_mutually_exclusive_group()
date_group.add_argument('--today', action='store_true', help='抓取今日数据')
date_group.add_argument('--yesterday', action='store_true', help='抓取昨日数据(默认)')
date_group.add_argument('--date', metavar='YYYY-MM-DD', help='抓取指定日期数据')
# 过滤参数
parser.add_argument('--keywords', help='核心关键词,逗号分隔')
parser.add_argument('--exclude-keywords', dest='exclude_keywords',
help='排除关键词,逗号分隔')
parser.add_argument('--high-value-keywords', dest='high_value_keywords',
help='高价值关键词,逗号分隔')
# 输出参数
parser.add_argument('--output', help='指定输出 Excel 路径')
parser.add_argument('--no-detail', dest='no_detail', action='store_true',
help='跳过详情 API,仅保存列表字段(速度更快)')
args = parser.parse_args()
# 凭证更新模式
if args.set_cookie:
cmd_set_cookie(args.source, args.bearer, args.session)
return
# 确定目标日期(默认昨日)
if args.today:
target_date = datetime.now().strftime('%Y-%m-%d')
elif args.date:
target_date = args.date
else:
target_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
# 关键词
keywords = [k.strip() for k in args.keywords.split(',')] if args.keywords else None
exclude_kw = [k.strip() for k in args.exclude_keywords.split(',')] if args.exclude_keywords else None
high_value_kw = [k.strip() for k in args.high_value_keywords.split(',')] if args.high_value_keywords else None
print(f'govb-fetcher 日期: {target_date} 详情补全: {"否" if args.no_detail else "是"}')
print(f'关键词: {keywords or get_keywords()}')
# 执行抓取
all_results = fetch_all_bidding(
target_date=target_date,
keywords=keywords,
exclude_kw=exclude_kw,
high_value_kw=high_value_kw,
fetch_detail=not args.no_detail,
)
# 统计
print('\n[result]')
total = 0
for source, rows in all_results.items():
print(f' {source}: {len(rows)} 条')
total += len(rows)
print(f' 合计: {total} 条')
if total == 0:
print('[info] 无匹配数据,不生成 Excel。')
return
# 输出路径
if args.output:
output_path = Path(args.output)
else:
output_dir = get_output_dir()
output_path = output_dir / f'政府采购商机汇总_{target_date}.xlsx'
save_to_excel(all_results, output_path, no_detail=args.no_detail)
if __name__ == '__main__':
main()
FILE:pyproject.toml
[project]
name = "govb-fetcher"
version = "0.1.0"
description = "政府采购商机自动抓取工具"
requires-python = ">=3.8"
dependencies = [
"requests",
"openpyxl",
"python-dotenv",
]
[project.scripts]
govb-fetcher = "govb_fetcher.fetcher:main"
[tool.setuptools.packages.find]
where = ["."]
将工作空间 git 仓库备份到 GitHub 各分支。通过 .env 配置目录列表,每天 03:00 自动执行。当用户说"workspace-backup"、"备份工作空间"、"工作空间备份"时触发。
---
name: workspace-backup
description: 将工作空间 git 仓库备份到 GitHub 各分支。通过 .env 配置目录列表,每天 03:00 自动执行。当用户说"workspace-backup"、"备份工作空间"、"工作空间备份"时触发。
metadata: {"openclaw":{"emoji":"💾","requires":{"bins":["git"]},"install":"pip install -e {baseDir}"}}
---
# Workspace Backup
将多个本地 git 工作空间自动备份到 GitHub,每个目录对应一个同名远程分支。
## 使用
```bash
workspace # 备份所有工作空间
workspace --backup --force # 强制推送(解决 non-fast-forward 错误)
workspace --status # 查看各工作空间状态及最近备份日志
```
每天 03:00 由 OpenClaw cron 自动执行。
## 配置
在 `workspace_backup/.env`(或 `~/.config/workspace/.env`)中配置备份目录:
```dotenv
WORKSPACE_main=/home/ubuntu/.openclaw/workspace
WORKSPACE_formulas=/home/ubuntu/.openclaw/workspace-formulas
```
`WORKSPACE_<id>` 的 `<id>` 即为目标 GitHub 分支名。用户级 `.env` 优先于包级 `.env`。
复制 `workspace_backup/.env.example` 为 `.env` 后修改路径即可。
## 前提条件
- SSH Key 已配置,可免密 `git push`
- 各工作空间目录已初始化为 git 仓库并设置远程
FILE:CLAUDE.md
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
`workspace-backup` is an OpenClaw workspace backup skill. Backs up each configured workspace git repo to a dedicated remote branch. Runs automatically at 03:00 daily via OpenClaw cron.
## Commands
```bash
# Install
pip install -e .
# Backup all workspaces (default)
workspace
workspace --backup --force # force push to avoid non-fast-forward errors
# Status
workspace --status
workspace --help
```
## Architecture
**Entry point:** `workspace_backup/cli.py`
- `load_env()` — merges `workspace_backup/.env` + `~/.config/workspace/.env` (user wins)
- `load_workspaces()` — extracts `WORKSPACE_<id>=<path>` entries → `[{id, workspace}]`
- `git()` — thin subprocess wrapper for all git calls
- `backup_workspace(path, branch)` — add → diff → commit → push
- `cmd_backup()` / `cmd_status()` — command handlers
- All events logged to `~/.openclaw/logs/backup.log`
## Configuration
`workspace_backup/.env` (or `~/.config/workspace/.env`):
```dotenv
WORKSPACE_main=/home/ubuntu/.openclaw/workspace
WORKSPACE_formulas=/home/ubuntu/.openclaw/workspace-formulas
```
`<id>` is used as the git branch name. See `workspace_backup/.env.example` for reference.
**Prerequisites:** SSH key for passwordless push; each workspace must be a git repo with a remote set.
FILE:_meta.json
{
"ownerId": "kn7egg3a3hx7nftrgxb1s3aj7s838fg5",
"slug": "openclaw-workspace-backup",
"version": "0.1.1",
"publishedAt": 1774173287517
}
FILE:pyproject.toml
[project]
name = "workspace-backup"
version = "0.1.2"
description = "OpenClaw 工作空间备份工具"
requires-python = ">=3.8"
dependencies = []
[project.scripts]
workspace = "workspace_backup.cli:main"
FILE:workspace_backup/__init__.py
FILE:workspace_backup/cli.py
import argparse
import subprocess
import sys
from datetime import datetime
from pathlib import Path
LOG_FILE = Path.home() / ".openclaw" / "logs" / "backup.log"
ENV_FILE_PKG = Path(__file__).parent / ".env"
ENV_FILE_USER = Path.home() / ".config" / "workspace" / ".env"
def log(message: str) -> None:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{timestamp}] {message}"
print(entry)
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(LOG_FILE, "a") as f:
f.write(entry + "\n")
def load_env() -> dict:
def parse_file(path: Path) -> dict:
if not path.exists():
return {}
result = {}
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
key, value = key.strip(), value.strip()
if len(value) >= 2 and value[0] == value[-1] and value[0] in ('"', "'"):
value = value[1:-1]
if key:
result[key] = value
return result
merged = parse_file(ENV_FILE_PKG)
merged.update(parse_file(ENV_FILE_USER)) # user-level wins on conflict
return merged
def load_workspaces() -> list:
prefix = "WORKSPACE_"
return [
{"id": key[len(prefix):], "workspace": val}
for key, val in load_env().items()
if key.startswith(prefix) and len(key) > len(prefix) and val
]
def git(args: list, cwd: str, **kwargs) -> subprocess.CompletedProcess:
return subprocess.run(["git"] + args, cwd=cwd, capture_output=True, **kwargs)
def backup_workspace(path: str, branch: str, force: bool = False) -> bool:
log(f"Starting backup for {branch} (path: {path})")
if not Path(path).is_dir():
log(f"WARNING: Directory does not exist: {path}")
return True
if git(["rev-parse", "--git-dir"], path).returncode != 0:
log(f"WARNING: Not a git repository: {path}")
return True
git(["add", "-A"], path)
if git(["diff", "--cached", "--quiet"], path).returncode == 0:
log(f"No changes to commit for {branch}")
return True
r = git(["commit", "-m", f"Backup {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"], path, text=True)
if r.returncode != 0:
log(f"ERROR: Failed to commit for {branch}: {r.stderr.strip()}")
return False
push_args = ["push", "origin", branch] + (["--force"] if force else [])
r = git(push_args, path, text=True)
if r.returncode != 0:
log(f"ERROR: Failed to push {branch}: {r.stderr.strip()}")
return False
log(f"Successfully backed up {branch}")
return True
def cmd_backup(force: bool = False) -> int:
log("========== Workspace Backup Started ==========")
if force:
log("Force push enabled")
errors = sum(
0 if backup_workspace(ws["workspace"], ws["id"], force=force) else 1
for ws in load_workspaces()
)
log("========== Workspace Backup Completed ==========")
return 1 if errors else 0
def cmd_status() -> int:
print("=== Workspace Backup Status ===\n")
for ws in load_workspaces():
branch, path = ws["id"], ws["workspace"]
print(f"--- {branch} ({path}) ---")
if not Path(path).is_dir():
print("Directory does not exist\n")
continue
r = git(["status", "--short"], path, text=True)
print(r.stdout.strip() if r.stdout.strip() else "(clean)")
print()
print("=== Last Backup Log ===")
if LOG_FILE.exists():
print("\n".join(LOG_FILE.read_text().splitlines()[-20:]))
else:
print("No backup log found")
return 0
def main():
parser = argparse.ArgumentParser(prog="workspace", description="OpenClaw 工作空间备份工具")
parser.add_argument("--backup", action="store_true", help="备份所有工作空间(默认行为)")
parser.add_argument("--status", action="store_true", help="查看各工作空间 git 状态及最近备份日志")
parser.add_argument("--force", action="store_true", help="强制推送(git push --force),与 --backup 配合使用")
args = parser.parse_args()
sys.exit(cmd_status() if args.status else cmd_backup(force=args.force))
if __name__ == "__main__":
main()
军工采购(军采)商机专用推报工具。汇总三大军采平台数据,生成 Excel 并通过 SMTP 发送邮件报告。与政府采购(政采)工具无关,仅处理军队采购渠道。当用户说"milb-email"、"军工商机邮件"、"推送军工商机"、"军工商机通报"时触发。注意:这不是通用邮件客户端,仅用于执行 milb 业务逻辑。
---
name: milb-email
description: 军工采购(军采)商机专用推报工具。汇总三大军采平台数据,生成 Excel 并通过 SMTP 发送邮件报告。与政府采购(政采)工具无关,仅处理军队采购渠道。当用户说"milb-email"、"军工商机邮件"、"推送军工商机"、"军工商机通报"时触发。注意:这不是通用邮件客户端,仅用于执行 milb 业务逻辑。
metadata: {"openclaw":{"emoji":"📧","requires":{"bins":["milb-email","milb-fetcher"]},"install":"pip install -e {baseDir}"}}
---
# Milb Email
自动抓取商机并发送邮件报告。
## 环境变量要求
该技能必须在 `.env` 中配置以下核心参数才能激活:
- `EMAIL_TO`, `EMAIL_CC`, `EMAIL_FROM`: 收发件地址
- `EMAIL_SMTP_HOST`, `EMAIL_SMTP_PORT`: SMTP 服务器信息
- `EMAIL_SMTP_USER`, `EMAIL_SMTP_PASSWORD`: 认证信息
- `EMAIL_SUBJECT_PREFIX`, `EMAIL_BODY_INTRO`: 邮件模板配置
- `EMAIL_RECIPIENT_NAME`, `EMAIL_SENDER_NAME`: 称呼和签名
## 快速使用
- `/milb-email` → 发送昨日报告(默认,解决军队采购网白天更新的问题)
- `/milb-email --help` → 显示帮助信息
- `/milb-email --today` → 发送今日报告(获取各渠道最新数据)
- `/milb-email --date 2026-03-23` → 发送指定日期报告
- `/milb-email --keywords "模型,仿真"` → 使用自定义关键词筛选
- `/milb-email --to [email protected]` → 测试发送至指定收件人
## 参数说明
| 参数 | 说明 | 默认值 |
|------|------|--------|
| 无参数 | 默认昨日 | 启用 |
| `--today` | 今日(获取各渠道最新数据) | - |
| `--date YYYY-MM-DD` | 指定日期 | - |
| `--keywords WORDS` | 关键词,逗号分隔 | 配置中的默认关键词 |
| `--to ADDRESS` | 测试发送至指定收件人 | .env 中的配置 |
## 数据源
- 全军武器装备采购信息网
- 军队采购网
- 国防科大采购信息网
## 触发词
发送邮件、推送报告、邮件通知、商机通报
## 配置文件
配置文件位于 `milb_email/.env`(独立配置),可配置以下参数:
| 环境变量 | 用途 |
|----------|------|
| `EMAIL_TO` | 收件人,逗号分隔 |
| `EMAIL_CC` | 抄送人,逗号分隔 |
| `EMAIL_FROM` | 发件人 |
| `EMAIL_RECIPIENT_NAME` | 收件人称呼 |
| `EMAIL_SENDER_NAME` | 发件人签名 |
| `EMAIL_SUBJECT_PREFIX` | 邮件主题前缀 |
| `EMAIL_BODY_INTRO` | 邮件正文开头 |
| `EMAIL_SMTP_HOST` | SMTP 服务器 |
| `EMAIL_SMTP_PORT` | SMTP 端口 |
| `EMAIL_SMTP_USER` | SMTP 用户名 |
| `EMAIL_SMTP_PASSWORD` | SMTP 密码 |
创建配置文件可复制 `milb_email/.env.example` 为 `milb_email/.env` 后修改。
## 技术说明
- 数据来源:全军武器装备采购信息网、军队采购网、国防科大采购信息网(均为军采渠道,非政采)
- 使用 SMTP 直接发送邮件(配置 EMAIL_SMTP_* 环境变量)
- 使用文件锁防止并发执行
FILE:CLAUDE.md
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
`milb-email` is a Chinese military bidding opportunity email reporting tool. It fetches bidding information from three data sources, generates Excel reports, and sends them via email.
## Commands
```bash
# Install package
pip install -e .
# Run the tool (sends yesterday's report by default)
milb-email
# Send today's report (fetches latest data from all channels)
milb-email --today
# Send report for specific date
milb-email --date 2026-03-23
# Use custom keywords to filter opportunities
milb-email --keywords "模型,仿真,AI"
# Test send to specific recipient
milb-email --to [email protected]
```
## Architecture
**Core Modules:**
- `milb_email/fetcher.py` - Main entry point, handles CLI arguments, fetches data, sends emails
- `milb_email/config.py` - Configuration loader, reads from `.env` file
**Data Flow:**
1. CLI parses arguments (`--date`, `--today`, `--keywords`, `--to`)
2. Calls `milb_fetcher.fetcher.fetch_all_bidding()` to fetch data from three sources
3. Generates email body with high-recommendation items
4. Sends email via SMTP (smtplib)
5. Attaches Excel report from `/home/ubuntu/.openclaw/workspace/military-bidding/`
**Email Sending:**
- SMTP via smtplib (SMTP_SSL, port 465)
- File lock at `/tmp/bidding_email.lock` prevents concurrent execution
## Configuration
All config via `.env` file (see `.env.example`):
- Email addresses: `EMAIL_TO`, `EMAIL_CC`, `EMAIL_FROM`
- SMTP: `EMAIL_SMTP_HOST`, `EMAIL_SMTP_PORT`, `EMAIL_SMTP_USER`, `EMAIL_SMTP_PASSWORD`
- Templates: `EMAIL_SUBJECT_PREFIX`, `EMAIL_BODY_INTRO`, `EMAIL_RECIPIENT_NAME`, `EMAIL_SENDER_NAME`
FILE:_meta.json
{
"ownerId": "kn7egg3a3hx7nftrgxb1s3aj7s838fg5",
"slug": "military-bidding-email",
"version": "0.1.2",
"publishedAt": 1774075348626
}
FILE:milb_email/__init__.py
# milb_email - 军工采购商机邮件发送工具
FILE:milb_email/config.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
配置加载模块 (milb-email)
从 .env 文件读取配置,所有配置项必填,缺失时直接报错
只加载 EMAIL_* 前缀的配置
"""
from pathlib import Path
from typing import Dict
# 必填配置项列表
REQUIRED_CONFIG_KEYS = [
'EMAIL_TO',
'EMAIL_CC',
'EMAIL_FROM',
'EMAIL_SMTP_HOST',
'EMAIL_SMTP_PORT',
'EMAIL_SMTP_USER',
'EMAIL_SMTP_PASSWORD',
]
def load_config() -> Dict:
"""
从 .env 文件加载配置
Returns:
Dict: 配置字典,包含所有配置项
"""
# 按优先级查找:当前工作目录 → ~/.config/milb-email/
candidates = [
Path.cwd() / '.env',
Path.home() / '.config' / 'milb-email' / '.env',
]
env_path = next((p for p in candidates if p.exists()), None)
config = {}
if env_path:
with open(env_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
if '=' in line:
key, value = line.split('=', 1)
config[key.strip()] = value.strip()
return config
def get_config(key: str) -> str:
"""
获取配置项,缺失时抛出异常
Args:
key: 配置键名
Returns:
配置值字符串
Raises:
ValueError: 配置项未在 .env 中设置
"""
config = load_config()
value = config.get(key)
if not value:
raise ValueError(f"配置项 {key} 未在 .env 中设置,请参考 .env.example")
return value
def get_email_config() -> Dict:
"""
获取邮件配置,缺失任何必填项则报错
Returns:
Dict: 邮件配置字典
Raises:
ValueError: 任何必填配置项缺失时
"""
# 先验证必填项
for key in REQUIRED_CONFIG_KEYS:
get_config(key) # 缺失会抛异常
return {
'to': get_config('EMAIL_TO').split(','),
'cc': get_config('EMAIL_CC').split(','),
'from': get_config('EMAIL_FROM'),
'recipient_name': get_config('EMAIL_RECIPIENT_NAME'),
'sender_name': get_config('EMAIL_SENDER_NAME'),
'subject_prefix': get_config('EMAIL_SUBJECT_PREFIX'),
'body_intro': get_config('EMAIL_BODY_INTRO'),
'smtp_host': get_config('EMAIL_SMTP_HOST'),
'smtp_port': get_config('EMAIL_SMTP_PORT'),
'smtp_user': get_config('EMAIL_SMTP_USER'),
'smtp_password': get_config('EMAIL_SMTP_PASSWORD'),
}
if __name__ == '__main__':
# 测试输出
print("=== milb-email 配置加载测试 ===")
try:
email_cfg = get_email_config()
print(f"收件人: {email_cfg['to']}")
print(f"抄送人: {email_cfg['cc']}")
print(f"发件人: {email_cfg['from']}")
print(f"收件人称呼: {email_cfg['recipient_name']}")
print(f"发件人签名: {email_cfg['sender_name']}")
print(f"主题前缀: {email_cfg['subject_prefix']}")
except ValueError as e:
print(f"[ERROR] {e}")
FILE:milb_email/fetcher.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
军工采购商机邮件发送工具 (milb-email)
自动抓取全军武器装备采购信息网、军队采购网、国防科大采购信息网的商机数据,
生成 Excel 汇总文件,并通过邮件发送报告。
用法:
milb-email [选项]
示例:
milb-email # 发送昨日报告(默认)
milb-email --today # 发送今日报告
milb-email --date 2026-03-23 # 发送指定日期报告
milb-email --keywords "模型,仿真" # 使用自定义关键词筛选
milb-email --to [email protected] # 测试发送至指定收件人
"""
import sys
import os
import subprocess
import json
import pandas as pd
from pathlib import Path
from milb_email.config import get_email_config
def _import_fetcher():
"""延迟导入避免循环依赖"""
from milb_fetcher.fetcher import fetch_all_bidding
return fetch_all_bidding
def get_high_recommend_items(df_weain, df_military, df_nudt):
"""
从各渠道数据中提取高推荐等级的项目。
Args:
df_weain: 全军武器装备采购信息网 DataFrame
df_military: 军队采购网 DataFrame
df_nudt: 国防科大采购信息网 DataFrame
Returns:
dict: 包含三个渠道高推荐项目的字典,结构为 {'weain': [], 'military': [], 'nudt': []}
"""
result = {'weain': [], 'military': [], 'nudt': []}
# 全军武器装备采购信息网
if len(df_weain) > 0:
high = df_weain[df_weain['推荐等级'] == '高']
for _, row in high.iterrows():
result['weain'].append({
'title': row['项目名称'][:50],
'type': row['公告类型'],
'deadline': row.get('截止日期', '')
})
# 军队采购网
if len(df_military) > 0:
high = df_military[df_military['推荐等级'] == '高']
for _, row in high.iterrows():
result['military'].append({
'title': row['项目名称'][:50],
'type': row['采购方式'],
'region': row.get('地区', '')
})
# 国防科大采购信息网
if len(df_nudt) > 0:
high = df_nudt[df_nudt['推荐等级'] == '高']
for _, row in high.iterrows():
result['nudt'].append({
'title': row['项目名称'][:50],
'type': row['公告类型']
})
return result
def send_email(date_str, df_weain, df_military, df_nudt, dates_info, to_override=None, cc_override=None):
"""
发送商机报告邮件。
Args:
date_str: 报告日期字符串,格式 YYYY-MM-DD
df_weain: 全军武器装备采购信息网 DataFrame
df_military: 军队采购网 DataFrame
df_nudt: 国防科大采购信息网 DataFrame
dates_info: 各渠道更新日期信息字典
to_override: 覆盖默认收件人,用于测试发送
cc_override: 覆盖默认抄送人
"""
env_config = get_email_config()
recipient_name = env_config['recipient_name']
sender_name = env_config['sender_name']
subject_prefix = env_config['subject_prefix']
body_intro = env_config['body_intro']
# 构建邮件主题
subject = f"【{subject_prefix}】{date_str}"
# 获取高推荐项目
high_items = get_high_recommend_items(df_weain, df_military, df_nudt)
# 构建邮件正文
body = f"""{recipient_name},{body_intro}
汇总如下:
【全军武器装备采购信息网】{len(df_weain)}条(更新日期: {dates_info.get('weain') or date_str})
高推荐项目:
"""
for i, item in enumerate(high_items['weain'], 1):
deadline = f" | 截止{item['deadline']}" if item.get('deadline') else ""
body += f"{i}. {item['title']}... | {item['type']}{deadline}\n"
body += f"""
【军队采购网】{len(df_military)}条(更新日期: {dates_info.get('military') or date_str})
高推荐项目:
"""
for i, item in enumerate(high_items['military'], 1):
region = f" | {item['region']}" if item.get('region') else ""
body += f"{i}. {item['title']}... | {item['type']}{region}\n"
body += f"""
【国防科大采购信息网】{len(df_nudt)}条(更新日期: {dates_info.get('nudt') or date_str})
高推荐项目:
"""
for i, item in enumerate(high_items['nudt'], 1):
body += f"{i}. {item['title']}... | {item['type']}\n"
body += f"""
详情请见附件Excel。
{sender_name}"""
# 获取Excel文件路径
excel_path = os.path.expanduser(f"~/.openclaw/workspace/military-bidding/军队采购商机汇总_{date_str}.xlsx")
# 确定收件人/抄送人
env_config = get_email_config()
if to_override:
to_list = [to_override]
cc_list = []
else:
to_list = env_config['to']
cc_list = env_config['cc']
try:
send_email_via_smtp(subject, body, excel_path, to_list, cc_list)
print("[SUCCESS] 邮件发送成功")
return True
except Exception as e:
print(f"[ERROR] 邮件发送失败: {e}")
return False
def send_email_via_smtp(subject, body, excel_path, to_list, cc_list):
"""
使用 SMTP 直接发送邮件(回退方案)。
接收原始参数,不依赖 MML 解析,避免格式歧义导致的乱码和附件丢失。
Args:
subject: 邮件主题字符串
body: 邮件正文纯文本字符串
excel_path: Excel 附件文件路径(文件不存在时跳过附件)
to_list: 收件人列表
cc_list: 抄送人列表
Returns:
bool: 发送成功返回 True
"""
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.header import Header
from email.utils import formatdate
env_config = get_email_config()
SMTP_HOST = env_config['smtp_host']
SMTP_PORT = int(env_config['smtp_port'])
SMTP_USER = env_config['smtp_user']
SMTP_PASSWORD = env_config['smtp_password']
# 创建邮件,主题使用 RFC 2047 编码防止中文乱码
msg = MIMEMultipart()
msg['From'] = SMTP_USER
msg['To'] = ', '.join(to_list)
msg['Cc'] = ', '.join(cc_list)
msg['Subject'] = Header(subject, 'utf-8').encode()
msg['Date'] = formatdate()
msg.attach(MIMEText(body.strip(), 'plain', 'utf-8'))
# 添加附件(直接使用原始路径,无需从 MML 中解析)
if excel_path and os.path.exists(excel_path):
with open(excel_path, 'rb') as f:
part = MIMEApplication(f.read(), _subtype='xlsx')
filename_encoded = Header(os.path.basename(excel_path), 'utf-8').encode()
part.add_header('Content-Disposition', 'attachment', filename=filename_encoded)
msg.attach(part)
elif excel_path:
print(f"[WARNING] 附件不存在,跳过: {excel_path}")
# 发送邮件
smtp = smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT)
smtp.login(SMTP_USER, SMTP_PASSWORD)
smtp.send_message(msg)
smtp.quit()
print(f"[INFO] 邮件已发送(SMTP)至: {', '.join(to_list)}, 抄送: {', '.join(cc_list)}")
return True
def send_bidding_report(date=None, keywords=None, auto_latest=False, to_override=None):
"""
发送商机报告邮件。
Args:
date: 指定日期,格式 YYYY-MM-DD。默认为 None,配合 auto_latest 使用
keywords: 关键词列表,用于筛选商机。默认为 None,使用配置中的默认关键词
auto_latest: 是否自动获取各渠道最新日期。默认为 False(默认获取昨日数据)
to_override: 覆盖默认收件人,用于测试发送。默认为 None
Returns:
tuple: (df_weain, df_military, df_nudt) 三个 DataFrame
"""
from datetime import datetime, timedelta
# 确定查询日期
if not auto_latest and date is None:
# 默认获取昨日数据(解决军队采购网白天更新的问题)
yesterday_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
date = yesterday_date
print(f"[INFO] 默认获取昨日商机: {date}")
print(f"[INFO] 开始抓取商机信息 - 自动获取最新日期: {auto_latest}")
# 抓取数据
fetch_all_bidding = _import_fetcher()
result = fetch_all_bidding(keywords=keywords, date=date, auto_latest=auto_latest)
if len(result) == 4:
df_weain, df_military, df_nudt, dates_info = result
else:
df_weain, df_military, df_nudt = result
dates_info = {'weain': date, 'military': date, 'nudt': date}
file_date = date if date else datetime.now().strftime('%Y-%m-%d')
print(f"[INFO] 采集完成: 全军{len(df_weain)}条, 军队{len(df_military)}条, 国防科大{len(df_nudt)}条")
# 发送邮件
send_email(file_date, df_weain, df_military, df_nudt, dates_info, to_override=to_override)
return df_weain, df_military, df_nudt
HELP_TEXT = """军工采购商机邮件发送工具
用法:
milb-email [选项]
选项:
--date DATE 日期,格式 YYYY-MM-DD
--keywords WORDS 关键词,逗号分隔,如: 模型,仿真,AI
--to ADDRESS 测试发送至指定收件人
--today 发送今日报告(获取各渠道最新数据)
--help 显示此帮助信息
示例:
milb-email # 发送昨日报告(默认)
milb-email --today # 发送今日报告
milb-email --date 2026-03-23 # 发送指定日期报告
milb-email --keywords "模型,仿真" # 使用自定义关键词
milb-email --to [email protected] # 测试发送
"""
def main():
"""
CLI 入口函数。
支持的参数:
--date DATE: 指定日期,格式 YYYY-MM-DD
--keywords WORDS: 关键词,逗号分隔
--to ADDRESS: 测试发送至指定收件人
--today: 发送今日报告
--help: 显示帮助信息
"""
import argparse
import fcntl
from datetime import datetime, timedelta
parser = argparse.ArgumentParser(
description='军工采购商机邮件发送工具',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=HELP_TEXT
)
parser.add_argument('--date', type=str, help='日期,格式 YYYY-MM-DD')
parser.add_argument('--keywords', type=str, help='关键词,逗号分隔')
parser.add_argument('--to', type=str, help='测试发送至指定收件人(覆盖配置)')
parser.add_argument('--today', action='store_true',
help='发送今日报告(获取各渠道最新数据)')
args = parser.parse_args()
# --help 时显示帮助
if '--help' in sys.argv:
print(HELP_TEXT)
sys.exit(0)
# 确定日期和模式
date = args.date
auto_latest = args.today # --today 表示获取各渠道最新数据
# 锁文件防止并发执行
LOCK_FILE = '/tmp/bidding_email.lock'
try:
lock_fd = open(LOCK_FILE, 'w')
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except (IOError, OSError):
print("[INFO] 已有任务在运行,跳过本次执行")
sys.exit(0)
# 解析关键词
keywords = None
if args.keywords:
keywords = args.keywords.split(',')
# 发送报告
try:
send_bidding_report(date=date, keywords=keywords, auto_latest=auto_latest, to_override=args.to)
finally:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
lock_fd.close()
try:
os.remove(LOCK_FILE)
except:
pass
if __name__ == '__main__':
main()
FILE:pyproject.toml
[project]
name = "milb-email"
version = "0.1.0"
description = "军工采购商机邮件发送工具"
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"requests",
"pandas",
]
[project.scripts]
milb-email = "milb_email.fetcher:main"
[tool.setuptools.packages.find]
where = ["."]
军工采招商机自动抓取工具。从全军武器装备采购信息网、军队采购网、国防科大采购网抓取招标信息,过滤并生成 Excel 报表。当用户说"抓取商机"、"查新"、"采集招标"时触发。
---
name: milb-fetcher
description: 军工采招商机自动抓取工具。从全军武器装备采购信息网、军队采购网、国防科大采购网抓取招标信息,过滤并生成 Excel 报表。当用户说"抓取商机"、"查新"、"采集招标"时触发。
metadata: {"openclaw":{"emoji":"🕷️","requires":{milb-fetcher},"install":"pip install -e {baseDir}"}}
---
# Milb Fetcher
从三大军工采购平台自动抓取招标信息。
## 快速使用
- `/milb-fetcher` → 自动检测各渠道最新可用日期并抓取(默认)
- `/milb-fetcher --help` → 显示帮助信息
日期选择(三选一,不指定则自动检测)
- `/milb-fetcher --today` → 抓取今日
- `/milb-fetcher --yesterday` → 抓取昨日
- `/milb-fetcher --date 2026-03-23` → 抓取指定日期
筛选参数
- `--keywords "关键词1,关键词2"` → 核心关键词
- `--exclude-keywords "排除词1,排除词2"` → 排除关键词
- `--high-value-keywords "高价值词1,高价值词2"` → 高价值关键词(用于推荐评级)
- `--regions "地区1,地区2"` → 地区筛选(仅对军队采购网生效)
输出控制
- `--output /path/to/file.xlsx` → 指定输出路径(默认存至 `~/.openclaw/workspace/military-bidding/军队采购商机汇总_{date}.xlsx`)
- `--no-auto-latest` → 禁用自动检测最新日期(未指定日期时改用今日)
## 数据源
- 全军武器装备采购信息网
- 军队采购网
- 国防科大采购信息网
## 推荐等级
基于 `FETCHER_HIGH_VALUE_KEYWORDS` 配置自动评定:
- **高**:标题命中高价值关键词
- **中**:标题命中核心关键词但未命中高价值词
- **低**:其他匹配项
## 过滤词
通过 `FETCHER_EXCLUDE_KEYWORDS` 配置,命中排除词的条目将被过滤掉。
## 触发词
抓取、采集、爬虫、查新、每日商机
## 配置文件
配置文件位于 `milb_fetcher/.env`(独立配置),可配置以下参数:
| 环境变量 | 用途 | 格式 |
|----------|------|------|
| `FETCHER_KEYWORDS` | 核心关键词,逗号分隔 | `词1,词2,...` |
| `FETCHER_EXCLUDE_KEYWORDS` | 排除关键词,逗号分隔 | `词1,词2,...` |
| `FETCHER_HIGH_VALUE_KEYWORDS` | 高价值关键词,逗号分隔 | `词1,词2,...` |
| `FETCHER_REGIONS` | 地区,逗号分隔 | `省份1,省份2,...` |
创建配置文件可复制 `milb_fetcher/.env.example` 为 `milb_fetcher/.env` 后修改。
FILE:_meta.json
{
"ownerId": "kn7egg3a3hx7nftrgxb1s3aj7s838fg5",
"slug": "military-bidding-fetcher",
"version": "0.1.3",
"publishedAt": 1774070358983
}
FILE:milb_fetcher/__init__.py
FILE:milb_fetcher/config.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
配置加载模块 (milb-fetcher)
按优先级读取 .env 配置,缺失时使用默认值:
1. ~/.config/milb-fetcher/.env (用户全局配置)
2. ./.env (当前运行目录,最高优先级)
"""
from pathlib import Path
from typing import Dict, List, Optional
# 默认配置值 (FETCHER_* 前缀)
DEFAULTS = {
'FETCHER_KEYWORDS': '体系,模型,仿真,数据,决策,规划,分析,智能,AI,软件,系统,信息,算法,效能',
'FETCHER_EXCLUDE_KEYWORDS': '体能,训练鞋,鞋类,服装,被装,医疗,药品,器械,膝关节,光纤,电梯,物业,绿化,装修,工程,建材,食材,食品,副食',
'FETCHER_HIGH_VALUE_KEYWORDS': '模型,仿真,数据,决策,分析,智能,AI,软件,意向',
'FETCHER_REGIONS': '北京,天津,河北,湖北,湖南',
}
def _parse_env_file(path: Path) -> Dict:
"""解析单个 .env 文件,返回配置字典"""
config = {}
with open(path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
config[key.strip()] = value.strip()
return config
def load_config() -> Dict:
"""
按优先级加载 .env 配置,后加载的优先级更高:
1. ~/.config/milb-fetcher/.env (用户全局配置)
2. ./.env (当前运行目录,最高优先级)
Returns:
Dict: 合并后的配置字典
"""
candidates = [
Path.home() / '.config' / 'milb-fetcher' / '.env',
Path.cwd() / '.env',
]
config = {}
for path in candidates:
if path.exists():
config.update(_parse_env_file(path))
return config
def get_config(key: str, default: Optional[str] = None) -> Optional[str]:
"""
获取单个配置项
Args:
key: 配置键名
default: 默认值,如果 .env 中未定义则使用此值
Returns:
配置值字符串
"""
config = load_config()
return config.get(key, default or DEFAULTS.get(key))
def get_keywords() -> List[str]:
"""获取核心关键词列表"""
value = get_config('FETCHER_KEYWORDS')
return [k.strip() for k in value.split(',') if k.strip()] if value else []
def get_exclude_keywords() -> List[str]:
"""获取排除关键词列表"""
value = get_config('FETCHER_EXCLUDE_KEYWORDS')
return [k.strip() for k in value.split(',') if k.strip()] if value else []
def get_high_value_keywords() -> List[str]:
"""获取高价值关键词列表"""
value = get_config('FETCHER_HIGH_VALUE_KEYWORDS')
return [k.strip() for k in value.split(',') if k.strip()] if value else []
def get_proxies() -> Optional[dict]:
"""返回 requests 用的 proxies 字典,FETCHER_USE_PROXY=true 时生效,否则返回 None"""
use_proxy = get_config('FETCHER_USE_PROXY', 'false').lower() == 'true'
if not use_proxy:
return None
proxy_url = get_config('FETCHER_PROXY')
if not proxy_url:
return None
return {'http': proxy_url, 'https': proxy_url}
def get_output_dir() -> str:
"""获取输出目录"""
return get_config('FETCHER_OUTPUT_DIR', '~/.openclaw/workspace/military-bidding')
def get_regions() -> Dict[str, str]:
"""获取地区字典"""
value = get_config('FETCHER_REGIONS')
region_codes = {
'北京': '110000', '天津': '120000', '河北': '130000',
'山西': '140000', '内蒙古': '150000', '辽宁': '210000',
'吉林': '220000', '黑龙江': '230000', '上海': '310000',
'江苏': '320000', '浙江': '330000', '安徽': '340000',
'福建': '350000', '江西': '360000', '山东': '370000',
'河南': '410000', '湖北': '420000', '湖南': '430000',
'广东': '440000', '广西': '450000', '海南': '460000',
'重庆': '500000', '四川': '510000', '贵州': '520000',
'云南': '530000', '西藏': '540000', '陕西': '610000',
'甘肃': '620000', '青海': '630000', '宁夏': '640000',
'新疆': '650000'
}
if not value:
return {}
region_names = [r.strip() for r in value.split(',') if r.strip()]
return {name: region_codes.get(name, '') for name in region_names if name in region_codes}
if __name__ == '__main__':
# 测试输出
print("=== milb-fetcher 配置加载测试 ===")
print(f"核心关键词: {get_keywords()}")
print(f"排除关键词: {get_exclude_keywords()}")
print(f"高价值关键词: {get_high_value_keywords()}")
print(f"地区: {get_regions()}")
FILE:milb_fetcher/fetcher.py
#!/usr/bin/env python3
"""
军工采购商机抓取工具 - 整合版
功能:
1. 军队采购网商机抓取
2. 全军武器装备采购信息网商机抓取
3. 国防科大采购信息网商机抓取
4. 整合输出Excel(Sheet1: 全军装备采购网, Sheet2: 军队采购网,Sheet3:国防科大信息采购网)
输入参数:
keywords: 核心关键词列表,用于匹配项目名称
exclude_keywords: 排除关键词列表,用于过滤无关项目
regions: 地区字典,key=地区名, value=regionCode (仅军队采购网使用)
date: 日期,格式 YYYY-MM-DD,默认今天
推荐等级规则:
高: 包含模型/仿真/数据/决策/分析/智能/AI/软件/意向等高价值关键词
中: 包含系统/信息等一般关键词
空: 其他匹配项
特殊规则:
- 包含"意向"关键词的项目优先推荐
- 包含"中标公告"/"结果公示"的项目过滤掉(已结束的项目)
"""
import os
import requests
import pandas as pd
from datetime import datetime
from typing import Dict, List, Optional
import time
import re
import json
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
# 从配置模块加载默认参数
from .config import get_keywords, get_exclude_keywords, get_high_value_keywords, get_regions, get_proxies, get_output_dir
def _clean_description(text):
"""清洗项目描述:去除HTML/CSS乱码,提取20-50字简述"""
if pd.isna(text) or str(text).strip() == '':
return ''
text = str(text)
# 移除HTML/CSS
text = re.sub(r'<script[^>]*>.*?</script>', '', text, flags=re.DOTALL)
text = re.sub(r'<style[^>]*>.*?</style>', '', text, flags=re.DOTALL)
text = re.sub(r'<[^>]+>', ' ', text)
text = re.sub(r'\{[^}]*\}', ' ', text)
text = re.sub(r'\.[a-zA-Z][a-zA-Z0-9_]*', ' ', text)
text = re.sub(r'&[a-zA-Z]+;', ' ', text)
text = re.sub(r'\s+', ' ', text)
text = text.strip()
# 提取核心内容
lines = text.split('。')
result = []
for line in lines[:3]:
line = line.strip()
if line and len(line) > 5:
if any(kw in line for kw in ['项目名称', '项目编号', '预算', '采购', '服务', '物资', '系统', '软件', '数据', '分析', '智能']):
result.append(line[:30])
text = ' '.join(result) if result else ''
if len(text) > 50:
text = text[:47] + '...'
return text if len(text) >= 5 else ''
def _generate_remarks(title: str, matched_kw: List[str]) -> str:
"""根据标题生成简明的分析备注"""
if not title:
return ""
remarks = []
# 意向项目优先标注
if '意向' in title:
remarks.append("采购意向前期可跟进")
# 根据关键词匹配生成备注
if '体系' in title:
remarks.append("体系设计相关")
if '模型' in title:
remarks.append("模型开发可参与")
if '仿真' in title:
remarks.append("仿真项目可做")
if '数据' in title:
remarks.append("数据分析需求")
if 'AI' in title or '智能' in title:
remarks.append("AI智能项目")
if '软件' in title:
remarks.append("软件定制开发")
if '系统' in title:
remarks.append("系统集成类")
if '训练' in title or '实验' in title:
remarks.append("训练实验相关")
if '决策' in title or '规划' in title:
remarks.append("决策规划类")
if not remarks:
remarks.append("可关注")
# 最多返回2条
return "; ".join(remarks[:2])
# 默认配置(从 config.py 加载,保留向后兼容)
DEFAULT_REGIONS = get_regions()
DEFAULT_HIGH_VALUE_KW = get_high_value_keywords()
DEFAULT_CORE_KW = get_keywords()
DEFAULT_EXCLUDE_KW = get_exclude_keywords()
# 采购方式枚举
MANNER_MAP = {
'1': '公开招标',
'2': '邀请招标',
'3': '竞争性谈判',
'4': '询价',
'5': '单一来源',
'6': '其他'
}
# 项目类别枚举
NATURE_MAP = {
'1': '物资',
'2': '工程',
'3': '服务'
}
# 采购方式映射
WEAIN_PURCHASE_TYPE_MAP = {
'公开招标': '公开招标',
'邀请招标': '邀请招标',
'竞争性谈判': '竞争性谈判',
'询价': '询价'
}
# ============ 全军武器装备采购信息网获取函数 ============
def fetch_weain_bidding(
keywords: Optional[List[str]] = None,
date: Optional[str] = None,
save_csv: bool = False
) -> pd.DataFrame:
"""
获取全军武器装备采购信息网商机信息
Args:
keywords: 核心关键词列表
date: 日期,默认今天,格式 YYYY-MM-DD
save_csv: 是否保存CSV(此函数不保存CSV,统一在主函数输出Excel)
Returns:
DataFrame: 筛选后的项目列表
"""
if keywords is None:
keywords = DEFAULT_CORE_KW
if date is None:
date = datetime.now().strftime('%Y-%m-%d')
# 转换日期格式为API所需的日期(去掉-)
api_date = date.replace('-', '')
# 获取当前时间戳
try:
result = subprocess.run(['date', '+%s%3N'], capture_output=True, text=True)
_t = result.stdout.strip()
except:
_t = str(int(datetime.now().timestamp() * 1000))
all_results = []
print(f"[INFO] 开始抓取全军武器装备采购信息网 - 日期: {date}")
print("-" * 60)
# 采购类型
purchase_types = [
("公开招标", "%E5%85%AC%E5%BC%80%E6%8B%9B%E6%A0%87"),
("邀请招标", "%E9%82%80%E8%AF%B7%E6%8B%9B%E6%A0%87"),
("竞争性谈判", "%E7%AB%9E%E4%BA%89%E6%80%A7%E8%B0%88%E5%88%A4"),
("询价", "%E8%AF%A2%E4%BB%B7")
]
for ptype, encoded_ptype in purchase_types:
url = f"https://www.weain.mil.cn/api/front/list/cggg/list?LMID=1149231276155707394&pageNo=1&purchaseType={encoded_ptype}&_t={_t}"
try:
resp = requests.get(url, timeout=10, proxies=get_proxies())
data = resp.json()
content_list = data.get('list', {}).get('contentList', [])
for item in content_list:
publish_time = item.get('publishTime', '')
# 只取当日数据
if date not in publish_time:
continue
title = item.get('nonSecretTitle', '')
# 检查关键词匹配
matched_kw = [kw for kw in keywords if kw in title]
if matched_kw:
# 获取字段
purchase_type = item.get('purchaseType', ptype)
secret_grade = item.get('secretGrade', '公开')
pc_url = item.get('pcUrl', '')
deadline = item.get('deadline', '')
# 推荐等级判断 - 意向优先
recommendation = ""
if '意向' in title:
recommendation = "高"
elif any(kw in matched_kw for kw in DEFAULT_HIGH_VALUE_KW):
recommendation = "高"
elif '系统' in matched_kw:
recommendation = "中"
# 生成备注
remarks = _generate_remarks(title, matched_kw)
all_results.append({
'项目名称': title,
'公告类型': purchase_type,
'涉密等级': secret_grade,
'详情链接': f'https://www.weain.mil.cn{pc_url}' if pc_url else '',
'截止日期': deadline,
'发布日期': publish_time[:10] if publish_time else '',
'匹配关键词': ','.join(matched_kw),
'推荐等级': recommendation,
'备注': remarks
})
except Exception as e:
print(f"[ERROR] 获取{ptype}数据失败: {e}")
continue
# 创建DataFrame
df = pd.DataFrame(all_results)
if len(df) > 0:
df.index = range(1, len(df) + 1)
df.index.name = '序号'
# 添加序号列到第一列
df.reset_index(inplace=True)
print(f"[INFO] 全军武器装备采购信息网: 共 {len(df)} 条匹配项目")
return df
# ============ 军队采购网获取函数(更新版) ============
def fetch_military_bidding(
keywords: Optional[List[str]] = None,
exclude_keywords: Optional[List[str]] = None,
regions: Optional[Dict[str, str]] = None,
date: Optional[str] = None,
high_value_keywords: Optional[List[str]] = None,
save_csv: bool = True,
output_path: str = None
) -> pd.DataFrame:
"""
获取并筛选军队采购网商机信息
Args:
keywords: 核心关键词列表,默认使用 DEFAULT_CORE_KW
exclude_keywords: 排除关键词列表,默认使用 DEFAULT_EXCLUDE_KW
regions: 地区字典,默认使用 DEFAULT_REGIONS
date: 日期,默认今天,格式 YYYY-MM-DD
high_value_keywords: 高价值关键词列表,用于评级
save_csv: 是否保存CSV文件(此版本保留参数兼容,实际不使用)
output_path: 输出文件路径(此版本保留参数兼容)
Returns:
DataFrame: 筛选后的项目列表
"""
# 参数初始化
if keywords is None:
keywords = DEFAULT_CORE_KW
if exclude_keywords is None:
exclude_keywords = DEFAULT_EXCLUDE_KW
if regions is None:
regions = DEFAULT_REGIONS
if high_value_keywords is None:
high_value_keywords = DEFAULT_HIGH_VALUE_KW
if date is None:
date = datetime.now().strftime('%Y-%m-%d')
# API基础参数
base_url = 'https://www.plap.mil.cn/freecms/rest/v1/notice/selectInfoMoreChannel.do'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'https://www.plap.mil.cn/freecms/site/juncai/cggg/index.html'
}
site_id = '404bb030-5be9-4070-85bd-c94b1473e8de'
channel_id = 'c5bff13f-21ca-4dac-b158-cb40accd3035'
all_results = []
print(f"[INFO] 开始抓取军队采购网 - 日期: {date}")
print(f"[INFO] 目标地区: {', '.join(regions.keys())}")
print(f"[INFO] 关键词: {', '.join(keywords)}")
print("-" * 60)
for region_name, region_code in regions.items():
page = 1
region_items = []
consecutive_empty = 0
# 分页获取数据
max_pages = 10
while page <= max_pages:
params = {
'siteId': site_id,
'channel': channel_id,
'currPage': page,
'pageSize': 20,
'regionCode': region_code,
}
try:
resp = requests.get(base_url, params=params, headers=headers, timeout=10, proxies=get_proxies())
data = resp.json()
items = data.get('data', [])
if not items:
break
# 筛选当日数据
today_items = [item for item in items if date in (item.get('noticeTime') or '')]
if today_items:
region_items.extend(today_items)
consecutive_empty = 0
page += 1
else:
consecutive_empty += 1
if consecutive_empty >= 2:
break
page += 1
except Exception as e:
print(f"[ERROR] 地区 {region_name} 第{page}页获取失败: {e}")
break
print(f"[INFO] {region_name}: 获取 {len(region_items)} 条数据")
# 筛选匹配项目
for item in region_items:
title = item.get('title', '')
# ====== 新增规则:过滤中标公告/结果公示 ======
if '中标公告' in title or '结果公示' in title:
continue
# 排除无关项目
if any(ex in title for ex in exclude_keywords):
continue
# 检查关键词匹配
matched_kw = [kw for kw in keywords if kw in title]
if matched_kw:
# 获取字段
purchase_manner = str(item.get('purchaseManner', ''))
purchase_nature = str(item.get('purchaseNature', ''))
pageurl = item.get('pageurl', '')
notice_time = item.get('noticeTime', '')[:10] if item.get('noticeTime') else ''
description = item.get('description', '')[:200] if item.get('description') else ''
# ====== 新增规则:意向优先推荐 ======
recommendation = ""
if '意向' in title:
recommendation = "高"
elif any(kw in matched_kw for kw in high_value_keywords):
recommendation = "高"
elif '系统' in matched_kw:
recommendation = "中"
# 生成备注
remarks = _generate_remarks(title, matched_kw)
all_results.append({
'项目名称': title,
'项目描述': description,
'地区': region_name,
'采购方式': MANNER_MAP.get(purchase_manner, purchase_manner),
'项目类别': NATURE_MAP.get(purchase_nature, purchase_nature),
'发布时间': notice_time,
'详情链接': f'https://www.plap.mil.cn{pageurl}' if pageurl else '',
'推荐等级': recommendation,
'匹配关键词': ','.join(matched_kw),
'备注': remarks
})
# 创建DataFrame
df = pd.DataFrame(all_results)
if len(df) > 0:
df.index = range(1, len(df) + 1)
df.index.name = '序号'
df.reset_index(inplace=True)
# 调整列顺序: 序号,项目名称,项目描述,地区,采购方式,项目类别,详情链接,发布时间,匹配关键词,推荐等级,备注
cols = ['序号', '项目名称', '项目描述', '地区', '采购方式', '项目类别', '详情链接', '发布时间', '匹配关键词', '推荐等级', '备注']
df = df[cols]
# 清洗项目描述(去除HTML/CSS乱码,20-50字简述)
df['项目描述'] = df['项目描述'].apply(_clean_description)
print("-" * 60)
print(f"[INFO] 军队采购网: 共 {len(df)} 条匹配项目")
return df
# ============ 国防科大采购信息网获取函数 ============
def _get_nudt_purchase_type(title: str) -> Optional[str]:
"""根据项目名称判断公告类型"""
# 剔除结果公示/中标公告
if '结果公示' in title or '中标公告' in title:
return None
# 判断类型(按优先级)
if '流标' in title or '废标' in title:
return '废标公告'
elif '招标公告' in title or '招标变更' in title or '变更公告' in title:
return '招标公告'
elif '比价公告' in title:
return '比价公告'
elif '询价公告' in title:
return '询价公告'
elif '竞争性谈判' in title:
return '竞争性谈判'
elif '意向' in title:
return '意向公开'
else:
return '其他'
def _get_nudt_recommend_level(matched_kw: List[str]) -> str:
"""根据匹配关键词返回推荐等级"""
high_kw = ['体系', '模型', '仿真', '数据', '决策', '分析', '智能', 'AI', '软件']
if any(k in matched_kw for k in high_kw):
return "高"
elif '系统' in matched_kw:
return "中"
return ""
def _get_nudt_remarks(title: str) -> str:
"""生成简明分析备注"""
remarks = []
if '体系' in title:
remarks.append("体系相关")
if '模型' in title:
remarks.append("模型开发")
if '仿真' in title:
remarks.append("仿真项目")
if '数据' in title:
remarks.append("数据处理")
if '软件' in title or '系统' in title:
remarks.append("软件/系统类")
if '智能' in title or 'AI' in title:
remarks.append("智能AI")
if not remarks:
remarks.append("可关注")
return "; ".join(remarks[:2])
def _fetch_nudt_page(page: int) -> List[Dict]:
"""获取国防科大采购信息网指定页的数据"""
if page == 1:
url = "https://www.nudt.edu.cn/cgxx/index.htm"
else:
url = f"https://www.nudt.edu.cn/cgxx/index{page-1}.htm"
try:
resp = requests.get(url, timeout=10, proxies=get_proxies())
resp.encoding = 'utf-8'
html = resp.text
li_pattern = r'<li>\s*<a href="([^"]+)">\s*<div class="date"><span class="day[^"]*">(\d+)</span><span class="year[^"]*">(\d+-\d+)</span></div>\s*<div class="title[^"]*">(.*?)</div>'
matches = re.findall(li_pattern, html)
results = []
for href, day, year_month, title in matches:
year = "20" + year_month.split("-")[0]
month = year_month.split("-")[1]
full_date = f"{year}-{month}-{day}"
results.append({
'href': 'https://www.nudt.edu.cn/cgxx/' + href,
'date': full_date,
'title': title.strip()
})
return results
except Exception as e:
return []
def fetch_nudt_bidding(
keywords: Optional[List[str]] = None,
date: Optional[str] = None
) -> pd.DataFrame:
"""
获取国防科大采购信息网商机信息
Args:
keywords: 核心关键词列表
date: 日期,默认今天,格式 YYYY-MM-DD
Returns:
DataFrame: 筛选后的项目列表
"""
if keywords is None:
keywords = DEFAULT_CORE_KW
if date is None:
date = datetime.now().strftime('%Y-%m-%d')
all_results = []
print(f"[INFO] 开始抓取国防科大采购信息网 - 日期: {date}")
print("-" * 60)
# 获取多页直到不是目标日期
page = 1
max_pages = 5
while page <= max_pages:
results = _fetch_nudt_page(page)
if not results:
break
# 检查是否有目标日期的数据
has_target_date = any(r['date'] == date for r in results)
if not has_target_date:
break
# 收集目标日期的数据
for r in results:
if r['date'] == date:
all_results.append(r)
page += 1
# 处理数据
processed_results = []
for item in all_results:
title = item['title']
# 判断公告类型,剔除结果公示/中标公告
purchase_type = _get_nudt_purchase_type(title)
if purchase_type is None:
continue
# 匹配关键词
matched_kw = [kw for kw in keywords if kw in title]
# 只保留匹配的项目
if matched_kw:
recommend = _get_nudt_recommend_level(matched_kw)
remarks = _get_nudt_remarks(title)
processed_results.append({
'项目名称': title,
'公告类型': purchase_type,
'详情链接': item['href'],
'发布日期': item['date'],
'匹配关键词': ','.join(matched_kw),
'推荐等级': recommend,
'备注': remarks
})
# 创建DataFrame
df = pd.DataFrame(processed_results)
if len(df) > 0:
df.index = range(1, len(df) + 1)
df.index.name = '序号'
df.reset_index(inplace=True)
print(f"[INFO] 国防科大采购信息网: 共 {len(df)} 条匹配项目")
return df
# ============ 整合输出Excel函数 ============
def _get_weain_latest_date() -> str:
"""获取全军武器装备采购信息网的最新发布日期"""
try:
result = subprocess.run(['date', '+%s%3N'], capture_output=True, text=True)
_t = result.stdout.strip()
# 获取公开招标第一页,检查最新日期
url = f"https://www.weain.mil.cn/api/front/list/cggg/list?LMID=1149231276155707394&pageNo=1&purchaseType=%E5%85%AC%E5%BC%80%E6%8B%9B%E6%A0%87&_t={_t}"
resp = requests.get(url, timeout=10, proxies=get_proxies())
data = resp.json()
content_list = data.get('list', {}).get('contentList', [])
if content_list:
publish_time = content_list[0].get('publishTime', '')[:10]
return publish_time
except:
pass
return datetime.now().strftime('%Y-%m-%d')
def _get_nudt_latest_date() -> str:
"""获取国防科大采购信息网的最新发布日期"""
try:
url = "https://www.nudt.edu.cn/cgxx/index.htm"
resp = requests.get(url, timeout=10, proxies=get_proxies())
resp.encoding = 'utf-8'
html = resp.text
# 匹配所有日期记录
li_pattern = r'<li>\s*<a href="[^"]+">\s*<div class="date"><span class="day[^"]*">(\d+)</span><span class="year[^"]*">(\d+-\d+)</span></div>'
matches = re.findall(li_pattern, html)
if matches:
# 找到最新的日期(列表是按时间正序排列的,第一条是最旧的,最后一条是最新的)
latest = matches[-1]
day = latest[0]
year_month = latest[1]
year = "20" + year_month.split("-")[0]
month = year_month.split("-")[1]
return f"{year}-{month}-{day}"
except Exception as e:
print(f"获取国防科大最新日期失败: {e}")
pass
return datetime.now().strftime('%Y-%m-%d')
def fetch_all_bidding(
keywords: Optional[List[str]] = None,
exclude_keywords: Optional[List[str]] = None,
regions: Optional[Dict[str, str]] = None,
date: Optional[str] = None,
high_value_keywords: Optional[List[str]] = None,
output_path: str = None,
auto_latest: bool = True
) -> tuple:
"""
整合获取全军武器装备采购信息网和军队采购网的商机信息,输出Excel
Args:
keywords: 核心关键词列表
exclude_keywords: 排除关键词列表(仅军队采购网使用)
regions: 地区字典(仅军队采购网使用)
date: 日期,默认今天,如果auto_latest为True则自动获取各渠道最新日期
high_value_keywords: 高价值关键词列表
output_path: 输出Excel路径
auto_latest: 是否自动获取各渠道最新日期(默认True)
Returns:
tuple: (df_weain, df_military, df_nudt) 三个DataFrame
"""
if keywords is None:
keywords = DEFAULT_CORE_KW
if exclude_keywords is None:
exclude_keywords = DEFAULT_EXCLUDE_KW
if regions is None:
regions = DEFAULT_REGIONS
if high_value_keywords is None:
high_value_keywords = DEFAULT_HIGH_VALUE_KW
today = datetime.now().strftime('%Y-%m-%d')
# 确定各渠道日期:auto_latest 时并行预检,否则直接用指定日期
if auto_latest and date is None:
print("[INFO] 自动检测各渠道最新日期(并行)...")
with ThreadPoolExecutor(max_workers=2) as ex:
f_weain_date = ex.submit(_get_weain_latest_date)
f_nudt_date = ex.submit(_get_nudt_latest_date)
weain_date = f_weain_date.result()
nudt_date = f_nudt_date.result()
military_date = today
file_date = today
print(f"[INFO] 全军装备采购网最新: {weain_date}")
print(f"[INFO] 军队采购网最新: {military_date}")
print(f"[INFO] 国防科大采购网最新: {nudt_date}")
print("=" * 60)
else:
if date is None:
date = today
weain_date = date
military_date = date
nudt_date = date
file_date = date
print("=" * 60)
print(f"开始整合抓取 - 日期: {date}")
print("=" * 60)
if output_path is None:
output_dir = os.path.expanduser(get_output_dir())
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f'军队采购商机汇总_{file_date}.xlsx')
# 并行抓取三个数据源
print("[INFO] 并行抓取三个数据源...")
with ThreadPoolExecutor(max_workers=3) as ex:
f_weain = ex.submit(fetch_weain_bidding, keywords, weain_date)
f_military = ex.submit(fetch_military_bidding, keywords, exclude_keywords,
regions, military_date, high_value_keywords, False)
f_nudt = ex.submit(fetch_nudt_bidding, keywords, nudt_date)
df_weain = f_weain.result()
df_military = f_military.result()
df_nudt = f_nudt.result()
weain_actual_date = weain_date if len(df_weain) > 0 else ""
nudt_actual_date = nudt_date if len(df_nudt) > 0 else ""
print()
# 输出Excel
_save_to_excel(df_weain, df_military, df_nudt, output_path)
# 返回各渠道实际使用的日期
return df_weain, df_military, df_nudt, {
'weain': weain_actual_date,
'military': military_date,
'nudt': nudt_actual_date
}
def _save_to_excel(df_weain: pd.DataFrame, df_military: pd.DataFrame, df_nudt: pd.DataFrame, output_path: str):
"""保存到Excel文件(三个sheet)"""
# 设置样式
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
# 创建工作簿
wb = Workbook()
# ========== Sheet 1: 全军武器装备采购信息网 ==========
ws1 = wb.active
ws1.title = "全军武器装备采购信息网"
# 写入标题行 - 使用自己的字段
if len(df_weain) > 0:
weain_headers = list(df_weain.columns)
for col, header in enumerate(weain_headers, 1):
cell = ws1.cell(row=1, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
cell.border = thin_border
# 写入数据
for row_idx, row_data in enumerate(df_weain.values, 2):
for col_idx, value in enumerate(row_data, 1):
ws1.cell(row=row_idx, column=col_idx, value=value).border = thin_border
# 调整列宽
ws1.column_dimensions['A'].width = 8
ws1.column_dimensions['B'].width = 60
ws1.column_dimensions['C'].width = 12
ws1.column_dimensions['D'].width = 10
ws1.column_dimensions['E'].width = 70
ws1.column_dimensions['F'].width = 12
ws1.column_dimensions['G'].width = 12
ws1.column_dimensions['H'].width = 20
ws1.column_dimensions['I'].width = 10
ws1.column_dimensions['J'].width = 30
# ========== Sheet 2: 军队采购网 ==========
ws2 = wb.create_sheet(title="军队采购网")
# 写入标题行 - 使用自己的字段
if len(df_military) > 0:
military_headers = list(df_military.columns)
for col, header in enumerate(military_headers, 1):
cell = ws2.cell(row=1, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
cell.border = thin_border
# 写入数据
for row_idx, row_data in enumerate(df_military.values, 2):
for col_idx, value in enumerate(row_data, 1):
ws2.cell(row=row_idx, column=col_idx, value=value).border = thin_border
# 调整列宽
ws2.column_dimensions['A'].width = 8
ws2.column_dimensions['B'].width = 50
ws2.column_dimensions['C'].width = 35
ws2.column_dimensions['D'].width = 10
ws2.column_dimensions['E'].width = 12
ws2.column_dimensions['F'].width = 10
ws2.column_dimensions['G'].width = 60
ws2.column_dimensions['H'].width = 12
ws2.column_dimensions['I'].width = 20
ws2.column_dimensions['J'].width = 10
ws2.column_dimensions['K'].width = 30
# ========== Sheet 3: 国防科大采购信息网 ==========
ws3 = wb.create_sheet(title="国防科大采购信息网")
# 写入标题行 - 使用自己的字段
if len(df_nudt) > 0:
nudt_headers = list(df_nudt.columns)
for col, header in enumerate(nudt_headers, 1):
cell = ws3.cell(row=1, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
cell.border = thin_border
# 写入数据
for row_idx, row_data in enumerate(df_nudt.values, 2):
for col_idx, value in enumerate(row_data, 1):
ws3.cell(row=row_idx, column=col_idx, value=value).border = thin_border
# 调整列宽
ws3.column_dimensions['A'].width = 8
ws3.column_dimensions['B'].width = 60
ws3.column_dimensions['C'].width = 12
ws3.column_dimensions['D'].width = 70
ws3.column_dimensions['E'].width = 12
ws3.column_dimensions['F'].width = 20
ws3.column_dimensions['G'].width = 10
ws3.column_dimensions['H'].width = 30
# 保存文件
wb.save(output_path)
print(f"[INFO] Excel已保存: {output_path}")
# ============ 兼容旧接口 ============
def fetch_with_custom_params(
region_codes: Dict[str, str],
target_date: str,
custom_keywords: List[str],
custom_exclude: List[str] = None
) -> pd.DataFrame:
"""
自定义参数抓取接口
Args:
region_codes: 地区编码字典
target_date: 目标日期 YYYY-MM-DD
custom_keywords: 自定义关键词列表
custom_exclude: 自定义排除词列表
Returns:
DataFrame: 筛选结果
"""
return fetch_military_bidding(
keywords=custom_keywords,
exclude_keywords=custom_exclude,
regions=region_codes,
date=target_date,
save_csv=True
)
HELP_TEXT = """军工采购商机抓取工具
用法:
python fetcher.py [选项]
选项:
--today 抓取今日数据
--yesterday 抓取昨日数据(默认)
--date DATE 抓取指定日期,格式 YYYY-MM-DD
--keywords WORDS 核心关键词,逗号分隔,如: 模型,仿真,AI
--exclude-keywords WORDS 排除关键词,逗号分隔,如: 服装,医疗
--high-value-keywords WORDS 高价值关键词,逗号分隔
--regions REGIONS 地区,逗号分隔,如: 北京,湖北,湖南
--output PATH 输出文件路径
--no-auto-latest 禁用自动获取最新日期
--help 显示此帮助信息
示例:
python fetcher.py # 抓取昨日(默认)
python fetcher.py --today # 抓取今日
python fetcher.py --date 2026-03-23 # 抓取指定日期
python fetcher.py --keywords "模型,仿真" --exclude-keywords "服装,医疗"
"""
def main():
import argparse
import sys
parser = argparse.ArgumentParser(
description='军工采购商机抓取工具',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=HELP_TEXT
)
group = parser.add_mutually_exclusive_group()
group.add_argument('--today', action='store_true', help='抓取今日数据')
group.add_argument('--yesterday', action='store_true', help='抓取昨日数据(默认)')
group.add_argument('--date', type=str, help='抓取指定日期,格式 YYYY-MM-DD')
parser.add_argument('--keywords', type=str,
help='核心关键词,逗号分隔')
parser.add_argument('--exclude-keywords', type=str,
help='排除关键词,逗号分隔')
parser.add_argument('--high-value-keywords', type=str,
help='高价值关键词,逗号分隔')
parser.add_argument('--regions', type=str,
help='地区,逗号分隔')
parser.add_argument('--output', type=str,
help='输出文件路径')
parser.add_argument('--no-auto-latest', action='store_true',
help='禁用自动获取最新日期')
args = parser.parse_args()
# 无参数或 --help 时显示帮助
if len(sys.argv) == 1 or '--help' in sys.argv:
print(HELP_TEXT)
sys.exit(0)
# 确定日期
target_date = None
auto_latest = not args.no_auto_latest
if args.today:
target_date = datetime.now().strftime('%Y-%m-%d')
auto_latest = False
elif args.yesterday:
from datetime import timedelta
target_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
auto_latest = False
elif args.date:
target_date = args.date
# 解析关键词
keywords = None
if args.keywords:
keywords = args.keywords.split(',')
exclude_keywords = None
if args.exclude_keywords:
exclude_keywords = args.exclude_keywords.split(',')
high_value_keywords = None
if args.high_value_keywords:
high_value_keywords = args.high_value_keywords.split(',')
# 解析地区
regions = None
if args.regions:
region_map = {
'北京': '110000', '天津': '120000', '河北': '130000',
'山西': '140000', '内蒙古': '150000', '辽宁': '210000',
'吉林': '220000', '黑龙江': '230000', '上海': '310000',
'江苏': '320000', '浙江': '330000', '安徽': '340000',
'福建': '350000', '江西': '360000', '山东': '370000',
'河南': '410000', '湖北': '420000', '湖南': '430000',
'广东': '440000', '广西': '450000', '海南': '460000',
'重庆': '500000', '四川': '510000', '贵州': '520000',
'云南': '530000', '西藏': '540000', '陕西': '610000',
'甘肃': '620000', '青海': '630000', '宁夏': '640000',
'新疆': '650000'
}
region_names = args.regions.split(',')
regions = {name: region_map.get(name, '') for name in region_names if name in region_map}
# 执行抓取
result = fetch_all_bidding(
keywords=keywords,
exclude_keywords=exclude_keywords,
regions=regions,
date=target_date,
high_value_keywords=high_value_keywords,
output_path=args.output,
auto_latest=auto_latest
)
if len(result) == 4:
df_weain, df_military, df_nudt, dates_info = result
else:
df_weain, df_military, df_nudt = result
print("\n" + "=" * 60)
print("[RESULT] 汇总结果:")
print(f" 全军武器装备采购信息网: {len(df_weain)} 条")
print(f" 军队采购网: {len(df_military)} 条")
print(f" 国防科大采购信息网: {len(df_nudt)} 条")
print("=" * 60)
# CLI入口
if __name__ == '__main__':
main()
FILE:pyproject.toml
[project]
name = "milb-fetcher"
version = "0.1.0"
description = "军工采购商机自动抓取工具"
requires-python = ">=3.8"
dependencies = [
"requests",
"pandas",
"openpyxl",
]
[project.scripts]
milb-fetcher = "milb_fetcher.fetcher:main"
[tool.setuptools.packages.find]
where = ["."]