@clawhub-hioneowner-e94a128ac5
钉钉操作助手,支持查人、查部门、发消息、预约会议、审批管理、日程查询和知识库操作等功能。
---
name: ding-skills
description: 钉钉操作助手。当用户提到以下任何场景时必须使用此技能:查人(查下某某、搜一下某人、找一下谁谁)、查部门、查手机号、查工号、约会议(预约会议、创建会议、安排会议、开个会)、发消息(给某人发消息、群里发个通知)、查审批(我的审批、待审批、审批状态)、发起审批、同意/拒绝审批、查日程、创建日程、查员工数、查离职、开视频会议、知识库(查知识库、创建文档、搜索文档、覆写文档)。Use when user mentions anything about DingTalk: looking up people, searching users/departments, scheduling meetings, creating conferences, sending messages, managing approvals, checking calendar events, querying employee info, knowledge base operations (list workspaces, create/search/overwrite documents), or any DingTalk-related operations.
---
# Ding Skills
钉钉全功能技能集:用户管理、部门管理、消息发送、OA审批、视频会议、日程管理。
## 前置要求
- 已设置环境变量 `DINGTALK_APP_KEY` 和 `DINGTALK_APP_SECRET`
- 钉钉应用已创建并拥有相应 API 权限
## 环境变量配置
```bash
export DINGTALK_APP_KEY="<your-app-key>"
export DINGTALK_APP_SECRET="<your-app-secret>"
export DINGTALK_ROBOT_CODE="<your-robot-code>" # 可选,发消息时使用
```
## 重要:常用工作流(必读)
大部分钉钉 API 需要 `userId` 或 `unionId`,但用户通常只会说人名。**遇到人名时,必须先查人再执行操作。**
### 工作流1:按人名预约会议 / 创建视频会议
当用户说"帮我和张三、李四开个会"或"预约一个会议,参会人:张三、李四"时:
```
步骤1: python scripts/search_user.py "张三" → 得到 userId
步骤2: python scripts/get_user.py "<userId>" → 得到 unionId
步骤3: 对每个参会人重复步骤1-2
步骤4: python scripts/create_schedule_conference.py "<主题>" "<发起人unionId>" "<开始时间>" "<结束时间>" "<参会人unionId1,unionId2>" "[会议地点]"
```
### 工作流2:按人名发消息
当用户说"给张三发个消息"时:
```
步骤1: python scripts/search_user.py "张三" → 得到 userId
步骤2: python scripts/send_user_message.py "<userId>" "<消息内容>"
```
注意:robotCode 自动从环境变量 DINGTALK_ROBOT_CODE 读取,也可作为第3个参数手动传入。
### 工作流3:按人名查审批
当用户说"查下张三的待审批"时:
```
步骤1: python scripts/search_user.py "张三" → 得到 userId
步骤2: python scripts/list_user_todo_approvals.py "<userId>"
```
### 工作流4:按人名查日程
当用户说"查下张三今天的日程"时:
```
步骤1: python scripts/search_user.py "张三" → 得到 userId
步骤2: python scripts/get_user.py "<userId>" → 得到 unionId
步骤3: python scripts/list_events.py "<unionId>" "[开始时间]" "[结束时间]"
```
### 工作流5:在知识库中创建文档
当用户说"在知识库里创建一个文档"时:
```
步骤1: python scripts/search_user.py "张三" → 得到 userId
步骤2: python scripts/get_user.py "<userId>" → 得到 unionId
步骤3: python scripts/list_workspaces.py "<unionId>" → 得到 workspaceId
步骤4: python scripts/create_doc.py "<workspaceId>" "<文档名>" "<unionId>"
```
### 工作流6:搜索知识库文档获取链接
当用户说"帮我找一下知识库里的《周报》"时:
```
步骤1: python scripts/search_user.py "张三" → 得到 userId
步骤2: python scripts/get_user.py "<userId>" → 得到 unionId
步骤3: python scripts/search_doc.py "<unionId>" "周报" → 得到文档链接
```
### 通用规则
- **用户说人名** → 必须先调用 `search_user.py` 获取 userId
- **需要 unionId 的 API**(日历、会议相关) → 再调用 `get_user.py` 从 userId 获取 unionId
- **需要 userId 的 API**(消息、审批、部门相关) → search_user.py 的结果可直接使用
- **可以并行查询**多个用户以提高效率
## 功能列表
### 1. 搜索用户 (search-user)
根据姓名搜索用户,返回匹配的 UserId 列表。
```bash
python scripts/search_user.py "<搜索关键词>"
```
输出:
```json
{
"success": true,
"keyword": "张三",
"totalCount": 3,
"hasMore": false,
"userIds": ["123456789", "987654321"]
}
```
### 2. 查询用户详情 (get-user)
获取指定用户的详细信息。
```bash
python scripts/get_user.py "<userId>"
```
输出:
```json
{
"success": true,
"user": {
"userid": "user001",
"name": "张三",
"mobile": "138****1234",
"dept_id_list": [12345],
"unionid": "xxxxx"
}
}
```
### 3. 根据手机号查询用户 (get-user-by-mobile)
```bash
python scripts/get_user_by_mobile.py "<手机号>"
```
输出:
```json
{ "success": true, "mobile": "13800138000", "userId": "user001" }
```
### 4. 根据 unionid 查询用户 (get-user-by-unionid)
```bash
python scripts/get_user_by_unionid.py "<unionid>"
```
输出:
```json
{ "success": true, "unionid": "xxxxx", "userId": "user001" }
```
### 5. 获取员工人数 (get-user-count)
```bash
python scripts/get_user_count.py [--onlyActive]
```
输出:
```json
{ "success": true, "onlyActive": false, "count": 150 }
```
### 6. 获取用户待审批数量 (get-user-todo-count)
```bash
python scripts/get_user_todo_count.py "<userId>"
```
输出:
```json
{ "success": true, "userId": "user001", "count": 5 }
```
### 7. 获取未登录用户列表 (list-inactive-users)
```bash
python scripts/list_inactive_users.py "<queryDate>" [--deptIds "id1,id2"] [--offset 0] [--size 100]
```
queryDate 格式: yyyyMMdd
输出:
```json
{ "success": true, "queryDate": "20240115", "userIds": ["user001"], "hasMore": false }
```
### 8. 查询离职记录列表 (list-resigned-users)
```bash
python scripts/list_resigned_users.py "<startTime>" ["<endTime>"] [--nextToken "xxx"] [--maxResults 100]
```
startTime/endTime 格式: ISO8601
输出:
```json
{
"success": true,
"startTime": "2024-01-01T00:00:00+08:00",
"records": [{ "userId": "user001", "name": "张三", "leaveTime": "2024-01-15T10:00:00Z" }]
}
```
### 9. 搜索部门 (search-department)
```bash
python scripts/search_department.py "<搜索关键词>"
```
输出:
```json
{ "success": true, "keyword": "技术部", "totalCount": 2, "departmentIds": [12345, 67890] }
```
### 10. 获取部门详情 (get-department)
```bash
python scripts/get_department.py "<deptId>"
```
输出:
```json
{ "success": true, "department": { "deptId": 12345, "name": "技术部", "parentId": 1 } }
```
### 11. 获取子部门列表 (list-sub-departments)
根部门 deptId = 1。
```bash
python scripts/list_sub_departments.py "<deptId>"
```
输出:
```json
{ "success": true, "deptId": 1, "subDepartmentIds": [12345, 67890] }
```
### 12. 获取部门用户列表 (list-department-users)
自动分页获取所有用户(简略信息)。
```bash
python scripts/list_department_users.py "<deptId>"
```
输出:
```json
{
"success": true,
"deptId": 12345,
"users": [{ "userId": "user001", "name": "张三" }, { "userId": "user002", "name": "李四" }]
}
```
### 13. 获取部门用户详情 (list-department-user-details)
分页获取,支持 cursor 和 size。
```bash
python scripts/list_department_user_details.py "<deptId>" [--cursor 0] [--size 100]
```
输出:
```json
{ "success": true, "deptId": 12345, "users": [...], "hasMore": true, "nextCursor": 100 }
```
### 14. 获取部门用户 ID 列表 (list-department-user-ids)
```bash
python scripts/list_department_user_ids.py "<deptId>"
```
输出:
```json
{ "success": true, "deptId": 12345, "userIds": ["user001", "user002"] }
```
### 15. 获取部门父部门链 (list-department-parents)
```bash
python scripts/list_department_parents.py "<deptId>"
```
输出:
```json
{ "success": true, "deptId": 12345, "parentIdList": [12345, 67890, 1] }
```
### 16. 获取用户所属部门父部门链 (list-user-parent-departments)
```bash
python scripts/list_user_parent_departments.py "<userId>"
```
输出:
```json
{ "success": true, "userId": "user001", "parentIdList": [12345, 1] }
```
### 17. 获取群内机器人列表 (get-bot-list)
```bash
python scripts/get_bot_list.py "<openConversationId>"
```
输出:
```json
{
"success": true,
"openConversationId": "cid",
"botList": [{ "robotCode": "code", "robotName": "name" }]
}
```
### 18. 机器人发送群消息 (send-group-message)
robotCode 自动从环境变量 `DINGTALK_ROBOT_CODE` 读取,也可作为第3个参数手动传入。
```bash
python scripts/send_group_message.py "<openConversationId>" "<消息内容>" ["<robotCode>"]
```
输出:
```json
{ "success": true, "openConversationId": "cid", "robotCode": "code", "processQueryKey": "key", "message": "消息内容" }
```
### 19. 机器人发送单聊消息 (send-user-message)
robotCode 自动从环境变量 `DINGTALK_ROBOT_CODE` 读取,也可作为第3个参数手动传入。
```bash
python scripts/send_user_message.py "<userId>" "<消息内容>" ["<robotCode>"]
```
输出:
```json
{ "success": true, "userId": "user001", "robotCode": "code", "processQueryKey": "key", "message": "消息内容" }
```
### 20. 获取审批实例 ID 列表 (list-approval-instance-ids)
```bash
python scripts/list_approval_instance_ids.py "<processCode>" --startTime <timestamp> --endTime <timestamp> [--size 20] [--nextToken "xxx"]
```
输出:
```json
{ "success": true, "processCode": "PROC-XXX", "instanceIds": ["id1", "id2"], "totalCount": 2, "hasMore": false }
```
### 21. 获取审批实例详情 (get-approval-instance)
```bash
python scripts/get_approval_instance.py "<instanceId>"
```
输出:
```json
{
"success": true,
"instanceId": "xxx-123",
"instance": {
"processInstanceId": "xxx-123",
"title": "请假申请",
"status": "COMPLETED",
"formComponentValues": [...],
"tasks": [...]
}
}
```
### 22. 查询用户发起的审批 (list-user-initiated-approvals)
```bash
python scripts/list_user_initiated_approvals.py "<userId>" [--startTime <ts>] [--endTime <ts>] [--maxResults 20]
```
输出:
```json
{ "success": true, "userId": "user001", "instances": [...], "totalCount": 5, "hasMore": false }
```
### 23. 查询用户抄送的审批 (list-user-cc-approvals)
```bash
python scripts/list_user_cc_approvals.py "<userId>" [--startTime <ts>] [--endTime <ts>] [--maxResults 20]
```
### 24. 查询用户待审批实例 (list-user-todo-approvals)
```bash
python scripts/list_user_todo_approvals.py "<userId>" [--maxResults 20]
```
输出:
```json
{ "success": true, "userId": "user001", "instances": [...], "totalCount": 3, "hasMore": false }
```
### 25. 查询用户已审批实例 (list-user-done-approvals)
```bash
python scripts/list_user_done_approvals.py "<userId>" [--startTime <ts>] [--endTime <ts>] [--maxResults 20]
```
### 26. 发起审批实例 (create-approval-instance)
```bash
python scripts/create_approval_instance.py "<processCode>" "<originatorUserId>" "<deptId>" '<formValuesJson>' [--ccList "user1,user2"]
```
formValuesJson 示例: `'[{"name":"标题","value":"请假申请"}]'`
输出:
```json
{ "success": true, "processCode": "PROC-XXX", "originatorUserId": "user001", "instanceId": "xxx-new" }
```
### 27. 撤销审批实例 (terminate-approval-instance)
```bash
python scripts/terminate_approval_instance.py "<instanceId>" "<operatingUserId>" ["<remark>"]
```
输出:
```json
{ "success": true, "instanceId": "xxx-123", "message": "审批实例已撤销" }
```
### 28. 执行审批任务 (execute-approval-task)
同意或拒绝审批任务。
```bash
python scripts/execute_approval_task.py "<instanceId>" "<userId>" "<agree|refuse>" [--taskId "xxx"] [--remark "审批意见"]
```
输出:
```json
{ "success": true, "instanceId": "xxx-123", "userId": "user001", "action": "agree", "message": "已同意审批" }
```
### 29. 转交审批任务 (transfer-approval-task)
```bash
python scripts/transfer_approval_task.py "<instanceId>" "<userId>" "<transferToUserId>" [--taskId "xxx"] [--remark "转交原因"]
```
输出:
```json
{ "success": true, "instanceId": "xxx-123", "userId": "user001", "transferToUserId": "user002", "message": "审批任务已转交" }
```
### 30. 添加审批评论 (add-approval-comment)
```bash
python scripts/add_approval_comment.py "<instanceId>" "<commentUserId>" "<评论内容>"
```
输出:
```json
{ "success": true, "instanceId": "xxx-123", "userId": "user001", "message": "评论已添加" }
```
### 31. 创建即时视频会议 (create-video-conference)
立即创建视频会议并邀请参会人。
```bash
python scripts/create_video_conference.py "<会议主题>" "<发起人unionId>" "[邀请人unionId1,unionId2]"
```
输出:
```json
{ "success": true, "title": "测试会议", "conferenceId": "xxx", "conferencePassword": "123456" }
```
### 32. 关闭视频会议 (close-video-conference)
```bash
python scripts/close_video_conference.py "<conferenceId>" "<操作人unionId>"
```
输出:
```json
{ "success": true, "conferenceId": "xxx", "message": "视频会议已关闭" }
```
### 33. 创建预约会议 (create-schedule-conference)
通过日历 API 创建预约会议,自动关联钉钉视频会议,日程会出现在钉钉日历中。
```bash
python scripts/create_schedule_conference.py "<会议主题>" "<创建人unionId>" "<开始时间>" "<结束时间>" "[参会人unionId1,unionId2]" "[会议地点]"
```
时间格式: `"2026-03-16 14:00"` 或 ISO 8601
输出:
```json
{
"success": true,
"title": "周会",
"eventId": "NXZCUEtxOGZMN3JpcDQ3ZE45UVRFdz09",
"onlineMeetingUrl": "dingtalk://...",
"conferenceId": "xxx",
"startTime": "2026-03-16T14:00:00+08:00",
"endTime": "2026-03-16T15:00:00+08:00",
"attendeeCount": 2
}
```
### 34. 取消预约会议 (cancel-schedule-conference)
```bash
python scripts/cancel_schedule_conference.py "<scheduleConferenceId>" "<创建人unionId>"
```
输出:
```json
{ "success": true, "scheduleConferenceId": "xxx", "message": "预约会议已取消" }
```
### 35. 查询日程列表 (list-events)
```bash
python scripts/list_events.py "<用户unionId>" [--time-min "2026-03-01 00:00"] [--time-max "2026-03-31 23:59"]
```
输出:
```json
{
"success": true,
"totalCount": 5,
"events": [{ "id": "eventId", "summary": "周会", "start": {...}, "end": {...} }]
}
```
### 36. 查询日程详情 (get-event)
```bash
python scripts/get_event.py "<用户unionId>" "<eventId>"
```
输出:
```json
{
"success": true,
"event": { "id": "eventId", "summary": "周会", "attendees": [...], "onlineMeetingInfo": {...} }
}
```
### 37. 删除日程 (delete-event)
```bash
python scripts/delete_event.py "<用户unionId>" "<eventId>" [--push-notification]
```
输出:
```json
{ "success": true, "eventId": "xxx", "message": "日程已删除" }
```
### 38. 添加日程参与者 (add-event-attendee)
```bash
python scripts/add_event_attendee.py "<用户unionId>" "<eventId>" "<参与者unionId1,unionId2>"
```
输出:
```json
{ "success": true, "eventId": "xxx", "addedCount": 2, "message": "已添加 2 位参与者" }
```
### 39. 移除日程参与者 (remove-event-attendee)
```bash
python scripts/remove_event_attendee.py "<用户unionId>" "<eventId>" "<参与者unionId1,unionId2>"
```
输出:
```json
{ "success": true, "eventId": "xxx", "removedCount": 1, "message": "已移除 1 位参与者" }
```
### 40. 获取知识库列表 (list-workspaces)
获取用户能访问的所有知识库。
```bash
python scripts/list_workspaces.py "<操作人unionId>"
```
输出:
```json
{
"success": true,
"totalCount": 2,
"workspaces": [
{ "workspaceId": "xxx", "name": "技术部知识库", "type": "TEAM", "url": "https://...", "rootNodeId": "yyy" }
]
}
```
### 41. 创建知识库文档 (create-doc)
在指定知识库中创建新文档。
```bash
python scripts/create_doc.py "<workspaceId>" "<文档名>" "<操作人unionId>" ["<docType>"]
```
docType 可选值:`alidoc`(钉钉文档,默认)、`alisheet`(表格)、`alinote`(笔记)
输出:
```json
{
"success": true,
"name": "周报",
"docType": "alidoc",
"workspaceId": "xxx",
"nodeId": "yyy",
"docKey": "zzz",
"url": "https://..."
}
```
### 42. 搜索知识库文档 (search-doc)
根据文档名关键词搜索知识库文档,返回文档链接。
```bash
python scripts/search_doc.py "<操作人unionId>" "<文档名关键词>" ["<workspaceId>"]
```
不指定 workspaceId 时搜索所有知识库。
输出:
```json
{
"success": true,
"keyword": "周报",
"totalCount": 3,
"documents": [
{ "name": "3月第2周周报", "nodeId": "xxx", "url": "https://...", "category": "ALIDOC", "workspaceName": "技术部知识库" }
]
}
```
### 43. 覆写文档内容 (overwrite-doc)
覆写知识库文档的全部内容(全量替换,非追加)。
```bash
python scripts/overwrite_doc.py "<workspaceId>" "<nodeId>" "<操作人unionId>" "<内容>"
```
输出:
```json
{ "success": true, "workspaceId": "xxx", "nodeId": "yyy", "message": "文档内容已覆写" }
```
## 错误处理
所有脚本在错误时返回统一格式:
```json
{
"success": false,
"error": {
"code": "ERROR_CODE",
"message": "错误描述"
}
}
```
常见错误码:
- `MISSING_CREDENTIALS` - 未设置环境变量
- `INVALID_ARGS` - 参数不足
- `UNKNOWN_ERROR` - API 调用异常
## 重要说明
- `userId` 是企业内部用户 ID,`unionId` 是全局唯一标识
- 会议、日程、知识库相关的 API 使用 `unionId`,可通过 get-user 查询获取
- 根部门 deptId 为 1
- 知识库 `workspaceId` 通过 list-workspaces 获取,`nodeId` 通过 search-doc 获取
FILE:package.json
{
"name": "ding-skills",
"version": "2.2.3",
"description": "钉钉全功能技能集 - 用户管理、部门管理、消息发送、OA审批、会议与日程管理、知识库文档管理",
"scripts": {
"search-user": "python scripts/search_user.py",
"get-user": "python scripts/get_user.py",
"get-user-by-mobile": "python scripts/get_user_by_mobile.py",
"get-user-by-unionid": "python scripts/get_user_by_unionid.py",
"get-user-count": "python scripts/get_user_count.py",
"get-user-todo-count": "python scripts/get_user_todo_count.py",
"list-inactive-users": "python scripts/list_inactive_users.py",
"list-resigned-users": "python scripts/list_resigned_users.py",
"search-department": "python scripts/search_department.py",
"get-department": "python scripts/get_department.py",
"list-sub-departments": "python scripts/list_sub_departments.py",
"list-department-users": "python scripts/list_department_users.py",
"list-department-user-details": "python scripts/list_department_user_details.py",
"list-department-user-ids": "python scripts/list_department_user_ids.py",
"list-department-parents": "python scripts/list_department_parents.py",
"list-user-parent-departments": "python scripts/list_user_parent_departments.py",
"get-bot-list": "python scripts/get_bot_list.py",
"send-group-message": "python scripts/send_group_message.py",
"send-user-message": "python scripts/send_user_message.py",
"list-approval-instance-ids": "python scripts/list_approval_instance_ids.py",
"get-approval-instance": "python scripts/get_approval_instance.py",
"list-user-initiated-approvals": "python scripts/list_user_initiated_approvals.py",
"list-user-cc-approvals": "python scripts/list_user_cc_approvals.py",
"list-user-todo-approvals": "python scripts/list_user_todo_approvals.py",
"list-user-done-approvals": "python scripts/list_user_done_approvals.py",
"create-approval-instance": "python scripts/create_approval_instance.py",
"terminate-approval-instance": "python scripts/terminate_approval_instance.py",
"execute-approval-task": "python scripts/execute_approval_task.py",
"transfer-approval-task": "python scripts/transfer_approval_task.py",
"add-approval-comment": "python scripts/add_approval_comment.py",
"create-video-conference": "python scripts/create_video_conference.py",
"close-video-conference": "python scripts/close_video_conference.py",
"create-schedule-conference": "python scripts/create_schedule_conference.py",
"cancel-schedule-conference": "python scripts/cancel_schedule_conference.py",
"list-events": "python scripts/list_events.py",
"get-event": "python scripts/get_event.py",
"delete-event": "python scripts/delete_event.py",
"add-event-attendee": "python scripts/add_event_attendee.py",
"remove-event-attendee": "python scripts/remove_event_attendee.py",
"list-workspaces": "python scripts/list_workspaces.py",
"create-doc": "python scripts/create_doc.py",
"search-doc": "python scripts/search_doc.py",
"overwrite-doc": "python scripts/overwrite_doc.py"
},
"keywords": [
"dingtalk",
"api",
"skill",
"meeting",
"calendar",
"approval",
"knowledge-base",
"wiki"
],
"license": "MIT"
}
FILE:scripts/__init__.py
FILE:scripts/add_approval_comment.py
"""添加审批评论
用法: python scripts/add_approval_comment.py "<instanceId>" "<commentUserId>" "<评论内容>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 3:
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/add_approval_comment.py \"<instanceId>\" \"<commentUserId>\" \"<评论内容>\""}})
sys.exit(1)
instance_id = args[0]
comment_user_id = args[1]
text = args[2]
try:
token = get_access_token()
print("正在添加审批评论...", file=sys.stderr)
api_request("POST", "/workflow/processInstances/comments", token, json_body={
"processInstanceId": instance_id,
"commentUserId": comment_user_id,
"text": text,
})
output({"success": True, "instanceId": instance_id, "userId": comment_user_id, "message": "评论已添加"})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/add_event_attendee.py
"""添加日程参与者
用法: python scripts/add_event_attendee.py "<用户unionId>" "<eventId>" "<参与者unionId1,unionId2,...>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 4:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/add_event_attendee.py \"<用户unionId>\" \"<eventId>\" \"<参与者unionId1,unionId2,...>\"",
}
})
sys.exit(1)
union_id = sys.argv[1]
event_id = sys.argv[2]
attendee_ids = sys.argv[3].split(",")
try:
token = get_access_token()
print("正在添加参与者...", file=sys.stderr)
body = {
"attendeesToAdd": [{"id": aid, "isOptional": False} for aid in attendee_ids],
}
api_request(
"POST",
f"/calendar/users/{union_id}/calendars/primary/events/{event_id}/attendees",
token,
json_body=body,
)
output({
"success": True,
"eventId": event_id,
"addedCount": len(attendee_ids),
"message": f"已添加 {len(attendee_ids)} 位参与者",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/cancel_schedule_conference.py
"""取消预约会议
用法: python scripts/cancel_schedule_conference.py "<scheduleConferenceId>" "<创建人unionId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 3:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/cancel_schedule_conference.py \"<scheduleConferenceId>\" \"<创建人unionId>\"",
}
})
sys.exit(1)
schedule_conference_id = sys.argv[1]
creator_union_id = sys.argv[2]
try:
token = get_access_token()
print("正在取消预约会议...", file=sys.stderr)
body = {
"scheduleConferenceId": schedule_conference_id,
"creatorUnionId": creator_union_id,
}
api_request("POST", "/conference/scheduleConferences/cancel", token, json_body=body)
output({
"success": True,
"scheduleConferenceId": schedule_conference_id,
"message": "预约会议已取消",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/close_video_conference.py
"""关闭视频会议
用法: python scripts/close_video_conference.py "<conferenceId>" "<操作人unionId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 3:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/close_video_conference.py \"<conferenceId>\" \"<操作人unionId>\"",
}
})
sys.exit(1)
conference_id = sys.argv[1]
union_id = sys.argv[2]
try:
token = get_access_token()
print("正在关闭视频会议...", file=sys.stderr)
body = {"unionId": union_id}
api_request("DELETE", f"/conference/videoConferences/{conference_id}", token, json_body=body)
output({
"success": True,
"conferenceId": conference_id,
"message": "视频会议已关闭",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/create_approval_instance.py
"""发起审批实例
用法: python scripts/create_approval_instance.py "<processCode>" "<originatorUserId>" "<deptId>" '<formValuesJson>' [--ccList "user1,user2"]
formValuesJson 示例: '[{"name":"标题","value":"请假申请"}]'
"""
import sys
import os
import json as json_mod
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"process_code": None, "originator_user_id": None, "dept_id": None, "form_values_json": None, "cc_list": None}
positional = []
i = 1
while i < len(argv):
if argv[i] == "--ccList" and i + 1 < len(argv):
args["cc_list"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--"):
positional.append(argv[i])
i += 1
else:
i += 1
if len(positional) >= 1:
args["process_code"] = positional[0]
if len(positional) >= 2:
args["originator_user_id"] = positional[1]
if len(positional) >= 3:
args["dept_id"] = positional[2]
if len(positional) >= 4:
args["form_values_json"] = positional[3]
return args
def main():
args = parse_args(sys.argv)
if not all([args["process_code"], args["originator_user_id"], args["dept_id"], args["form_values_json"]]):
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/create_approval_instance.py \"<processCode>\" \"<originatorUserId>\" \"<deptId>\" '<formValuesJson>' [--ccList \"user1,user2\"]"}})
sys.exit(1)
try:
form_values = json_mod.loads(args["form_values_json"])
except json_mod.JSONDecodeError:
output({"success": False, "error": {"code": "INVALID_JSON", "message": "formValuesJson 参数不是有效的 JSON 字符串"}})
sys.exit(1)
try:
token = get_access_token()
print("正在发起审批实例...", file=sys.stderr)
body = {
"processCode": args["process_code"],
"originatorUserId": args["originator_user_id"],
"deptId": args["dept_id"],
"formComponentValues": form_values,
}
if args["cc_list"]:
body["ccList"] = args["cc_list"]
result = api_request("POST", "/workflow/processInstances", token, json_body=body)
output({
"success": True,
"processCode": args["process_code"],
"originatorUserId": args["originator_user_id"],
"instanceId": result.get("result", {}).get("processInstanceId", ""),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/create_doc.py
"""在知识库中创建文档
用法: python scripts/create_doc.py "<workspaceId>" "<文档名>" "<操作人unionId>" ["<docType>"]
docType 默认为 alidoc(钉钉文档),可选:alidoc, alinote, alisheet
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 3:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/create_doc.py \"<workspaceId>\" \"<文档名>\" \"<操作人unionId>\" [\"<docType>\"]"}})
sys.exit(1)
workspace_id = args[0]
doc_name = args[1]
operator_id = args[2]
doc_type = args[3] if len(args) >= 4 else "alidoc"
try:
token = get_access_token()
print(f"正在创建文档: {doc_name}...", file=sys.stderr)
result = api_request("POST", f"/doc/workspaces/{workspace_id}/docs", token, json_body={
"name": doc_name,
"docType": doc_type,
"operatorId": operator_id,
})
output({
"success": True,
"name": doc_name,
"docType": doc_type,
"workspaceId": result.get("workspaceId", workspace_id),
"nodeId": result.get("nodeId"),
"docKey": result.get("docKey"),
"url": result.get("url"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/create_schedule_conference.py
"""创建预约会议(通过日历 API 创建带视频会议的日程)
用法: python scripts/create_schedule_conference.py "<会议主题>" "<创建人unionId>" "<开始时间>" "<结束时间>" "[参会人unionId1,unionId2,...]" "[会议地点]"
时间格式: "2026-03-16 14:00" 或 ISO 8601
"""
import sys
import os
from datetime import datetime, timezone, timedelta
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
CST = timezone(timedelta(hours=8))
def parse_time(time_str):
for fmt in ("%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"):
try:
dt = datetime.strptime(time_str, fmt)
dt = dt.replace(tzinfo=CST)
return dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
except ValueError:
continue
return time_str
def main():
if len(sys.argv) < 5:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/create_schedule_conference.py \"<主题>\" \"<创建人unionId>\" \"<开始时间>\" \"<结束时间>\" \"[参会人unionId1,unionId2,...]\" \"[会议地点]\"",
}
})
sys.exit(1)
title = sys.argv[1]
creator_union_id = sys.argv[2]
start_time = parse_time(sys.argv[3])
end_time = parse_time(sys.argv[4])
invite_union_ids = sys.argv[5].split(",") if len(sys.argv) > 5 else []
location = sys.argv[6] if len(sys.argv) > 6 else ""
try:
token = get_access_token()
print("正在创建预约会议...", file=sys.stderr)
attendees = [{"id": uid, "isOptional": False} for uid in invite_union_ids]
body = {
"summary": title,
"start": {"dateTime": start_time, "timeZone": "Asia/Shanghai"},
"end": {"dateTime": end_time, "timeZone": "Asia/Shanghai"},
"isAllDay": False,
"onlineMeetingInfo": {"type": "dingtalk"},
"attendees": attendees,
}
if location:
body["location"] = {"displayName": location}
result = api_request(
"POST",
f"/calendar/users/{creator_union_id}/calendars/primary/events",
token,
json_body=body,
)
online_info = result.get("onlineMeetingInfo", {})
resp_data = {
"success": True,
"title": title,
"eventId": result.get("id"),
"onlineMeetingUrl": online_info.get("extraInfo", {}).get("url", ""),
"conferenceId": online_info.get("conferenceId", ""),
"startTime": start_time,
"endTime": end_time,
"attendeeCount": len(invite_union_ids),
}
if location:
resp_data["location"] = location
output(resp_data)
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/create_video_conference.py
"""创建即时视频会议
用法: python scripts/create_video_conference.py "<会议主题>" "<发起人unionId>" "[邀请人unionId1,unionId2,...]"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 3:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/create_video_conference.py \"<会议主题>\" \"<发起人unionId>\" \"[邀请人unionId1,unionId2,...]\"",
}
})
sys.exit(1)
conf_title = sys.argv[1]
user_id = sys.argv[2]
invite_user_ids = sys.argv[3].split(",") if len(sys.argv) > 3 else []
try:
token = get_access_token()
print("正在创建视频会议...", file=sys.stderr)
body = {
"userId": user_id,
"confTitle": conf_title,
"inviteCaller": True,
"inviteUserIds": invite_user_ids,
}
result = api_request("POST", "/conference/videoConferences", token, json_body=body)
output({
"success": True,
"title": conf_title,
"conferenceId": result.get("conferenceId"),
"conferencePassword": result.get("conferencePassword"),
"hostPassword": result.get("hostPassword"),
"phoneNumbers": result.get("phoneNumbers"),
"externalLinkUrl": result.get("externalLinkUrl"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/delete_event.py
"""删除日程
用法: python scripts/delete_event.py "<用户unionId>" "<eventId>" [--push-notification]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 3:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/delete_event.py \"<用户unionId>\" \"<eventId>\" [--push-notification]",
}
})
sys.exit(1)
union_id = sys.argv[1]
event_id = sys.argv[2]
push_notification = "--push-notification" in sys.argv
try:
token = get_access_token()
print("正在删除日程...", file=sys.stderr)
params = {}
if push_notification:
params["pushNotification"] = "true"
api_request(
"DELETE",
f"/calendar/users/{union_id}/calendars/primary/events/{event_id}",
token,
params=params,
)
output({
"success": True,
"eventId": event_id,
"message": "日程已删除",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/dingtalk_client.py
"""钉钉 API 客户端 - 封装认证和 HTTP 请求,支持 OpenAPI 和 TOP API"""
import os
import sys
import json
import requests
API_BASE = "https://api.dingtalk.com"
OPEN_API_BASE = f"{API_BASE}/v1.0"
TOP_API_BASE = "https://oapi.dingtalk.com"
def get_credentials():
app_key = os.environ.get("DINGTALK_APP_KEY")
app_secret = os.environ.get("DINGTALK_APP_SECRET")
if not app_key or not app_secret:
print(json.dumps({
"success": False,
"error": {
"code": "MISSING_CREDENTIALS",
"message": "缺少钉钉应用凭证,请设置环境变量 DINGTALK_APP_KEY 和 DINGTALK_APP_SECRET",
}
}, ensure_ascii=False, indent=2))
sys.exit(1)
return app_key, app_secret
def get_access_token():
app_key, app_secret = get_credentials()
print("正在获取 access_token...", file=sys.stderr)
resp = requests.post(f"{OPEN_API_BASE}/oauth2/accessToken", json={
"appKey": app_key,
"appSecret": app_secret,
})
data = resp.json()
token = data.get("accessToken")
if not token:
print(json.dumps({
"success": False,
"error": {
"code": data.get("code", "UNKNOWN"),
"message": data.get("message", "获取 access_token 失败"),
}
}, ensure_ascii=False, indent=2))
sys.exit(1)
print("access_token 获取成功", file=sys.stderr)
return token
def api_request(method, path, token, json_body=None, params=None, api_version="v1.0"):
"""OpenAPI 请求 (api.dingtalk.com),通过 header 传递 token"""
headers = {"x-acs-dingtalk-access-token": token}
url = f"{API_BASE}/{api_version}{path}"
resp = requests.request(method, url, headers=headers, json=json_body, params=params)
if resp.status_code >= 400:
data = resp.json() if resp.text else {}
raise ApiError(
code=data.get("code", f"HTTP_{resp.status_code}"),
message=data.get("message", resp.text),
request_id=data.get("requestid"),
)
if resp.status_code == 204 or not resp.text:
return {}
return resp.json()
def top_api_request(method, path, token, json_body=None):
"""TOP API 请求 (oapi.dingtalk.com),通过 query 参数传递 token,响应格式带 errcode/result"""
url = f"{TOP_API_BASE}{path}"
params = {"access_token": token}
resp = requests.request(method, url, params=params, json=json_body)
if resp.status_code >= 400:
data = resp.json() if resp.text else {}
raise ApiError(
code=data.get("code", f"HTTP_{resp.status_code}"),
message=data.get("message", resp.text),
)
data = resp.json()
if data.get("errcode", 0) != 0:
raise ApiError(
code=str(data.get("errcode")),
message=data.get("errmsg", "未知错误"),
)
return data
class ApiError(Exception):
def __init__(self, code, message, request_id=None):
self.code = code
self.api_message = message
self.request_id = request_id
super().__init__(f"{code}: {message}")
def handle_error(e):
if isinstance(e, ApiError):
print(json.dumps({
"success": False,
"error": {
"code": e.code,
"message": e.api_message,
"requestId": e.request_id,
}
}, ensure_ascii=False, indent=2))
else:
print(json.dumps({
"success": False,
"error": {
"code": "UNKNOWN_ERROR",
"message": str(e),
}
}, ensure_ascii=False, indent=2))
def get_robot_code(arg_value=None):
"""获取 robotCode:优先使用传入参数,其次读取环境变量 DINGTALK_ROBOT_CODE"""
code = arg_value or os.environ.get("DINGTALK_ROBOT_CODE")
if not code:
print(json.dumps({
"success": False,
"error": {
"code": "MISSING_ROBOT_CODE",
"message": "缺少 robotCode,请通过命令行参数传入或设置环境变量 DINGTALK_ROBOT_CODE",
}
}, ensure_ascii=False, indent=2))
sys.exit(1)
return code
def output(data):
print(json.dumps(data, ensure_ascii=False, indent=2))
FILE:scripts/execute_approval_task.py
"""执行审批任务(同意/拒绝)
用法: python scripts/execute_approval_task.py "<instanceId>" "<userId>" "<agree|refuse>" [--taskId "xxx"] [--remark "审批意见"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"instance_id": None, "user_id": None, "result": None, "task_id": None, "remark": None}
positional = []
i = 1
while i < len(argv):
if argv[i] == "--taskId" and i + 1 < len(argv):
args["task_id"] = argv[i + 1]
i += 2
elif argv[i] == "--remark" and i + 1 < len(argv):
args["remark"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--"):
positional.append(argv[i])
i += 1
else:
i += 1
if len(positional) >= 1:
args["instance_id"] = positional[0]
if len(positional) >= 2:
args["user_id"] = positional[1]
if len(positional) >= 3:
args["result"] = positional[2]
return args
def main():
args = parse_args(sys.argv)
if not all([args["instance_id"], args["user_id"], args["result"]]):
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/execute_approval_task.py \"<instanceId>\" \"<userId>\" \"<agree|refuse>\" [--taskId \"xxx\"] [--remark \"审批意见\"]"}})
sys.exit(1)
if args["result"] not in ("agree", "refuse"):
output({"success": False, "error": {"code": "INVALID_RESULT", "message": "审批结果必须是 agree(同意)或 refuse(拒绝)"}})
sys.exit(1)
try:
token = get_access_token()
print("正在执行审批任务...", file=sys.stderr)
body = {
"processInstanceId": args["instance_id"],
"actionerUserId": args["user_id"],
"result": args["result"],
"remark": args["remark"] or "",
}
if args["task_id"]:
body["taskId"] = args["task_id"]
result = api_request("POST", "/workflow/processInstances/tasks/execute", token, json_body=body)
output({
"success": result.get("result", {}).get("success", False),
"instanceId": args["instance_id"],
"userId": args["user_id"],
"action": args["result"],
"message": "已同意审批" if args["result"] == "agree" else "已拒绝审批",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_approval_instance.py
"""获取审批实例详情
用法: python scripts/get_approval_instance.py "<instanceId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/get_approval_instance.py \"<instanceId>\""}})
sys.exit(1)
instance_id = args[0]
try:
token = get_access_token()
print("正在查询审批实例详情...", file=sys.stderr)
result = api_request("GET", "/workflow/processInstances", token, params={"processInstanceId": instance_id})
output({"success": True, "instanceId": instance_id, "instance": result.get("result", {})})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_bot_list.py
"""获取群内机器人列表
用法: python scripts/get_bot_list.py "<openConversationId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/get_bot_list.py \"<openConversationId>\""}})
sys.exit(1)
open_conversation_id = args[0]
try:
token = get_access_token()
print("正在获取群内机器人列表...", file=sys.stderr)
result = api_request("POST", "/robot/getBotListInGroup", token, json_body={
"openConversationId": open_conversation_id,
})
output({
"success": True,
"openConversationId": open_conversation_id,
"botList": result.get("chatbotInstanceVOList", []),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_department.py
"""获取部门详情
用法: python scripts/get_department.py "<deptId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/get_department.py \"<deptId>\""}})
sys.exit(1)
dept_id = int(args[0])
try:
token = get_access_token()
print("正在获取部门详情...", file=sys.stderr)
result = top_api_request("POST", "/topapi/v2/department/get", token, json_body={"dept_id": dept_id})
output({"success": True, "department": result.get("result", {})})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_event.py
"""查询日程详情
用法: python scripts/get_event.py "<用户unionId>" "<eventId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 3:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/get_event.py \"<用户unionId>\" \"<eventId>\"",
}
})
sys.exit(1)
union_id = sys.argv[1]
event_id = sys.argv[2]
try:
token = get_access_token()
print("正在查询日程详情...", file=sys.stderr)
result = api_request(
"GET",
f"/calendar/users/{union_id}/calendars/primary/events/{event_id}",
token,
)
output({
"success": True,
"event": {
"id": result.get("id"),
"summary": result.get("summary"),
"description": result.get("description"),
"start": result.get("start"),
"end": result.get("end"),
"status": result.get("status"),
"isAllDay": result.get("isAllDay"),
"location": result.get("location"),
"organizer": result.get("organizer"),
"attendees": result.get("attendees"),
"onlineMeetingInfo": result.get("onlineMeetingInfo"),
"recurrence": result.get("recurrence"),
"createTime": result.get("createTime"),
"updateTime": result.get("updateTime"),
},
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_user.py
"""查询用户详情
用法: python scripts/get_user.py "<userId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
if len(sys.argv) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/get_user.py \"<userId>\""}})
sys.exit(1)
user_id = sys.argv[1]
try:
token = get_access_token()
print("正在查询用户详情...", file=sys.stderr)
result = top_api_request("POST", "/topapi/v2/user/get", token, json_body={
"userid": user_id, "language": "zh_CN",
})
output({"success": True, "user": result.get("result", {})})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_user_by_mobile.py
"""根据手机号查询用户
用法: python scripts/get_user_by_mobile.py "<手机号>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
if len(sys.argv) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/get_user_by_mobile.py \"<手机号>\""}})
sys.exit(1)
mobile = sys.argv[1]
try:
token = get_access_token()
print("正在根据手机号查询用户...", file=sys.stderr)
result = top_api_request("POST", "/topapi/v2/user/getbymobile", token, json_body={"mobile": mobile})
output({"success": True, "mobile": mobile, "userId": result.get("result", {}).get("userid")})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_user_by_unionid.py
"""根据 unionid 查询用户 userId
用法: python scripts/get_user_by_unionid.py "<unionid>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
if len(sys.argv) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/get_user_by_unionid.py \"<unionid>\""}})
sys.exit(1)
unionid = sys.argv[1]
try:
token = get_access_token()
print("正在根据 unionid 查询用户...", file=sys.stderr)
result = top_api_request("POST", "/topapi/user/getbyunionid", token, json_body={"unionid": unionid})
output({"success": True, "unionid": unionid, "userId": result.get("result", {}).get("userid")})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_user_count.py
"""获取员工人数
用法: python scripts/get_user_count.py [--onlyActive]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
only_active = "--onlyActive" in sys.argv
try:
token = get_access_token()
print("正在获取员工人数...", file=sys.stderr)
result = top_api_request("POST", "/topapi/user/count", token, json_body={"only_active": only_active})
output({"success": True, "onlyActive": only_active, "count": result.get("result", {}).get("count", 0)})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_user_todo_count.py
"""获取用户待审批数量
用法: python scripts/get_user_todo_count.py "<userId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/get_user_todo_count.py \"<userId>\""}})
sys.exit(1)
user_id = args[0]
try:
token = get_access_token()
print("正在查询用户待审批数量...", file=sys.stderr)
result = api_request("GET", "/workflow/processes/todoTasks/numbers", token, params={"userId": user_id})
output({"success": True, "userId": user_id, "count": result.get("result", {}).get("count", 0)})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_approval_instance_ids.py
"""获取审批实例 ID 列表
用法: python scripts/list_approval_instance_ids.py "<processCode>" --startTime <timestamp> --endTime <timestamp> [--size 20] [--nextToken "xxx"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"process_code": None, "start_time": 0, "end_time": 0, "size": 20, "next_token": None}
i = 1
while i < len(argv):
if argv[i] == "--startTime" and i + 1 < len(argv):
args["start_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--endTime" and i + 1 < len(argv):
args["end_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--size" and i + 1 < len(argv):
args["size"] = int(argv[i + 1])
i += 2
elif argv[i] == "--nextToken" and i + 1 < len(argv):
args["next_token"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--") and args["process_code"] is None:
args["process_code"] = argv[i]
i += 1
else:
i += 1
return args
def main():
args = parse_args(sys.argv)
if not args["process_code"] or not args["start_time"] or not args["end_time"]:
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/list_approval_instance_ids.py \"<processCode>\" --startTime <timestamp> --endTime <timestamp> [--size 20] [--nextToken \"xxx\"]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在查询审批实例ID列表...", file=sys.stderr)
body = {
"processCode": args["process_code"],
"startTime": args["start_time"],
"endTime": args["end_time"],
"size": args["size"],
}
if args["next_token"]:
body["nextToken"] = args["next_token"]
result = api_request("POST", "/workflow/processInstances/ids", token, json_body=body)
r = result.get("result", {})
instance_ids = r.get("list", [])
output({
"success": True,
"processCode": args["process_code"],
"instanceIds": instance_ids,
"totalCount": len(instance_ids),
"hasMore": bool(r.get("nextToken")),
"nextToken": r.get("nextToken"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_department_parents.py
"""获取部门的父部门链
用法: python scripts/list_department_parents.py "<deptId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_department_parents.py \"<deptId>\""}})
sys.exit(1)
dept_id = int(args[0])
try:
token = get_access_token()
print("正在获取部门父部门链...", file=sys.stderr)
result = top_api_request("POST", "/topapi/v2/department/listparentbydept", token, json_body={"dept_id": dept_id})
output({"success": True, "deptId": dept_id, "parentIdList": result.get("result", {}).get("parent_id_list", [])})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_department_user_details.py
"""获取部门用户详细信息(分页)
用法: python scripts/list_department_user_details.py "<deptId>" [--cursor 0] [--size 100]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def parse_args(argv):
args = {"dept_id": None, "cursor": 0, "size": 100}
i = 1
while i < len(argv):
if argv[i] == "--cursor" and i + 1 < len(argv):
args["cursor"] = int(argv[i + 1])
i += 2
elif argv[i] == "--size" and i + 1 < len(argv):
args["size"] = int(argv[i + 1])
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--") and args["dept_id"] is None:
args["dept_id"] = int(argv[i])
i += 1
else:
i += 1
return args
def main():
args = parse_args(sys.argv)
if args["dept_id"] is None:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_department_user_details.py \"<deptId>\" [--cursor 0] [--size 100]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在获取部门用户详情...", file=sys.stderr)
result = top_api_request("POST", "/topapi/v2/user/list", token, json_body={
"dept_id": args["dept_id"], "cursor": args["cursor"], "size": args["size"],
})
r = result.get("result", {})
output({
"success": True,
"deptId": args["dept_id"],
"users": r.get("list", []),
"hasMore": r.get("has_more", False),
"nextCursor": r.get("next_cursor"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_department_user_ids.py
"""获取部门用户 ID 列表
用法: python scripts/list_department_user_ids.py "<deptId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_department_user_ids.py \"<deptId>\""}})
sys.exit(1)
dept_id = int(args[0])
try:
token = get_access_token()
print("正在获取部门用户ID列表...", file=sys.stderr)
result = top_api_request("POST", "/topapi/user/listid", token, json_body={"dept_id": dept_id})
output({"success": True, "deptId": dept_id, "userIds": result.get("result", {}).get("userid_list", [])})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_department_users.py
"""获取部门用户列表(简略信息,自动分页)
用法: python scripts/list_department_users.py "<deptId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_department_users.py \"<deptId>\""}})
sys.exit(1)
dept_id = int(args[0])
try:
token = get_access_token()
print("正在获取部门用户列表...", file=sys.stderr)
all_users = []
cursor = 0
has_more = True
while has_more:
result = top_api_request("POST", "/topapi/v2/user/list", token, json_body={
"dept_id": dept_id, "cursor": cursor, "size": 100,
})
r = result.get("result", {})
for u in (r.get("list") or []):
all_users.append({"userId": u.get("userid"), "name": u.get("name")})
has_more = r.get("has_more", False)
if has_more:
cursor = r.get("next_cursor", 0)
output({"success": True, "deptId": dept_id, "users": all_users})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_events.py
"""查询日程列表
用法: python scripts/list_events.py "<用户unionId>" [--time-min "2026-03-01 00:00"] [--time-max "2026-03-31 23:59"] [--max-results 50] [--next-token "xxx"]
"""
import sys
import os
from datetime import datetime, timezone, timedelta
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
CST = timezone(timedelta(hours=8))
def parse_time_to_iso(time_str):
for fmt in ("%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"):
try:
dt = datetime.strptime(time_str, fmt)
dt = dt.replace(tzinfo=CST)
return dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
except ValueError:
continue
return time_str
def parse_args(argv):
args = {"union_id": None, "time_min": None, "time_max": None, "max_results": None, "next_token": None}
positional = []
i = 1
while i < len(argv):
if argv[i] == "--time-min" and i + 1 < len(argv):
args["time_min"] = parse_time_to_iso(argv[i + 1])
i += 2
elif argv[i] == "--time-max" and i + 1 < len(argv):
args["time_max"] = parse_time_to_iso(argv[i + 1])
i += 2
elif argv[i] == "--max-results" and i + 1 < len(argv):
args["max_results"] = int(argv[i + 1])
i += 2
elif argv[i] == "--next-token" and i + 1 < len(argv):
args["next_token"] = argv[i + 1]
i += 2
else:
positional.append(argv[i])
i += 1
if positional:
args["union_id"] = positional[0]
return args
def main():
args = parse_args(sys.argv)
if not args["union_id"]:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/list_events.py \"<用户unionId>\" [--time-min \"...\"] [--time-max \"...\"] [--max-results 50]",
}
})
sys.exit(1)
try:
token = get_access_token()
print("正在查询日程列表...", file=sys.stderr)
params = {}
if args["time_min"]:
params["timeMin"] = args["time_min"]
if args["time_max"]:
params["timeMax"] = args["time_max"]
if args["max_results"]:
params["maxResults"] = args["max_results"]
if args["next_token"]:
params["nextToken"] = args["next_token"]
result = api_request(
"GET",
f"/calendar/users/{args['union_id']}/calendars/primary/events",
token,
params=params,
)
events = result.get("events", [])
output({
"success": True,
"totalCount": len(events),
"nextToken": result.get("nextToken"),
"events": [{
"id": e.get("id"),
"summary": e.get("summary"),
"start": e.get("start"),
"end": e.get("end"),
"status": e.get("status"),
"isAllDay": e.get("isAllDay"),
"onlineMeetingInfo": e.get("onlineMeetingInfo"),
} for e in events],
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_inactive_users.py
"""获取未登录钉钉的员工列表
用法: python scripts/list_inactive_users.py "<queryDate>" [--deptIds "id1,id2,..."] [--offset 0] [--size 100]
queryDate 格式: yyyyMMdd
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def parse_args(argv):
args = {"query_date": None, "dept_ids": [], "offset": 0, "size": 100}
i = 1
while i < len(argv):
if argv[i] == "--deptIds" and i + 1 < len(argv):
args["dept_ids"] = [int(x.strip()) for x in argv[i + 1].split(",") if x.strip()]
i += 2
elif argv[i] == "--offset" and i + 1 < len(argv):
args["offset"] = int(argv[i + 1])
i += 2
elif argv[i] == "--size" and i + 1 < len(argv):
args["size"] = int(argv[i + 1])
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--") and args["query_date"] is None:
args["query_date"] = argv[i]
i += 1
else:
i += 1
return args
def main():
args = parse_args(sys.argv)
if not args["query_date"]:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_inactive_users.py \"<queryDate>\" [--deptIds \"id1,id2\"] [--offset 0] [--size 100]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在获取未登录用户列表...", file=sys.stderr)
body = {"is_active": False, "query_date": args["query_date"], "offset": args["offset"], "size": args["size"]}
if args["dept_ids"]:
body["dept_ids"] = args["dept_ids"]
result = top_api_request("POST", "/topapi/inactive/user/v2/get", token, json_body=body)
r = result.get("result", {})
output({
"success": True,
"queryDate": args["query_date"],
"userIds": r.get("list", []),
"hasMore": r.get("has_more", False),
"nextOffset": (args["offset"] + args["size"]) if r.get("has_more") else None,
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_resigned_users.py
"""查询离职记录列表
用法: python scripts/list_resigned_users.py "<startTime>" ["<endTime>"] [--nextToken "xxx"] [--maxResults 100]
startTime/endTime 格式: ISO8601 (如 2024-01-15T00:00:00+08:00)
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"start_time": None, "end_time": None, "next_token": None, "max_results": 100}
positional = []
i = 1
while i < len(argv):
if argv[i] == "--nextToken" and i + 1 < len(argv):
args["next_token"] = argv[i + 1]
i += 2
elif argv[i] == "--maxResults" and i + 1 < len(argv):
args["max_results"] = int(argv[i + 1])
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--"):
positional.append(argv[i])
i += 1
else:
i += 1
if len(positional) >= 1:
args["start_time"] = positional[0]
if len(positional) >= 2:
args["end_time"] = positional[1]
return args
def main():
args = parse_args(sys.argv)
if not args["start_time"]:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_resigned_users.py \"<startTime>\" [\"<endTime>\"] [--nextToken \"xxx\"] [--maxResults 100]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在查询离职记录...", file=sys.stderr)
params = {"startTime": args["start_time"], "maxResults": str(args["max_results"])}
if args["end_time"]:
params["endTime"] = args["end_time"]
if args["next_token"]:
params["nextToken"] = args["next_token"]
result = api_request("GET", "/contact/empLeaveRecords", token, params=params)
output({
"success": True,
"startTime": args["start_time"],
"endTime": args["end_time"],
"records": result.get("records", []),
"nextToken": result.get("nextToken"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_sub_departments.py
"""获取子部门列表
用法: python scripts/list_sub_departments.py "<deptId>"
根部门 deptId = 1
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_sub_departments.py \"<deptId>\",根部门为 1"}})
sys.exit(1)
dept_id = int(args[0])
try:
token = get_access_token()
print("正在获取子部门列表...", file=sys.stderr)
result = top_api_request("POST", "/topapi/v2/department/listsub", token, json_body={"dept_id": dept_id})
sub_ids = [d.get("dept_id") for d in (result.get("result") or [])]
output({"success": True, "deptId": dept_id, "subDepartmentIds": sub_ids})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_user_cc_approvals.py
"""查询用户抄送的审批实例
用法: python scripts/list_user_cc_approvals.py "<userId>" [--startTime <ts>] [--endTime <ts>] [--maxResults 20] [--nextToken "xxx"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"user_id": None, "start_time": None, "end_time": None, "max_results": 20, "next_token": None}
i = 1
while i < len(argv):
if argv[i] == "--startTime" and i + 1 < len(argv):
args["start_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--endTime" and i + 1 < len(argv):
args["end_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--maxResults" and i + 1 < len(argv):
args["max_results"] = int(argv[i + 1])
i += 2
elif argv[i] == "--nextToken" and i + 1 < len(argv):
args["next_token"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--") and args["user_id"] is None:
args["user_id"] = argv[i]
i += 1
else:
i += 1
return args
def main():
args = parse_args(sys.argv)
if not args["user_id"]:
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/list_user_cc_approvals.py \"<userId>\" [--startTime <ts>] [--endTime <ts>] [--maxResults 20]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在查询用户抄送的审批...", file=sys.stderr)
body = {"userId": args["user_id"], "maxResults": args["max_results"]}
if args["start_time"]:
body["startTime"] = args["start_time"]
if args["end_time"]:
body["endTime"] = args["end_time"]
if args["next_token"]:
body["nextToken"] = args["next_token"]
result = api_request("POST", "/workflow/processInstances/userCc", token, json_body=body)
instances = result.get("result", {}).get("list", [])
output({
"success": True,
"userId": args["user_id"],
"instances": instances,
"totalCount": len(instances),
"hasMore": bool(result.get("result", {}).get("nextToken")),
"nextToken": result.get("result", {}).get("nextToken"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_user_done_approvals.py
"""查询用户已审批的实例
用法: python scripts/list_user_done_approvals.py "<userId>" [--startTime <ts>] [--endTime <ts>] [--maxResults 20] [--nextToken "xxx"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"user_id": None, "start_time": None, "end_time": None, "max_results": 20, "next_token": None}
i = 1
while i < len(argv):
if argv[i] == "--startTime" and i + 1 < len(argv):
args["start_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--endTime" and i + 1 < len(argv):
args["end_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--maxResults" and i + 1 < len(argv):
args["max_results"] = int(argv[i + 1])
i += 2
elif argv[i] == "--nextToken" and i + 1 < len(argv):
args["next_token"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--") and args["user_id"] is None:
args["user_id"] = argv[i]
i += 1
else:
i += 1
return args
def main():
args = parse_args(sys.argv)
if not args["user_id"]:
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/list_user_done_approvals.py \"<userId>\" [--startTime <ts>] [--endTime <ts>] [--maxResults 20]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在查询用户已审批实例...", file=sys.stderr)
body = {"userId": args["user_id"], "maxResults": args["max_results"]}
if args["start_time"]:
body["startTime"] = args["start_time"]
if args["end_time"]:
body["endTime"] = args["end_time"]
if args["next_token"]:
body["nextToken"] = args["next_token"]
result = api_request("POST", "/workflow/processInstances/userDone", token, json_body=body)
instances = result.get("result", {}).get("list", [])
output({
"success": True,
"userId": args["user_id"],
"instances": instances,
"totalCount": len(instances),
"hasMore": bool(result.get("result", {}).get("nextToken")),
"nextToken": result.get("result", {}).get("nextToken"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_user_initiated_approvals.py
"""查询用户发起的审批实例
用法: python scripts/list_user_initiated_approvals.py "<userId>" [--startTime <ts>] [--endTime <ts>] [--maxResults 20] [--nextToken "xxx"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"user_id": None, "start_time": None, "end_time": None, "max_results": 20, "next_token": None}
i = 1
while i < len(argv):
if argv[i] == "--startTime" and i + 1 < len(argv):
args["start_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--endTime" and i + 1 < len(argv):
args["end_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--maxResults" and i + 1 < len(argv):
args["max_results"] = int(argv[i + 1])
i += 2
elif argv[i] == "--nextToken" and i + 1 < len(argv):
args["next_token"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--") and args["user_id"] is None:
args["user_id"] = argv[i]
i += 1
else:
i += 1
return args
def main():
args = parse_args(sys.argv)
if not args["user_id"]:
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/list_user_initiated_approvals.py \"<userId>\" [--startTime <ts>] [--endTime <ts>] [--maxResults 20]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在查询用户发起的审批...", file=sys.stderr)
body = {"userId": args["user_id"], "maxResults": args["max_results"]}
if args["start_time"]:
body["startTime"] = args["start_time"]
if args["end_time"]:
body["endTime"] = args["end_time"]
if args["next_token"]:
body["nextToken"] = args["next_token"]
result = api_request("POST", "/workflow/processInstances/userStarted", token, json_body=body)
instances = result.get("result", {}).get("list", [])
output({
"success": True,
"userId": args["user_id"],
"instances": instances,
"totalCount": len(instances),
"hasMore": bool(result.get("result", {}).get("nextToken")),
"nextToken": result.get("result", {}).get("nextToken"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_user_parent_departments.py
"""获取用户所属部门的父部门链
用法: python scripts/list_user_parent_departments.py "<userId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_user_parent_departments.py \"<userId>\""}})
sys.exit(1)
user_id = args[0]
try:
token = get_access_token()
print("正在获取用户所属部门父部门链...", file=sys.stderr)
result = top_api_request("POST", "/topapi/v2/department/listparentbyuser", token, json_body={"userid": user_id})
output({"success": True, "userId": user_id, "parentIdList": result.get("result", {}).get("parent_list", [])})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_user_todo_approvals.py
"""查询用户待审批的实例
用法: python scripts/list_user_todo_approvals.py "<userId>" [--maxResults 20] [--nextToken "xxx"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"user_id": None, "max_results": 20, "next_token": None}
i = 1
while i < len(argv):
if argv[i] == "--maxResults" and i + 1 < len(argv):
args["max_results"] = int(argv[i + 1])
i += 2
elif argv[i] == "--nextToken" and i + 1 < len(argv):
args["next_token"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--") and args["user_id"] is None:
args["user_id"] = argv[i]
i += 1
else:
i += 1
return args
def main():
args = parse_args(sys.argv)
if not args["user_id"]:
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/list_user_todo_approvals.py \"<userId>\" [--maxResults 20] [--nextToken \"xxx\"]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在查询用户待审批实例...", file=sys.stderr)
body = {"userId": args["user_id"], "maxResults": args["max_results"]}
if args["next_token"]:
body["nextToken"] = args["next_token"]
result = api_request("POST", "/workflow/processInstances/userTodo", token, json_body=body)
instances = result.get("result", {}).get("list", [])
output({
"success": True,
"userId": args["user_id"],
"instances": instances,
"totalCount": len(instances),
"hasMore": bool(result.get("result", {}).get("nextToken")),
"nextToken": result.get("result", {}).get("nextToken"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_workspaces.py
"""获取知识库列表
用法: python scripts/list_workspaces.py "<操作人unionId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_workspaces.py \"<操作人unionId>\""}})
sys.exit(1)
operator_id = args[0]
try:
token = get_access_token()
print("正在获取知识库列表...", file=sys.stderr)
workspaces = []
next_token = None
while True:
params = {"operatorId": operator_id, "maxResults": 30}
if next_token:
params["nextToken"] = next_token
result = api_request("GET", "/wiki/workspaces", token, params=params, api_version="v2.0")
ws = result.get("workspace")
if ws:
if isinstance(ws, list):
workspaces.extend(ws)
else:
workspaces.append(ws)
next_token = result.get("nextToken")
if not next_token:
break
output({
"success": True,
"totalCount": len(workspaces),
"workspaces": [{"workspaceId": w.get("workspaceId"), "name": w.get("name"), "type": w.get("type"), "url": w.get("url"), "rootNodeId": w.get("rootNodeId")} for w in workspaces],
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/overwrite_doc.py
"""覆写知识库文档内容(全量替换)
用法: python scripts/overwrite_doc.py "<workspaceId>" "<nodeId>" "<操作人unionId>" "<内容>"
注意:此操作会完全替换文档原有内容
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 4:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/overwrite_doc.py \"<workspaceId>\" \"<nodeId>\" \"<操作人unionId>\" \"<内容>\""}})
sys.exit(1)
workspace_id = args[0]
node_id = args[1]
operator_id = args[2]
content = args[3]
try:
token = get_access_token()
print("正在覆写文档内容...", file=sys.stderr)
result = api_request("PUT", f"/doc/workspaces/{workspace_id}/docs/{node_id}/contents", token, json_body={
"operatorId": operator_id,
"content": content,
})
output({
"success": True,
"workspaceId": workspace_id,
"nodeId": node_id,
"message": "文档内容已覆写",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/remove_event_attendee.py
"""移除日程参与者
用法: python scripts/remove_event_attendee.py "<用户unionId>" "<eventId>" "<参与者unionId1,unionId2,...>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 4:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/remove_event_attendee.py \"<用户unionId>\" \"<eventId>\" \"<参与者unionId1,unionId2,...>\"",
}
})
sys.exit(1)
union_id = sys.argv[1]
event_id = sys.argv[2]
attendee_ids = sys.argv[3].split(",")
try:
token = get_access_token()
print("正在移除参与者...", file=sys.stderr)
body = {
"attendeesToRemove": [{"id": aid} for aid in attendee_ids],
}
api_request(
"POST",
f"/calendar/users/{union_id}/calendars/primary/events/{event_id}/attendees/batchRemove",
token,
json_body=body,
)
output({
"success": True,
"eventId": event_id,
"removedCount": len(attendee_ids),
"message": f"已移除 {len(attendee_ids)} 位参与者",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/search_department.py
"""搜索部门
用法: python scripts/search_department.py "<搜索关键词>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/search_department.py \"<搜索关键词>\""}})
sys.exit(1)
keyword = args[0]
try:
token = get_access_token()
print("正在搜索部门...", file=sys.stderr)
result = api_request("POST", "/contact/departments/search", token, json_body={
"queryWord": keyword, "offset": 0, "size": 20,
})
output({
"success": True,
"keyword": keyword,
"totalCount": result.get("totalCount", 0),
"hasMore": result.get("hasMore", False),
"departmentIds": result.get("list", []),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/search_doc.py
"""根据文档名搜索知识库文档,返回文档链接
用法: python scripts/search_doc.py "<操作人unionId>" "<文档名关键词>" ["<workspaceId>"]
如果不指定 workspaceId,会搜索所有知识库
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def search_nodes(token, parent_node_id, operator_id, keyword):
"""递归搜索节点,按名称模糊匹配"""
matched = []
next_token = None
while True:
params = {"parentNodeId": parent_node_id, "operatorId": operator_id, "maxResults": 50}
if next_token:
params["nextToken"] = next_token
result = api_request("GET", "/wiki/nodes", token, params=params, api_version="v2.0")
node = result.get("node")
if node:
nodes = node if isinstance(node, list) else [node]
for n in nodes:
name = n.get("name", "")
if keyword.lower() in name.lower():
matched.append(n)
if n.get("hasChildren"):
matched.extend(search_nodes(token, n["nodeId"], operator_id, keyword))
next_token = result.get("nextToken")
if not next_token:
break
return matched
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/search_doc.py \"<操作人unionId>\" \"<文档名关键词>\" [\"<workspaceId>\"]"}})
sys.exit(1)
operator_id = args[0]
keyword = args[1]
workspace_id = args[2] if len(args) >= 3 else None
try:
token = get_access_token()
print(f"正在搜索文档: {keyword}...", file=sys.stderr)
root_nodes = []
if workspace_id:
params = {"operatorId": operator_id, "maxResults": 30}
result = api_request("GET", "/wiki/workspaces", token, params=params, api_version="v2.0")
ws = result.get("workspace")
if ws:
ws_list = ws if isinstance(ws, list) else [ws]
for w in ws_list:
if w.get("workspaceId") == workspace_id:
root_nodes.append((w.get("rootNodeId"), w.get("name")))
break
if not root_nodes:
root_nodes.append((None, None))
else:
next_token = None
while True:
params = {"operatorId": operator_id, "maxResults": 30}
if next_token:
params["nextToken"] = next_token
result = api_request("GET", "/wiki/workspaces", token, params=params, api_version="v2.0")
ws = result.get("workspace")
if ws:
ws_list = ws if isinstance(ws, list) else [ws]
for w in ws_list:
root_nodes.append((w.get("rootNodeId"), w.get("name")))
next_token = result.get("nextToken")
if not next_token:
break
all_matched = []
for root_node_id, ws_name in root_nodes:
if root_node_id:
matched = search_nodes(token, root_node_id, operator_id, keyword)
for m in matched:
m["workspaceName"] = ws_name
all_matched.extend(matched)
output({
"success": True,
"keyword": keyword,
"totalCount": len(all_matched),
"documents": [{
"name": d.get("name"),
"nodeId": d.get("nodeId"),
"url": d.get("url"),
"category": d.get("category"),
"workspaceName": d.get("workspaceName"),
"creatorId": d.get("creatorId"),
"modifiedTime": d.get("modifiedTime"),
} for d in all_matched],
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/search_user.py
"""搜索用户
用法: python scripts/search_user.py "<搜索关键词>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/search_user.py \"<搜索关键词>\""}})
sys.exit(1)
keyword = sys.argv[1]
try:
token = get_access_token()
print("正在搜索用户...", file=sys.stderr)
result = api_request("POST", "/contact/users/search", token, json_body={
"queryWord": keyword, "offset": 0, "size": 20,
})
output({
"success": True,
"keyword": keyword,
"totalCount": result.get("totalCount", 0),
"hasMore": result.get("hasMore", False),
"userIds": result.get("list", []),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/send_group_message.py
"""机器人发送群消息
用法: python scripts/send_group_message.py "<openConversationId>" "<消息内容>" ["<robotCode>"]
robotCode 可通过命令行参数传入,也可设置环境变量 DINGTALK_ROBOT_CODE
"""
import sys
import os
import json as json_mod
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output, get_robot_code
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/send_group_message.py \"<openConversationId>\" \"<消息内容>\" [\"<robotCode>\"]"}})
sys.exit(1)
open_conversation_id = args[0]
message = args[1]
robot_code = get_robot_code(args[2] if len(args) >= 3 else None)
try:
token = get_access_token()
print("正在发送群消息...", file=sys.stderr)
result = api_request("POST", "/robot/oToMessages/groupMessages/send", token, json_body={
"openConversationId": open_conversation_id,
"robotCode": robot_code,
"msgKey": "sampleText",
"msgParam": json_mod.dumps({"content": message}),
})
output({
"success": True,
"openConversationId": open_conversation_id,
"robotCode": robot_code,
"processQueryKey": result.get("processQueryKey"),
"message": message,
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/send_user_message.py
"""机器人发送单聊消息
用法: python scripts/send_user_message.py "<userId>" "<消息内容>" ["<robotCode>"]
robotCode 可通过命令行参数传入,也可设置环境变量 DINGTALK_ROBOT_CODE
"""
import sys
import os
import json as json_mod
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output, get_robot_code
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/send_user_message.py \"<userId>\" \"<消息内容>\" [\"<robotCode>\"]"}})
sys.exit(1)
user_id = args[0]
message = args[1]
robot_code = get_robot_code(args[2] if len(args) >= 3 else None)
try:
token = get_access_token()
print("正在发送单聊消息...", file=sys.stderr)
result = api_request("POST", "/robot/oToMessages/batchSend", token, json_body={
"robotCode": robot_code,
"userIds": [user_id],
"msgKey": "sampleText",
"msgParam": json_mod.dumps({"content": message}),
})
output({
"success": True,
"userId": user_id,
"robotCode": robot_code,
"processQueryKey": result.get("processQueryKey"),
"flowControlledStaffIdList": result.get("flowControlledStaffIdList", []),
"invalidStaffIdList": result.get("invalidStaffIdList", []),
"message": message,
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/terminate_approval_instance.py
"""撤销审批实例
用法: python scripts/terminate_approval_instance.py "<instanceId>" "<operatingUserId>" ["<remark>"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/terminate_approval_instance.py \"<instanceId>\" \"<operatingUserId>\" [\"<remark>\"]"}})
sys.exit(1)
instance_id = args[0]
operating_user_id = args[1]
remark = args[2] if len(args) > 2 else ""
try:
token = get_access_token()
print("正在撤销审批实例...", file=sys.stderr)
body = {
"processInstanceId": instance_id,
"operatingUserId": operating_user_id,
"remark": remark,
}
api_request("POST", "/workflow/processInstances/terminate", token, json_body=body)
output({"success": True, "instanceId": instance_id, "message": "审批实例已撤销"})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/transfer_approval_task.py
"""转交审批任务
用法: python scripts/transfer_approval_task.py "<instanceId>" "<userId>" "<transferToUserId>" [--taskId "xxx"] [--remark "转交原因"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def parse_args(argv):
args = {"instance_id": None, "user_id": None, "transfer_to": None, "task_id": None, "remark": None}
positional = []
i = 1
while i < len(argv):
if argv[i] == "--taskId" and i + 1 < len(argv):
args["task_id"] = argv[i + 1]
i += 2
elif argv[i] == "--remark" and i + 1 < len(argv):
args["remark"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--"):
positional.append(argv[i])
i += 1
else:
i += 1
if len(positional) >= 1:
args["instance_id"] = positional[0]
if len(positional) >= 2:
args["user_id"] = positional[1]
if len(positional) >= 3:
args["transfer_to"] = positional[2]
return args
def main():
args = parse_args(sys.argv)
if not all([args["instance_id"], args["user_id"], args["transfer_to"]]):
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/transfer_approval_task.py \"<instanceId>\" \"<userId>\" \"<transferToUserId>\" [--taskId \"xxx\"] [--remark \"转交原因\"]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在转交审批任务...", file=sys.stderr)
body = {
"process_instance_id": args["instance_id"],
"userid": args["user_id"],
"transfer_to_userid": args["transfer_to"],
"remark": args["remark"] or "转交审批任务",
}
if args["task_id"]:
body["task_id"] = args["task_id"]
top_api_request("POST", "/topapi/process/workrecord/task/transfer", token, json_body=body)
output({
"success": True,
"instanceId": args["instance_id"],
"userId": args["user_id"],
"transferToUserId": args["transfer_to"],
"message": "审批任务已转交",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
Web scraping using local Crawl4AI instance. Use for fetching full page content with JavaScript rendering. Better than Tavily for complex pages. Unlimited usage.
---
name: ding-skills
description: 钉钉操作助手(DingTalk)。当用户通过钉钉对话、或提到以下任何场景时必须使用此技能:查人(查下某某、搜一下某人、找一下谁谁)、查部门、查手机号、查工号、约会议(预约会议、创建会议、安排会议、开个会)、发消息(给某人发消息、群里发个通知)、查审批(我的审批、待审批、审批状态)、发起审批、同意/拒绝审批、查日程、创建日程、查员工数、查离职、开视频会议、钉钉知识库(查钉钉知识库、在钉钉创建文档、搜索钉钉文档、覆写钉钉文档)。注意:当用户从钉钉渠道发送消息时,"知识库"默认指钉钉知识库,不是飞书知识库。Use when user mentions anything about DingTalk or when the message comes from a DingTalk channel: looking up people, searching users/departments, scheduling meetings, creating conferences, sending messages, managing approvals, checking calendar events, querying employee info, DingTalk knowledge base operations (list workspaces, create/search/overwrite documents). When user is on DingTalk channel, "知识库" means DingTalk knowledge base, NOT Feishu.
---
# Ding Skills
钉钉全功能技能集:用户管理、部门管理、消息发送、OA审批、视频会议、日程管理。
## 前置要求
- 已设置环境变量 `DINGTALK_APP_KEY` 和 `DINGTALK_APP_SECRET`
- 钉钉应用已创建并拥有相应 API 权限
## 环境变量配置
```bash
export DINGTALK_APP_KEY="<your-app-key>"
export DINGTALK_APP_SECRET="<your-app-secret>"
export DINGTALK_ROBOT_CODE="<your-robot-code>" # 可选,发消息时使用
```
## 重要:常用工作流(必读)
大部分钉钉 API 需要 `userId` 或 `unionId`,但用户通常只会说人名。**遇到人名时,必须先查人再执行操作。**
### 工作流1:按人名预约会议 / 创建视频会议
当用户说"帮我和张三、李四开个会"或"预约一个会议,参会人:张三、李四"时:
```
步骤1: python scripts/search_user.py "张三" → 得到 userId
步骤2: python scripts/get_user.py "<userId>" → 得到 unionId
步骤3: 对每个参会人重复步骤1-2
步骤4: python scripts/create_schedule_conference.py "<主题>" "<发起人unionId>" "<开始时间>" "<结束时间>" "<参会人unionId1,unionId2>" "[会议地点]"
```
### 工作流2:按人名发消息
当用户说"给张三发个消息"时:
```
步骤1: python scripts/search_user.py "张三" → 得到 userId
步骤2: python scripts/send_user_message.py "<userId>" "<消息内容>"
```
注意:robotCode 自动从环境变量 DINGTALK_ROBOT_CODE 读取,也可作为第3个参数手动传入。
### 工作流3:按人名查审批
当用户说"查下张三的待审批"时:
```
步骤1: python scripts/search_user.py "张三" → 得到 userId
步骤2: python scripts/list_user_todo_approvals.py "<userId>"
```
### 工作流4:按人名查日程
当用户说"查下张三今天的日程"时:
```
步骤1: python scripts/search_user.py "张三" → 得到 userId
步骤2: python scripts/get_user.py "<userId>" → 得到 unionId
步骤3: python scripts/list_events.py "<unionId>" "[开始时间]" "[结束时间]"
```
### 工作流5:在知识库中创建文档
当用户说"在知识库里创建一个文档"时:
```
步骤1: python scripts/search_user.py "张三" → 得到 userId
步骤2: python scripts/get_user.py "<userId>" → 得到 unionId
步骤3: python scripts/list_workspaces.py "<unionId>" → 得到 workspaceId
步骤4: python scripts/create_doc.py "<workspaceId>" "<文档名>" "<unionId>"
```
### 工作流6:搜索知识库文档获取链接
当用户说"帮我找一下知识库里的《周报》"时:
```
步骤1: python scripts/search_user.py "张三" → 得到 userId
步骤2: python scripts/get_user.py "<userId>" → 得到 unionId
步骤3: python scripts/search_doc.py "<unionId>" "周报" → 得到文档链接
```
### 通用规则
- **用户说人名** → 必须先调用 `search_user.py` 获取 userId
- **需要 unionId 的 API**(日历、会议相关) → 再调用 `get_user.py` 从 userId 获取 unionId
- **需要 userId 的 API**(消息、审批、部门相关) → search_user.py 的结果可直接使用
- **可以并行查询**多个用户以提高效率
## 功能列表
### 1. 搜索用户 (search-user)
根据姓名搜索用户,返回匹配的 UserId 列表。
```bash
python scripts/search_user.py "<搜索关键词>"
```
输出:
```json
{
"success": true,
"keyword": "张三",
"totalCount": 3,
"hasMore": false,
"userIds": ["123456789", "987654321"]
}
```
### 2. 查询用户详情 (get-user)
获取指定用户的详细信息。
```bash
python scripts/get_user.py "<userId>"
```
输出:
```json
{
"success": true,
"user": {
"userid": "user001",
"name": "张三",
"mobile": "138****1234",
"dept_id_list": [12345],
"unionid": "xxxxx"
}
}
```
### 3. 根据手机号查询用户 (get-user-by-mobile)
```bash
python scripts/get_user_by_mobile.py "<手机号>"
```
输出:
```json
{ "success": true, "mobile": "13800138000", "userId": "user001" }
```
### 4. 根据 unionid 查询用户 (get-user-by-unionid)
```bash
python scripts/get_user_by_unionid.py "<unionid>"
```
输出:
```json
{ "success": true, "unionid": "xxxxx", "userId": "user001" }
```
### 5. 获取员工人数 (get-user-count)
```bash
python scripts/get_user_count.py [--onlyActive]
```
输出:
```json
{ "success": true, "onlyActive": false, "count": 150 }
```
### 6. 获取用户待审批数量 (get-user-todo-count)
```bash
python scripts/get_user_todo_count.py "<userId>"
```
输出:
```json
{ "success": true, "userId": "user001", "count": 5 }
```
### 7. 获取未登录用户列表 (list-inactive-users)
```bash
python scripts/list_inactive_users.py "<queryDate>" [--deptIds "id1,id2"] [--offset 0] [--size 100]
```
queryDate 格式: yyyyMMdd
输出:
```json
{ "success": true, "queryDate": "20240115", "userIds": ["user001"], "hasMore": false }
```
### 8. 查询离职记录列表 (list-resigned-users)
```bash
python scripts/list_resigned_users.py "<startTime>" ["<endTime>"] [--nextToken "xxx"] [--maxResults 100]
```
startTime/endTime 格式: ISO8601
输出:
```json
{
"success": true,
"startTime": "2024-01-01T00:00:00+08:00",
"records": [{ "userId": "user001", "name": "张三", "leaveTime": "2024-01-15T10:00:00Z" }]
}
```
### 9. 搜索部门 (search-department)
```bash
python scripts/search_department.py "<搜索关键词>"
```
输出:
```json
{ "success": true, "keyword": "技术部", "totalCount": 2, "departmentIds": [12345, 67890] }
```
### 10. 获取部门详情 (get-department)
```bash
python scripts/get_department.py "<deptId>"
```
输出:
```json
{ "success": true, "department": { "deptId": 12345, "name": "技术部", "parentId": 1 } }
```
### 11. 获取子部门列表 (list-sub-departments)
根部门 deptId = 1。
```bash
python scripts/list_sub_departments.py "<deptId>"
```
输出:
```json
{ "success": true, "deptId": 1, "subDepartmentIds": [12345, 67890] }
```
### 12. 获取部门用户列表 (list-department-users)
自动分页获取所有用户(简略信息)。
```bash
python scripts/list_department_users.py "<deptId>"
```
输出:
```json
{
"success": true,
"deptId": 12345,
"users": [{ "userId": "user001", "name": "张三" }, { "userId": "user002", "name": "李四" }]
}
```
### 13. 获取部门用户详情 (list-department-user-details)
分页获取,支持 cursor 和 size。
```bash
python scripts/list_department_user_details.py "<deptId>" [--cursor 0] [--size 100]
```
输出:
```json
{ "success": true, "deptId": 12345, "users": [...], "hasMore": true, "nextCursor": 100 }
```
### 14. 获取部门用户 ID 列表 (list-department-user-ids)
```bash
python scripts/list_department_user_ids.py "<deptId>"
```
输出:
```json
{ "success": true, "deptId": 12345, "userIds": ["user001", "user002"] }
```
### 15. 获取部门父部门链 (list-department-parents)
```bash
python scripts/list_department_parents.py "<deptId>"
```
输出:
```json
{ "success": true, "deptId": 12345, "parentIdList": [12345, 67890, 1] }
```
### 16. 获取用户所属部门父部门链 (list-user-parent-departments)
```bash
python scripts/list_user_parent_departments.py "<userId>"
```
输出:
```json
{ "success": true, "userId": "user001", "parentIdList": [12345, 1] }
```
### 17. 获取群内机器人列表 (get-bot-list)
```bash
python scripts/get_bot_list.py "<openConversationId>"
```
输出:
```json
{
"success": true,
"openConversationId": "cid",
"botList": [{ "robotCode": "code", "robotName": "name" }]
}
```
### 18. 机器人发送群消息 (send-group-message)
robotCode 自动从环境变量 `DINGTALK_ROBOT_CODE` 读取,也可作为第3个参数手动传入。
```bash
python scripts/send_group_message.py "<openConversationId>" "<消息内容>" ["<robotCode>"]
```
输出:
```json
{ "success": true, "openConversationId": "cid", "robotCode": "code", "processQueryKey": "key", "message": "消息内容" }
```
### 19. 机器人发送单聊消息 (send-user-message)
robotCode 自动从环境变量 `DINGTALK_ROBOT_CODE` 读取,也可作为第3个参数手动传入。
```bash
python scripts/send_user_message.py "<userId>" "<消息内容>" ["<robotCode>"]
```
输出:
```json
{ "success": true, "userId": "user001", "robotCode": "code", "processQueryKey": "key", "message": "消息内容" }
```
### 20. 获取审批实例 ID 列表 (list-approval-instance-ids)
```bash
python scripts/list_approval_instance_ids.py "<processCode>" --startTime <timestamp> --endTime <timestamp> [--size 20] [--nextToken "xxx"]
```
输出:
```json
{ "success": true, "processCode": "PROC-XXX", "instanceIds": ["id1", "id2"], "totalCount": 2, "hasMore": false }
```
### 21. 获取审批实例详情 (get-approval-instance)
```bash
python scripts/get_approval_instance.py "<instanceId>"
```
输出:
```json
{
"success": true,
"instanceId": "xxx-123",
"instance": {
"processInstanceId": "xxx-123",
"title": "请假申请",
"status": "COMPLETED",
"formComponentValues": [...],
"tasks": [...]
}
}
```
### 22. 查询用户发起的审批 (list-user-initiated-approvals)
```bash
python scripts/list_user_initiated_approvals.py "<userId>" [--startTime <ts>] [--endTime <ts>] [--maxResults 20]
```
输出:
```json
{ "success": true, "userId": "user001", "instances": [...], "totalCount": 5, "hasMore": false }
```
### 23. 查询用户抄送的审批 (list-user-cc-approvals)
```bash
python scripts/list_user_cc_approvals.py "<userId>" [--startTime <ts>] [--endTime <ts>] [--maxResults 20]
```
### 24. 查询用户待审批实例 (list-user-todo-approvals)
```bash
python scripts/list_user_todo_approvals.py "<userId>" [--maxResults 20]
```
输出:
```json
{ "success": true, "userId": "user001", "instances": [...], "totalCount": 3, "hasMore": false }
```
### 25. 查询用户已审批实例 (list-user-done-approvals)
```bash
python scripts/list_user_done_approvals.py "<userId>" [--startTime <ts>] [--endTime <ts>] [--maxResults 20]
```
### 26. 发起审批实例 (create-approval-instance)
```bash
python scripts/create_approval_instance.py "<processCode>" "<originatorUserId>" "<deptId>" '<formValuesJson>' [--ccList "user1,user2"]
```
formValuesJson 示例: `'[{"name":"标题","value":"请假申请"}]'`
输出:
```json
{ "success": true, "processCode": "PROC-XXX", "originatorUserId": "user001", "instanceId": "xxx-new" }
```
### 27. 撤销审批实例 (terminate-approval-instance)
```bash
python scripts/terminate_approval_instance.py "<instanceId>" "<operatingUserId>" ["<remark>"]
```
输出:
```json
{ "success": true, "instanceId": "xxx-123", "message": "审批实例已撤销" }
```
### 28. 执行审批任务 (execute-approval-task)
同意或拒绝审批任务。
```bash
python scripts/execute_approval_task.py "<instanceId>" "<userId>" "<agree|refuse>" [--taskId "xxx"] [--remark "审批意见"]
```
输出:
```json
{ "success": true, "instanceId": "xxx-123", "userId": "user001", "action": "agree", "message": "已同意审批" }
```
### 29. 转交审批任务 (transfer-approval-task)
```bash
python scripts/transfer_approval_task.py "<instanceId>" "<userId>" "<transferToUserId>" [--taskId "xxx"] [--remark "转交原因"]
```
输出:
```json
{ "success": true, "instanceId": "xxx-123", "userId": "user001", "transferToUserId": "user002", "message": "审批任务已转交" }
```
### 30. 添加审批评论 (add-approval-comment)
```bash
python scripts/add_approval_comment.py "<instanceId>" "<commentUserId>" "<评论内容>"
```
输出:
```json
{ "success": true, "instanceId": "xxx-123", "userId": "user001", "message": "评论已添加" }
```
### 31. 创建即时视频会议 (create-video-conference)
立即创建视频会议并邀请参会人。
```bash
python scripts/create_video_conference.py "<会议主题>" "<发起人unionId>" "[邀请人unionId1,unionId2]"
```
输出:
```json
{ "success": true, "title": "测试会议", "conferenceId": "xxx", "conferencePassword": "123456" }
```
### 32. 关闭视频会议 (close-video-conference)
```bash
python scripts/close_video_conference.py "<conferenceId>" "<操作人unionId>"
```
输出:
```json
{ "success": true, "conferenceId": "xxx", "message": "视频会议已关闭" }
```
### 33. 创建预约会议 (create-schedule-conference)
通过日历 API 创建预约会议,自动关联钉钉视频会议,日程会出现在钉钉日历中。
```bash
python scripts/create_schedule_conference.py "<会议主题>" "<创建人unionId>" "<开始时间>" "<结束时间>" "[参会人unionId1,unionId2]" "[会议地点]"
```
时间格式: `"2026-03-16 14:00"` 或 ISO 8601
输出:
```json
{
"success": true,
"title": "周会",
"eventId": "NXZCUEtxOGZMN3JpcDQ3ZE45UVRFdz09",
"onlineMeetingUrl": "dingtalk://...",
"conferenceId": "xxx",
"startTime": "2026-03-16T14:00:00+08:00",
"endTime": "2026-03-16T15:00:00+08:00",
"attendeeCount": 2
}
```
### 34. 取消预约会议 (cancel-schedule-conference)
```bash
python scripts/cancel_schedule_conference.py "<scheduleConferenceId>" "<创建人unionId>"
```
输出:
```json
{ "success": true, "scheduleConferenceId": "xxx", "message": "预约会议已取消" }
```
### 35. 查询日程列表 (list-events)
```bash
python scripts/list_events.py "<用户unionId>" [--time-min "2026-03-01 00:00"] [--time-max "2026-03-31 23:59"]
```
输出:
```json
{
"success": true,
"totalCount": 5,
"events": [{ "id": "eventId", "summary": "周会", "start": {...}, "end": {...} }]
}
```
### 36. 查询日程详情 (get-event)
```bash
python scripts/get_event.py "<用户unionId>" "<eventId>"
```
输出:
```json
{
"success": true,
"event": { "id": "eventId", "summary": "周会", "attendees": [...], "onlineMeetingInfo": {...} }
}
```
### 37. 删除日程 (delete-event)
```bash
python scripts/delete_event.py "<用户unionId>" "<eventId>" [--push-notification]
```
输出:
```json
{ "success": true, "eventId": "xxx", "message": "日程已删除" }
```
### 38. 添加日程参与者 (add-event-attendee)
```bash
python scripts/add_event_attendee.py "<用户unionId>" "<eventId>" "<参与者unionId1,unionId2>"
```
输出:
```json
{ "success": true, "eventId": "xxx", "addedCount": 2, "message": "已添加 2 位参与者" }
```
### 39. 移除日程参与者 (remove-event-attendee)
```bash
python scripts/remove_event_attendee.py "<用户unionId>" "<eventId>" "<参与者unionId1,unionId2>"
```
输出:
```json
{ "success": true, "eventId": "xxx", "removedCount": 1, "message": "已移除 1 位参与者" }
```
### 40. 获取知识库列表 (list-workspaces)
获取用户能访问的所有知识库。
```bash
python scripts/list_workspaces.py "<操作人unionId>"
```
输出:
```json
{
"success": true,
"totalCount": 2,
"workspaces": [
{ "workspaceId": "xxx", "name": "技术部知识库", "type": "TEAM", "url": "https://...", "rootNodeId": "yyy" }
]
}
```
### 41. 创建知识库文档 (create-doc)
在指定知识库中创建新文档。
```bash
python scripts/create_doc.py "<workspaceId>" "<文档名>" "<操作人unionId>" ["<docType>"]
```
docType 可选值:`alidoc`(钉钉文档,默认)、`alisheet`(表格)、`alinote`(笔记)
输出:
```json
{
"success": true,
"name": "周报",
"docType": "alidoc",
"workspaceId": "xxx",
"nodeId": "yyy",
"docKey": "zzz",
"url": "https://..."
}
```
### 42. 搜索知识库文档 (search-doc)
根据文档名关键词搜索知识库文档,返回文档链接。
```bash
python scripts/search_doc.py "<操作人unionId>" "<文档名关键词>" ["<workspaceId>"]
```
不指定 workspaceId 时搜索所有知识库。
输出:
```json
{
"success": true,
"keyword": "周报",
"totalCount": 3,
"documents": [
{ "name": "3月第2周周报", "nodeId": "xxx", "url": "https://...", "category": "ALIDOC", "workspaceName": "技术部知识库" }
]
}
```
### 43. 覆写文档内容 (overwrite-doc)
覆写知识库文档的全部内容(全量替换,非追加)。
```bash
python scripts/overwrite_doc.py "<workspaceId>" "<nodeId>" "<操作人unionId>" "<内容>"
```
输出:
```json
{ "success": true, "workspaceId": "xxx", "nodeId": "yyy", "message": "文档内容已覆写" }
```
## 错误处理
所有脚本在错误时返回统一格式:
```json
{
"success": false,
"error": {
"code": "ERROR_CODE",
"message": "错误描述"
}
}
```
常见错误码:
- `MISSING_CREDENTIALS` - 未设置环境变量
- `INVALID_ARGS` - 参数不足
- `UNKNOWN_ERROR` - API 调用异常
## 重要说明
- `userId` 是企业内部用户 ID,`unionId` 是全局唯一标识
- 会议、日程、知识库相关的 API 使用 `unionId`,可通过 get-user 查询获取
- 根部门 deptId 为 1
- 知识库 `workspaceId` 通过 list-workspaces 获取,`nodeId` 通过 search-doc 获取
FILE:package.json
{
"name": "ding-skills",
"version": "2.3.1",
"description": "钉钉全功能技能集 - 用户管理、部门管理、消息发送、OA审批、会议与日程管理、知识库文档管理",
"scripts": {
"search-user": "python scripts/search_user.py",
"get-user": "python scripts/get_user.py",
"get-user-by-mobile": "python scripts/get_user_by_mobile.py",
"get-user-by-unionid": "python scripts/get_user_by_unionid.py",
"get-user-count": "python scripts/get_user_count.py",
"get-user-todo-count": "python scripts/get_user_todo_count.py",
"list-inactive-users": "python scripts/list_inactive_users.py",
"list-resigned-users": "python scripts/list_resigned_users.py",
"search-department": "python scripts/search_department.py",
"get-department": "python scripts/get_department.py",
"list-sub-departments": "python scripts/list_sub_departments.py",
"list-department-users": "python scripts/list_department_users.py",
"list-department-user-details": "python scripts/list_department_user_details.py",
"list-department-user-ids": "python scripts/list_department_user_ids.py",
"list-department-parents": "python scripts/list_department_parents.py",
"list-user-parent-departments": "python scripts/list_user_parent_departments.py",
"get-bot-list": "python scripts/get_bot_list.py",
"send-group-message": "python scripts/send_group_message.py",
"send-user-message": "python scripts/send_user_message.py",
"list-approval-instance-ids": "python scripts/list_approval_instance_ids.py",
"get-approval-instance": "python scripts/get_approval_instance.py",
"list-user-initiated-approvals": "python scripts/list_user_initiated_approvals.py",
"list-user-cc-approvals": "python scripts/list_user_cc_approvals.py",
"list-user-todo-approvals": "python scripts/list_user_todo_approvals.py",
"list-user-done-approvals": "python scripts/list_user_done_approvals.py",
"create-approval-instance": "python scripts/create_approval_instance.py",
"terminate-approval-instance": "python scripts/terminate_approval_instance.py",
"execute-approval-task": "python scripts/execute_approval_task.py",
"transfer-approval-task": "python scripts/transfer_approval_task.py",
"add-approval-comment": "python scripts/add_approval_comment.py",
"create-video-conference": "python scripts/create_video_conference.py",
"close-video-conference": "python scripts/close_video_conference.py",
"create-schedule-conference": "python scripts/create_schedule_conference.py",
"cancel-schedule-conference": "python scripts/cancel_schedule_conference.py",
"list-events": "python scripts/list_events.py",
"get-event": "python scripts/get_event.py",
"delete-event": "python scripts/delete_event.py",
"add-event-attendee": "python scripts/add_event_attendee.py",
"remove-event-attendee": "python scripts/remove_event_attendee.py",
"list-workspaces": "python scripts/list_workspaces.py",
"create-doc": "python scripts/create_doc.py",
"search-doc": "python scripts/search_doc.py",
"overwrite-doc": "python scripts/overwrite_doc.py"
},
"keywords": [
"dingtalk",
"api",
"skill",
"meeting",
"calendar",
"approval",
"knowledge-base",
"wiki"
],
"license": "MIT"
}
FILE:scripts/__init__.py
FILE:scripts/add_approval_comment.py
"""添加审批评论
用法: python scripts/add_approval_comment.py "<instanceId>" "<commentUserId>" "<评论内容>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 3:
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/add_approval_comment.py \"<instanceId>\" \"<commentUserId>\" \"<评论内容>\""}})
sys.exit(1)
instance_id = args[0]
comment_user_id = args[1]
text = args[2]
try:
token = get_access_token()
print("正在添加审批评论...", file=sys.stderr)
api_request("POST", "/workflow/processInstances/comments", token, json_body={
"processInstanceId": instance_id,
"commentUserId": comment_user_id,
"text": text,
})
output({"success": True, "instanceId": instance_id, "userId": comment_user_id, "message": "评论已添加"})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/add_event_attendee.py
"""添加日程参与者
用法: python scripts/add_event_attendee.py "<用户unionId>" "<eventId>" "<参与者unionId1,unionId2,...>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 4:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/add_event_attendee.py \"<用户unionId>\" \"<eventId>\" \"<参与者unionId1,unionId2,...>\"",
}
})
sys.exit(1)
union_id = sys.argv[1]
event_id = sys.argv[2]
attendee_ids = sys.argv[3].split(",")
try:
token = get_access_token()
print("正在添加参与者...", file=sys.stderr)
body = {
"attendeesToAdd": [{"id": aid, "isOptional": False} for aid in attendee_ids],
}
api_request(
"POST",
f"/calendar/users/{union_id}/calendars/primary/events/{event_id}/attendees",
token,
json_body=body,
)
output({
"success": True,
"eventId": event_id,
"addedCount": len(attendee_ids),
"message": f"已添加 {len(attendee_ids)} 位参与者",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/cancel_schedule_conference.py
"""取消预约会议
用法: python scripts/cancel_schedule_conference.py "<scheduleConferenceId>" "<创建人unionId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 3:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/cancel_schedule_conference.py \"<scheduleConferenceId>\" \"<创建人unionId>\"",
}
})
sys.exit(1)
schedule_conference_id = sys.argv[1]
creator_union_id = sys.argv[2]
try:
token = get_access_token()
print("正在取消预约会议...", file=sys.stderr)
body = {
"scheduleConferenceId": schedule_conference_id,
"creatorUnionId": creator_union_id,
}
api_request("POST", "/conference/scheduleConferences/cancel", token, json_body=body)
output({
"success": True,
"scheduleConferenceId": schedule_conference_id,
"message": "预约会议已取消",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/close_video_conference.py
"""关闭视频会议
用法: python scripts/close_video_conference.py "<conferenceId>" "<操作人unionId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 3:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/close_video_conference.py \"<conferenceId>\" \"<操作人unionId>\"",
}
})
sys.exit(1)
conference_id = sys.argv[1]
union_id = sys.argv[2]
try:
token = get_access_token()
print("正在关闭视频会议...", file=sys.stderr)
body = {"unionId": union_id}
api_request("DELETE", f"/conference/videoConferences/{conference_id}", token, json_body=body)
output({
"success": True,
"conferenceId": conference_id,
"message": "视频会议已关闭",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/create_approval_instance.py
"""发起审批实例
用法: python scripts/create_approval_instance.py "<processCode>" "<originatorUserId>" "<deptId>" '<formValuesJson>' [--ccList "user1,user2"]
formValuesJson 示例: '[{"name":"标题","value":"请假申请"}]'
"""
import sys
import os
import json as json_mod
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"process_code": None, "originator_user_id": None, "dept_id": None, "form_values_json": None, "cc_list": None}
positional = []
i = 1
while i < len(argv):
if argv[i] == "--ccList" and i + 1 < len(argv):
args["cc_list"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--"):
positional.append(argv[i])
i += 1
else:
i += 1
if len(positional) >= 1:
args["process_code"] = positional[0]
if len(positional) >= 2:
args["originator_user_id"] = positional[1]
if len(positional) >= 3:
args["dept_id"] = positional[2]
if len(positional) >= 4:
args["form_values_json"] = positional[3]
return args
def main():
args = parse_args(sys.argv)
if not all([args["process_code"], args["originator_user_id"], args["dept_id"], args["form_values_json"]]):
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/create_approval_instance.py \"<processCode>\" \"<originatorUserId>\" \"<deptId>\" '<formValuesJson>' [--ccList \"user1,user2\"]"}})
sys.exit(1)
try:
form_values = json_mod.loads(args["form_values_json"])
except json_mod.JSONDecodeError:
output({"success": False, "error": {"code": "INVALID_JSON", "message": "formValuesJson 参数不是有效的 JSON 字符串"}})
sys.exit(1)
try:
token = get_access_token()
print("正在发起审批实例...", file=sys.stderr)
body = {
"processCode": args["process_code"],
"originatorUserId": args["originator_user_id"],
"deptId": args["dept_id"],
"formComponentValues": form_values,
}
if args["cc_list"]:
body["ccList"] = args["cc_list"]
result = api_request("POST", "/workflow/processInstances", token, json_body=body)
output({
"success": True,
"processCode": args["process_code"],
"originatorUserId": args["originator_user_id"],
"instanceId": result.get("result", {}).get("processInstanceId", ""),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/create_doc.py
"""在知识库中创建文档
用法: python scripts/create_doc.py "<workspaceId>" "<文档名>" "<操作人unionId>" ["<docType>"]
docType 默认为 alidoc(钉钉文档),可选:alidoc, alinote, alisheet
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 3:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/create_doc.py \"<workspaceId>\" \"<文档名>\" \"<操作人unionId>\" [\"<docType>\"]"}})
sys.exit(1)
workspace_id = args[0]
doc_name = args[1]
operator_id = args[2]
doc_type = args[3] if len(args) >= 4 else "alidoc"
try:
token = get_access_token()
print(f"正在创建文档: {doc_name}...", file=sys.stderr)
result = api_request("POST", f"/doc/workspaces/{workspace_id}/docs", token, json_body={
"name": doc_name,
"docType": doc_type,
"operatorId": operator_id,
})
output({
"success": True,
"name": doc_name,
"docType": doc_type,
"workspaceId": result.get("workspaceId", workspace_id),
"nodeId": result.get("nodeId"),
"docKey": result.get("docKey"),
"url": result.get("url"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/create_schedule_conference.py
"""创建预约会议(通过日历 API 创建带视频会议的日程)
用法: python scripts/create_schedule_conference.py "<会议主题>" "<创建人unionId>" "<开始时间>" "<结束时间>" "[参会人unionId1,unionId2,...]" "[会议地点]"
时间格式: "2026-03-16 14:00" 或 ISO 8601
"""
import sys
import os
from datetime import datetime, timezone, timedelta
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
CST = timezone(timedelta(hours=8))
def parse_time(time_str):
for fmt in ("%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"):
try:
dt = datetime.strptime(time_str, fmt)
dt = dt.replace(tzinfo=CST)
return dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
except ValueError:
continue
return time_str
def main():
if len(sys.argv) < 5:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/create_schedule_conference.py \"<主题>\" \"<创建人unionId>\" \"<开始时间>\" \"<结束时间>\" \"[参会人unionId1,unionId2,...]\" \"[会议地点]\"",
}
})
sys.exit(1)
title = sys.argv[1]
creator_union_id = sys.argv[2]
start_time = parse_time(sys.argv[3])
end_time = parse_time(sys.argv[4])
invite_union_ids = sys.argv[5].split(",") if len(sys.argv) > 5 else []
location = sys.argv[6] if len(sys.argv) > 6 else ""
try:
token = get_access_token()
print("正在创建预约会议...", file=sys.stderr)
attendees = [{"id": uid, "isOptional": False} for uid in invite_union_ids]
body = {
"summary": title,
"start": {"dateTime": start_time, "timeZone": "Asia/Shanghai"},
"end": {"dateTime": end_time, "timeZone": "Asia/Shanghai"},
"isAllDay": False,
"onlineMeetingInfo": {"type": "dingtalk"},
"attendees": attendees,
}
if location:
body["location"] = {"displayName": location}
result = api_request(
"POST",
f"/calendar/users/{creator_union_id}/calendars/primary/events",
token,
json_body=body,
)
online_info = result.get("onlineMeetingInfo", {})
resp_data = {
"success": True,
"title": title,
"eventId": result.get("id"),
"onlineMeetingUrl": online_info.get("extraInfo", {}).get("url", ""),
"conferenceId": online_info.get("conferenceId", ""),
"startTime": start_time,
"endTime": end_time,
"attendeeCount": len(invite_union_ids),
}
if location:
resp_data["location"] = location
output(resp_data)
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/create_video_conference.py
"""创建即时视频会议
用法: python scripts/create_video_conference.py "<会议主题>" "<发起人unionId>" "[邀请人unionId1,unionId2,...]"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 3:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/create_video_conference.py \"<会议主题>\" \"<发起人unionId>\" \"[邀请人unionId1,unionId2,...]\"",
}
})
sys.exit(1)
conf_title = sys.argv[1]
user_id = sys.argv[2]
invite_user_ids = sys.argv[3].split(",") if len(sys.argv) > 3 else []
try:
token = get_access_token()
print("正在创建视频会议...", file=sys.stderr)
body = {
"userId": user_id,
"confTitle": conf_title,
"inviteCaller": True,
"inviteUserIds": invite_user_ids,
}
result = api_request("POST", "/conference/videoConferences", token, json_body=body)
output({
"success": True,
"title": conf_title,
"conferenceId": result.get("conferenceId"),
"conferencePassword": result.get("conferencePassword"),
"hostPassword": result.get("hostPassword"),
"phoneNumbers": result.get("phoneNumbers"),
"externalLinkUrl": result.get("externalLinkUrl"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/delete_event.py
"""删除日程
用法: python scripts/delete_event.py "<用户unionId>" "<eventId>" [--push-notification]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 3:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/delete_event.py \"<用户unionId>\" \"<eventId>\" [--push-notification]",
}
})
sys.exit(1)
union_id = sys.argv[1]
event_id = sys.argv[2]
push_notification = "--push-notification" in sys.argv
try:
token = get_access_token()
print("正在删除日程...", file=sys.stderr)
params = {}
if push_notification:
params["pushNotification"] = "true"
api_request(
"DELETE",
f"/calendar/users/{union_id}/calendars/primary/events/{event_id}",
token,
params=params,
)
output({
"success": True,
"eventId": event_id,
"message": "日程已删除",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/dingtalk_client.py
"""钉钉 API 客户端 - 封装认证和 HTTP 请求,支持 OpenAPI 和 TOP API"""
import os
import sys
import json
import requests
API_BASE = "https://api.dingtalk.com"
OPEN_API_BASE = f"{API_BASE}/v1.0"
TOP_API_BASE = "https://oapi.dingtalk.com"
def get_credentials():
app_key = os.environ.get("DINGTALK_APP_KEY")
app_secret = os.environ.get("DINGTALK_APP_SECRET")
if not app_key or not app_secret:
print(json.dumps({
"success": False,
"error": {
"code": "MISSING_CREDENTIALS",
"message": "缺少钉钉应用凭证,请设置环境变量 DINGTALK_APP_KEY 和 DINGTALK_APP_SECRET",
}
}, ensure_ascii=False, indent=2))
sys.exit(1)
return app_key, app_secret
def get_access_token():
app_key, app_secret = get_credentials()
print("正在获取 access_token...", file=sys.stderr)
resp = requests.post(f"{OPEN_API_BASE}/oauth2/accessToken", json={
"appKey": app_key,
"appSecret": app_secret,
})
data = resp.json()
token = data.get("accessToken")
if not token:
print(json.dumps({
"success": False,
"error": {
"code": data.get("code", "UNKNOWN"),
"message": data.get("message", "获取 access_token 失败"),
}
}, ensure_ascii=False, indent=2))
sys.exit(1)
print("access_token 获取成功", file=sys.stderr)
return token
def api_request(method, path, token, json_body=None, params=None, api_version="v1.0"):
"""OpenAPI 请求 (api.dingtalk.com),通过 header 传递 token"""
headers = {"x-acs-dingtalk-access-token": token}
url = f"{API_BASE}/{api_version}{path}"
resp = requests.request(method, url, headers=headers, json=json_body, params=params)
if resp.status_code >= 400:
data = resp.json() if resp.text else {}
raise ApiError(
code=data.get("code", f"HTTP_{resp.status_code}"),
message=data.get("message", resp.text),
request_id=data.get("requestid"),
)
if resp.status_code == 204 or not resp.text:
return {}
return resp.json()
def top_api_request(method, path, token, json_body=None):
"""TOP API 请求 (oapi.dingtalk.com),通过 query 参数传递 token,响应格式带 errcode/result"""
url = f"{TOP_API_BASE}{path}"
params = {"access_token": token}
resp = requests.request(method, url, params=params, json=json_body)
if resp.status_code >= 400:
data = resp.json() if resp.text else {}
raise ApiError(
code=data.get("code", f"HTTP_{resp.status_code}"),
message=data.get("message", resp.text),
)
data = resp.json()
if data.get("errcode", 0) != 0:
raise ApiError(
code=str(data.get("errcode")),
message=data.get("errmsg", "未知错误"),
)
return data
class ApiError(Exception):
def __init__(self, code, message, request_id=None):
self.code = code
self.api_message = message
self.request_id = request_id
super().__init__(f"{code}: {message}")
def handle_error(e):
if isinstance(e, ApiError):
print(json.dumps({
"success": False,
"error": {
"code": e.code,
"message": e.api_message,
"requestId": e.request_id,
}
}, ensure_ascii=False, indent=2))
else:
print(json.dumps({
"success": False,
"error": {
"code": "UNKNOWN_ERROR",
"message": str(e),
}
}, ensure_ascii=False, indent=2))
def get_robot_code(arg_value=None):
"""获取 robotCode:优先使用传入参数,其次读取环境变量 DINGTALK_ROBOT_CODE"""
code = arg_value or os.environ.get("DINGTALK_ROBOT_CODE")
if not code:
print(json.dumps({
"success": False,
"error": {
"code": "MISSING_ROBOT_CODE",
"message": "缺少 robotCode,请通过命令行参数传入或设置环境变量 DINGTALK_ROBOT_CODE",
}
}, ensure_ascii=False, indent=2))
sys.exit(1)
return code
def output(data):
print(json.dumps(data, ensure_ascii=False, indent=2))
FILE:scripts/execute_approval_task.py
"""执行审批任务(同意/拒绝)
用法: python scripts/execute_approval_task.py "<instanceId>" "<userId>" "<agree|refuse>" [--taskId "xxx"] [--remark "审批意见"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"instance_id": None, "user_id": None, "result": None, "task_id": None, "remark": None}
positional = []
i = 1
while i < len(argv):
if argv[i] == "--taskId" and i + 1 < len(argv):
args["task_id"] = argv[i + 1]
i += 2
elif argv[i] == "--remark" and i + 1 < len(argv):
args["remark"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--"):
positional.append(argv[i])
i += 1
else:
i += 1
if len(positional) >= 1:
args["instance_id"] = positional[0]
if len(positional) >= 2:
args["user_id"] = positional[1]
if len(positional) >= 3:
args["result"] = positional[2]
return args
def main():
args = parse_args(sys.argv)
if not all([args["instance_id"], args["user_id"], args["result"]]):
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/execute_approval_task.py \"<instanceId>\" \"<userId>\" \"<agree|refuse>\" [--taskId \"xxx\"] [--remark \"审批意见\"]"}})
sys.exit(1)
if args["result"] not in ("agree", "refuse"):
output({"success": False, "error": {"code": "INVALID_RESULT", "message": "审批结果必须是 agree(同意)或 refuse(拒绝)"}})
sys.exit(1)
try:
token = get_access_token()
print("正在执行审批任务...", file=sys.stderr)
body = {
"processInstanceId": args["instance_id"],
"actionerUserId": args["user_id"],
"result": args["result"],
"remark": args["remark"] or "",
}
if args["task_id"]:
body["taskId"] = args["task_id"]
result = api_request("POST", "/workflow/processInstances/tasks/execute", token, json_body=body)
output({
"success": result.get("result", {}).get("success", False),
"instanceId": args["instance_id"],
"userId": args["user_id"],
"action": args["result"],
"message": "已同意审批" if args["result"] == "agree" else "已拒绝审批",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_approval_instance.py
"""获取审批实例详情
用法: python scripts/get_approval_instance.py "<instanceId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/get_approval_instance.py \"<instanceId>\""}})
sys.exit(1)
instance_id = args[0]
try:
token = get_access_token()
print("正在查询审批实例详情...", file=sys.stderr)
result = api_request("GET", "/workflow/processInstances", token, params={"processInstanceId": instance_id})
output({"success": True, "instanceId": instance_id, "instance": result.get("result", {})})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_bot_list.py
"""获取群内机器人列表
用法: python scripts/get_bot_list.py "<openConversationId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/get_bot_list.py \"<openConversationId>\""}})
sys.exit(1)
open_conversation_id = args[0]
try:
token = get_access_token()
print("正在获取群内机器人列表...", file=sys.stderr)
result = api_request("POST", "/robot/getBotListInGroup", token, json_body={
"openConversationId": open_conversation_id,
})
output({
"success": True,
"openConversationId": open_conversation_id,
"botList": result.get("chatbotInstanceVOList", []),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_department.py
"""获取部门详情
用法: python scripts/get_department.py "<deptId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/get_department.py \"<deptId>\""}})
sys.exit(1)
dept_id = int(args[0])
try:
token = get_access_token()
print("正在获取部门详情...", file=sys.stderr)
result = top_api_request("POST", "/topapi/v2/department/get", token, json_body={"dept_id": dept_id})
output({"success": True, "department": result.get("result", {})})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_event.py
"""查询日程详情
用法: python scripts/get_event.py "<用户unionId>" "<eventId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 3:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/get_event.py \"<用户unionId>\" \"<eventId>\"",
}
})
sys.exit(1)
union_id = sys.argv[1]
event_id = sys.argv[2]
try:
token = get_access_token()
print("正在查询日程详情...", file=sys.stderr)
result = api_request(
"GET",
f"/calendar/users/{union_id}/calendars/primary/events/{event_id}",
token,
)
output({
"success": True,
"event": {
"id": result.get("id"),
"summary": result.get("summary"),
"description": result.get("description"),
"start": result.get("start"),
"end": result.get("end"),
"status": result.get("status"),
"isAllDay": result.get("isAllDay"),
"location": result.get("location"),
"organizer": result.get("organizer"),
"attendees": result.get("attendees"),
"onlineMeetingInfo": result.get("onlineMeetingInfo"),
"recurrence": result.get("recurrence"),
"createTime": result.get("createTime"),
"updateTime": result.get("updateTime"),
},
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_user.py
"""查询用户详情
用法: python scripts/get_user.py "<userId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
if len(sys.argv) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/get_user.py \"<userId>\""}})
sys.exit(1)
user_id = sys.argv[1]
try:
token = get_access_token()
print("正在查询用户详情...", file=sys.stderr)
result = top_api_request("POST", "/topapi/v2/user/get", token, json_body={
"userid": user_id, "language": "zh_CN",
})
output({"success": True, "user": result.get("result", {})})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_user_by_mobile.py
"""根据手机号查询用户
用法: python scripts/get_user_by_mobile.py "<手机号>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
if len(sys.argv) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/get_user_by_mobile.py \"<手机号>\""}})
sys.exit(1)
mobile = sys.argv[1]
try:
token = get_access_token()
print("正在根据手机号查询用户...", file=sys.stderr)
result = top_api_request("POST", "/topapi/v2/user/getbymobile", token, json_body={"mobile": mobile})
output({"success": True, "mobile": mobile, "userId": result.get("result", {}).get("userid")})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_user_by_unionid.py
"""根据 unionid 查询用户 userId
用法: python scripts/get_user_by_unionid.py "<unionid>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
if len(sys.argv) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/get_user_by_unionid.py \"<unionid>\""}})
sys.exit(1)
unionid = sys.argv[1]
try:
token = get_access_token()
print("正在根据 unionid 查询用户...", file=sys.stderr)
result = top_api_request("POST", "/topapi/user/getbyunionid", token, json_body={"unionid": unionid})
output({"success": True, "unionid": unionid, "userId": result.get("result", {}).get("userid")})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_user_count.py
"""获取员工人数
用法: python scripts/get_user_count.py [--onlyActive]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
only_active = "--onlyActive" in sys.argv
try:
token = get_access_token()
print("正在获取员工人数...", file=sys.stderr)
result = top_api_request("POST", "/topapi/user/count", token, json_body={"only_active": only_active})
output({"success": True, "onlyActive": only_active, "count": result.get("result", {}).get("count", 0)})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/get_user_todo_count.py
"""获取用户待审批数量
用法: python scripts/get_user_todo_count.py "<userId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/get_user_todo_count.py \"<userId>\""}})
sys.exit(1)
user_id = args[0]
try:
token = get_access_token()
print("正在查询用户待审批数量...", file=sys.stderr)
result = api_request("GET", "/workflow/processes/todoTasks/numbers", token, params={"userId": user_id})
output({"success": True, "userId": user_id, "count": result.get("result", {}).get("count", 0)})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_approval_instance_ids.py
"""获取审批实例 ID 列表
用法: python scripts/list_approval_instance_ids.py "<processCode>" --startTime <timestamp> --endTime <timestamp> [--size 20] [--nextToken "xxx"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"process_code": None, "start_time": 0, "end_time": 0, "size": 20, "next_token": None}
i = 1
while i < len(argv):
if argv[i] == "--startTime" and i + 1 < len(argv):
args["start_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--endTime" and i + 1 < len(argv):
args["end_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--size" and i + 1 < len(argv):
args["size"] = int(argv[i + 1])
i += 2
elif argv[i] == "--nextToken" and i + 1 < len(argv):
args["next_token"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--") and args["process_code"] is None:
args["process_code"] = argv[i]
i += 1
else:
i += 1
return args
def main():
args = parse_args(sys.argv)
if not args["process_code"] or not args["start_time"] or not args["end_time"]:
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/list_approval_instance_ids.py \"<processCode>\" --startTime <timestamp> --endTime <timestamp> [--size 20] [--nextToken \"xxx\"]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在查询审批实例ID列表...", file=sys.stderr)
body = {
"processCode": args["process_code"],
"startTime": args["start_time"],
"endTime": args["end_time"],
"size": args["size"],
}
if args["next_token"]:
body["nextToken"] = args["next_token"]
result = api_request("POST", "/workflow/processInstances/ids", token, json_body=body)
r = result.get("result", {})
instance_ids = r.get("list", [])
output({
"success": True,
"processCode": args["process_code"],
"instanceIds": instance_ids,
"totalCount": len(instance_ids),
"hasMore": bool(r.get("nextToken")),
"nextToken": r.get("nextToken"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_department_parents.py
"""获取部门的父部门链
用法: python scripts/list_department_parents.py "<deptId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_department_parents.py \"<deptId>\""}})
sys.exit(1)
dept_id = int(args[0])
try:
token = get_access_token()
print("正在获取部门父部门链...", file=sys.stderr)
result = top_api_request("POST", "/topapi/v2/department/listparentbydept", token, json_body={"dept_id": dept_id})
output({"success": True, "deptId": dept_id, "parentIdList": result.get("result", {}).get("parent_id_list", [])})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_department_user_details.py
"""获取部门用户详细信息(分页)
用法: python scripts/list_department_user_details.py "<deptId>" [--cursor 0] [--size 100]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def parse_args(argv):
args = {"dept_id": None, "cursor": 0, "size": 100}
i = 1
while i < len(argv):
if argv[i] == "--cursor" and i + 1 < len(argv):
args["cursor"] = int(argv[i + 1])
i += 2
elif argv[i] == "--size" and i + 1 < len(argv):
args["size"] = int(argv[i + 1])
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--") and args["dept_id"] is None:
args["dept_id"] = int(argv[i])
i += 1
else:
i += 1
return args
def main():
args = parse_args(sys.argv)
if args["dept_id"] is None:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_department_user_details.py \"<deptId>\" [--cursor 0] [--size 100]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在获取部门用户详情...", file=sys.stderr)
result = top_api_request("POST", "/topapi/v2/user/list", token, json_body={
"dept_id": args["dept_id"], "cursor": args["cursor"], "size": args["size"],
})
r = result.get("result", {})
output({
"success": True,
"deptId": args["dept_id"],
"users": r.get("list", []),
"hasMore": r.get("has_more", False),
"nextCursor": r.get("next_cursor"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_department_user_ids.py
"""获取部门用户 ID 列表
用法: python scripts/list_department_user_ids.py "<deptId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_department_user_ids.py \"<deptId>\""}})
sys.exit(1)
dept_id = int(args[0])
try:
token = get_access_token()
print("正在获取部门用户ID列表...", file=sys.stderr)
result = top_api_request("POST", "/topapi/user/listid", token, json_body={"dept_id": dept_id})
output({"success": True, "deptId": dept_id, "userIds": result.get("result", {}).get("userid_list", [])})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_department_users.py
"""获取部门用户列表(简略信息,自动分页)
用法: python scripts/list_department_users.py "<deptId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_department_users.py \"<deptId>\""}})
sys.exit(1)
dept_id = int(args[0])
try:
token = get_access_token()
print("正在获取部门用户列表...", file=sys.stderr)
all_users = []
cursor = 0
has_more = True
while has_more:
result = top_api_request("POST", "/topapi/v2/user/list", token, json_body={
"dept_id": dept_id, "cursor": cursor, "size": 100,
})
r = result.get("result", {})
for u in (r.get("list") or []):
all_users.append({"userId": u.get("userid"), "name": u.get("name")})
has_more = r.get("has_more", False)
if has_more:
cursor = r.get("next_cursor", 0)
output({"success": True, "deptId": dept_id, "users": all_users})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_events.py
"""查询日程列表
用法: python scripts/list_events.py "<用户unionId>" [--time-min "2026-03-01 00:00"] [--time-max "2026-03-31 23:59"] [--max-results 50] [--next-token "xxx"]
"""
import sys
import os
from datetime import datetime, timezone, timedelta
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
CST = timezone(timedelta(hours=8))
def parse_time_to_iso(time_str):
for fmt in ("%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"):
try:
dt = datetime.strptime(time_str, fmt)
dt = dt.replace(tzinfo=CST)
return dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
except ValueError:
continue
return time_str
def parse_args(argv):
args = {"union_id": None, "time_min": None, "time_max": None, "max_results": None, "next_token": None}
positional = []
i = 1
while i < len(argv):
if argv[i] == "--time-min" and i + 1 < len(argv):
args["time_min"] = parse_time_to_iso(argv[i + 1])
i += 2
elif argv[i] == "--time-max" and i + 1 < len(argv):
args["time_max"] = parse_time_to_iso(argv[i + 1])
i += 2
elif argv[i] == "--max-results" and i + 1 < len(argv):
args["max_results"] = int(argv[i + 1])
i += 2
elif argv[i] == "--next-token" and i + 1 < len(argv):
args["next_token"] = argv[i + 1]
i += 2
else:
positional.append(argv[i])
i += 1
if positional:
args["union_id"] = positional[0]
return args
def main():
args = parse_args(sys.argv)
if not args["union_id"]:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/list_events.py \"<用户unionId>\" [--time-min \"...\"] [--time-max \"...\"] [--max-results 50]",
}
})
sys.exit(1)
try:
token = get_access_token()
print("正在查询日程列表...", file=sys.stderr)
params = {}
if args["time_min"]:
params["timeMin"] = args["time_min"]
if args["time_max"]:
params["timeMax"] = args["time_max"]
if args["max_results"]:
params["maxResults"] = args["max_results"]
if args["next_token"]:
params["nextToken"] = args["next_token"]
result = api_request(
"GET",
f"/calendar/users/{args['union_id']}/calendars/primary/events",
token,
params=params,
)
events = result.get("events", [])
output({
"success": True,
"totalCount": len(events),
"nextToken": result.get("nextToken"),
"events": [{
"id": e.get("id"),
"summary": e.get("summary"),
"start": e.get("start"),
"end": e.get("end"),
"status": e.get("status"),
"isAllDay": e.get("isAllDay"),
"onlineMeetingInfo": e.get("onlineMeetingInfo"),
} for e in events],
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_inactive_users.py
"""获取未登录钉钉的员工列表
用法: python scripts/list_inactive_users.py "<queryDate>" [--deptIds "id1,id2,..."] [--offset 0] [--size 100]
queryDate 格式: yyyyMMdd
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def parse_args(argv):
args = {"query_date": None, "dept_ids": [], "offset": 0, "size": 100}
i = 1
while i < len(argv):
if argv[i] == "--deptIds" and i + 1 < len(argv):
args["dept_ids"] = [int(x.strip()) for x in argv[i + 1].split(",") if x.strip()]
i += 2
elif argv[i] == "--offset" and i + 1 < len(argv):
args["offset"] = int(argv[i + 1])
i += 2
elif argv[i] == "--size" and i + 1 < len(argv):
args["size"] = int(argv[i + 1])
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--") and args["query_date"] is None:
args["query_date"] = argv[i]
i += 1
else:
i += 1
return args
def main():
args = parse_args(sys.argv)
if not args["query_date"]:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_inactive_users.py \"<queryDate>\" [--deptIds \"id1,id2\"] [--offset 0] [--size 100]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在获取未登录用户列表...", file=sys.stderr)
body = {"is_active": False, "query_date": args["query_date"], "offset": args["offset"], "size": args["size"]}
if args["dept_ids"]:
body["dept_ids"] = args["dept_ids"]
result = top_api_request("POST", "/topapi/inactive/user/v2/get", token, json_body=body)
r = result.get("result", {})
output({
"success": True,
"queryDate": args["query_date"],
"userIds": r.get("list", []),
"hasMore": r.get("has_more", False),
"nextOffset": (args["offset"] + args["size"]) if r.get("has_more") else None,
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_resigned_users.py
"""查询离职记录列表
用法: python scripts/list_resigned_users.py "<startTime>" ["<endTime>"] [--nextToken "xxx"] [--maxResults 100]
startTime/endTime 格式: ISO8601 (如 2024-01-15T00:00:00+08:00)
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"start_time": None, "end_time": None, "next_token": None, "max_results": 100}
positional = []
i = 1
while i < len(argv):
if argv[i] == "--nextToken" and i + 1 < len(argv):
args["next_token"] = argv[i + 1]
i += 2
elif argv[i] == "--maxResults" and i + 1 < len(argv):
args["max_results"] = int(argv[i + 1])
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--"):
positional.append(argv[i])
i += 1
else:
i += 1
if len(positional) >= 1:
args["start_time"] = positional[0]
if len(positional) >= 2:
args["end_time"] = positional[1]
return args
def main():
args = parse_args(sys.argv)
if not args["start_time"]:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_resigned_users.py \"<startTime>\" [\"<endTime>\"] [--nextToken \"xxx\"] [--maxResults 100]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在查询离职记录...", file=sys.stderr)
params = {"startTime": args["start_time"], "maxResults": str(args["max_results"])}
if args["end_time"]:
params["endTime"] = args["end_time"]
if args["next_token"]:
params["nextToken"] = args["next_token"]
result = api_request("GET", "/contact/empLeaveRecords", token, params=params)
output({
"success": True,
"startTime": args["start_time"],
"endTime": args["end_time"],
"records": result.get("records", []),
"nextToken": result.get("nextToken"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_sub_departments.py
"""获取子部门列表
用法: python scripts/list_sub_departments.py "<deptId>"
根部门 deptId = 1
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_sub_departments.py \"<deptId>\",根部门为 1"}})
sys.exit(1)
dept_id = int(args[0])
try:
token = get_access_token()
print("正在获取子部门列表...", file=sys.stderr)
result = top_api_request("POST", "/topapi/v2/department/listsub", token, json_body={"dept_id": dept_id})
sub_ids = [d.get("dept_id") for d in (result.get("result") or [])]
output({"success": True, "deptId": dept_id, "subDepartmentIds": sub_ids})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_user_cc_approvals.py
"""查询用户抄送的审批实例
用法: python scripts/list_user_cc_approvals.py "<userId>" [--startTime <ts>] [--endTime <ts>] [--maxResults 20] [--nextToken "xxx"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"user_id": None, "start_time": None, "end_time": None, "max_results": 20, "next_token": None}
i = 1
while i < len(argv):
if argv[i] == "--startTime" and i + 1 < len(argv):
args["start_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--endTime" and i + 1 < len(argv):
args["end_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--maxResults" and i + 1 < len(argv):
args["max_results"] = int(argv[i + 1])
i += 2
elif argv[i] == "--nextToken" and i + 1 < len(argv):
args["next_token"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--") and args["user_id"] is None:
args["user_id"] = argv[i]
i += 1
else:
i += 1
return args
def main():
args = parse_args(sys.argv)
if not args["user_id"]:
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/list_user_cc_approvals.py \"<userId>\" [--startTime <ts>] [--endTime <ts>] [--maxResults 20]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在查询用户抄送的审批...", file=sys.stderr)
body = {"userId": args["user_id"], "maxResults": args["max_results"]}
if args["start_time"]:
body["startTime"] = args["start_time"]
if args["end_time"]:
body["endTime"] = args["end_time"]
if args["next_token"]:
body["nextToken"] = args["next_token"]
result = api_request("POST", "/workflow/processInstances/userCc", token, json_body=body)
instances = result.get("result", {}).get("list", [])
output({
"success": True,
"userId": args["user_id"],
"instances": instances,
"totalCount": len(instances),
"hasMore": bool(result.get("result", {}).get("nextToken")),
"nextToken": result.get("result", {}).get("nextToken"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_user_done_approvals.py
"""查询用户已审批的实例
用法: python scripts/list_user_done_approvals.py "<userId>" [--startTime <ts>] [--endTime <ts>] [--maxResults 20] [--nextToken "xxx"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"user_id": None, "start_time": None, "end_time": None, "max_results": 20, "next_token": None}
i = 1
while i < len(argv):
if argv[i] == "--startTime" and i + 1 < len(argv):
args["start_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--endTime" and i + 1 < len(argv):
args["end_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--maxResults" and i + 1 < len(argv):
args["max_results"] = int(argv[i + 1])
i += 2
elif argv[i] == "--nextToken" and i + 1 < len(argv):
args["next_token"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--") and args["user_id"] is None:
args["user_id"] = argv[i]
i += 1
else:
i += 1
return args
def main():
args = parse_args(sys.argv)
if not args["user_id"]:
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/list_user_done_approvals.py \"<userId>\" [--startTime <ts>] [--endTime <ts>] [--maxResults 20]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在查询用户已审批实例...", file=sys.stderr)
body = {"userId": args["user_id"], "maxResults": args["max_results"]}
if args["start_time"]:
body["startTime"] = args["start_time"]
if args["end_time"]:
body["endTime"] = args["end_time"]
if args["next_token"]:
body["nextToken"] = args["next_token"]
result = api_request("POST", "/workflow/processInstances/userDone", token, json_body=body)
instances = result.get("result", {}).get("list", [])
output({
"success": True,
"userId": args["user_id"],
"instances": instances,
"totalCount": len(instances),
"hasMore": bool(result.get("result", {}).get("nextToken")),
"nextToken": result.get("result", {}).get("nextToken"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_user_initiated_approvals.py
"""查询用户发起的审批实例
用法: python scripts/list_user_initiated_approvals.py "<userId>" [--startTime <ts>] [--endTime <ts>] [--maxResults 20] [--nextToken "xxx"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"user_id": None, "start_time": None, "end_time": None, "max_results": 20, "next_token": None}
i = 1
while i < len(argv):
if argv[i] == "--startTime" and i + 1 < len(argv):
args["start_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--endTime" and i + 1 < len(argv):
args["end_time"] = int(argv[i + 1])
i += 2
elif argv[i] == "--maxResults" and i + 1 < len(argv):
args["max_results"] = int(argv[i + 1])
i += 2
elif argv[i] == "--nextToken" and i + 1 < len(argv):
args["next_token"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--") and args["user_id"] is None:
args["user_id"] = argv[i]
i += 1
else:
i += 1
return args
def main():
args = parse_args(sys.argv)
if not args["user_id"]:
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/list_user_initiated_approvals.py \"<userId>\" [--startTime <ts>] [--endTime <ts>] [--maxResults 20]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在查询用户发起的审批...", file=sys.stderr)
body = {"userId": args["user_id"], "maxResults": args["max_results"]}
if args["start_time"]:
body["startTime"] = args["start_time"]
if args["end_time"]:
body["endTime"] = args["end_time"]
if args["next_token"]:
body["nextToken"] = args["next_token"]
result = api_request("POST", "/workflow/processInstances/userStarted", token, json_body=body)
instances = result.get("result", {}).get("list", [])
output({
"success": True,
"userId": args["user_id"],
"instances": instances,
"totalCount": len(instances),
"hasMore": bool(result.get("result", {}).get("nextToken")),
"nextToken": result.get("result", {}).get("nextToken"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_user_parent_departments.py
"""获取用户所属部门的父部门链
用法: python scripts/list_user_parent_departments.py "<userId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_user_parent_departments.py \"<userId>\""}})
sys.exit(1)
user_id = args[0]
try:
token = get_access_token()
print("正在获取用户所属部门父部门链...", file=sys.stderr)
result = top_api_request("POST", "/topapi/v2/department/listparentbyuser", token, json_body={"userid": user_id})
output({"success": True, "userId": user_id, "parentIdList": result.get("result", {}).get("parent_list", [])})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_user_todo_approvals.py
"""查询用户待审批的实例
用法: python scripts/list_user_todo_approvals.py "<userId>" [--maxResults 20] [--nextToken "xxx"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def parse_args(argv):
args = {"user_id": None, "max_results": 20, "next_token": None}
i = 1
while i < len(argv):
if argv[i] == "--maxResults" and i + 1 < len(argv):
args["max_results"] = int(argv[i + 1])
i += 2
elif argv[i] == "--nextToken" and i + 1 < len(argv):
args["next_token"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--") and args["user_id"] is None:
args["user_id"] = argv[i]
i += 1
else:
i += 1
return args
def main():
args = parse_args(sys.argv)
if not args["user_id"]:
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/list_user_todo_approvals.py \"<userId>\" [--maxResults 20] [--nextToken \"xxx\"]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在查询用户待审批实例...", file=sys.stderr)
body = {"userId": args["user_id"], "maxResults": args["max_results"]}
if args["next_token"]:
body["nextToken"] = args["next_token"]
result = api_request("POST", "/workflow/processInstances/userTodo", token, json_body=body)
instances = result.get("result", {}).get("list", [])
output({
"success": True,
"userId": args["user_id"],
"instances": instances,
"totalCount": len(instances),
"hasMore": bool(result.get("result", {}).get("nextToken")),
"nextToken": result.get("result", {}).get("nextToken"),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/list_workspaces.py
"""获取知识库列表
用法: python scripts/list_workspaces.py "<操作人unionId>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/list_workspaces.py \"<操作人unionId>\""}})
sys.exit(1)
operator_id = args[0]
try:
token = get_access_token()
print("正在获取知识库列表...", file=sys.stderr)
workspaces = []
next_token = None
while True:
params = {"operatorId": operator_id, "maxResults": 30}
if next_token:
params["nextToken"] = next_token
result = api_request("GET", "/wiki/workspaces", token, params=params, api_version="v2.0")
ws = result.get("workspace")
if ws:
if isinstance(ws, list):
workspaces.extend(ws)
else:
workspaces.append(ws)
next_token = result.get("nextToken")
if not next_token:
break
output({
"success": True,
"totalCount": len(workspaces),
"workspaces": [{"workspaceId": w.get("workspaceId"), "name": w.get("name"), "type": w.get("type"), "url": w.get("url"), "rootNodeId": w.get("rootNodeId")} for w in workspaces],
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/overwrite_doc.py
"""覆写知识库文档内容(全量替换)
用法: python scripts/overwrite_doc.py "<workspaceId>" "<nodeId>" "<操作人unionId>" "<内容>"
注意:此操作会完全替换文档原有内容
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 4:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/overwrite_doc.py \"<workspaceId>\" \"<nodeId>\" \"<操作人unionId>\" \"<内容>\""}})
sys.exit(1)
workspace_id = args[0]
node_id = args[1]
operator_id = args[2]
content = args[3]
try:
token = get_access_token()
print("正在覆写文档内容...", file=sys.stderr)
result = api_request("PUT", f"/doc/workspaces/{workspace_id}/docs/{node_id}/contents", token, json_body={
"operatorId": operator_id,
"content": content,
})
output({
"success": True,
"workspaceId": workspace_id,
"nodeId": node_id,
"message": "文档内容已覆写",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/remove_event_attendee.py
"""移除日程参与者
用法: python scripts/remove_event_attendee.py "<用户unionId>" "<eventId>" "<参与者unionId1,unionId2,...>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 4:
output({
"success": False,
"error": {
"code": "INVALID_ARGS",
"message": "用法: python scripts/remove_event_attendee.py \"<用户unionId>\" \"<eventId>\" \"<参与者unionId1,unionId2,...>\"",
}
})
sys.exit(1)
union_id = sys.argv[1]
event_id = sys.argv[2]
attendee_ids = sys.argv[3].split(",")
try:
token = get_access_token()
print("正在移除参与者...", file=sys.stderr)
body = {
"attendeesToRemove": [{"id": aid} for aid in attendee_ids],
}
api_request(
"POST",
f"/calendar/users/{union_id}/calendars/primary/events/{event_id}/attendees/batchRemove",
token,
json_body=body,
)
output({
"success": True,
"eventId": event_id,
"removedCount": len(attendee_ids),
"message": f"已移除 {len(attendee_ids)} 位参与者",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/search_department.py
"""搜索部门
用法: python scripts/search_department.py "<搜索关键词>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 1:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/search_department.py \"<搜索关键词>\""}})
sys.exit(1)
keyword = args[0]
try:
token = get_access_token()
print("正在搜索部门...", file=sys.stderr)
result = api_request("POST", "/contact/departments/search", token, json_body={
"queryWord": keyword, "offset": 0, "size": 20,
})
output({
"success": True,
"keyword": keyword,
"totalCount": result.get("totalCount", 0),
"hasMore": result.get("hasMore", False),
"departmentIds": result.get("list", []),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/search_doc.py
"""根据文档名搜索知识库文档,返回文档链接
用法: python scripts/search_doc.py "<操作人unionId>" "<文档名关键词>" ["<workspaceId>"]
如果不指定 workspaceId,会搜索所有知识库
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def search_nodes(token, parent_node_id, operator_id, keyword):
"""递归搜索节点,按名称模糊匹配"""
matched = []
next_token = None
while True:
params = {"parentNodeId": parent_node_id, "operatorId": operator_id, "maxResults": 50}
if next_token:
params["nextToken"] = next_token
result = api_request("GET", "/wiki/nodes", token, params=params, api_version="v2.0")
node = result.get("node")
if node:
nodes = node if isinstance(node, list) else [node]
for n in nodes:
name = n.get("name", "")
if keyword.lower() in name.lower():
matched.append(n)
if n.get("hasChildren"):
matched.extend(search_nodes(token, n["nodeId"], operator_id, keyword))
next_token = result.get("nextToken")
if not next_token:
break
return matched
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/search_doc.py \"<操作人unionId>\" \"<文档名关键词>\" [\"<workspaceId>\"]"}})
sys.exit(1)
operator_id = args[0]
keyword = args[1]
workspace_id = args[2] if len(args) >= 3 else None
try:
token = get_access_token()
print(f"正在搜索文档: {keyword}...", file=sys.stderr)
root_nodes = []
if workspace_id:
params = {"operatorId": operator_id, "maxResults": 30}
result = api_request("GET", "/wiki/workspaces", token, params=params, api_version="v2.0")
ws = result.get("workspace")
if ws:
ws_list = ws if isinstance(ws, list) else [ws]
for w in ws_list:
if w.get("workspaceId") == workspace_id:
root_nodes.append((w.get("rootNodeId"), w.get("name")))
break
if not root_nodes:
root_nodes.append((None, None))
else:
next_token = None
while True:
params = {"operatorId": operator_id, "maxResults": 30}
if next_token:
params["nextToken"] = next_token
result = api_request("GET", "/wiki/workspaces", token, params=params, api_version="v2.0")
ws = result.get("workspace")
if ws:
ws_list = ws if isinstance(ws, list) else [ws]
for w in ws_list:
root_nodes.append((w.get("rootNodeId"), w.get("name")))
next_token = result.get("nextToken")
if not next_token:
break
all_matched = []
for root_node_id, ws_name in root_nodes:
if root_node_id:
matched = search_nodes(token, root_node_id, operator_id, keyword)
for m in matched:
m["workspaceName"] = ws_name
all_matched.extend(matched)
output({
"success": True,
"keyword": keyword,
"totalCount": len(all_matched),
"documents": [{
"name": d.get("name"),
"nodeId": d.get("nodeId"),
"url": d.get("url"),
"category": d.get("category"),
"workspaceName": d.get("workspaceName"),
"creatorId": d.get("creatorId"),
"modifiedTime": d.get("modifiedTime"),
} for d in all_matched],
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/search_user.py
"""搜索用户
用法: python scripts/search_user.py "<搜索关键词>"
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
if len(sys.argv) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/search_user.py \"<搜索关键词>\""}})
sys.exit(1)
keyword = sys.argv[1]
try:
token = get_access_token()
print("正在搜索用户...", file=sys.stderr)
result = api_request("POST", "/contact/users/search", token, json_body={
"queryWord": keyword, "offset": 0, "size": 20,
})
output({
"success": True,
"keyword": keyword,
"totalCount": result.get("totalCount", 0),
"hasMore": result.get("hasMore", False),
"userIds": result.get("list", []),
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/send_group_message.py
"""机器人发送群消息
用法: python scripts/send_group_message.py "<openConversationId>" "<消息内容>" ["<robotCode>"]
robotCode 可通过命令行参数传入,也可设置环境变量 DINGTALK_ROBOT_CODE
"""
import sys
import os
import json as json_mod
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output, get_robot_code
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/send_group_message.py \"<openConversationId>\" \"<消息内容>\" [\"<robotCode>\"]"}})
sys.exit(1)
open_conversation_id = args[0]
message = args[1]
robot_code = get_robot_code(args[2] if len(args) >= 3 else None)
try:
token = get_access_token()
print("正在发送群消息...", file=sys.stderr)
result = api_request("POST", "/robot/oToMessages/groupMessages/send", token, json_body={
"openConversationId": open_conversation_id,
"robotCode": robot_code,
"msgKey": "sampleText",
"msgParam": json_mod.dumps({"content": message}),
})
output({
"success": True,
"openConversationId": open_conversation_id,
"robotCode": robot_code,
"processQueryKey": result.get("processQueryKey"),
"message": message,
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/send_user_message.py
"""机器人发送单聊消息
用法: python scripts/send_user_message.py "<userId>" "<消息内容>" ["<robotCode>"]
robotCode 可通过命令行参数传入,也可设置环境变量 DINGTALK_ROBOT_CODE
"""
import sys
import os
import json as json_mod
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output, get_robot_code
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS", "message": "用法: python scripts/send_user_message.py \"<userId>\" \"<消息内容>\" [\"<robotCode>\"]"}})
sys.exit(1)
user_id = args[0]
message = args[1]
robot_code = get_robot_code(args[2] if len(args) >= 3 else None)
try:
token = get_access_token()
print("正在发送单聊消息...", file=sys.stderr)
result = api_request("POST", "/robot/oToMessages/batchSend", token, json_body={
"robotCode": robot_code,
"userIds": [user_id],
"msgKey": "sampleText",
"msgParam": json_mod.dumps({"content": message}),
})
output({
"success": True,
"userId": user_id,
"robotCode": robot_code,
"processQueryKey": result.get("processQueryKey"),
"flowControlledStaffIdList": result.get("flowControlledStaffIdList", []),
"invalidStaffIdList": result.get("invalidStaffIdList", []),
"message": message,
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/terminate_approval_instance.py
"""撤销审批实例
用法: python scripts/terminate_approval_instance.py "<instanceId>" "<operatingUserId>" ["<remark>"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, api_request, handle_error, output
def main():
args = [a for a in sys.argv[1:] if a != "--debug"]
if len(args) < 2:
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/terminate_approval_instance.py \"<instanceId>\" \"<operatingUserId>\" [\"<remark>\"]"}})
sys.exit(1)
instance_id = args[0]
operating_user_id = args[1]
remark = args[2] if len(args) > 2 else ""
try:
token = get_access_token()
print("正在撤销审批实例...", file=sys.stderr)
body = {
"processInstanceId": instance_id,
"operatingUserId": operating_user_id,
"remark": remark,
}
api_request("POST", "/workflow/processInstances/terminate", token, json_body=body)
output({"success": True, "instanceId": instance_id, "message": "审批实例已撤销"})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/transfer_approval_task.py
"""转交审批任务
用法: python scripts/transfer_approval_task.py "<instanceId>" "<userId>" "<transferToUserId>" [--taskId "xxx"] [--remark "转交原因"]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from dingtalk_client import get_access_token, top_api_request, handle_error, output
def parse_args(argv):
args = {"instance_id": None, "user_id": None, "transfer_to": None, "task_id": None, "remark": None}
positional = []
i = 1
while i < len(argv):
if argv[i] == "--taskId" and i + 1 < len(argv):
args["task_id"] = argv[i + 1]
i += 2
elif argv[i] == "--remark" and i + 1 < len(argv):
args["remark"] = argv[i + 1]
i += 2
elif argv[i] == "--debug":
i += 1
elif not argv[i].startswith("--"):
positional.append(argv[i])
i += 1
else:
i += 1
if len(positional) >= 1:
args["instance_id"] = positional[0]
if len(positional) >= 2:
args["user_id"] = positional[1]
if len(positional) >= 3:
args["transfer_to"] = positional[2]
return args
def main():
args = parse_args(sys.argv)
if not all([args["instance_id"], args["user_id"], args["transfer_to"]]):
output({"success": False, "error": {"code": "INVALID_ARGS",
"message": "用法: python scripts/transfer_approval_task.py \"<instanceId>\" \"<userId>\" \"<transferToUserId>\" [--taskId \"xxx\"] [--remark \"转交原因\"]"}})
sys.exit(1)
try:
token = get_access_token()
print("正在转交审批任务...", file=sys.stderr)
body = {
"process_instance_id": args["instance_id"],
"userid": args["user_id"],
"transfer_to_userid": args["transfer_to"],
"remark": args["remark"] or "转交审批任务",
}
if args["task_id"]:
body["task_id"] = args["task_id"]
top_api_request("POST", "/topapi/process/workrecord/task/transfer", token, json_body=body)
output({
"success": True,
"instanceId": args["instance_id"],
"userId": args["user_id"],
"transferToUserId": args["transfer_to"],
"message": "审批任务已转交",
})
except Exception as e:
handle_error(e)
sys.exit(1)
if __name__ == "__main__":
main()