@clawhub-jiansiting-089abefb4e
自动配置 FortiGate 防火墙,支持基础策略管理及工控协议(Modbus、IEC104、S7等)的安全配置。
---
name: fortigate-config
description: 自动配置 FortiGate 防火墙,支持基础策略管理及工控协议(Modbus、IEC104、S7等)的安全配置。
author: jiansiting
version: 2.0.0
tags: [fortigate, firewall, industrial, security, automation]
---
# FortiGate 自动配置技能(工控增强版)
## 功能说明
本技能通过 FortiGate REST API 实现防火墙的自动化配置,特别增加了对工业控制系统(ICS)协议的支持。您可以:
- 管理防火墙策略(增、删、改、查)
- 管理地址对象
- 配置 Industrial Connectivity(协议转换,仅 Rugged 系列)
- 创建基于工控协议的服务对象(Modbus TCP、IEC104 等)
- 创建针对工控协议的 IPS 配置文件
- 添加工控协议策略并关联安全配置文件
## 许可证要求
使用工控协议签名功能需要以下许可证之一:
- FortiGuard 工业安全服务
- ATP(高级威胁防护)套装
- UTP(统一威胁防护)套装
## 配置项
在使用本技能前,需要在 OpenClaw 的环境变量或配置文件中设置以下项:
| 配置项 | 说明 | 必填 | 默认值 |
| :--- | :--- | :--- | :--- |
| `FORTIGATE_HOST` | FortiGate 设备的 IP 地址或域名 | 是 | 无 |
| `FORTIGATE_TOKEN` | API 访问令牌 | 是 | 无 |
| `FORTIGATE_PORT` | API 端口 | 否 | 443 |
| `FORTIGATE_VERIFY_SSL` | 是否验证 SSL 证书 | 否 | false |
## 使用方法
@openclaw fortigate-config <操作> <参数(JSON 格式)>
### 基础操作
| 操作 | 说明 | 参数示例 |
|------|------|----------|
| `list-policies` | 列出所有防火墙策略 | `{}` |
| `add-address` | 添加地址对象 | `{"name": "web-server", "subnet": "192.168.1.10/32"}` |
| `delete-address` | 删除地址对象 | `{"name": "web-server"}` |
| `add-policy` | 添加防火墙策略 | `{"name": "allow-web", "srcintf": "port1", "dstintf": "port2", "srcaddr": "all", "dstaddr": "all", "action": "accept"}` |
| `update-policy` | 更新防火墙策略 | `{"policyid": 1, "action": "deny", "name": "new-name"}` |
### 工控协议操作
| 操作 | 说明 | 参数示例 |
|------|------|----------|
| `configure-icond` | 配置 Industrial Connectivity 服务(协议转换) | `{"interface": "internal1", "protocol_type": "modbus-serial-tcp", "tty_device": "serial0"}` |
| `create-industrial-service` | 创建工控协议服务对象 | `{"name": "modbus-tcp", "protocol": "TCP", "port": 502}` |
| `create-industrial-ips` | 创建工控 IPS 配置文件 | `{"name": "plc-protection", "protocols": ["Modbus", "IEC104"], "action": "monitor"}` |
| `add-industrial-policy` | 添加工控协议策略(带安全防护) | `{"name": "hmi-to-plc", "srcintf": "port1", "dstintf": "port2", "srcaddr": "hmi-net", "dstaddr": "plc-net", "service": "modbus-tcp", "ips_profile": "plc-protection"}` |
## 常见工控协议端口
| 协议 | 端口 | 描述 |
|------|------|------|
| Modbus TCP | TCP 502 | 工业自动化常用协议 |
| IEC 104 | TCP 2404 | 电力系统远程控制协议 |
| S7 Plus | TCP 102 | 西门子 PLC 协议 |
| Ethernet/IP | TCP 44818 | Rockwell/ODVA 工业协议 |
| DNP3 | TCP 20000 | 电力/水务自动化协议 |
## 注意事项
- 所有写操作(添加、删除、更新)均会先检查对象是否存在,避免重复创建或误删。
- 输出格式优先使用表格(需安装 `tabulate`),否则使用简单文本对齐。
- 生产环境请将 `FORTIGATE_VERIFY_SSL` 设为 `true` 并使用有效证书。
## 反馈与支持
如有问题,请联系 [email protected]
FILE:requirements.txt
requests>=2.25.1
tabulate>=0.8.9
FILE:scripts/main.py
#!/usr/bin/env python3
"""
FortiGate 配置 Skill - 工控增强版
支持基础防火墙配置及工控协议(Modbus、IEC104、S7等)的安全策略管理。
"""
import os
import sys
import json
import argparse
import requests
import urllib3
# 尝试导入 tabulate,用于美化输出
try:
from tabulate import tabulate
TABULATE_AVAILABLE = True
except ImportError:
TABULATE_AVAILABLE = False
# 禁用 SSL 证书警告(如果使用自签名证书)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ---------- 配置读取 ----------
FGT_HOST = os.environ.get("FORTIGATE_HOST")
FGT_TOKEN = os.environ.get("FORTIGATE_TOKEN")
FGT_PORT = os.environ.get("FORTIGATE_PORT", "443")
FGT_VERIFY_SSL = os.environ.get("FORTIGATE_VERIFY_SSL", "false").lower() == "true"
if not FGT_HOST or not FGT_TOKEN:
print("错误:必须设置环境变量 FORTIGATE_HOST 和 FORTIGATE_TOKEN")
sys.exit(1)
BASE_URL = f"https://{FGT_HOST}:{FGT_PORT}/api/v2"
HEADERS = {
"Authorization": f"Bearer {FGT_TOKEN}",
"Content-Type": "application/json"
}
# ---------- API 调用工具 ----------
def call_api(method, path, data=None, params=None):
"""发送 API 请求并返回解析后的 JSON 响应,出错时返回包含 error 字段的字典"""
url = f"{BASE_URL}/{path.lstrip('/')}"
try:
response = requests.request(
method=method,
url=url,
headers=HEADERS,
json=data,
params=params,
verify=FGT_VERIFY_SSL,
timeout=30
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
error_detail = ""
if response is not None and response.text:
try:
error_json = response.json()
error_detail = error_json.get("error", {}).get("message", response.text)
except:
error_detail = response.text
return {"error": f"HTTP {response.status_code}: {error_detail}"}
except requests.exceptions.RequestException as e:
return {"error": f"网络请求失败: {e}"}
except json.JSONDecodeError:
return {"error": "API 返回了非 JSON 格式的响应"}
# ---------- 对象存在性检查 ----------
def address_exists(name):
"""检查地址对象是否存在,返回布尔值"""
result = call_api("GET", f"cmdb/firewall/address/{name}")
if "error" in result or result.get("http_status") == 404:
return False
return bool(result.get("results"))
def policy_exists(policy_id):
"""检查策略是否存在,返回布尔值"""
result = call_api("GET", "cmdb/firewall/policy/", params={"policyid": policy_id})
if "error" in result:
return False
results = result.get("results", [])
return len(results) > 0
def service_exists(name):
"""检查自定义服务对象是否存在"""
result = call_api("GET", f"cmdb/firewall.service/custom/{name}")
return not ("error" in result) and bool(result.get("results"))
def ips_sensor_exists(name):
"""检查 IPS 传感器是否存在"""
result = call_api("GET", f"cmdb/ips/sensor/{name}")
return not ("error" in result) and bool(result.get("results"))
# ---------- 基础操作函数 ----------
def list_policies():
"""列出所有防火墙策略,并使用表格输出"""
result = call_api("GET", "cmdb/firewall/policy/")
if "error" in result:
print(f"错误: {result['error']}")
return 1
policies = result.get("results", [])
if not policies:
print("没有找到任何防火墙策略。")
return 0
table_data = []
for p in policies:
table_data.append([
p.get("policyid"),
p.get("name", ""),
p.get("action"),
p.get("status"),
p.get("srcintf", [{}])[0].get("name") if p.get("srcintf") else "",
p.get("dstintf", [{}])[0].get("name") if p.get("dstintf") else "",
])
headers = ["ID", "名称", "动作", "状态", "源接口", "目的接口"]
if TABULATE_AVAILABLE:
print(tabulate(table_data, headers=headers, tablefmt="grid"))
else:
# 降级输出:简单的文本对齐
print("\t".join(headers))
for row in table_data:
print("\t".join(str(cell) for cell in row))
return 0
def add_address(name, subnet):
"""添加地址对象,添加前检查是否存在"""
if address_exists(name):
print(f"警告: 地址对象 '{name}' 已存在,跳过创建。")
return 0
# 处理 CIDR 格式
if "/" in subnet:
ip, mask = subnet.split("/")
# 简单映射常见掩码,生产环境建议使用 ipaddress 库
cidr_to_mask = {
"24": "255.255.255.0",
"32": "255.255.255.255",
"16": "255.255.0.0",
"8": "255.0.0.0"
}
netmask = cidr_to_mask.get(mask, "255.255.255.0")
subnet_list = [ip, netmask]
else:
subnet_list = subnet.split()
data = {
"name": name,
"subnet": subnet_list
}
result = call_api("POST", "cmdb/firewall/address/", data=data)
if "error" in result:
print(f"错误: 添加地址对象失败: {result['error']}")
return 1
print(f"成功: 地址对象 '{name}' 已添加。")
return 0
def delete_address(name):
"""删除地址对象,删除前检查是否存在"""
if not address_exists(name):
print(f"错误: 地址对象 '{name}' 不存在,无法删除。")
return 1
result = call_api("DELETE", f"cmdb/firewall/address/{name}")
if "error" in result:
print(f"错误: 删除地址对象失败: {result['error']}")
return 1
print(f"成功: 地址对象 '{name}' 已删除。")
return 0
def add_policy(params):
"""添加防火墙策略,添加前检查名称或ID是否已存在"""
# 解析参数
if isinstance(params, str):
try:
params = json.loads(params)
except json.JSONDecodeError:
print("错误: 添加策略需要有效的 JSON 参数")
return 1
# 构建策略数据
data = {
"name": params.get("name"),
"srcintf": [{"name": params.get("srcintf")}] if params.get("srcintf") else [],
"dstintf": [{"name": params.get("dstintf")}] if params.get("dstintf") else [],
"srcaddr": [{"name": params.get("srcaddr")}] if params.get("srcaddr") else [],
"dstaddr": [{"name": params.get("dstaddr")}] if params.get("dstaddr") else [],
"action": params.get("action", "accept"),
"status": params.get("status", "enable")
}
# 可选字段
if params.get("schedule"):
data["schedule"] = params.get("schedule")
if params.get("service"):
data["service"] = [{"name": params.get("service")}]
# 简单检查同名策略(仅作提示,不阻止创建)
policy_name = params.get("name")
if policy_name:
check = call_api("GET", "cmdb/firewall/policy/", params={"filter": f"name=@{policy_name}"})
if not check.get("error") and check.get("results"):
print(f"注意: 存在同名策略 '{policy_name}',将创建新策略(名称可能重复)。")
result = call_api("POST", "cmdb/firewall/policy/", data=data)
if "error" in result:
print(f"错误: 添加策略失败: {result['error']}")
return 1
print(f"成功: 策略 '{policy_name}' 已添加。")
return 0
def update_policy(policy_id, updates):
"""更新防火墙策略"""
if not policy_exists(policy_id):
print(f"错误: 策略 ID {policy_id} 不存在,无法更新。")
return 1
result = call_api("PUT", f"cmdb/firewall/policy/{policy_id}", data=updates)
if "error" in result:
print(f"错误: 更新策略失败: {result['error']}")
return 1
print(f"成功: 策略 ID {policy_id} 已更新。")
return 0
# ---------- 工控协议操作函数 ----------
def configure_icond(params):
"""
配置 Industrial Connectivity 服务(协议转换)
注意:此功能仅适用于 FortiGate Rugged 系列,API 路径可能因版本而异。
此处使用接口的 allowaccess 配置,实际 icond 配置可能需要通过 CLI 或特定 API。
"""
interface = params.get("interface")
protocol_type = params.get("protocol_type")
tty_device = params.get("tty_device")
if not interface or not protocol_type or not tty_device:
print("错误: configure-icond 需要提供 interface, protocol_type, tty_device 参数")
return 1
# 1. 启用接口的 icond 访问权限
interface_data = {
"allowaccess": "ping https ssh http icond"
}
iface_result = call_api("PUT", f"cmdb/system/interface/{interface}", data=interface_data)
if "error" in iface_result:
print(f"错误: 配置接口 {interface} 失败: {iface_result['error']}")
return 1
# 2. 配置 icond(假设 API 路径为 cmdb/system/icond)
# 如果此 API 不存在,用户需通过 CLI 手动配置。这里仅给出框架。
icond_data = {
"status": "enable",
"type": protocol_type,
"tty-device": tty_device
}
icond_result = call_api("PUT", "cmdb/system/icond", data=icond_data)
if "error" in icond_result:
# 如果 API 不可用,给出提示
print(f"警告: 无法通过 API 配置 icond({icond_result['error']}),请手动在 CLI 下执行:")
print(f"config system icond")
print(f" set status enable")
print(f" set type {protocol_type}")
print(f" set tty-device {tty_device}")
print(f"end")
return 0
print(f"成功: 已启用 {protocol_type} 协议转换,使用串口 {tty_device}")
return 0
def create_industrial_service(params):
"""创建工控协议服务对象"""
name = params.get("name")
protocol = params.get("protocol", "TCP").upper()
port = params.get("port")
if not name or not port:
print("错误: create-industrial-service 需要提供 name 和 port 参数")
return 1
if service_exists(name):
print(f"警告: 服务对象 '{name}' 已存在,跳过创建。")
return 0
data = {
"name": name,
"protocol": protocol,
"tcp-portrange": str(port) if protocol == "TCP" else "",
"udp-portrange": str(port) if protocol == "UDP" else ""
}
result = call_api("POST", "cmdb/firewall.service/custom", data=data)
if "error" in result:
print(f"错误: 创建服务对象失败: {result['error']}")
return 1
print(f"成功: 服务对象 '{name}' 已创建。")
return 0
def create_industrial_ips(params):
"""创建针对工控协议的 IPS 配置文件"""
name = params.get("name")
protocols = params.get("protocols", [])
action = params.get("action", "monitor") # 默认监控模式,避免影响业务
if not name or not protocols:
print("错误: create-industrial-ips 需要提供 name 和 protocols 列表")
return 1
if ips_sensor_exists(name):
print(f"警告: IPS 传感器 '{name}' 已存在,跳过创建。")
return 0
# 构建过滤器
filters = []
for proto in protocols:
filters.append({
"name": f"industrial.{proto.lower()}",
"action": action
})
data = {
"name": name,
"filters": filters
}
result = call_api("POST", "cmdb/ips/sensor", data=data)
if "error" in result:
print(f"错误: 创建 IPS 传感器失败: {result['error']}")
return 1
print(f"成功: IPS 传感器 '{name}' 已创建,动作为 {action}。")
return 0
def add_industrial_policy(params):
"""添加工控协议防火墙策略(带安全配置文件)"""
if isinstance(params, str):
try:
params = json.loads(params)
except json.JSONDecodeError:
print("错误: add-industrial-policy 需要有效的 JSON 参数")
return 1
# 检查必须参数
required = ["name", "srcintf", "dstintf", "srcaddr", "dstaddr", "service"]
for r in required:
if r not in params:
print(f"错误: 缺少必要参数 '{r}'")
return 1
# 构建策略数据
data = {
"name": params.get("name"),
"srcintf": [{"name": params.get("srcintf")}],
"dstintf": [{"name": params.get("dstintf")}],
"srcaddr": [{"name": params.get("srcaddr")}],
"dstaddr": [{"name": params.get("dstaddr")}],
"service": [{"name": params.get("service")}],
"action": params.get("action", "accept"),
"status": params.get("status", "enable")
}
# 关联安全配置文件(可选)
if params.get("ips_profile"):
# 检查 IPS 传感器是否存在
if not ips_sensor_exists(params["ips_profile"]):
print(f"警告: IPS 传感器 '{params['ips_profile']}' 不存在,策略将不关联 IPS。")
else:
data["ips-sensor"] = params["ips_profile"]
if params.get("app_profile"):
data["application-list"] = params["app_profile"]
# 检查同名策略(可选)
policy_name = params.get("name")
if policy_name:
check = call_api("GET", "cmdb/firewall/policy/", params={"filter": f"name=@{policy_name}"})
if not check.get("error") and check.get("results"):
print(f"注意: 存在同名策略 '{policy_name}',将创建新策略。")
result = call_api("POST", "cmdb/firewall/policy/", data=data)
if "error" in result:
print(f"错误: 添加工控策略失败: {result['error']}")
return 1
print(f"成功: 工控策略 '{data['name']}' 已添加。")
return 0
# ---------- 主入口 ----------
def main():
parser = argparse.ArgumentParser(description="FortiGate 配置技能(工控增强版)")
parser.add_argument("action", help="操作")
parser.add_argument("params", nargs="?", default="{}", help="操作所需的参数(JSON 格式)")
args = parser.parse_args()
action = args.action
params_str = args.params
# 尝试解析 params 为 JSON
try:
params = json.loads(params_str) if params_str else {}
except json.JSONDecodeError:
print("错误: params 必须是有效的 JSON 字符串")
return 1
# 操作映射
actions = {
# 基础操作
"list-policies": lambda: list_policies(),
"add-address": lambda: add_address(params.get("name"), params.get("subnet")),
"delete-address": lambda: delete_address(params.get("name")),
"add-policy": lambda: add_policy(params),
"update-policy": lambda: update_policy(params.get("policyid"), {k: v for k, v in params.items() if k != "policyid"}),
# 工控操作
"configure-icond": lambda: configure_icond(params),
"create-industrial-service": lambda: create_industrial_service(params),
"create-industrial-ips": lambda: create_industrial_ips(params),
"add-industrial-policy": lambda: add_industrial_policy(params),
}
if action not in actions:
print(f"错误: 未知操作 '{action}'")
print("支持的操作: " + ", ".join(actions.keys()))
return 1
return actions[action]()
if __name__ == "__main__":
sys.exit(main())基于TIA Openness API的完整PLC自动化技能,支持项目创建、硬件配置、SCL编程、编译下载
---
name: tia-openness-complete-skill
description: 基于TIA Openness API的完整PLC自动化技能,支持项目创建、硬件配置、SCL编程、编译下载
author: jiansiting
version: 1.0.0
---
# TIA Openness 完整自动化技能
## 功能概述
本技能通过TIA Portal Openness API实现PLC工程的端到端自动化:
- 创建TIA Portal项目
- 添加PLC设备(支持S7-1200/1500)
- 根据工艺描述自动生成SCL程序块(OB/FC/FB/DB)
- 编译PLC软件
- 下载到PLC
- 支持单步操作和完整流程一键执行
## 环境要求
- Windows 10/11
- TIA Portal V16/V17/V18 已安装,并启用Openness组件
- Python 3.9+,已安装 `pythonnet` 和 `jinja2`
- 当前用户需属于 `Siemens TIA Openness` 用户组
## 安装依赖
```bash
pip install pythonnet jinja2
FILE:return.json
{
"success": true/false,
"message": "操作结果描述",
... // 额外数据
}
FILE:readme.txt
tia-openness-complete-skill/
├── __init__.py # OpenClaw入口
├── tia_core.py # TIA Openness核心封装(加载程序集、连接管理)
├── hardware.py # 硬件配置助手(设备类型定义)
├── block_generator.py # 程序块生成器(根据描述生成SCL)
├── actions.py # 操作函数(创建项目、添加设备、创建块、编译、下载)
├── utils.py # 辅助工具(路径处理、注册表读取)
├── templates/ # SCL代码模板
│ ├── ob_template.scl
│ ├── fb_template.scl
│ ├── fc_template.scl
│ └── db_template.scl
├── SKILL.md # 技能文档(元数据、参数说明、使用示例)
└── requirements.txt # 依赖项
注意事项
首次运行时会弹出Openness防火墙确认,请点击“是”允许连接。
下载时需要正确的PG/PC接口名称,本技能默认使用第一个可用接口,如有多个请确保配置正确。
如果项目受密码保护,请使用UMAC参数(本技能暂未集成,可扩展)。
硬件目录路径可能因TIA版本不同而略有差异,请根据实际情况调整。
FILE:_init_.py
"""
OpenClaw技能:TIA Openness 完整自动化技能
支持项目创建、硬件配置、SCL编程、编译下载完整流程
"""
import logging
from typing import Dict, Any
from .actions import (
get_tia, close_tia,
action_create_project,
action_add_plc,
action_create_block,
action_compile_software,
action_download,
action_save_project,
action_close_project,
action_full_automation
)
logger = logging.getLogger(__name__)
def run(action: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""
OpenClaw标准入口函数
:param action: 操作名称
:param params: 参数字典
:return: 结果字典
"""
try:
# 路由操作
if action == "create_project":
return action_create_project(params)
elif action == "add_plc":
return action_add_plc(params)
elif action == "create_block":
return action_create_block(params)
elif action == "compile_software":
return action_compile_software(params)
elif action == "download":
return action_download(params)
elif action == "save_project":
return action_save_project(params)
elif action == "close_project":
return action_close_project(params)
elif action == "full_automation":
return action_full_automation(params)
else:
return {"success": False, "message": f"未知操作: {action}"}
except Exception as e:
logger.exception("技能执行异常")
return {"success": False, "message": f"执行异常: {str(e)}"}
finally:
if params.get("close_after", False):
close_tia()
FILE:full_automation.json
{
"action": "full_automation",
"path": "C:\\TIA_Projects",
"name": "AutoProject",
"device_name": "PLC_1",
"cpu_model": "CPU 1511-1 PN",
"family": "S7-1500",
"firmware": "V4.0",
"plc_name": "PLC_1",
"block_type": "OB",
"block_name": "Main",
"description": "主程序循环",
"interface": "PN/IE",
"ip_address": "192.168.0.1",
"close_after": true
}
FILE:save_project.json
{
"action": "save_project"
}
FILE:block_generator.py
"""SCL代码生成器:根据工艺描述和模板生成SCL代码"""
import os
import re
from jinja2 import Environment, FileSystemLoader, Template
from typing import Dict, Any, List
class SCLGenerator:
def __init__(self, template_dir: str = None):
if template_dir is None:
template_dir = os.path.join(os.path.dirname(__file__), 'templates')
self.env = Environment(loader=FileSystemLoader(template_dir))
def generate(self, block_type: str, name: str, description: str,
parameters: Dict[str, Any] = None) -> str:
"""
生成SCL代码
:param block_type: "OB", "FC", "FB", "DB"
:param name: 块名称
:param description: 工艺描述
:param parameters: 用户提供的额外参数(如变量列表)
"""
# 解析描述,提取变量和控制逻辑
analysis = self._analyze_description(description)
# 加载模板
template = self.env.get_template(f"{block_type.lower()}_template.txt")
# 渲染
code = template.render(
block_name=name,
description=description,
variables=analysis.get("variables", []),
logic=analysis.get("logic", ""),
parameters=parameters or {}
)
return code
def _analyze_description(self, desc: str) -> dict:
"""
简单关键词分析,提取变量和逻辑框架
"""
result = {"variables": [], "logic": ""}
# 提取输入/输出变量(简单模式)
input_pattern = r"输入[::]\s*(\w+)"
output_pattern = r"输出[::]\s*(\w+)"
motor_pattern = r"电机\s*(\w+)"
pump_pattern = r"泵\s*(\w+)"
valve_pattern = r"阀\s*(\w+)"
inputs = re.findall(input_pattern, desc)
outputs = re.findall(output_pattern, desc)
motors = re.findall(motor_pattern, desc)
pumps = re.findall(pump_pattern, desc)
valves = re.findall(valve_pattern, desc)
for inp in inputs:
result["variables"].append({"name": inp, "type": "Bool", "direction": "input"})
for out in outputs:
result["variables"].append({"name": out, "type": "Bool", "direction": "output"})
for motor in motors:
result["variables"].append({"name": f"Motor_{motor}_Run", "type": "Bool", "direction": "output"})
for pump in pumps:
result["variables"].append({"name": f"Pump_{pump}_Run", "type": "Bool", "direction": "output"})
for valve in valves:
result["variables"].append({"name": f"Valve_{valve}_Open", "type": "Bool", "direction": "output"})
# 简单逻辑生成
if "启动" in desc or "start" in desc.lower():
result["logic"] = "IF #Start THEN #Run := TRUE; END_IF;"
elif "停止" in desc or "stop" in desc.lower():
result["logic"] = "IF #Stop THEN #Run := FALSE; END_IF;"
elif "控制" in desc or "control" in desc.lower():
result["logic"] = "IF #Auto THEN #Output := #Input; ELSE #Output := FALSE; END_IF;"
else:
result["logic"] = "// 用户逻辑占位符"
return result
FILE:download.json
{
"action": "download",
"plc_name": "PLC_1",
"interface": "PN/IE",
"ip_address": "192.168.0.1",
"password": "PLC_Password"
}
FILE:actions.py
"""OpenClaw操作实现函数"""
import os
import tempfile
import logging
from typing import Dict, Any, List
from .utils import ensure_directory, create_temp_file
from .tia_core import TiaPortal
from .hardware import HardwareCatalog
from .block_generator import SCLGenerator
logger = logging.getLogger(__name__)
# 全局TIA实例
_tia: TiaPortal = None
_scl_gen = SCLGenerator()
def get_tia(params: Dict[str, Any]) -> TiaPortal:
"""获取或创建TIA Portal实例"""
global _tia
if _tia is None:
tia_version = params.get("tia_version", "V18")
mode = params.get("tia_mode", "WithUserInterface")
_tia = TiaPortal(tia_version, mode)
_tia.start()
return _tia
def close_tia():
"""关闭TIA Portal实例"""
global _tia
if _tia:
_tia.close()
_tia = None
# ---------- 操作函数 ----------
def action_create_project(params: Dict[str, Any]) -> Dict[str, Any]:
"""创建新项目"""
tia = get_tia(params)
path = params.get("path")
name = params.get("name")
author = params.get("author", "")
comment = params.get("comment", "")
if not path or not name:
return {"success": False, "message": "缺少 path 或 name 参数"}
if not ensure_directory(path):
return {"success": False, "message": f"无法创建目录: {path}"}
try:
proj = tia.create_project(path, name, author, comment)
return {
"success": True,
"message": f"项目创建成功: {name}",
"project_path": proj.Path.FullName
}
except Exception as e:
logger.exception("创建项目失败")
return {"success": False, "message": f"创建项目失败: {str(e)}"}
def action_add_plc(params: Dict[str, Any]) -> Dict[str, Any]:
"""添加PLC设备"""
tia = get_tia(params)
if not tia.project:
return {"success": False, "message": "没有打开的项目,请先创建或打开项目"}
device_name = params.get("device_name", "PLC_1")
cpu_model = params.get("cpu_model", "CPU 1511-1 PN")
family = params.get("family", "S7-1500")
firmware = params.get("firmware", "V4.0")
# 从硬件目录获取路径
if family == "S7-1200":
catalog = HardwareCatalog.CPU_S7_1200.get(cpu_model)
else:
catalog = HardwareCatalog.CPU_S7_1500.get(cpu_model)
if not catalog:
return {"success": False, "message": f"未知CPU型号: {cpu_model}"}
try:
device = tia.add_plc_device(device_name, catalog, firmware)
return {
"success": True,
"message": f"PLC设备添加成功: {device_name}",
"device_name": device.Name
}
except Exception as e:
logger.exception("添加PLC失败")
return {"success": False, "message": f"添加PLC失败: {str(e)}"}
def action_create_block(params: Dict[str, Any]) -> Dict[str, Any]:
"""根据描述创建程序块并生成SCL代码"""
tia = get_tia(params)
if not tia.project:
return {"success": False, "message": "没有打开的项目"}
plc_name = params.get("plc_name")
block_type = params.get("block_type") # "OB","FB","FC","DB"
block_name = params.get("block_name")
description = params.get("description", "")
number = params.get("number", 0)
language = params.get("language", "SCL")
if not plc_name or not block_type or not block_name:
return {"success": False, "message": "缺少 plc_name, block_type 或 block_name 参数"}
try:
plc_software = tia.get_plc_software(plc_name)
# 生成SCL代码
scl_code = _scl_gen.generate(block_type, block_name, description)
# 创建块
block = tia.create_block(plc_software, block_type, block_name, number, language, scl_code)
return {
"success": True,
"message": f"{block_type}块创建成功: {block_name}",
"block_name": block.Name,
"block_number": block.Number
}
except Exception as e:
logger.exception("创建块失败")
return {"success": False, "message": f"创建块失败: {str(e)}"}
def action_compile_software(params: Dict[str, Any]) -> Dict[str, Any]:
"""编译PLC软件"""
tia = get_tia(params)
if not tia.project:
return {"success": False, "message": "没有打开的项目"}
plc_name = params.get("plc_name")
if not plc_name:
return {"success": False, "message": "缺少 plc_name 参数"}
try:
plc_software = tia.get_plc_software(plc_name)
success, msgs = tia.compile_plc(plc_software)
if success:
return {"success": True, "message": "编译成功", "messages": msgs}
else:
return {"success": False, "message": "编译失败", "messages": msgs}
except Exception as e:
return {"success": False, "message": f"编译异常: {str(e)}"}
def action_download(params: Dict[str, Any]) -> Dict[str, Any]:
"""下载到PLC"""
tia = get_tia(params)
if not tia.project:
return {"success": False, "message": "没有打开的项目"}
plc_name = params.get("plc_name")
interface = params.get("interface", "PN/IE")
ip_address = params.get("ip_address")
password = params.get("password")
if not plc_name:
return {"success": False, "message": "缺少 plc_name 参数"}
try:
plc_software = tia.get_plc_software(plc_name)
success, result = tia.download_to_plc(plc_software, interface, ip_address, password)
if success:
return {"success": True, "message": result}
else:
return {"success": False, "message": "下载失败", "details": result}
except Exception as e:
logger.exception("下载失败")
return {"success": False, "message": f"下载异常: {str(e)}"}
def action_save_project(params: Dict[str, Any]) -> Dict[str, Any]:
"""保存项目"""
tia = get_tia(params)
if not tia.project:
return {"success": False, "message": "没有打开的项目"}
try:
tia.save_project()
return {"success": True, "message": "项目保存成功"}
except Exception as e:
return {"success": False, "message": f"保存失败: {str(e)}"}
def action_close_project(params: Dict[str, Any]) -> Dict[str, Any]:
"""关闭项目"""
tia = get_tia(params)
if not tia.project:
return {"success": False, "message": "没有打开的项目"}
try:
tia.close_project()
return {"success": True, "message": "项目已关闭"}
except Exception as e:
return {"success": False, "message": f"关闭失败: {str(e)}"}
def action_full_automation(params: Dict[str, Any]) -> Dict[str, Any]:
"""完整自动化流程"""
steps = [
("创建项目", action_create_project),
("添加PLC", action_add_plc),
("创建程序块", action_create_block),
("编译软件", action_compile_software),
("下载", action_download)
]
results = []
for step_name, step_func in steps:
# 为每个步骤传入相同参数(但可根据需要调整)
res = step_func(params)
results.append({"step": step_name, "success": res["success"], "message": res.get("message")})
if not res["success"]:
break
return {
"success": all(r["success"] for r in results),
"steps": results
}
FILE:create_project.json
{
"action": "create_project",
"path": "C:\\TIA_Projects",
"name": "MyProject",
"author": "OpenClaw",
"comment": "自动创建的项目"
}
FILE:hardware.py
"""硬件配置助手:提供常用PLC型号和模块的目录路径"""
class HardwareCatalog:
"""西门子硬件目录路径常量"""
# S7-1200 CPU
CPU_S7_1200 = {
"CPU 1211C": "SIMATIC.PLC.S71200.CPU 1211C",
"CPU 1212C": "SIMATIC.PLC.S71200.CPU 1212C",
"CPU 1214C": "SIMATIC.PLC.S71200.CPU 1214C",
"CPU 1215C": "SIMATIC.PLC.S71200.CPU 1215C",
"CPU 1217C": "SIMATIC.PLC.S71200.CPU 1217C",
}
# S7-1500 CPU
CPU_S7_1500 = {
"CPU 1511-1 PN": "SIMATIC.PLC.S71500.CPU 1511-1 PN",
"CPU 1513-1 PN": "SIMATIC.PLC.S71500.CPU 1513-1 PN",
"CPU 1515-2 PN": "SIMATIC.PLC.S71500.CPU 1515-2 PN",
"CPU 1516-3 PN/DP": "SIMATIC.PLC.S71500.CPU 1516-3 PN/DP",
"CPU 1518-4 PN/DP": "SIMATIC.PLC.S71500.CPU 1518-4 PN/DP",
}
# 数字量输入模块
DI_MODULES = {
"DI 16x24VDC": "SIMATIC.DI.S71200.DI 16x24VDC",
"DI 32x24VDC": "SIMATIC.DI.S71200.DI 32x24VDC",
"DI 16x230VAC": "SIMATIC.DI.S71200.DI 16x230VAC",
}
# 数字量输出模块
DO_MODULES = {
"DO 16x24VDC": "SIMATIC.DO.S71200.DO 16x24VDC",
"DO 32x24VDC": "SIMATIC.DO.S71200.DO 32x24VDC",
"DO 16xRelay": "SIMATIC.DO.S71200.DO 16xRelay",
}
# 模拟量输入模块
AI_MODULES = {
"AI 4x13bit": "SIMATIC.AI.S71200.AI 4x13bit",
"AI 8x13bit": "SIMATIC.AI.S71200.AI 8x13bit",
}
# 模拟量输出模块
AO_MODULES = {
"AO 2x14bit": "SIMATIC.AO.S71200.AO 2x14bit",
"AO 4x14bit": "SIMATIC.AO.S71200.AO 4x14bit",
}
FILE:OpenClaw workflow.yaml
- skill: tia-openness-complete-skill
params:
action: create_project
path: "C:\\TIA_Projects"
name: "DemoProject"
- skill: tia-openness-complete-skill
params:
action: add_plc
device_name: "PLC_1"
cpu_model: "CPU 1511-1 PN"
- skill: tia-openness-complete-skill
params:
action: create_block
plc_name: "PLC_1"
block_type: "FB"
block_name: "MotorControl"
description: "电机控制,输入启动停止,输出运行"
- skill: tia-openness-complete-skill
params:
action: compile_software
plc_name: "PLC_1"
- skill: tia-openness-complete-skill
params:
action: download
plc_name: "PLC_1"
interface: "PN/IE"
ip_address: "192.168.0.1"
close_after: true
FILE:compile_software.json
{
"action": "compile_software",
"plc_name": "PLC_1"
}
FILE:requirements.txt
pythonnet>=3.0.0
jinja2>=3.0.0
FILE:close_project.json
{
"action": "close_project"
}
FILE:add_plc.json
{
"action": "add_plc",
"device_name": "PLC_1",
"cpu_model": "CPU 1511-1 PN",
"family": "S7-1500",
"firmware": "V4.0"
}
FILE:utils.py
"""辅助工具函数"""
import os
import winreg
import tempfile
from typing import Optional
def get_openness_dll_path(tia_version: str = "V18") -> tuple:
"""
从注册表获取Openness DLL路径
返回 (engineering_dll_path, hmi_dll_path)
"""
try:
key_path = f"SOFTWARE\\Siemens\\Automation\\Openness\\{tia_version}\\PublicAPI\\{tia_version}.0.0"
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, key_path, 0, winreg.KEY_READ) as key:
eng_dll, _ = winreg.QueryValueEx(key, "Siemens.Engineering")
hmi_dll, _ = winreg.QueryValueEx(key, "Siemens.Engineering.Hmi")
return eng_dll, hmi_dll
except Exception as e:
raise RuntimeError(f"无法从注册表获取Openness DLL路径: {e}")
def ensure_directory(path: str) -> bool:
"""确保目录存在,如果不存在则创建"""
try:
os.makedirs(path, exist_ok=True)
return True
except Exception:
return False
def create_temp_file(content: str, suffix: str = ".tmp") -> str:
"""创建临时文件并写入内容"""
fd, path = tempfile.mkstemp(suffix=suffix, text=True)
with os.fdopen(fd, 'w', encoding='utf-8') as f:
f.write(content)
return path
def parse_bool(value) -> bool:
"""将各种输入转换为布尔值"""
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() in ('true', '1', 'yes', 'on')
return bool(value)
FILE:tia_core.py
"""TIA Openness核心封装,基于pythonnet调用.NET API"""
import clr
import sys
from typing import Optional, List, Dict, Any
import logging
from .utils import get_openness_dll_path
logger = logging.getLogger(__name__)
class TiaPortal:
"""TIA Portal实例封装"""
def __init__(self, tia_version: str = "V18", mode: str = "WithUserInterface"):
"""
初始化TIA Portal连接
:param tia_version: TIA版本,如 "V16","V17","V18"
:param mode: 启动模式,"WithUserInterface" 或 "WithoutUserInterface"
"""
self.tia_version = tia_version
self.mode = mode
self._portal = None
self._project = None
self._load_assemblies()
def _load_assemblies(self):
"""加载Openness程序集"""
try:
eng_dll, hmi_dll = get_openness_dll_path(self.tia_version)
clr.AddReference(eng_dll)
clr.AddReference(hmi_dll)
from Siemens.Engineering import TiaPortal as TiaPortalNet, TiaPortalMode
from Siemens.Engineering import TiaPortalProcess
from Siemens.Engineering.HW import Device, DeviceItem
from Siemens.Engineering.SW.Blocks import PlcBlock, PlcBlockType, PlcProgrammingLanguage, ICompilable
from Siemens.Engineering.SW.Tags import PlcTagTable, PlcTag
from Siemens.Engineering.SW.Types import PlcType
from Siemens.Engineering.Compiler import CompilerResultState
from Siemens.Engineering.Online import OnlineProvider
from Siemens.Engineering.Download import DownloadProvider, DownloadOptions
from Siemens.Engineering.Connection import ConnectionConfiguration
from Siemens.Engineering.HW.Features import SoftwareContainer
self.TiaPortalNet = TiaPortalNet
self.TiaPortalMode = TiaPortalMode
self.TiaPortalProcess = TiaPortalProcess
self.Device = Device
self.DeviceItem = DeviceItem
self.PlcBlock = PlcBlock
self.PlcBlockType = PlcBlockType
self.PlcProgrammingLanguage = PlcProgrammingLanguage
self.ICompilable = ICompilable
self.PlcTagTable = PlcTagTable
self.PlcTag = PlcTag
self.PlcType = PlcType
self.CompilerResultState = CompilerResultState
self.OnlineProvider = OnlineProvider
self.DownloadProvider = DownloadProvider
self.DownloadOptions = DownloadOptions
self.ConnectionConfiguration = ConnectionConfiguration
self.SoftwareContainer = SoftwareContainer
logger.info(f"成功加载TIA Openness {self.tia_version} 程序集")
except Exception as e:
logger.error(f"加载Openness程序集失败: {e}")
raise
def start(self):
"""启动或附加到TIA Portal"""
processes = self.TiaPortalProcess.GetProcesses()
if len(processes) > 0:
self._portal = processes[0].Attach()
logger.info("已附加到运行中的TIA Portal进程")
else:
mode_map = {
"WithUserInterface": self.TiaPortalMode.WithUserInterface,
"WithoutUserInterface": self.TiaPortalMode.WithoutUserInterface
}
self._portal = self.TiaPortalNet(mode_map.get(self.mode, self.TiaPortalMode.WithUserInterface))
logger.info("已启动新的TIA Portal实例")
def close(self):
"""关闭TIA Portal连接"""
if self._portal:
self._portal.Dispose()
self._portal = None
self._project = None
logger.info("TIA Portal连接已关闭")
@property
def portal(self):
if self._portal is None:
self.start()
return self._portal
@property
def project(self):
return self._project
# ---------- 项目操作 ----------
def create_project(self, path: str, name: str, author: str = "", comment: str = ""):
"""创建新项目"""
from System.IO import DirectoryInfo
from Siemens.Engineering import ProjectInfo
proj_info = ProjectInfo()
proj_info.Name = name
proj_info.Path = DirectoryInfo(path)
proj_info.Author = author
proj_info.Comment = comment
self._project = self.portal.Projects.Create(proj_info)
logger.info(f"项目创建成功: {path}\\{name}")
return self._project
def open_project(self, project_path: str, user: Optional[str] = None, password: Optional[str] = None):
"""打开现有项目,支持UMAC认证"""
from System.IO import FileInfo
from System import Security
def handle_auth(sender, args):
if user and password:
ss = Security.SecureString()
for ch in password:
ss.AppendChar(ch)
creds = args.UmacCredentials
creds.Type = self._get_enum("UmacUserType", "Project")
creds.Name = user
creds.SetPassword(ss)
if user and password:
self.portal.Authentication += handle_auth
file_info = FileInfo(project_path)
self._project = self.portal.Projects.Open(file_info)
logger.info(f"项目打开成功: {project_path}")
return self._project
def save_project(self):
"""保存项目"""
if self._project:
self._project.Save()
logger.info("项目已保存")
return True
return False
def close_project(self):
"""关闭项目(不保存)"""
if self._project:
self._project.Close()
self._project = None
logger.info("项目已关闭")
return True
return False
# ---------- 硬件操作 ----------
def add_plc_device(self, device_name: str, catalog_path: str, firmware: str = "V4.0"):
"""
添加PLC设备
:param device_name: 设备名称,如 "PLC_1"
:param catalog_path: 硬件目录路径,如 "SIMATIC.PLC.S71500.CPU 1511-1 PN"
:param firmware: 固件版本,如 "V4.0"
:return: 设备对象
"""
identifier = f"{catalog_path}/{firmware}"
device_info = self.portal.HardwareCatalog.Find(identifier)
if device_info is None:
raise Exception(f"未找到硬件设备: {identifier}")
device = self._project.Devices.CreateWithItem(device_info, device_name)
logger.info(f"添加PLC设备: {device_name} ({catalog_path})")
return device
def get_plc_software(self, device_name: str):
"""获取PLC软件对象"""
for device in self._project.Devices:
if device.Name == device_name:
for item in device.DeviceItems:
if "PLCSoftware" in item.Classification.ToString():
sc = item.GetService(self.SoftwareContainer)
if sc:
return sc.Software
raise Exception(f"未找到PLC软件: {device_name}")
# ---------- 程序块操作 ----------
def create_block(self, plc_software, block_type: str, name: str,
number: int = 0, language: str = "SCL", code: str = None):
"""
创建程序块
:param block_type: "OB", "FB", "FC", "DB"
:param name: 块名称
:param number: 编号(0表示自动)
:param language: 编程语言(SCL/LAD/FBD)
:param code: SCL代码内容
"""
type_map = {
"OB": self.PlcBlockType.OB,
"FB": self.PlcBlockType.FB,
"FC": self.PlcBlockType.FC,
"DB": self.PlcBlockType.GlobalDB
}
lang_map = {
"SCL": self.PlcProgrammingLanguage.SCL,
"LAD": self.PlcProgrammingLanguage.LAD,
"FBD": self.PlcProgrammingLanguage.FBD
}
block_group = plc_software.BlockGroup
if number > 0:
block = block_group.Blocks.CreateBlock(type_map[block_type], lang_map[language], name, number)
else:
block = block_group.Blocks.CreateBlock(type_map[block_type], lang_map[language], name)
if code:
block.Code.SetText(code)
logger.info(f"创建块: {block_type} {name} (编号: {number if number else '自动'})")
return block
# ---------- 编译 ----------
def compile_plc(self, plc_software) -> tuple:
"""编译PLC软件,返回 (成功标志, 消息列表)"""
compile_service = plc_software.GetService(self.ICompilable)
result = compile_service.Compile()
msgs = [msg.Text for msg in result.Messages]
if result.State == self.CompilerResultState.Success:
logger.info("编译成功")
return True, msgs
else:
logger.error(f"编译失败: {result.State}")
return False, msgs
# ---------- 下载 ----------
def download_to_plc(self, plc_software, interface: str = "PN/IE",
ip_address: str = None, password: Optional[str] = None):
"""
下载到PLC
:param interface: 接口类型,如 "PN/IE"
:param ip_address: PLC的IP地址(如果与项目中配置不同)
:param password: PLC保护密码
"""
from Siemens.Engineering.Download.Configurations import DownloadPasswordConfiguration
from System import Security
# 获取下载服务
device_item = plc_software.Parent.Parent # 获取PLC的DeviceItem
download_provider = device_item.GetService(self.DownloadProvider)
if download_provider is None:
raise Exception("无法获取下载服务")
# 配置连接
conn_config = download_provider.Configuration
mode = conn_config.Modes.Find(interface)
if mode is None:
raise Exception(f"未找到接口模式: {interface}")
pc_interface = mode.PcInterfaces[0] # 简化:取第一个
if ip_address:
# 使用指定IP
target_config = pc_interface.Addresses.Create(ip_address)
else:
target_config = pc_interface.TargetInterfaces[0]
# 设置下载选项
options = self.DownloadOptions.Hardware | self.DownloadOptions.Software
# 定义回调处理密码等
def pre_download(config):
if isinstance(config, DownloadPasswordConfiguration) and password:
ss = Security.SecureString()
for ch in password:
ss.AppendChar(ch)
config.SetPassword(ss)
result = download_provider.Download(target_config, pre_download, None, options)
if result.State == self.CompilerResultState.Success:
logger.info("下载成功")
return True, "下载成功"
else:
msgs = [msg.Message for msg in result.Messages]
return False, msgs
# ---------- 辅助 ----------
def _get_enum(self, enum_type_name: str, value_name: str):
"""获取.NET枚举值"""
try:
from Siemens.Engineering import UmacUserType
if enum_type_name == "UmacUserType":
return getattr(UmacUserType, value_name)
except:
pass
raise ValueError(f"无法获取枚举 {enum_type_name}.{value_name}")
FILE:create_block.json
{
"action": "create_block",
"plc_name": "PLC_1",
"block_type": "FB",
"block_name": "MotorControl",
"description": "电机控制功能块,输入启动、停止,输出运行",
"number": 10
}
FILE:templates/fc_template.txt
FUNCTION "{{ block_name }}" : Void
// {{ description }}
VAR_INPUT
{% for var in variables if var.direction == 'input' %}
{{ var.name }} : {{ var.type }};
{% endfor %}
END_VAR
VAR_OUTPUT
{% for var in variables if var.direction == 'output' %}
{{ var.name }} : {{ var.type }};
{% endfor %}
END_VAR
BEGIN
{{ logic }}
END_FUNCTION
FILE:templates/ob_template.txt
ORGANIZATION_BLOCK "{{ block_name }}"
// {{ description }}
BEGIN
// 主程序循环
{{ logic }}
// 调用功能块示例
// "MotorControl"(Start := #Start, Stop := #Stop, Run => #MotorRun);
END_ORGANIZATION_BLOCK
FILE:templates/db_template.txt
DATA_BLOCK "{{ block_name }}"
{ S7_Optimized_Access := 'TRUE' }
VERSION : 0.1
NON_RETAIN
VAR
{% for var in variables %}
{{ var.name }} : {{ var.type }};
{% endfor %}
END_VAR
BEGIN
END_DATA_BLOCK
FILE:templates/fb_template.txt
FUNCTION_BLOCK "{{ block_name }}"
// {{ description }}
VAR_INPUT
{% for var in variables if var.direction == 'input' %}
{{ var.name }} : {{ var.type }};
{% endfor %}
END_VAR
VAR_OUTPUT
{% for var in variables if var.direction == 'output' %}
{{ var.name }} : {{ var.type }};
{% endfor %}
END_VAR
VAR
// 内部变量
END_VAR
BEGIN
{{ logic }}
END_FUNCTION_BLOCK