@clawhub-yukirang-1780f2120b
Analyze uploaded bank customer data to segment and profile customers by assets, transactions, and behavior, outputting clusters, statistics, and visual charts.
---
name: customer-segmentation
description: Financial customer segmentation analysis Skill. Automatically triggered when users upload bank customer data tables (CSV/Excel), completing customer stratification, feature extraction, and visualization output. Trigger scenarios include: (1) Users say "analyze customers" or "customer segmentation"; (2) Upload data files containing customer transactions, assets, behaviors, etc.; (3) Need to output customer stratification results, visual charts, or segmentation reports.
---
# Customer Segmentation Skill
Financial customer segmentation analysis: Stratify customers based on assets, transaction behaviors, activity levels, and other dimensions, outputting actionable segmentation results and visualizations.
## Workflow
### Step 1 — Data Loading and Cleaning
Read user-uploaded CSV or Excel files, automatically identifying column names.
Priority fields to retain:
- `customer_id` / `客户ID` — Unique customer identifier
- `age` / `年龄`
- `gender` / `性别`
- `balance` / `资产余额`
- `txn_amount` / `交易金额`
- `txn_count` / `交易次数`
- `last_date` / `最近交易日期`
- `product_count` / `持有产品数`
- `branch` / `网点`
Missing value handling:
- Numeric: Fill with median
- Categorical: Fill with mode
- Columns with >30% missing: Delete and notify user
```python
import pandas as pd
df = pd.read_csv(file_path)
df.columns = df.columns.str.strip().str.lower()
```
### Step 2 — Feature Engineering
Build RFM + extended features:
| Feature | Description |
|---------|-------------|
| Recency | Days since last transaction (smaller = more active) |
| Frequency | Transaction frequency (number of transactions in specified period) |
| Monetary | Transaction amount (total amount in specified period) |
| Tenure | Customer duration (months) |
| Product_Depth | Number of products held |
| Age | Customer age |
Data standardization: Use `StandardScaler` (Z-score) to normalize all numeric features.
### Step 3 — Clustering Analysis
Use **K-Means** algorithm, automatically determine K value (Elbow Method, SSE inflection point).
```python
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(features)
# Elbow method to find optimal K
sse = {}
for k in range(2, 10):
km = KMeans(n_clusters=k, random_state=42, n_init=10)
km.fit(X_scaled)
sse[k] = km.inertia_
optimal_k = min(sse, key=sse.get) # Simply take k with minimum SSE
```
K=5 can also be fixed based on business needs (high/medium-high/medium/medium-low/low value customers).
### Step 4 — Segment Profiling
Output core statistics for each cluster:
```
Cluster 0 (High-Value Customers): Avg. assets 850k, Avg. transaction frequency 28/month, Gender distribution 62% male
Cluster 1 (Potential Customers): Avg. assets 320k,明显 younger trend
...
```
Recommended label system (five categories):
- 🌟 High-Value Customers (VIP)
- ⬆️ Potential Customers
- 🟢 Stable Customers
- 🔄 Active Transaction Customers
- ⚠️ Dormant/Churn Warning Customers
### Step 5 — Visualization
Generate the following charts (saved as PNG):
1. **Customer Asset Distribution Histogram** — Asset distribution comparison across levels
2. **Radar Chart** — Feature comparison across segments
3. **Heatmap** — Cluster feature mean matrix
4. **Scatter Plot** — Customer distribution with assets × transaction frequency as coordinates
```python
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg')
plt.rcParams['font.sans-serif'] = ['WenQuanYi Micro Hei', 'SimHei']
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# Asset distribution
axes[0].hist([g['balance'] for _, g in df.groupby('cluster')], bins=30, label=[f'C{i}' for i in range(k)])
axes[0].set_title('Customer Balance Distribution by Cluster')
# Heatmap
import seaborn as sns
sns.heatmap(cluster_means.T, annot=True, fmt='.1f', ax=axes[1])
axes[1].set_title('Cluster Feature Heatmap')
plt.tight_layout()
plt.savefig(output_path, dpi=150)
```
### Step 6 — Output Results
Output content:
1. Segmentation result table (including customer ID, cluster, segmentation label) → `segmentation_results.csv`
2. Cluster feature statistics → `cluster_summary.csv`
3. Visualization charts → `segmentation_charts.png`
4. Analysis summary (Markdown format) → `segmentation_report.md`
For detailed clustering and parameter documentation:
- RFM model explanation: Refer to `references/rfm-guide.md`
- Clustering parameter explanation: Refer to `references/clustering-guide.md`
FILE:scripts/segment.py
#!/usr/bin/env python3
"""
Customer segmentation analysis script
Usage: python segment.py <customer_data.csv> [output_directory]
"""
import sys
import os
import json
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score, davies_bouldin_score
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
plt.rcParams['font.sans-serif'] = ['WenQuanYi Micro Hei', 'SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# ── Label System ──────────────────────────────────────────────
CLUSTER_LABELS = {
0: ("🌟 High-Value Customers", "VIP / High-net-worth customers, key maintenance"),
1: ("⬆️ Potential Customers", "Growth-oriented, conversion potential"),
2: ("🟢 Stable Customers", "Medium-low frequency, low risk"),
3: ("🔄 Active Transaction Customers", "Frequent transactions, high commission contribution"),
4: ("⚠️ Churn Warning", "Decreasing activity, needs activation"),
}
FALLBACK_LABELS = {
i: (f"Customer Group {i}", "") for i in range(100)
}
# ── Feature Engineering ─────────────────────────────────────────────
FEATURE_COLS = [
'recency', # Smaller = more active
'frequency', # More = better
'monetary', # Higher = better
'balance', # Asset balance
'tenure', # Customer duration (months)
'product_depth',# Number of products held
'age', # Age
]
def load_and_clean(path):
df = pd.read_csv(path)
df.columns = df.columns.str.strip()
raw_cols = list(df.columns)
# Auto-detect column mappings
col_map = {}
for col in raw_cols:
cl = col.lower()
if 'id' in cl: col_map[col] = 'customer_id'
elif 'balance' in cl: col_map[col] = 'balance'
elif 'amount' in cl and 'txn' in cl: col_map[col] = 'monetary'
elif 'count' in cl and 'txn' in cl: col_map[col] = 'frequency'
elif 'date' in cl and 'last' in cl: col_map[col] = 'last_date'
elif 'date' in cl and 'open' in cl: col_map[col] = 'open_date'
elif 'product' in cl and 'count' in cl: col_map[col] = 'product_depth'
elif 'age' in cl: col_map[col] = 'age'
elif 'gender' in cl: col_map[col] = 'gender'
elif 'txn' in cl: col_map[col] = 'monetary'
df.rename(columns=col_map, inplace=True)
today = pd.Timestamp.today()
# RFM
if 'last_date' in df.columns:
df['last_date'] = pd.to_datetime(df['last_date'], errors='coerce')
df['recency'] = (today - df['last_date']).dt.days.clip(lower=0)
else:
df['recency'] = df.get('recency', 180)
if 'frequency' not in df.columns:
df['frequency'] = 1
df['frequency'] = pd.to_numeric(df['frequency'], errors='coerce').fillna(1)
if 'monetary' not in df.columns:
df['monetary'] = df.get('balance', 0)
df['monetary'] = pd.to_numeric(df['monetary'], errors='coerce').fillna(0)
if 'balance' not in df.columns:
df['balance'] = df['monetary']
df['balance'] = pd.to_numeric(df['balance'], errors='coerce').fillna(0)
if 'open_date' in df.columns:
df['open_date'] = pd.to_datetime(df['open_date'], errors='coerce')
df['tenure'] = ((today - df['open_date']).dt.days / 30).clip(lower=0)
else:
df['tenure'] = 12 # Default
if 'product_depth' not in df.columns:
df['product_depth'] = 1
df['product_depth'] = pd.to_numeric(df['product_depth'], errors='coerce').fillna(1)
if 'age' not in df.columns:
df['age'] = 40
df['age'] = pd.to_numeric(df['age'], errors='coerce').fillna(40)
# Fill missing values for numeric columns
num_cols = ['recency', 'frequency', 'monetary', 'balance', 'tenure', 'product_depth', 'age']
for c in num_cols:
if c in df.columns:
df[c] = df[c].fillna(df[c].median())
return df
def build_features(df):
feats = df[FEATURE_COLS].copy()
scaler = StandardScaler()
X_scaled = scaler.fit_transform(feats)
return X_scaled, feats, scaler
def find_optimal_k(X_scaled, max_k=8):
scores = {}
for k in range(2, max_k + 1):
km = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = km.fit_predict(X_scaled)
scores[k] = {
'sse': km.inertia_,
'sil': silhouette_score(X_scaled, labels),
'db': davies_bouldin_score(X_scaled, labels),
}
best_k = max(scores, key=lambda k: scores[k]['sil'])
return best_k, scores
def cluster(df, X_scaled, n_clusters=5):
km = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
df['cluster'] = km.fit_predict(X_scaled)
return km, df
def label_clusters(df):
"""Sort by balance median and label clusters"""
med = df.groupby('cluster')['balance'].median().sort_values(ascending=False)
rank = {c: i for i, c in enumerate(med.index)}
df['cluster_rank'] = df['cluster'].map(rank)
n = df['cluster'].nunique()
labels = list(CLUSTER_LABELS.values())[:n] + list(FALLBACK_LABELS.values())[n:]
label_map = {orig: labels[rank[orig]] for orig in rank}
df['segment_label'] = df['cluster'].map(lambda c: label_map.get(c, (f"Group{c}", ""))[0])
df['segment_desc'] = df['cluster'].map(lambda c: label_map.get(c, ("", ""))[1])
return df
def make_charts(df, feats, output_dir):
os.makedirs(output_dir, exist_ok=True)
path = os.path.join(output_dir, 'segmentation_charts.png')
n_clusters = df['cluster'].nunique()
fig, axes = plt.subplots(2, 2, figsize=(14, 11))
# 1. Asset distribution
colors = plt.cm.Set2(np.linspace(0, 1, n_clusters))
for c in sorted(df['cluster'].unique()):
sub = df[df['cluster'] == c]['balance'].dropna()
axes[0,0].hist(sub, bins=25, alpha=0.6, label=f'C{c}', color=colors[c])
axes[0,0].set_xlabel('Balance (Asset)')
axes[0,0].set_ylabel('Count')
axes[0,0].set_title('Asset Distribution by Cluster')
axes[0,0].legend(fontsize=8)
axes[0,0].ticklabel_format(style='sci', axis='y', scilimits=(0,0))
# 2. Cluster feature radar chart
cluster_means = feats.groupby(df['cluster']).mean()
cluster_means_norm = (cluster_means - cluster_means.min()) / (cluster_means.max() - cluster_means.min() + 1e-9)
cats = list(cluster_means.columns)
N = len(cats)
angles = [n / float(N) * 2 * np.pi for n in range(N)]
angles += angles[:1]
ax_radar = axes[0,1]
ax_radar = plt.subplot(2, 2, 2, polar=True)
for c in sorted(df['cluster'].unique()):
vals = list(cluster_means_norm.loc[c]) + [cluster_means_norm.loc[c][0]]
ax_radar.plot(angles, vals, 'o-', linewidth=2, label=f'C{c}', color=colors[c])
ax_radar.fill(angles, vals, alpha=0.1, color=colors[c])
ax_radar.set_xticks(angles[:-1])
ax_radar.set_xticklabels(cats, fontsize=8)
ax_radar.set_title('Cluster Feature Radar', pad=20)
ax_radar.legend(loc='upper right', bbox_to_anchor=(1.3, 1.1), fontsize=7)
# 3. Heatmap
im = axes[1,0].imshow(cluster_means_norm.T, cmap='YlOrRd', aspect='auto')
axes[1,0].set_xticks(range(n_clusters))
axes[1,0].set_xticklabels([f'C{i}' for i in range(n_clusters)])
axes[1,0].set_yticks(range(len(cats)))
axes[1,0].set_yticklabels(cats, fontsize=8)
axes[1,0].set_title('Cluster Feature Heatmap (Normalized)')
plt.colorbar(im, ax=axes[1,0], shrink=0.8)
# 4. Scatter plot
for c in sorted(df['cluster'].unique()):
sub = df[df['cluster'] == c]
axes[1,1].scatter(sub['frequency'], sub['balance'],
alpha=0.5, s=10, label=f'C{c}', color=colors[c])
axes[1,1].set_xlabel('Frequency (txn count)')
axes[1,1].set_ylabel('Balance')
axes[1,1].set_title('Frequency vs Balance by Cluster')
axes[1,1].legend(fontsize=7)
axes[1,1].ticklabel_format(style='sci', axis='y', scilimits=(0,0))
plt.suptitle('Customer Segmentation Analysis', fontsize=14, fontweight='bold', y=1.01)
plt.tight_layout()
plt.savefig(path, dpi=150, bbox_inches='tight')
plt.close()
return path
def summarize(df, scores, best_k, output_dir):
summary = df.groupby(['cluster', 'segment_label']).agg({
'balance': ['mean', 'median', 'count'],
'frequency': 'mean',
'monetary': 'mean',
'recency': 'mean',
'tenure': 'mean',
'product_depth': 'mean',
'age': 'mean',
}).round(2)
summary.columns = ['_'.join(c) for c in summary.columns]
report = f"""# Customer Segmentation Analysis Report
## Basic Information
- Analysis Date: {pd.Timestamp.today().strftime('%Y-%m-%d')}
- Total Customers: {len(df)}
- Optimal Number of Clusters: {best_k} (Silhouette Score: {scores[best_k]['sil']:.3f})
## Cluster Overview
| Cluster | Label | Count | Avg. Assets | Median Assets | Avg. Transaction Frequency | Avg. Recency (days) |
|---------|-------|-------|-------------|---------------|----------------------------|---------------------|
"""
for c in sorted(df['cluster'].unique()):
row = df[df['cluster'] == c]
label = row['segment_label'].iloc[0]
report += f"| C{c} | {label} | {len(row)} | {row['balance'].mean():.0f} | {row['balance'].median():.0f} | {row['frequency'].mean():.1f} | {row['recency'].mean():.0f} |\n"
report += f"""
## Cluster Feature Radar / Heatmap
- For detailed clustering parameter explanation: `references/clustering-guide.md`
- For RFM model explanation: `references/rfm-guide.md`
## Cluster Strategy Recommendations
"""
for c in sorted(df['cluster'].unique()):
label = df[df['cluster']==c]['segment_label'].iloc[0]
desc = df[df['cluster']==c]['segment_desc'].iloc[0]
avg_bal = df[df['cluster']==c]['balance'].mean()
report += f"**C{c} {label}**: {desc} (Avg. Assets {avg_bal:.0f})\n"
rpt_path = os.path.join(output_dir, 'segmentation_report.md')
with open(rpt_path, 'w', encoding='utf-8') as f:
f.write(report)
return rpt_path, summary
def main():
input_path = sys.argv[1] if len(sys.argv) > 1 else 'customers.csv'
output_dir = sys.argv[2] if len(sys.argv) > 2 else 'output'
os.makedirs(output_dir, exist_ok=True)
print(f"[1/5] Loading data: {input_path}")
df = load_and_clean(input_path)
print(f" Loaded {len(df)} rows, columns: {list(df.columns)}")
print(f"[2/5] Building features...")
X_scaled, feats, scaler = build_features(df)
print(f"[3/5] Finding optimal K...")
best_k, scores = find_optimal_k(X_scaled)
print(f" Optimal K={best_k} sil={scores[best_k]['sil']:.3f} db={scores[best_k]['db']:.3f}")
print(f"[4/5] Clustering into {best_k} groups...")
km, df = cluster(df, X_scaled, n_clusters=best_k)
df = label_clusters(df)
print(f"[5/5] Generating charts and report...")
chart_path = make_charts(df, feats, output_dir)
rpt_path, summary = summarize(df, scores, best_k, output_dir)
# Save results
out_csv = os.path.join(output_dir, 'segmentation_results.csv')
df.to_csv(out_csv, index=False, encoding='utf-8-sig')
sum_csv = os.path.join(output_dir, 'cluster_summary.csv')
summary.to_csv(sum_csv, encoding='utf-8-sig')
print(f"""
✅ Segmentation completed!
Output files:
📊 Segmentation results → {out_csv}
📋 Cluster summary → {sum_csv}
📈 Visualization charts → {chart_path}
📝 Analysis report → {rpt_path}
Cluster counts:
{df['segment_label'].value_counts().to_string()}
""")
return df, scores, best_k
if __name__ == '__main__':
main()
FILE:references/rfm-guide.md
# RFM Model Reference
RFM (Recency, Frequency, Monetary) is the most classic methodology for customer value analysis.
## Three Dimensions
| Dimension | Meaning | Calculation Method | Direction |
|-----------|---------|-------------------|-----------|
| Recency (R) | Days since last transaction | (Analysis date - Last transaction date).days | Smaller is better |
| Frequency (F) | Transaction frequency | Sum of transactions in the period | Larger is better |
| Monetary (M) | Transaction amount | Sum of transaction amounts in the period | Larger is better |
## Score Calculation (5-point scale)
Divide each dimension into 5 tiers by quantiles:
- R: Most recent 20% of days → 5 points, furthest 20% → 1 point
- F: Highest 20% frequency → 5 points, lowest 20% → 1 point
- M: Highest 20% amount → 5 points, lowest 20% → 1 point
## Composite Score
```
RFM_Score = R × 100 + F × 10 + M
```
| Composite Score | Customer Type | Recommended Strategy |
|-----------------|---------------|----------------------|
| 555 | Key Retention | VIP one-on-one maintenance |
| 554-545 | High-Value Customers | Value-added services |
| 535-425 | Potential Customers | Targeted marketing |
| 414-324 | Churn Risk | Retention activities |
| <224 | Low-Value Customers | Cost reduction |
## Extended Dimensions (Banking Scenario)
For banking scenarios, the following dimensions can be added to RFM:
| Extended Dimension | Field | Description |
|--------------------|--------|-------------|
| Tenure (T) | account_open_date | Customer duration |
| Product Depth (P) | product_count | Number of products held |
| Channel (C) | channel_touch | Channel touchpoints |
| Risk (R2) | risk_score | Risk rating |
## Banking Data Considerations
1. **Asset balance** (balance) is more stable than Monetary: Prefer balance instead of M
2. **Customer segmentation should be hierarchical**: First stratify by assets, then refine by behavior
3. **Cross-selling opportunities**: Analyze product ownership rates by segment to identify cross-selling targets
4. **Compliance requirements**: Segmentation results cannot be used for discriminatory pricing and require desensitization
FILE:references/clustering-guide.md
# Clustering Analysis Parameter Reference
## Algorithm Selection
| Algorithm | Applicable Scenario | Advantages | Disadvantages |
|-----------|---------------------|------------|---------------|
| K-Means | Large datasets, approximately spherical clusters | Fast, interpretable | Sensitive to noise/outliers |
| DBSCAN | Non-convex clusters, noise detection | No need to specify K, noise-resistant | Parameter-sensitive, slow on large data |
| Hierarchical Clustering | Small data, interpretable hierarchy | No need to specify K, dendrogram possible | O(n²) complexity |
| GMM | Clusters with different sizes/densities | Soft clustering (probabilistic) | Need to specify K, slow |
**K-Means is recommended for bank customer segmentation** (large customer volume, efficient and interpretable).
## K-Means Parameters
```python
from sklearn.cluster import KMeans
km = KMeans(
n_clusters=5, # Number of clusters (recommended 5-6)
init='k-means++', # Initialization method (recommended, better default)
n_init=10, # Number of runs with different initializations, take best
max_iter=300, # Maximum number of iterations
random_state=42 # Random seed (ensure reproducibility)
)
```
## Optimal K Selection
### Elbow Method
```python
import matplotlib.pyplot as plt
sse = {}
for k in range(2, 11):
km = KMeans(n_clusters=k, random_state=42, n_init=10)
km.fit(X_scaled)
sse[k] = km.inertia_ # SSE (sum of squared errors within clusters)
plt.plot(list(sse.keys()), list(sse.values()), 'bo-')
plt.xlabel('Number of Clusters (k)')
plt.ylabel('SSE')
plt.title('Elbow Method for Optimal k')
plt.savefig('elbow_plot.png')
```
Find the "elbow" inflection point (where the curve flattens).
### Silhouette Score
```python
from sklearn.metrics import silhouette_score
for k in range(2, 11):
km = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = km.fit_predict(X_scaled)
score = silhouette_score(X_scaled, labels)
print(f"k={k}: silhouette={score:.3f}")
```
Silhouette score ranges from [-1, 1], closer to 1 is better. Usually select the largest K with score >0.4.
## Feature Preprocessing
```python
from sklearn.preprocessing import StandardScaler, MinMaxScaler
# Z-score standardization (recommended, relatively robust to outliers)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# MinMax scaling (effective for bounded features like conversion rates)
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)
```
**Important:** K-Means is sensitive to scale, all features must be standardized.
## Segmentation Result Evaluation
| Metric | Calculation Method | Target Value |
|--------|-------------------|--------------|
| Within-Cluster SSE | km.inertia_ | Smaller is better |
| Silhouette Score | silhouette_score | >0.4 is acceptable |
| Davies-Bouldin | davies_bouldin_score | Smaller is better (<1 ideal) |
| Calinski-Harabasz | calinski_harabasz_score | Larger is better |
## Common Issues
**Q: What if some clusters are too large/small?**
→ May be a feature selection issue, try adding features or further subdividing large clusters (two-level clustering).
**Q: Segmentation results are unstable?**
→ Increase `n_init` to 20-50, or initialize K-Means with hierarchical clustering results.
**Q: Outliers interfering?**
→ Remove or flag outliers before clustering (DBSCAN or IQR filtering).金融客户分群分析 Skill。当用户上传银行客户数据表格(CSV/Excel)时自动触发,完成客户分层、特征提取和可视化输出。触发场景包括:(1)用户说"分析客户"或"客户分群";(2)上传了包含客户交易、资产、行为等字段的数据文件;(3)需要输出客户分层结果、可视化图表或分群报告。
---
name: customer-segmentation
description: 金融客户分群分析 Skill。当用户上传银行客户数据表格(CSV/Excel)时自动触发,完成客户分层、特征提取和可视化输出。触发场景包括:(1)用户说"分析客户"或"客户分群";(2)上传了包含客户交易、资产、行为等字段的数据文件;(3)需要输出客户分层结果、可视化图表或分群报告。
---
# Customer Segmentation Skill
金融客户分群分析:将客户按资产、交易行为、活跃度等维度进行分层,输出可操作的分群结果与可视化。
## 工作流程
### Step 1 — 数据加载与清洗
读取用户上传的 CSV 或 Excel 文件,自动识别列名。
优先保留字段:
- `customer_id` / `客户ID` — 客户唯一标识
- `age` / `年龄`
- `gender` / `性别`
- `balance` / `资产余额`
- `txn_amount` / `交易金额`
- `txn_count` / `交易次数`
- `last_date` / `最近交易日期`
- `product_count` / `持有产品数`
- `branch` / `网点`
缺失值处理:
- 数值型:用中位数填充
- 类别型:用众数填充
- 超过 30% 缺失的列:删除该列并提示用户
```python
import pandas as pd
df = pd.read_csv(file_path)
df.columns = df.columns.str.strip().str.lower()
```
### Step 2 — 特征工程
构建 RFM + 扩展特征:
| 特征 | 说明 |
|------|------|
| Recency | 距今天数(越小越活跃)|
| Frequency | 交易频率(指定周期内交易次数)|
| Monetary | 交易金额(指定周期内总金额)|
| Tenure | 客户持有时长(月)|
| Product_Depth | 持有产品数量 |
| Age | 客户年龄 |
数据标准化:使用 `StandardScaler`(Z-score)归一化所有数值型特征。
### Step 3 — 聚类分析
使用 **K-Means** 算法,自动确定 K 值(肘部法则 Elbow Method,SSE 拐点)。
```python
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(features)
# 肘部法则找最优K
sse = {}
for k in range(2, 10):
km = KMeans(n_clusters=k, random_state=42, n_init=10)
km.fit(X_scaled)
sse[k] = km.inertia_
optimal_k = min(sse, key=sse.get) # 简单取SSE最小的k
```
也可根据业务需求固定 K=5(高/中高/中/中低/低价值客户)。
### Step 4 — 分群画像
输出每个簇的核心统计量:
```
簇 0(高价值客户):平均资产 85万,平均交易频次 28次/月,性别分布男62%
簇 1(潜力客户):平均资产 32万,年轻化趋势明显
...
```
推荐标签体系(五类):
- 🌟 高价值客户(VIP)
- ⬆️ 潜力客户
- 🟢 稳定客户
- 🔄 活跃交易客户
- ⚠️ 沉睡/流失预警客户
### Step 5 — 可视化
生成以下图表(保存为 PNG):
1. **客户资产分布直方图** — 各层级资产分布对比
2. **雷达图** — 各分群特征对比
3. **热力图** — 分群特征均值矩阵
4. **散点图** — 以资产×交易频次为坐标的客户分布
```python
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg')
plt.rcParams['font.sans-serif'] = ['WenQuanYi Micro Hei', 'SimHei']
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 资产分布
axes[0].hist([g['balance'] for _, g in df.groupby('cluster')], bins=30, label=[f'C{i}' for i in range(k)])
axes[0].set_title('Customer Balance Distribution by Cluster')
# 热力图
import seaborn as sns
sns.heatmap(cluster_means.T, annot=True, fmt='.1f', ax=axes[1])
axes[1].set_title('Cluster Feature Heatmap')
plt.tight_layout()
plt.savefig(output_path, dpi=150)
```
### Step 6 — 输出结果
输出内容:
1. 分群结果表(含客户ID、所属簇、分群标签)→ `segmentation_results.csv`
2. 分群特征统计 → `cluster_summary.csv`
3. 可视化图表 → `segmentation_charts.png`
4. 分析摘要(Markdown格式)→ `segmentation_report.md`
详细聚类和参数文档见:
- RFM 模型说明:参考 `references/rfm-guide.md`
- 聚类参数说明:参考 `references/clustering-guide.md`
FILE:references/rfm-guide.md
# RFM 模型参考
RFM(Recency, Frequency, Monetary)是客户价值分析最经典的方法论。
## 三个维度
| 维度 | 含义 | 计算方式 | 方向 |
|------|------|----------|------|
| Recency (R) | 最近一次交易距今时间 | (分析日 - 最近交易日).days | 越小越好 |
| Frequency (F) | 交易频次 | 周期内交易次数之和 | 越大越好 |
| Monetary (M) | 交易金额 | 周期内交易金额之和 | 越大越好 |
## 分值计算(5分制)
对每个维度按分位数分成5档:
- R:最近交易在最近20%天内 → 5分,最远20% → 1分
- F:交易频次最高20% → 5分,最低20% → 1分
- M:交易金额最高20% → 5分,最低20% → 1分
## 综合得分
```
RFM_Score = R × 100 + F × 10 + M
```
| 综合得分 | 客户类型 | 建议策略 |
|----------|----------|----------|
| 555 | 重点保持 | VIP一对一维护 |
| 554-545 | 高价值客户 | 增值服务 |
| 535-425 | 潜力客户 | 定向营销 |
| 414-324 | 流失风险 | 挽留活动 |
| <224 | 低价值客户 | 降低成本 |
## 扩展维度(银行场景)
银行场景可在 RFM 基础上增加:
| 扩展维度 | 字段 | 说明 |
|----------|------|------|
| Tenure (T) | account_open_date | 客户持有时长 |
| Product Depth (P) | product_count | 持有产品数量 |
| Channel (C) | channel_touch | 渠道触达次数 |
| Risk (R2) | risk_score | 风险评级 |
## 银行数据注意事项
1. **资产余额**(balance)比 Monetary 更稳定:优先用余额替代 M
2. **客户分群应分层**:先用资产分层,再用行为细化
3. **产品交叉销售机会**:分群后分析各簇产品持有率,找交叉销售目标
4. **合规要求**:分群结果不可用于歧视性定价,需脱敏处理
FILE:references/clustering-guide.md
# 聚类分析参数参考
## 算法选择
| 算法 | 适用场景 | 优点 | 缺点 |
|------|----------|------|------|
| K-Means | 大数据集,簇近似球形 | 快速、可解释 | 对噪声/离群点敏感 |
| DBSCAN | 非凸簇,噪声检测 | 无需指定K,抗噪声 | 参数敏感,大数据慢 |
| 层次聚类 | 小数据,可解释层次 | 无需指定K,可画树状图 | O(n²)复杂度 |
| GMM | 簇大小/密度不同 | 软聚类(概率) | 需指定K,速度慢 |
**推荐银行客户分群用 K-Means**(大客户量,高效易解释)。
## K-Means 参数
```python
from sklearn.cluster import KMeans
km = KMeans(
n_clusters=5, # 聚类数(建议5-6)
init='k-means++', # 初始化方式(推荐,默认更优)
n_init=10, # 用不同初始化运行次数,取最优
max_iter=300, # 最大迭代次数
random_state=42 # 随机种子(保证可复现)
)
```
## 最优K的选择
### 肘部法则(Elbow Method)
```python
import matplotlib.pyplot as plt
sse = {}
for k in range(2, 11):
km = KMeans(n_clusters=k, random_state=42, n_init=10)
km.fit(X_scaled)
sse[k] = km.inertia_ # SSE(簇内误差平方和)
plt.plot(list(sse.keys()), list(sse.values()), 'bo-')
plt.xlabel('Number of Clusters (k)')
plt.ylabel('SSE')
plt.title('Elbow Method for Optimal k')
plt.savefig('elbow_plot.png')
```
找"肘部"拐点(曲线变缓处)。
### 轮廓系数(Silhouette Score)
```python
from sklearn.metrics import silhouette_score
for k in range(2, 11):
km = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = km.fit_predict(X_scaled)
score = silhouette_score(X_scaled, labels)
print(f"k={k}: silhouette={score:.3f}")
```
轮廓系数范围 [-1, 1],越接近1越好。通常选 >0.4 的最大K。
## 特征预处理
```python
from sklearn.preprocessing import StandardScaler, MinMaxScaler
# Z-score 标准化(推荐,对异常值相对稳健)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# MinMax 缩放(对有界特征如转化率有效)
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)
```
**重要:** K-Means 对量纲敏感,所有特征必须标准化。
## 分群结果评估
| 指标 | 计算方式 | 目标值 |
|------|----------|--------|
| 簇内 SSE | km.inertia_ | 越小越好 |
| 轮廓系数 | silhouette_score | >0.4 为可接受 |
| Davies-Bouldin | davies_bouldin_score | 越小越好(<1理想)|
| Calinski-Harabasz | calinski_harabasz_score | 越大越好 |
## 常见问题
**Q: 某些簇太大/太小怎么办?**
→ 可能是特征选择问题,尝试增加特征或对大类再细分(两层分群)。
**Q: 分群结果不稳定?**
→ 增加 `n_init` 至 20-50,或用层次聚类结果初始化 K-Means。
**Q: 离群点干扰?**
→ 聚类前先删除或标记离群点(DBSCAN 或 IQR 过滤)。
FILE:scripts/segment.py
#!/usr/bin/env python3
"""
客户分群分析脚本
用法: python segment.py <客户数据.csv> [输出目录]
"""
import sys
import os
import json
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score, davies_bouldin_score
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
plt.rcParams['font.sans-serif'] = ['WenQuanYi Micro Hei', 'SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# ── 标签体系 ──────────────────────────────────────────────
CLUSTER_LABELS = {
0: ("🌟 高价值客户", "VIP / 高净值客户,重点维护"),
1: ("⬆️ 潜力客户", "成长型,有转化潜力"),
2: ("🟢 稳定客户", "中低频,低风险"),
3: ("🔄 活跃交易客户", "交易频繁,佣金贡献高"),
4: ("⚠️ 流失预警", "活跃度下降,需激活"),
}
FALLBACK_LABELS = {
i: (f"客户群体 {i}", "") for i in range(100)
}
# ── 特征工程 ─────────────────────────────────────────────
FEATURE_COLS = [
'recency', # 越活跃越小
'frequency', # 越多越好
'monetary', # 越高越好
'balance', # 资产余额
'tenure', # 持有时长(月)
'product_depth',# 持有产品数
'age', # 年龄
]
def load_and_clean(path):
df = pd.read_csv(path)
df.columns = df.columns.str.strip()
raw_cols = list(df.columns)
# 自动识别列名映射
col_map = {}
for col in raw_cols:
cl = col.lower()
if 'id' in cl: col_map[col] = 'customer_id'
elif 'balance' in cl: col_map[col] = 'balance'
elif 'amount' in cl and 'txn' in cl: col_map[col] = 'monetary'
elif 'count' in cl and 'txn' in cl: col_map[col] = 'frequency'
elif 'date' in cl and 'last' in cl: col_map[col] = 'last_date'
elif 'date' in cl and 'open' in cl: col_map[col] = 'open_date'
elif 'product' in cl and 'count' in cl: col_map[col] = 'product_depth'
elif 'age' in cl: col_map[col] = 'age'
elif 'gender' in cl: col_map[col] = 'gender'
elif 'txn' in cl: col_map[col] = 'monetary'
df.rename(columns=col_map, inplace=True)
today = pd.Timestamp.today()
# RFM
if 'last_date' in df.columns:
df['last_date'] = pd.to_datetime(df['last_date'], errors='coerce')
df['recency'] = (today - df['last_date']).dt.days.clip(lower=0)
else:
df['recency'] = df.get('recency', 180)
if 'frequency' not in df.columns:
df['frequency'] = 1
df['frequency'] = pd.to_numeric(df['frequency'], errors='coerce').fillna(1)
if 'monetary' not in df.columns:
df['monetary'] = df.get('balance', 0)
df['monetary'] = pd.to_numeric(df['monetary'], errors='coerce').fillna(0)
if 'balance' not in df.columns:
df['balance'] = df['monetary']
df['balance'] = pd.to_numeric(df['balance'], errors='coerce').fillna(0)
if 'open_date' in df.columns:
df['open_date'] = pd.to_datetime(df['open_date'], errors='coerce')
df['tenure'] = ((today - df['open_date']).dt.days / 30).clip(lower=0)
else:
df['tenure'] = 12 # 默认
if 'product_depth' not in df.columns:
df['product_depth'] = 1
df['product_depth'] = pd.to_numeric(df['product_depth'], errors='coerce').fillna(1)
if 'age' not in df.columns:
df['age'] = 40
df['age'] = pd.to_numeric(df['age'], errors='coerce').fillna(40)
# 数值列缺失值填充
num_cols = ['recency', 'frequency', 'monetary', 'balance', 'tenure', 'product_depth', 'age']
for c in num_cols:
if c in df.columns:
df[c] = df[c].fillna(df[c].median())
return df
def build_features(df):
feats = df[FEATURE_COLS].copy()
scaler = StandardScaler()
X_scaled = scaler.fit_transform(feats)
return X_scaled, feats, scaler
def find_optimal_k(X_scaled, max_k=8):
scores = {}
for k in range(2, max_k + 1):
km = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = km.fit_predict(X_scaled)
scores[k] = {
'sse': km.inertia_,
'sil': silhouette_score(X_scaled, labels),
'db': davies_bouldin_score(X_scaled, labels),
}
best_k = max(scores, key=lambda k: scores[k]['sil'])
return best_k, scores
def cluster(df, X_scaled, n_clusters=5):
km = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
df['cluster'] = km.fit_predict(X_scaled)
return km, df
def label_clusters(df):
"""按 balance 中位数排序,给簇打标签"""
med = df.groupby('cluster')['balance'].median().sort_values(ascending=False)
rank = {c: i for i, c in enumerate(med.index)}
df['cluster_rank'] = df['cluster'].map(rank)
n = df['cluster'].nunique()
labels = list(CLUSTER_LABELS.values())[:n] + list(FALLBACK_LABELS.values())[n:]
label_map = {orig: labels[rank[orig]] for orig in rank}
df['segment_label'] = df['cluster'].map(lambda c: label_map.get(c, (f"群体{c}", ""))[0])
df['segment_desc'] = df['cluster'].map(lambda c: label_map.get(c, ("", ""))[1])
return df
def make_charts(df, feats, output_dir):
os.makedirs(output_dir, exist_ok=True)
path = os.path.join(output_dir, 'segmentation_charts.png')
n_clusters = df['cluster'].nunique()
fig, axes = plt.subplots(2, 2, figsize=(14, 11))
# 1. 资产分布
colors = plt.cm.Set2(np.linspace(0, 1, n_clusters))
for c in sorted(df['cluster'].unique()):
sub = df[df['cluster'] == c]['balance'].dropna()
axes[0,0].hist(sub, bins=25, alpha=0.6, label=f'C{c}', color=colors[c])
axes[0,0].set_xlabel('Balance (Asset)')
axes[0,0].set_ylabel('Count')
axes[0,0].set_title('Asset Distribution by Cluster')
axes[0,0].legend(fontsize=8)
axes[0,0].ticklabel_format(style='sci', axis='y', scilimits=(0,0))
# 2. 分群特征雷达图
cluster_means = feats.groupby(df['cluster']).mean()
cluster_means_norm = (cluster_means - cluster_means.min()) / (cluster_means.max() - cluster_means.min() + 1e-9)
cats = list(cluster_means.columns)
N = len(cats)
angles = [n / float(N) * 2 * np.pi for n in range(N)]
angles += angles[:1]
ax_radar = axes[0,1]
ax_radar = plt.subplot(2, 2, 2, polar=True)
for c in sorted(df['cluster'].unique()):
vals = list(cluster_means_norm.loc[c]) + [cluster_means_norm.loc[c][0]]
ax_radar.plot(angles, vals, 'o-', linewidth=2, label=f'C{c}', color=colors[c])
ax_radar.fill(angles, vals, alpha=0.1, color=colors[c])
ax_radar.set_xticks(angles[:-1])
ax_radar.set_xticklabels(cats, fontsize=8)
ax_radar.set_title('Cluster Feature Radar', pad=20)
ax_radar.legend(loc='upper right', bbox_to_anchor=(1.3, 1.1), fontsize=7)
# 3. 热力图
im = axes[1,0].imshow(cluster_means_norm.T, cmap='YlOrRd', aspect='auto')
axes[1,0].set_xticks(range(n_clusters))
axes[1,0].set_xticklabels([f'C{i}' for i in range(n_clusters)])
axes[1,0].set_yticks(range(len(cats)))
axes[1,0].set_yticklabels(cats, fontsize=8)
axes[1,0].set_title('Cluster Feature Heatmap (Normalized)')
plt.colorbar(im, ax=axes[1,0], shrink=0.8)
# 4. 散点图
for c in sorted(df['cluster'].unique()):
sub = df[df['cluster'] == c]
axes[1,1].scatter(sub['frequency'], sub['balance'],
alpha=0.5, s=10, label=f'C{c}', color=colors[c])
axes[1,1].set_xlabel('Frequency (txn count)')
axes[1,1].set_ylabel('Balance')
axes[1,1].set_title('Frequency vs Balance by Cluster')
axes[1,1].legend(fontsize=7)
axes[1,1].ticklabel_format(style='sci', axis='y', scilimits=(0,0))
plt.suptitle('Customer Segmentation Analysis', fontsize=14, fontweight='bold', y=1.01)
plt.tight_layout()
plt.savefig(path, dpi=150, bbox_inches='tight')
plt.close()
return path
def summarize(df, scores, best_k, output_dir):
summary = df.groupby(['cluster', 'segment_label']).agg({
'balance': ['mean', 'median', 'count'],
'frequency': 'mean',
'monetary': 'mean',
'recency': 'mean',
'tenure': 'mean',
'product_depth': 'mean',
'age': 'mean',
}).round(2)
summary.columns = ['_'.join(c) for c in summary.columns]
report = f"""# 客户分群分析报告
## 基本信息
- 分析时间: {pd.Timestamp.today().strftime('%Y-%m-%d')}
- 客户总数: {len(df)}
- 最优分群数: {best_k}(轮廓系数: {scores[best_k]['sil']:.3f})
## 各分群概况
| 簇 | 标签 | 人数 | 平均资产 | 中位资产 | 平均交易频次 | 平均最近交易(天) |
|----|------|------|----------|----------|--------------|----------------|
"""
for c in sorted(df['cluster'].unique()):
row = df[df['cluster'] == c]
label = row['segment_label'].iloc[0]
report += f"| C{c} | {label} | {len(row)} | {row['balance'].mean():.0f} | {row['balance'].median():.0f} | {row['frequency'].mean():.1f} | {row['recency'].mean():.0f} |\n"
report += f"""
## 分群特征雷达图 / 热力图
- 详细聚类参数说明见: `references/clustering-guide.md`
- RFM模型说明见: `references/rfm-guide.md`
## 分群策略建议
"""
for c in sorted(df['cluster'].unique()):
label = df[df['cluster']==c]['segment_label'].iloc[0]
desc = df[df['cluster']==c]['segment_desc'].iloc[0]
avg_bal = df[df['cluster']==c]['balance'].mean()
report += f"**C{c} {label}**:{desc}(平均资产 {avg_bal:.0f})\n"
rpt_path = os.path.join(output_dir, 'segmentation_report.md')
with open(rpt_path, 'w', encoding='utf-8') as f:
f.write(report)
return rpt_path, summary
def main():
input_path = sys.argv[1] if len(sys.argv) > 1 else 'customers.csv'
output_dir = sys.argv[2] if len(sys.argv) > 2 else 'output'
os.makedirs(output_dir, exist_ok=True)
print(f"[1/5] Loading data: {input_path}")
df = load_and_clean(input_path)
print(f" Loaded {len(df)} rows, columns: {list(df.columns)}")
print(f"[2/5] Building features...")
X_scaled, feats, scaler = build_features(df)
print(f"[3/5] Finding optimal K...")
best_k, scores = find_optimal_k(X_scaled)
print(f" Optimal K={best_k} sil={scores[best_k]['sil']:.3f} db={scores[best_k]['db']:.3f}")
print(f"[4/5] Clustering into {best_k} groups...")
km, df = cluster(df, X_scaled, n_clusters=best_k)
df = label_clusters(df)
print(f"[5/5] Generating charts and report...")
chart_path = make_charts(df, feats, output_dir)
rpt_path, summary = summarize(df, scores, best_k, output_dir)
# 保存结果
out_csv = os.path.join(output_dir, 'segmentation_results.csv')
df.to_csv(out_csv, index=False, encoding='utf-8-sig')
sum_csv = os.path.join(output_dir, 'cluster_summary.csv')
summary.to_csv(sum_csv, encoding='utf-8-sig')
print(f"""
✅ 分群完成!
输出文件:
📊 分群结果 → {out_csv}
📋 分群汇总 → {sum_csv}
📈 可视化图表 → {chart_path}
📝 分析报告 → {rpt_path}
各分群人数:
{df['segment_label'].value_counts().to_string()}
""")
return df, scores, best_k
if __name__ == '__main__':
main()
企业舆情信用风险扫描。当用户提供企业名称列表时,自动抓取各企业最新公开舆情(新闻、公告、监管信息),结合风险评分模型输出结构化信用风险报告。触发场景:(1)用户说"扫描风险"、"舆情分析"、"信用风险评级";(2)用户提供了一份企业名单需要批量评估;(3)用户上传含企业名称的CSV/TXT文件;(4)定期复检提醒...
---
name: risk-sentiment-scanner
description: 企业舆情信用风险扫描。当用户提供企业名称列表时,自动抓取各企业最新公开舆情(新闻、公告、监管信息),结合风险评分模型输出结构化信用风险报告。触发场景:(1)用户说"扫描风险"、"舆情分析"、"信用风险评级";(2)用户提供了一份企业名单需要批量评估;(3)用户上传含企业名称的CSV/TXT文件;(4)定期复检提醒时自动触发。数据来源:公开网络搜索(新浪财经、证券时报、21财经、腾讯新闻等)。
---
# Risk Sentiment Scanner
对目标企业进行公开舆情信息抓取 + LLM 驱动信用风险评级,输出结构化 JSON 报告。
## 工作流程
### Step 1 — 接收企业列表
支持三种输入方式:
**方式 A(对话直接提供):**
```
蚂蚁集团
贵州茅台
碧桂园
```
**方式 B(上传文件):**
读取用户上传的 `.txt` 或 `.csv` 文件,每行一个企业名称,自动去重。
**方式 C(Cron 定时触发):**
从 `memory/risk-watchlist.md` 读取企业名单(可由用户提前维护)。
### Step 2 — 舆情信息抓取
对每个企业执行以下操作:
```bash
# 搜索近6个月相关舆情(多关键词组合)
企业名称 + "风险"
企业名称 + "违规 / 处罚 / 调查"
企业名称 + "债务 / 重组 / 违约"
企业名称 + "监管 / 合规"
企业名称 + "经营 / 财报 / 转型"
```
使用 `batch_web_search` 工具,每个企业最多抓取 **8 条搜索结果**。
**重点来源优先级:**
1. 证券时报 (stcn.com)
2. 21财经 (21jingji.com)
3. 新浪财经 (finance.sina.com.cn)
4. 腾讯新闻 (news.qq.com)
5. 东方财富网 (eastmoney.com)
### Step 3 — 正文提取
对每家企业,取搜索结果中 **最新且相关的 3 条 URL**,使用 `extract_content_from_websites` 提取正文内容。
提取策略:
- 每篇正文最多读 2000 字(截断后半部分)
- 保留:标题、时间、风险相关段落
- 丢弃:广告、评论区、导航内容
### Step 4 — LLM 风险评级
将整理后的舆情文本发给 LLM,按以下结构输出(每家企业独立评分):
```json
{
"company": "企业名称",
"risk_level": "R1-低风险 | R2-中低风险 | R3-中高风险 | R4-高风险",
"risk_score": 0-100,
"risk_trend": "上升 | 稳定 | 下降",
"key_positive_factors": ["利好因素1", "利好因素2"],
"key_negative_factors": ["风险因素1", "风险因素2"],
"red_flags": ["需重点关注信号1", "信号2"],
"news_summary": "近6月舆情摘要(100字内)",
"recommended_action": "业务合作建议",
"review_frequency": "月度 | 季度 | 半年度",
"data_sources": ["来源1 URL", "来源2 URL"],
"last_updated": "YYYY-MM-DD"
}
```
### Step 5 — 风险评分模型(参考)
| 维度 | 权重 | 评分逻辑 |
|------|------|----------|
| 监管/处罚 | 30% | 有处罚记录+R25,有重大违法+R40 |
| 财务压力 | 25% | 债务重组进行中+R30,违约记录+R50 |
| 公司治理 | 20% | 高管被查/反腐+R25,持续动荡+R40 |
| 舆情情绪 | 15% | 主流负面报道占比>60%+R20 |
| 经营状况 | 10% | 营收/利润持续下滑+R15 |
**最终等级:**
- R1(0–25分):低风险,舆情正面
- R2(26–50分):中低风险,有可管理风险敞口
- R3(51–75分):中高风险,偿债能力承压
- R4(76–100分):高风险,业务可持续性存疑
### Step 6 — 输出与推送
**输出内容(JSON 格式):**
```json
{
"report_date": "2026-03-18",
"total_companies": 3,
"summary": {
"R1_count": 0,
"R2_count": 1,
"R3_count": 1,
"R4_count": 1,
"high_risk_companies": ["碧桂园", "贵州茅台"]
},
"companies": [ /* 每家企业的完整评级 JSON */ ]
}
```
**输出位置(按用户偏好):**
- 默认:直接输出到当前对话
- 可选:保存至 `memory/risk-reports/YYYY-MM-DD.md`
- 可选:推送至飞书文档(通过 Feishu Skill 接口)
### Step 7 — 异常告警
若扫描结果中出现 **R4 级企业**,自动在报告顶部输出红色告警:
```
🚨 【高风险预警】碧桂园 — R4(78分)
触发原因:177亿美元债务重组仅支付2%本金,净资产极度薄弱
建议动作:建议回避新增敞口,存量业务降级处理
```
---
## 维护企业监控名单
用户可通过以下方式管理要跟踪的企业列表:
1. **对话更新**:直接告诉我"把 XXX 公司加入监控列表"
2. **文件维护**:编辑 `memory/risk-watchlist.md`,每行一个企业名称+备注
格式示例:
```
# 风险监控名单
蚂蚁集团 # 科技/金融,头部
贵州茅台 # 白酒,R3观察中
碧桂园 # 房地产,R4,存量处置
宁德时代 # 新能源,季度复检
```
---
## 注意事项
- 本 Skill 仅基于**公开信息**,不构成正式信用评级
- 风险评分为 LLM 判断结果,供参考,不当作为唯一决策依据
- 涉及投资、信贷等重大决策前,建议咨询专业金融顾问
FILE:scripts/scan.js
#!/usr/bin/env node
/**
* risk-sentiment-scanner 核心脚本
* 输入: 企业名称列表(字符串,每行一个)
* 输出: 结构化 JSON 风险报告
*
* 用法:
* node scan.js "蚂蚁集团\n贵州茅台\n碧桂园"
* node scan.js --file companies.txt
*/
import { readFileSync, writeFileSync, mkdirSync, existsSync } from 'fs';
import { join, dirname } from 'path';
import { fileURLToPath } from 'url';
const __dirname = dirname(fileURLToPath(import.meta.url));
const OUTPUT_DIR = join(__dirname, '../reports');
mkdirSync(OUTPUT_DIR, { recursive: true });
// ── 风险评分模型 ───────────────────────────────────────
const WEIGHTS = {
regulatory: 0.30, // 监管/处罚
financial: 0.25, // 财务压力
governance: 0.20, // 公司治理
sentiment: 0.15, // 舆情情绪
operation: 0.10, // 经营状况
};
const RISK_KEYWORDS = {
regulatory: ['处罚', '罚款', '违规', '违法', '被查', '监管', '整改', '关停', '清算', '行政处罚'],
financial: ['债务', '重组', '违约', '逾期', '偿债', '资金链', '清盘', '破产', '支付危机', '资不抵债'],
governance: ['被查', '落马', '反腐', '双规', '留置', '违纪', '涉嫌', '高管被', '内控', '治理风险'],
negative: ['暴跌', '蒸发', '腰斩', '亏损', '暴雷', '跑路', '失信', '限高', '被执行'],
positive: ['增长', '盈利', '扩张', '突破', '创新', '获奖', '合作', '稳健', '改善', '转型成功'],
};
function scoreCompany(text) {
const t = text.toLowerCase();
const scores = { regulatory: 0, financial: 0, governance: 0, sentiment: 0, operation: 0 };
// 监管/处罚
RISK_KEYWORDS.regulatory.forEach(kw => { if (t.includes(kw)) scores.regulatory += 25; });
if (t.includes('相互宝')) scores.regulatory += 10;
if (t.includes('常态化监管')) scores.regulatory -= 15; // 缓解信号
// 财务压力
RISK_KEYWORDS.financial.forEach(kw => { if (t.includes(kw)) scores.financial += 30; });
if (t.includes('重组生效')) scores.financial += 10; // 重组生效≠风险解除
if (t.includes('2%') && t.includes('本金')) scores.financial += 20; // 低回收率
if (t.includes('净资产约100亿')) scores.financial += 25;
// 公司治理
RISK_KEYWORDS.governance.forEach(kw => { if (t.includes(kw)) scores.governance += 28; });
if (t.includes('三任')) scores.governance += 15; // 持续动荡
// 舆情情绪
RISK_KEYWORDS.negative.forEach(kw => { if (t.includes(kw)) scores.sentiment += 18; });
RISK_KEYWORDS.positive.forEach(kw => { if (t.includes(kw)) scores.sentiment -= 10; });
// 经营状况
if (t.includes('营收增速新低') || t.includes('失速')) scores.operation += 20;
if (t.includes('个位数') && t.includes('目标')) scores.operation += 15;
if (t.includes('价格倒挂')) scores.operation += 15;
if (t.includes('市值蒸发逾万亿')) scores.operation += 20;
if (t.includes('海外营收') && !t.includes('下滑')) scores.operation -= 10; // 国际化是正面
// 归一化到0-100
const final = {};
for (const [k, v] of Object.entries(scores)) {
final[k] = Math.min(100, Math.max(0, WEIGHTS[k] * v));
}
const total = Object.values(final).reduce((a, b) => a + b, 0);
return total;
}
function level(score) {
if (score <= 25) return 'R1-低风险';
if (score <= 50) return 'R2-中低风险';
if (score <= 75) return 'R3-中高风险';
return 'R4-高风险';
}
function action(score) {
if (score <= 25) return '可合作,建议季度级复检';
if (score <= 50) return '可合作,重点关注合规持续性,季度复检';
if (score <= 75) return '谨慎合作,需强担保,月度复检';
return '不建议新增敞口,存量业务降级处理,回避';
}
function frequency(score) {
if (score >= 75) return '月度';
if (score >= 50) return '季度';
return '半年度';
}
function extractFactors(text, keywords) {
const found = [];
keywords.forEach(kw => { if (text.includes(kw)) found.push(kw); });
return [...new Set(found)];
}
// ── 简化舆情摘要 ──────────────────────────────────────
function summarize(text, maxLen = 120) {
// 去除非常用字符,截断
const clean = text.replace(/\s+/g, ' ').trim();
return clean.length > maxLen ? clean.slice(0, maxLen) + '…' : clean;
}
// ── 主分析函数 ───────────────────────────────────────
function analyzeCompany(name, newsTexts) {
const combined = newsTexts.join('。');
const score = Math.round(scoreCompany(combined));
const lev = level(score);
const act = action(score);
const freq = frequency(score);
const neg = extractFactors(combined, [...RISK_KEYWORDS.regulatory, ...RISK_KEYWORDS.financial, ...RISK_KEYWORDS.governance, ...RISK_KEYWORDS.negative]);
const pos = extractFactors(combined, RISK_KEYWORDS.positive);
// 趋势判断
const trend = neg.length > pos.length + 3 ? '上升' : neg.length < pos.length ? '下降' : '稳定';
// 红标信号(取最强的3个)
const allRisks = [...RISK_KEYWORDS.regulatory, ...RISK_KEYWORDS.financial, ...RISK_KEYWORDS.governance];
const redFlags = allRisks.filter(kw => combined.includes(kw)).slice(0, 3);
return {
company: name,
risk_level: lev,
risk_score: score,
risk_trend: trend,
key_positive_factors: pos.slice(0, 4),
key_negative_factors: neg.slice(0, 4),
red_flags: redFlags,
news_summary: summarize(combined),
recommended_action: act,
review_frequency: freq,
last_updated: new Date().toISOString().slice(0, 10),
};
}
// ── 入口 ────────────────────────────────────────────
const args = process.argv.slice(2);
async function main() {
let companies = [];
if (args[0] === '--file' && args[1]) {
const content = readFileSync(args[1], 'utf8');
companies = content.split('\n').map(l => l.trim()).filter(l => l && !l.startsWith('#'));
} else if (args.length > 0) {
companies = args.join(' ').split('\n').map(l => l.trim()).filter(l => l && !l.startsWith('#'));
} else {
// 读监控名单
const watchlistPath = join(__dirname, '../../memory/risk-watchlist.md');
if (existsSync(watchlistPath)) {
const content = readFileSync(watchlistPath, 'utf8');
companies = content.split('\n').map(l => l.trim()).filter(l => l && !l.startsWith('#'));
} else {
console.error('❌ 未提供企业列表,请通过参数或 memory/risk-watchlist.md 提供');
process.exit(1);
}
}
console.log(`\n🔍 开始扫描 companies.length 家企业...\n`);
const results = companies.map(name => {
console.log(` ⏳ name`);
// 注意:实际运行时,舆情数据需通过 batch_web_search + extract_content_from_websites 获取
// 此处返回占位结构,真实调用由 agent 在会话中完成
return analyzeCompany(name, []);
});
// 统计
const r1 = results.filter(r => r.risk_level.startsWith('R1')).length;
const r2 = results.filter(r => r.risk_level.startsWith('R2')).length;
const r3 = results.filter(r => r.risk_level.startsWith('R3')).length;
const r4 = results.filter(r => r.risk_level.startsWith('R4')).length;
const report = {
report_date: new Date().toISOString().slice(0, 10),
total_companies: companies.length,
summary: { R1_count: r1, R2_count: r2, R3_count: r3, R4_count: r4,
high_risk_companies: results.filter(r => r.risk_level.startsWith('R4')).map(r => r.company) },
companies: results,
};
// 保存报告
const date = new Date().toISOString().slice(0, 10);
const outPath = join(OUTPUT_DIR, `risk-report-date.json`);
writeFileSync(outPath, JSON.stringify(report, null, 2), 'utf8');
console.log(`\n✅ 报告已保存 → outPath`);
console.log(JSON.stringify(report.summary, null, 2));
return report;
}
main().catch(console.error);