@clawhub-biglizi775-0148e1bc57
用于调用 Quake CLI 进行资产检索、自动翻页导出 CSV/RAW。用户提到 Quake、quake.exe、quake_for_Linux、quake_for_Apple、资产测绘、批量查询、自动翻页导出等需求时优先加载本 skill。
---
name: quake-search-use
description: 用于调用 Quake CLI 进行资产检索、自动翻页导出 CSV/RAW。用户提到 Quake、quake.exe、quake_for_Linux、quake_for_Apple、资产测绘、批量查询、自动翻页导出等需求时优先加载本 skill。
---
# Quake Search Use
## First
- 目标是让用户输入 key 和查询参数后,自动翻页并导出结果。
- 脚本支持 `search/domain/host/info/honeypot` 五种模式。
- 自动识别系统并选择二进制:
- Windows: `quake.exe`
- macOS: `quake_for_Apple`
- Linux: `quake_for_Linux`
- 可用 `--quake-bin` 手动覆盖二进制路径。
- 本 skill 可移植:整个目录复制到任意项目即可使用。
## 入口脚本
- 主脚本:`scripts/quake_batch_cli.py`
- 运行方式:
- AI 无交互推荐:
- `python3 scripts/quake_batch_cli.py --no-interactive --mode search --key "$QUAKE_API_KEY" --query 'app:"exchange 2010"' --fields "ip,port,title" --page-size 100 --max-records 1000 --output-csv search.csv --output-raw search_raw.txt`
- 兼容交互:
- `python3 scripts/quake_batch_cli.py`
## 可移植要求
- 将 Quake 官方二进制放到 `scripts/` 目录同级(至少一个):
- `quake.exe`
- `quake_for_Apple`
- `quake_for_Linux`
- 或者运行时通过 `--quake-bin` 指定绝对/相对路径。
## 参数说明
- `--mode`:`search/domain/host/info/honeypot`
- `--key`:可选,提供后先执行 `quake init <key>`
- `--quake-bin`:可选,手动指定 Quake 二进制路径
- `--query`:仅 `search` 模式需要
- `--domain`:仅 `domain` 模式需要
- `--ip`:仅 `host/honeypot` 模式需要
- `--fields`:`search/domain` 返回字段
- `--filter`:`search` 模式正则过滤(映射 quake `-f`)
- `--page-size`:每页数量(1~100)
- `--max-records`:最大导出条数
- `--output-csv`:CSV 输出文件
- `--output-raw`:RAW 输出文件
- `--no-interactive`:禁止交互输入,缺参即报错
## CSV 导出帮助
- 自定义导出字段:`search/domain` 模式使用 `--fields` 指定,字段顺序即 CSV 列顺序。
- `search` 常用字段:
- `ip,port,title,country,province,city,owner,time,domain,ssldomain`
- `domain` 常用字段:
- `domain,ip,port,title`
- 无交互导出示例(search):
- `python3 scripts/quake_batch_cli.py --no-interactive --mode search --key "$QUAKE_API_KEY" --query 'ip: "118.1XX.XX.191" AND port: "21008"' --fields "ip,port,title,country,province,city,time" --page-size 10 --max-records 50 --output-csv quake_custom.csv --output-raw quake_custom_raw.txt`
- 无交互导出示例(domain):
- `python3 scripts/quake_batch_cli.py --no-interactive --mode domain --key "$QUAKE_API_KEY" --domain "360.cn" --fields "ip,port,domain,title" --page-size 100 --max-records 1000 --output-csv quake_domain.csv --output-raw quake_domain_raw.txt`
- 结果为空时,CSV 仅保留表头;如需排查请查看 `--output-raw` 文件中的原始返回。
## 安全注意事项
- 不要把真实 API key 写进仓库。
- 推荐通过环境变量注入(如 `QUAKE_API_KEY`)。
- 对外分发时只提供脚本和说明,不包含你的 key。
FILE:scripts/quake_batch_cli.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Quake CLI wrapper (single-file, portable).
Features:
- Wraps local quake executable
- Interactive input for key and query parameters
- Auto paging for search/domain/host
- Export CSV and raw text
"""
from __future__ import annotations
import argparse
import csv
import platform
import re
import subprocess
from pathlib import Path
from typing import List, Optional, Tuple
DEFAULT_PAGE_SIZE = 100
MAX_PAGE_SIZE = 100
class QuakeCliError(Exception):
pass
ANSI_ESCAPE_RE = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
def script_dir() -> Path:
return Path(__file__).resolve().parent
def resolve_quake_binary(manual_path: Optional[str] = None) -> Path:
if manual_path:
candidate = Path(manual_path).expanduser()
if not candidate.is_absolute():
candidate = script_dir() / candidate
if candidate.exists():
return candidate
raise QuakeCliError(f"指定的 quake 二进制不存在: {candidate}")
system_name = platform.system().lower()
if "windows" in system_name:
candidate = script_dir() / "quake.exe"
if candidate.exists():
return candidate
elif "darwin" in system_name:
candidate = script_dir() / "quake_for_Apple"
if candidate.exists():
return candidate
elif "linux" in system_name:
candidate = script_dir() / "quake_for_Linux"
if candidate.exists():
return candidate
fallback_names = ["quake.exe", "quake_for_Apple", "quake_for_Linux", "quake"]
for name in fallback_names:
candidate = script_dir() / name
if candidate.exists():
return candidate
raise QuakeCliError(
f"未找到可用 Quake 二进制。当前系统: {platform.system()},"
f"请确认目录中存在 quake.exe / quake_for_Apple / quake_for_Linux,"
"或使用 --quake-bin 手动指定。"
)
def run_quake(args: List[str], quake_bin: Optional[str] = None) -> str:
exe = resolve_quake_binary(quake_bin)
cmd = [str(exe)] + args
proc = subprocess.run(
cmd,
cwd=str(script_dir()),
capture_output=True,
text=True,
encoding="utf-8",
errors="ignore",
)
output = (proc.stdout or "") + ("\n" + proc.stderr if proc.stderr else "")
if proc.returncode != 0:
raise QuakeCliError(f"命令失败: {' '.join(args)}\n{output.strip()}")
return output
def parse_count_total(output: str) -> Tuple[int, Optional[int]]:
count = 0
total = None
clean_output = ANSI_ESCAPE_RE.sub("", output)
m_count = re.search(r"count:\s*(\d+)", clean_output, flags=re.IGNORECASE)
if m_count:
count = int(m_count.group(1))
m_total = re.search(r"total:\s*(\d+)", clean_output, flags=re.IGNORECASE)
if m_total:
total = int(m_total.group(1))
return count, total
def parse_domain_or_search_rows(output: str, fields: List[str]) -> List[List[str]]:
rows: List[List[str]] = []
collect = False
expected = len(fields)
for line in output.splitlines():
s = ANSI_ESCAPE_RE.sub("", line).strip()
if not s:
continue
if "count:" in s and "total:" in s:
collect = True
continue
if not collect:
continue
if s.startswith("[") or s.startswith("IP:") or s.startswith("|"):
continue
if s.startswith("+]") or s.startswith("[+]") or s.startswith("[-]") or s.startswith("[!]"):
continue
if "\t" in s:
raw_parts = [p.strip() for p in s.split("\t")]
parts = [p for p in raw_parts]
else:
parts = s.split(None, max(0, expected - 1))
if expected > 0 and len(parts) < expected:
parts += [""] * (expected - len(parts))
if expected > 0 and len(parts) > expected:
parts = parts[:expected]
if parts:
rows.append(parts[:expected] if expected > 0 else parts)
return rows
def parse_host_rows(output: str) -> List[List[str]]:
rows: List[List[str]] = []
current_ip = ""
for line in output.splitlines():
s = ANSI_ESCAPE_RE.sub("", line).strip()
if s.startswith("IP:"):
m = re.search(r"IP:\s*([0-9a-fA-F:\.\/]+)", s)
if m:
current_ip = m.group(1)
continue
if s.startswith("|"):
body = s.lstrip("|").strip()
parts = body.split()
if len(parts) >= 3:
port = parts[0]
protocol = parts[1]
time_value = parts[-1]
rows.append([current_ip, port, protocol, time_value])
return rows
def write_csv(path: Path, header: List[str], rows: List[List[str]]) -> None:
with path.open("w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow(header)
writer.writerows(rows)
def write_text(path: Path, text: str) -> None:
path.write_text(text, encoding="utf-8")
def init_key(api_key: str, quake_bin: Optional[str] = None) -> None:
print("[*] 正在初始化 Quake key ...")
out = run_quake(["init", api_key], quake_bin=quake_bin)
print(out.strip() or "[+] init 完成")
def run_info(quake_bin: Optional[str] = None) -> None:
out = run_quake(["info"], quake_bin=quake_bin)
print(out.strip())
def run_honeypot(ip: str, quake_bin: Optional[str] = None) -> None:
out = run_quake(["honeypot", ip], quake_bin=quake_bin)
print(out.strip())
def paged_domain(
domain: str,
fields: str,
page_size: int,
max_records: int,
quake_bin: Optional[str],
) -> Tuple[List[List[str]], str, Optional[int]]:
rows_all: List[List[str]] = []
raw_chunks: List[str] = []
fields_list = [x.strip() for x in fields.split(",") if x.strip()]
start = 0
total = None
while len(rows_all) < max_records:
size = min(page_size, max_records - len(rows_all))
out = run_quake(
["domain", domain, "--start", str(start), "--size", str(size), "-t", fields],
quake_bin=quake_bin,
)
raw_chunks.append(out)
count, page_total = parse_count_total(out)
if page_total is not None:
total = page_total
page_rows = parse_domain_or_search_rows(out, fields_list)
rows_all.extend(page_rows)
print(f"[*] domain 翻页: start={start}, count={count}, 累计={len(rows_all)}")
if count == 0 or len(page_rows) == 0 or count < size:
break
start += size
return rows_all[:max_records], "\n\n".join(raw_chunks), total
def paged_search(
query: str,
fields: str,
page_size: int,
max_records: int,
regex_filter: Optional[str],
quake_bin: Optional[str],
) -> Tuple[List[List[str]], str, Optional[int]]:
rows_all: List[List[str]] = []
raw_chunks: List[str] = []
fields_list = [x.strip() for x in fields.split(",") if x.strip()]
start = 0
total = None
while len(rows_all) < max_records:
size = min(page_size, max_records - len(rows_all))
args = ["search", query, "--start", str(start), "--size", str(size), "-t", fields]
if regex_filter:
args += ["-f", regex_filter]
out = run_quake(args, quake_bin=quake_bin)
raw_chunks.append(out)
count, page_total = parse_count_total(out)
if page_total is not None:
total = page_total
page_rows = parse_domain_or_search_rows(out, fields_list)
rows_all.extend(page_rows)
print(f"[*] search 翻页: start={start}, count={count}, 累计={len(rows_all)}")
if count == 0 or len(page_rows) == 0 or count < size:
break
start += size
return rows_all[:max_records], "\n\n".join(raw_chunks), total
def paged_host(
ip_or_cidr: str,
page_size: int,
max_records: int,
quake_bin: Optional[str],
) -> Tuple[List[List[str]], str, Optional[int]]:
rows_all: List[List[str]] = []
raw_chunks: List[str] = []
start = 0
total = None
while len(rows_all) < max_records:
size = min(page_size, max_records - len(rows_all))
out = run_quake(
["host", ip_or_cidr, "--start", str(start), "--size", str(size)],
quake_bin=quake_bin,
)
raw_chunks.append(out)
count, page_total = parse_count_total(out)
if page_total is not None:
total = page_total
page_rows = parse_host_rows(out)
rows_all.extend(page_rows)
print(f"[*] host 翻页: start={start}, count={count}, 累计={len(rows_all)}")
if count == 0 or len(page_rows) == 0 or count < size:
break
start += size
return rows_all[:max_records], "\n\n".join(raw_chunks), total
def prompt_required(msg: str) -> str:
while True:
v = input(msg).strip()
if v:
return v
print("该项必填,请重新输入。")
def prompt_int(msg: str, default: int, minimum: int, maximum: int) -> int:
while True:
raw = input(f"{msg}(默认 {default}): ").strip()
if not raw:
return default
if not raw.isdigit():
print("请输入整数。")
continue
v = int(raw)
if v < minimum or v > maximum:
print(f"请输入 {minimum}-{maximum} 之间的整数。")
continue
return v
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Quake CLI 批量查询脚本(单文件)")
parser.add_argument("--mode", choices=["search", "domain", "host", "info", "honeypot"], help="查询模式")
parser.add_argument("--key", help="可选,若提供则先执行 quake init <key>")
parser.add_argument("--quake-bin", help="可选,手动指定 quake 二进制路径")
parser.add_argument("--query", help="search 模式查询语句")
parser.add_argument("--domain", help="domain 模式的域名")
parser.add_argument("--ip", help="host/honeypot 模式的 IP 或网段")
parser.add_argument("--fields", help="search/domain 显示字段")
parser.add_argument("--filter", help="search 模式正则过滤")
parser.add_argument("--page-size", type=int, default=DEFAULT_PAGE_SIZE, help=f"每页数量(1-{MAX_PAGE_SIZE})")
parser.add_argument("--max-records", type=int, default=1000, help="最大拉取条数")
parser.add_argument("--output-csv", default="quake_results.csv", help="CSV 输出文件")
parser.add_argument("--output-raw", default="quake_results_raw.txt", help="原始输出文件")
parser.add_argument("--no-interactive", action="store_true", help="无交互模式")
return parser.parse_args()
def interactive_fill(args: argparse.Namespace) -> argparse.Namespace:
if args.mode is None:
args.mode = prompt_required("模式(search/domain/host/info/honeypot): ").lower()
if not args.key:
args.key = input("Quake API key(可选,回车跳过): ").strip() or None
if args.mode == "search":
args.query = args.query or prompt_required("请输入 search 查询语句: ")
args.fields = args.fields or input("fields(默认 ip,port,title): ").strip() or "ip,port,title"
elif args.mode == "domain":
args.domain = args.domain or prompt_required("请输入域名(如 360.cn): ")
args.fields = args.fields or input("fields(默认 domain,ip): ").strip() or "domain,ip"
elif args.mode in ("host", "honeypot"):
args.ip = args.ip or prompt_required("请输入 IP 或网段: ")
if args.mode in ("search", "domain", "host"):
args.page_size = prompt_int("每页数量", args.page_size, 1, MAX_PAGE_SIZE)
args.max_records = prompt_int("最大拉取条数", args.max_records, 1, 1_000_000)
if args.output_csv == "quake_results.csv":
custom_csv = input("CSV 输出文件名(默认 quake_results.csv): ").strip()
if custom_csv:
args.output_csv = custom_csv
if args.output_raw == "quake_results_raw.txt":
custom_raw = input("RAW 输出文件名(默认 quake_results_raw.txt): ").strip()
if custom_raw:
args.output_raw = custom_raw
return args
def validate_no_interactive(args: argparse.Namespace) -> None:
if args.mode is None:
raise QuakeCliError("--no-interactive 模式下,--mode 必填")
if args.mode == "search" and not args.query:
raise QuakeCliError("--no-interactive 模式下,search 需要 --query")
if args.mode == "domain" and not args.domain:
raise QuakeCliError("--no-interactive 模式下,domain 需要 --domain")
if args.mode in ("host", "honeypot") and not args.ip:
raise QuakeCliError("--no-interactive 模式下,host/honeypot 需要 --ip")
if args.mode in ("search", "domain", "host") and (args.page_size < 1 or args.page_size > MAX_PAGE_SIZE):
raise QuakeCliError(f"--page-size 必须在 1~{MAX_PAGE_SIZE}")
def main() -> None:
args = parse_args()
if args.no_interactive:
validate_no_interactive(args)
else:
args = interactive_fill(args)
selected_bin = resolve_quake_binary(args.quake_bin)
print(f"[*] 使用 Quake 二进制: {selected_bin}")
if args.key:
init_key(args.key, quake_bin=str(selected_bin))
if args.mode == "info":
run_info(quake_bin=str(selected_bin))
return
if args.mode == "honeypot":
run_honeypot(args.ip, quake_bin=str(selected_bin))
return
if args.mode == "domain":
rows, raw, total = paged_domain(
domain=args.domain,
fields=args.fields or "domain,ip",
page_size=args.page_size,
max_records=args.max_records,
quake_bin=str(selected_bin),
)
header = [x.strip() for x in (args.fields or "domain,ip").split(",") if x.strip()]
elif args.mode == "search":
rows, raw, total = paged_search(
query=args.query,
fields=args.fields or "ip,port,title",
page_size=args.page_size,
max_records=args.max_records,
regex_filter=args.filter,
quake_bin=str(selected_bin),
)
header = [x.strip() for x in (args.fields or "ip,port,title").split(",") if x.strip()]
elif args.mode == "host":
rows, raw, total = paged_host(
ip_or_cidr=args.ip,
page_size=args.page_size,
max_records=args.max_records,
quake_bin=str(selected_bin),
)
header = ["ip", "port", "protocol", "time"]
else:
raise QuakeCliError(f"未知模式: {args.mode}")
csv_path = Path(args.output_csv)
raw_path = Path(args.output_raw)
write_csv(csv_path, header, rows)
write_text(raw_path, raw)
print(f"[+] CSV 已导出: {csv_path.resolve()}")
print(f"[+] RAW 已导出: {raw_path.resolve()}")
print(f"[+] 导出条数: {len(rows)}")
if total is not None:
print(f"[+] 目标总量(平台返回): {total}")
if __name__ == "__main__":
try:
main()
except QuakeCliError as exc:
print(f"[!] 错误: {exc}")
raise SystemExit(1)
except KeyboardInterrupt:
print("\n[!] 用户中断")
raise SystemExit(130)
FILE:scripts/README.md
# QuakeSearchskill Scripts
## 1. Files
- `quake_batch_cli.py`: Quake 批量查询与导出脚本(交互 + 无交互)
## 2. Binary placement
将 Quake 官方二进制放到本目录(至少一个):
- `quake.exe`(Windows)
- `quake_for_Apple`(macOS)
- `quake_for_Linux`(Linux)
脚本会自动按系统选择;也可以 `--quake-bin` 手动指定。
## 3. Quick start
```bash
python3 quake_batch_cli.py
```
无交互示例:
```bash
python3 quake_batch_cli.py --no-interactive --mode search --key "$QUAKE_API_KEY" --query 'ip: "118.114.241.191" AND port: "21008"' --fields "ip,port,title,country,province,city,time" --page-size 10 --max-records 50 --output-csv result.csv --output-raw result_raw.txt
```
用于调用奇安信 Hunter OpenAPI 进行资产批量导出。用户提到 Hunter、资产测绘、批量导出、task_id 下载文件等需求时优先加载本 skill。
---
name: qax-hunter-use
description: 用于调用奇安信 Hunter OpenAPI 进行资产批量导出。用户提到 Hunter、资产测绘、批量导出、task_id 下载文件等需求时优先加载本 skill。
---
# QAX Hunter Use
## First
- 默认走 API 模式,不使用文件上传。
- 查询语法由用户输入,脚本自动做 RFC 4648 base64url 编码。
- 批量任务流程固定为:
1) 创建任务 `/openApi/search/batch`
2) 查询进度 `/openApi/search/batch/{task_id}`
3) 下载结果 `/openApi/search/download/{task_id}`
## 入口脚本
- 主脚本:`scripts/hunter_batch_cli.py`
- 运行方式:
- AI 无交互推荐:
- `python3 scripts/hunter_batch_cli.py --no-interactive --api-key "$HUNTER_API_KEY" --search 'web.title="test"' --check-delay 10 --json-output`
- 兼容交互:
- `python3 scripts/hunter_batch_cli.py`
## 参数说明
- 必填:`api-key`
- 常用可选:`search`、`start_time`、`end_time`、`is_web`、`status_code`、`fields`、`assets_limit`
- AI 友好参数:
- `--no-interactive`:禁止交互输入,缺参即报错
- `--check-delay`:创建任务后等待 N 秒(默认 10)再首次尝试下载
- `--poll-interval` / `--poll-timeout`:轮询控制
- `--output-file`:指定下载文件名
- `--json-output`:最后输出一行 JSON 结果,便于机器解析
## 安全注意事项
- 不要将真实 `api-key` 写入仓库文件。
- 推荐使用环境变量注入:`HUNTER_API_KEY`。
- 若必须持久化,请使用系统密钥环或加密配置文件,不要明文存储。
FILE:scripts/hunter_batch_cli.py
#!/usr/bin/env python3
"""
奇安信 Hunter 批量查询命令行脚本。
功能说明:
- 通过交互方式输入请求参数;
- 使用 POST /openApi/search/batch 创建导出任务;
- 不使用文件上传模式;
- 自动将 search 按 RFC 4648 进行 base64url 编码;
- 自动轮询任务进度并下载最终导出文件。
"""
from __future__ import annotations
import argparse
import base64
import json
import os
import re
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional
import requests
API_URL = "https://hunter.qianxin.com/openApi/search/batch"
DOWNLOAD_BASE_URL = "https://hunter.qianxin.com/openApi/search/download"
# Hunter 可选导出字段白名单,用于入参校验避免请求被服务端拒绝
ALLOWED_FIELDS = {
"ip",
"port",
"domain",
"ip_tag",
"url",
"web_title",
"is_risk_protocol",
"protocol",
"base_protocol",
"status_code",
"os",
"company",
"number",
"icp_exception",
"country",
"province",
"city",
"is_web",
"isp",
"as_org",
"cert_sha256",
"ssl_certificate",
"component",
"asset_tag",
"updated_at",
"header",
"header_server",
"banner",
}
def _prompt_required(prompt_text: str) -> str:
"""读取必填参数,直到用户输入非空内容。"""
while True:
value = input(prompt_text).strip()
if value:
return value
print("该参数必填,请重新输入。")
def _prompt_optional(prompt_text: str) -> Optional[str]:
"""读取可选参数,空输入返回 None。"""
value = input(prompt_text).strip()
return value or None
def _validate_date(date_str: str) -> bool:
"""校验日期格式是否为 YYYY-MM-DD。"""
try:
datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
return False
return True
def _encode_base64url(text: str) -> str:
# Hunter 要求 search 按 RFC 4648 base64url 编码,且去掉末尾填充 "="
raw = text.encode("utf-8")
encoded = base64.urlsafe_b64encode(raw).decode("utf-8")
return encoded.rstrip("=")
def _extract_task_id(data: Dict) -> Optional[str]:
# 兼容不同返回结构:顶层 / data 内层
for key in ("task_id", "taskId", "id"):
if key in data:
return str(data[key])
nested_data = data.get("data")
if isinstance(nested_data, dict):
for key in ("task_id", "taskId", "id"):
if key in nested_data:
return str(nested_data[key])
return None
def _extract_progress(data: Dict) -> Optional[int]:
# 兼容 progress/percent/rate 等不同字段命名
source = data.get("data") if isinstance(data.get("data"), dict) else data
for key in ("progress", "percent", "rate", "progress_rate"):
value = source.get(key)
if value is None:
continue
if isinstance(value, (int, float)):
return int(value)
if isinstance(value, str):
matched = re.search(r"\d+", value)
if matched:
return int(matched.group(0))
return None
def _is_task_finished(data: Dict) -> bool:
# 服务端状态字段存在多种形式,这里统一做宽松判断
source = data.get("data") if isinstance(data.get("data"), dict) else data
for key in ("is_finish", "is_finished", "finished", "done", "completed"):
value = source.get(key)
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return int(value) == 1
if isinstance(value, str):
if value.lower() in {"1", "true", "yes", "done", "success", "finished", "completed"}:
return True
status = source.get("status")
if isinstance(status, str) and status.lower() in {"done", "success", "finished", "completed"}:
return True
if isinstance(status, (int, float)) and int(status) in {2, 3, 100}:
return True
progress = _extract_progress(data)
return progress == 100
def _poll_task(task_id: str, api_key: str, interval_sec: int = 5, timeout_sec: int = 300) -> bool:
# 轮询导出任务,直到完成或超时
status_url = f"{API_URL}/{task_id}"
params = {"api-key": api_key}
deadline = time.time() + timeout_sec
print("\n开始轮询任务进度...")
while time.time() < deadline:
try:
# 查询当前任务进度
resp = requests.get(status_url, params=params, timeout=30)
resp.raise_for_status()
payload = resp.json()
except requests.RequestException as exc:
print(f"查询进度失败: {exc}")
time.sleep(interval_sec)
continue
except json.JSONDecodeError:
print("进度接口返回非 JSON,继续重试...")
time.sleep(interval_sec)
continue
progress = _extract_progress(payload)
if progress is not None:
print(f"任务 {task_id} 当前进度: {progress}%")
else:
print(f"任务 {task_id} 进度响应: {json.dumps(payload, ensure_ascii=False)}")
if _is_task_finished(payload):
print("任务已完成。")
return True
time.sleep(interval_sec)
print(f"任务轮询超时(>{timeout_sec}秒),请稍后手动查询:{API_URL}/{task_id}?api-key=你的key")
return False
def _guess_filename(resp: requests.Response, fallback_name: str) -> str:
# 优先从响应头提取服务端建议文件名
cd = resp.headers.get("Content-Disposition", "")
match = re.search(r'filename="?([^";]+)"?', cd)
if match:
return match.group(1)
return fallback_name
def _download_result(task_id: str, api_key: str, output_file: Optional[str] = None) -> Optional[Path]:
"""下载导出结果文件,成功返回本地文件路径。"""
download_url = f"{DOWNLOAD_BASE_URL}/{task_id}"
params = {"api-key": api_key}
fallback_name = output_file or f"hunter_task_{task_id}.csv"
try:
resp = requests.get(download_url, params=params, timeout=60, stream=True)
resp.raise_for_status()
except requests.RequestException as exc:
print(f"下载失败: {exc}")
return None
# 兼容下载接口在未完成时返回 JSON 错误消息
content_type = (resp.headers.get("Content-Type") or "").lower()
if "application/json" in content_type:
try:
payload = resp.json()
print("下载接口返回 JSON(可能任务尚未完成):")
print(json.dumps(payload, ensure_ascii=False, indent=2))
except json.JSONDecodeError:
print("下载接口返回 JSON 解析失败。")
return None
filename = _guess_filename(resp, fallback_name)
# 下载接口通常会重定向到对象存储,这里按流式写入避免大文件占内存
path = Path(filename)
with path.open("wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return path
def _build_params_from_args(args: argparse.Namespace) -> Dict[str, str]:
"""从命令行参数构建请求参数。"""
params: Dict[str, str] = {}
api_key = args.api_key or os.getenv("HUNTER_API_KEY")
if not api_key:
if args.no_interactive:
raise ValueError("缺少 api-key,请通过 --api-key 或环境变量 HUNTER_API_KEY 提供")
api_key = _prompt_required("api-key(必填): ")
params["api-key"] = api_key
search_raw = args.search
if search_raw is None and not args.no_interactive:
search_raw = _prompt_optional("搜索语法 search(可选,输入原始语法): ")
if search_raw:
params["search"] = _encode_base64url(search_raw)
start_time = args.start_time
if start_time is None and not args.no_interactive:
start_time = _prompt_optional("开始时间 start_time(可选,格式 YYYY-MM-DD): ")
if start_time:
if not _validate_date(start_time):
raise ValueError("start_time 格式错误,应为 YYYY-MM-DD")
params["start_time"] = start_time
end_time = args.end_time
if end_time is None and not args.no_interactive:
end_time = _prompt_optional("结束时间 end_time(可选,格式 YYYY-MM-DD): ")
if end_time:
if not _validate_date(end_time):
raise ValueError("end_time 格式错误,应为 YYYY-MM-DD")
params["end_time"] = end_time
is_web = args.is_web
if is_web is None and not args.no_interactive:
is_web = _prompt_optional("是否网站资产 is_web(可选,1=是,2=否): ")
if is_web:
if is_web not in {"1", "2"}:
raise ValueError("is_web 仅允许 1 或 2")
params["is_web"] = is_web
status_code = args.status_code
if status_code is None and not args.no_interactive:
status_code = _prompt_optional("状态码 status_code(可选,逗号分隔,如 200,401): ")
if status_code:
params["status_code"] = status_code
fields = args.fields
if fields is None and not args.no_interactive:
fields = _prompt_optional("返回字段 fields(可选,逗号分隔): ")
if fields:
field_list = [f.strip() for f in fields.split(",") if f.strip()]
invalid = [f for f in field_list if f not in ALLOWED_FIELDS]
if invalid:
raise ValueError(f"fields 包含不支持字段: {', '.join(invalid)}")
params["fields"] = ",".join(field_list)
assets_limit = args.assets_limit
if assets_limit is None and not args.no_interactive:
assets_limit = _prompt_optional("导出资产数量 assets_limit(可选,整数): ")
if assets_limit:
if not str(assets_limit).isdigit():
raise ValueError("assets_limit 必须为整数")
params["assets_limit"] = str(assets_limit)
return params
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="奇安信 Hunter 资产批量导出")
parser.add_argument("--api-key", help="Hunter API key(可配合环境变量 HUNTER_API_KEY)")
parser.add_argument("--search", help="原始搜索语法(脚本会自动 base64url 编码)")
parser.add_argument("--start-time", help="开始时间,格式 YYYY-MM-DD")
parser.add_argument("--end-time", help="结束时间,格式 YYYY-MM-DD")
parser.add_argument("--is-web", choices=["1", "2"], help="是否网站资产:1=是,2=否")
parser.add_argument("--status-code", help="状态码,逗号分隔")
parser.add_argument("--fields", help="返回字段,逗号分隔")
parser.add_argument("--assets-limit", help="导出资产数量,整数")
parser.add_argument("--output-file", help="下载文件名")
parser.add_argument("--no-interactive", action="store_true", help="启用无交互模式")
parser.add_argument("--check-delay", type=int, default=10, help="创建任务后等待多少秒再首次尝试下载")
parser.add_argument("--poll-interval", type=int, default=5, help="任务轮询间隔秒数")
parser.add_argument("--poll-timeout", type=int, default=300, help="任务轮询超时时间秒数")
parser.add_argument("--json-output", action="store_true", help="输出机器可读 JSON 结果")
return parser.parse_args()
def main() -> None:
# 流程:创建任务 -> 轮询进度 -> 下载文件
args = parse_args()
try:
params = _build_params_from_args(args)
except ValueError as exc:
print(f"\n参数错误: {exc}")
return
print("\n正在提交批量查询任务...")
try:
response = requests.post(API_URL, params=params, timeout=30)
response.raise_for_status()
except requests.RequestException as exc:
print(f"请求失败: {exc}")
return
try:
data = response.json()
except json.JSONDecodeError:
print("响应不是 JSON,原始内容如下:")
print(response.text)
return
task_id = _extract_task_id(data)
print("\n请求成功。")
if task_id:
print(f"任务 ID: {task_id}")
else:
print("未在响应中识别到任务 ID,请检查完整响应。")
print("\n完整响应:")
print(json.dumps(data, ensure_ascii=False, indent=2))
if not task_id:
return
api_key = params["api-key"]
saved_path: Optional[Path] = None
if args.check_delay > 0:
print(f"\n等待 {args.check_delay} 秒后尝试下载...")
time.sleep(args.check_delay)
saved_path = _download_result(task_id=task_id, api_key=api_key, output_file=args.output_file)
if saved_path is None:
poll_ok = _poll_task(
task_id=task_id,
api_key=api_key,
interval_sec=args.poll_interval,
timeout_sec=args.poll_timeout,
)
if not poll_ok:
return
output_name = args.output_file
if output_name is None and not args.no_interactive:
output_name = _prompt_optional("下载文件名(可选,默认自动命名): ")
saved_path = _download_result(task_id=task_id, api_key=api_key, output_file=output_name)
if saved_path:
print(f"\n下载成功,文件已保存到: {saved_path.resolve()}")
if args.json_output:
result = {
"ok": True,
"task_id": task_id,
"file": str(saved_path.resolve()),
}
print(json.dumps(result, ensure_ascii=False))
elif args.json_output:
result = {
"ok": False,
"task_id": task_id,
"file": None,
}
print(json.dumps(result, ensure_ascii=False))
if __name__ == "__main__":
main()
FILE:scripts/requirements.txt
requests
用于调用 FOFA OpenAPI 进行批量检索并导出 CSV/JSON。用户提到 FOFA、资产测绘、批量查询、自动翻页、导出 CSV、key+查询语句等需求时优先加载本 skill。
---
name: fofa-search-use
description: 用于调用 FOFA OpenAPI 进行批量检索并导出 CSV/JSON。用户提到 FOFA、资产测绘、批量查询、自动翻页、导出 CSV、key+查询语句等需求时优先加载本 skill。
---
# FOFA Search Use
## First
- 目标是让用户只提供 `key` 和 `query` 就能检索并导出数据。
- 脚本自动选择翻页策略:
1) 优先 `search/next`
2) 不支持时回退 `search/all?page`
- 默认导出 `CSV`,可选导出 `JSON`。
- 本 skill 不依赖固定机器路径,整个文件夹可直接复制到任意项目使用。
## 入口脚本
- 主脚本:`scripts/fofa_search_cli.py`
- 运行方式:
- AI 无交互推荐:
- `python3 scripts/fofa_search_cli.py --no-interactive --key "$FOFA_KEY" --query 'title="test"' --output-file results.csv --json-output`
- 兼容交互:
- `python3 scripts/fofa_search_cli.py`
## 环境与安装(可移植)
- 推荐虚拟环境:
- `python3 -m venv .venv`
- `source .venv/bin/activate`(Windows PowerShell: `.venv\Scripts\Activate.ps1`)
- `pip install -r scripts/requirements.txt`
- 最小依赖文件:`scripts/requirements.txt`
## 参数说明
- `--key`:FOFA API key
- `--query`:FOFA 查询语句
- `--fields`:返回字段,默认 `host,ip,port,protocol,title`
- `--max-records`:最大导出条数,默认 `1000`
- `--page-size`:每页请求数量,默认 `10000`(最大 `10000`)
- `--output-file`:CSV 文件名,默认 `fofa_results.csv`
- `--json-output`:额外导出 JSON
- `--base-url`:FOFA 地址,默认 `https://fofa.info`
- `--no-interactive`:禁止交互输入,缺参直接报错
## 推荐执行策略
- 先用小样本测试语句(如 `--max-records 10`)。
- 字段尽量精简,减少无效数据和消耗。
- 大批量导出时优先使用无交互参数模式,便于自动化和复现。
## CSV 导出帮助
- 自定义导出字段:使用 `--fields`,字段顺序即 CSV 列顺序。
- 常用字段示例:`host,ip,port,protocol,title,country,city,server`
- 无交互导出示例:
- `python3 scripts/fofa_search_cli.py --no-interactive --key "$FOFA_KEY" --query 'title="test"' --fields "ip,port,host,title,country,city" --max-records 500 --page-size 100 --output-file fofa_custom.csv`
- 交互导出要点:
- 运行脚本后在 `fields` 提示中输入逗号分隔字段。
- 如果某列为空,CSV 中对应单元格会保留空值,这是正常现象。
- Excel 打开乱码处理:
- 脚本默认用 `utf-8-sig` 写出 CSV,通常可直接被 Excel 正确识别。
- 若仍异常,导入时手动选择 `UTF-8` 编码。
## 安全注意事项
- 不要把真实 `key` 写入仓库文件或提交历史。
- 推荐通过环境变量注入:`FOFA_KEY`。
- 必须持久化时使用本地安全存储,不要明文保存在代码库。
FILE:scripts/fofa_search_cli.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Portable FOFA batch search CLI.
Features:
- Interactive mode and no-interactive mode.
- Auto paging with /search/next fallback to /search/all?page.
- Export CSV and optional JSON.
"""
from __future__ import annotations
import argparse
import base64
import csv
import json
import locale
import re
import time
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
import requests
DEFAULT_BASE_URL = "https://fofa.info"
DEFAULT_FIELDS = "host,ip,port,protocol,title"
DEFAULT_PAGE_SIZE = 10000
MAX_PAGE_SIZE = 10000
class FofaError(Exception):
def __init__(self, message: str):
self.message = message
self.code = self._extract_error_code(message)
super().__init__(message)
@staticmethod
def _extract_error_code(error_message: str) -> Optional[int]:
matched = re.search(r"\[(-?\d+)\]", error_message or "")
if matched:
return int(matched.group(1))
return None
def encode_query(query: str) -> str:
return base64.b64encode(query.encode("utf-8")).decode("utf-8")
def get_shell_language() -> str:
lang = locale.getlocale()[0]
if not lang:
try:
lang = locale.getdefaultlocale()[0] # fallback for old envs
except Exception:
lang = None
return lang or "en"
class FofaClient:
def __init__(self, key: str, base_url: str = DEFAULT_BASE_URL, timeout: int = 30):
self.key = key.strip()
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.session = requests.Session()
self.lang = "zh-CN" if get_shell_language().startswith("zh") else "en"
if not self.key:
raise FofaError("FOFA key is empty")
def _request(self, path: str, params: Optional[Dict] = None, retries: int = 3) -> Dict:
url = f"{self.base_url}{path}"
request_params = dict(params or {})
request_params["key"] = self.key
request_params["lang"] = self.lang
last_error = None
for _ in range(retries):
try:
resp = self.session.get(url, params=request_params, timeout=self.timeout)
resp.raise_for_status()
data = resp.json()
if data.get("error"):
raise FofaError(data.get("errmsg") or "FOFA API error")
return data
except (requests.RequestException, ValueError, FofaError) as exc:
last_error = exc
time.sleep(1)
raise FofaError(f"Request failed: {last_error}")
def search(self, query: str, fields: str, page: int, size: int) -> Dict:
return self._request(
"/api/v1/search/all",
{
"qbase64": encode_query(query),
"fields": fields,
"page": page,
"size": size,
},
)
def search_next(self, query: str, fields: str, size: int, next_token: str = "") -> Dict:
params = {
"qbase64": encode_query(query),
"fields": fields,
"size": size,
}
if next_token:
params["next"] = next_token
return self._request("/api/v1/search/next", params)
def can_use_next(self) -> bool:
try:
self.search_next("bad=query", fields="ip", size=1)
except FofaError as exc:
return exc.code == 820000
return False
def _prompt_required(prompt: str) -> str:
while True:
value = input(prompt).strip()
if value:
return value
print("This field is required.")
def _prompt_int(prompt: str, default: int, minimum: int, maximum: int) -> int:
while True:
raw = input(f"{prompt} (default {default}): ").strip()
if not raw:
return default
if not raw.isdigit():
print("Please input an integer.")
continue
value = int(raw)
if value < minimum or value > maximum:
print(f"Please input {minimum} ~ {maximum}.")
continue
return value
def _rows_to_dicts(rows: Iterable[List], fields: List[str]) -> Iterable[Dict]:
for row in rows:
item = {}
for idx, field in enumerate(fields):
item[field] = row[idx] if idx < len(row) else ""
yield item
def save_csv(path: Path, records: List[Dict], fields: List[str]) -> None:
with path.open("w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=fields)
writer.writeheader()
for item in records:
writer.writerow(item)
def save_json(path: Path, records: List[Dict]) -> None:
with path.open("w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False, indent=2)
def fetch_all(
client: FofaClient,
query: str,
fields: str,
max_records: int,
page_size: int,
) -> Tuple[List[Dict], int, int]:
field_list = [x.strip() for x in fields.split(",") if x.strip()]
all_records: List[Dict] = []
total_size = 0
consumed_fpoint = 0
use_next = client.can_use_next()
print(f"[mode] {'search/next' if use_next else 'search/all?page'}")
if use_next:
next_token = ""
while len(all_records) < max_records:
remain = max_records - len(all_records)
size = min(page_size, remain)
resp = client.search_next(query=query, fields=fields, size=size, next_token=next_token)
rows = resp.get("results", [])
if not rows:
break
all_records.extend(list(_rows_to_dicts(rows, field_list)))
total_size = resp.get("size", total_size)
consumed_fpoint += int(resp.get("consumed_fpoint", 0))
next_token = resp.get("next", "")
print(f"[progress] {len(all_records)}/{max_records}")
if len(rows) < size:
break
else:
page = 1
while len(all_records) < max_records:
remain = max_records - len(all_records)
size = min(page_size, remain)
resp = client.search(query=query, fields=fields, page=page, size=size)
rows = resp.get("results", [])
if not rows:
break
all_records.extend(list(_rows_to_dicts(rows, field_list)))
total_size = resp.get("size", total_size)
consumed_fpoint += int(resp.get("consumed_fpoint", 0))
print(f"[progress] page={page}, got={len(all_records)}/{max_records}")
if len(rows) < size:
break
page += 1
if len(all_records) > max_records:
all_records = all_records[:max_records]
return all_records, total_size, consumed_fpoint
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="FOFA portable batch search CLI")
parser.add_argument("--key", help="FOFA API key")
parser.add_argument("--query", help="FOFA query string")
parser.add_argument("--fields", default=DEFAULT_FIELDS, help=f"fields list, default: {DEFAULT_FIELDS}")
parser.add_argument("--max-records", type=int, default=1000, help="max records to fetch")
parser.add_argument("--page-size", type=int, default=DEFAULT_PAGE_SIZE, help=f"page size (1-{MAX_PAGE_SIZE})")
parser.add_argument("--output-file", default="fofa_results.csv", help="csv output file path")
parser.add_argument("--json-output", action="store_true", help="also export json")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help=f"fofa base url, default: {DEFAULT_BASE_URL}")
parser.add_argument("--no-interactive", action="store_true", help="do not ask input interactively")
return parser
def gather_args(args: argparse.Namespace) -> argparse.Namespace:
if args.no_interactive:
if not args.key or not args.query:
raise FofaError("In --no-interactive mode, --key and --query are required")
return args
print("=== FOFA Portable Search CLI ===")
args.key = args.key or _prompt_required("FOFA key: ")
args.query = args.query or _prompt_required("FOFA query: ")
if args.fields == DEFAULT_FIELDS:
custom_fields = input(f"fields (default {DEFAULT_FIELDS}): ").strip()
if custom_fields:
args.fields = custom_fields
args.max_records = _prompt_int("max records", args.max_records, 1, 1_000_000)
args.page_size = _prompt_int("page size", args.page_size, 1, MAX_PAGE_SIZE)
if args.output_file == "fofa_results.csv":
output = input("csv output filename (default fofa_results.csv): ").strip()
if output:
args.output_file = output
if not args.json_output:
args.json_output = input("also export JSON? (y/N): ").strip().lower() == "y"
return args
def main() -> None:
parser = build_parser()
args = gather_args(parser.parse_args())
client = FofaClient(key=args.key, base_url=args.base_url)
records, total, consumed = fetch_all(
client=client,
query=args.query,
fields=args.fields,
max_records=args.max_records,
page_size=args.page_size,
)
if not records:
print("[done] no data found.")
return
csv_path = Path(args.output_file)
field_list = [x.strip() for x in args.fields.split(",") if x.strip()]
save_csv(csv_path, records, field_list)
print(f"[done] csv saved: {csv_path.resolve()}")
json_path = None
if args.json_output:
json_path = csv_path.with_suffix(".json")
save_json(json_path, records)
print(f"[done] json saved: {json_path.resolve()}")
result = {
"query": args.query,
"mode": "next_or_page_auto",
"fofa_total": total,
"exported": len(records),
"consumed_fpoint": consumed,
"csv_file": str(csv_path.resolve()),
"json_file": str(json_path.resolve()) if json_path else None,
}
print(json.dumps(result, ensure_ascii=False))
if __name__ == "__main__":
main()
FILE:scripts/requirements.txt
requests>=2.31.0