项目概述
因GITLAB的webhook 消息格式与企业微信不兼容,无法直接转发到企业微信,GitLab to WeChat Work Webhook Forwarder 是一个轻量级服务,用于将 GitLab 的 Webhook 事件转发至企业微信,实现自动化通知。支持多种事件类型(如流水线、合并请求、代码推送等),并提供消息格式化、去重机制和 Docker 部署支持。
{"errcode":40008,"errmsg":"invalid message type, hint: [1762826916378852572159192], from 192.168.xx.xx, more info at https://open.work.weixin.qq.com/devtool/query?e=40008"}
核心功能
事件支持
流水线()、构建任务(
pipeline)、合并请求(
build)代码推送(
merge_request)、标签推送(
push)、议题(
tag_push)评论(
issue)、Wiki 事件(
note)、部署(
wiki_page)
deployment
消息处理
智能格式化:自动截断长文本、结构化展示关键信息(仓库、分支、触发人等)去重机制:基于事件内容和类型哈希,默认缓存 60 秒避免重复通知错误处理:请求超时保护(10秒)、异常捕获及日志记录
目录结构

部署指南
环境要求
Python 3.11+ 或 Docker
配置步骤
设置企业微信 Webhook 地址为环境变量:
export WECHAT_WEBHOOK_URL="https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY"
安装依赖:
pip install -r requirements.txt
运行方式
本地启动:
python app.py # 开发模式
gunicorn -w 4 -b 0.0.0.0:5000 app:app # 生产环境
Docker 部署:
docker build -t gitlab-wechat-webhook .
docker run -d -p 5000:5000 -e WECHAT_WEBHOOK_URL="YOUR_URL" gitlab-wechat-webhook
GitLab 配置
进入项目设置 → Webhooks填写以下信息:
URL: 触发事件:勾选需要转发的事件类型
http://your-server:5000/webhook
消息示例
流水线成功
✅ **Gitlab 流水线运行成功**
【代码库】group/project
【分支】main
【提交信息】feat: add new feature
【触发人】username
【详情】https://gitlab.com/group/project/-/pipelines/123
合并请求创建
🆕 **Gitlab 合并请求创建**
【标题】Add new feature
【分支】feature-branch → main
【作者】username
【链接】https://gitlab.com/group/project/-/merge_requests/456
高级配置
缓存时间:修改 中的
app.py日志级别:调整
CACHE_TTL_SECONDS
logging.basicConfig(level=logging.INFO)
故障排查
企业微信未收到消息:检查 是否有效,查看容器日志(
WECHAT_WEBHOOK_URL)GitLab 未触发:确认 Webhook URL 可访问,检查防火墙设置
docker logs gitlab-wechat-webhook
扩展开发
添加新事件:在 中新增判断逻辑,编写对应的
webhook() 函数自定义消息:修改现有
format_xxx() 函数的内容结构
format_xxx()
源码:
app.py
import os
import logging
from flask import Flask, request, jsonify
from datetime import datetime, timedelta
# ====== 配置 ======
WECHAT_WEBHOOK_URL = os.environ.get("WECHAT_WEBHOOK_URL")
if not WECHAT_WEBHOOK_URL:
raise EnvironmentError("WECHAT_WEBHOOK_URL environment variable is required")
NOTIFICATION_CACHE = {}
CACHE_TTL_SECONDS = 60
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
def truncate(text, length=50):
if len(text) > length:
return text[:length] + "..."
return text
def get_trigger_user(data):
user = data.get("user", {})
return user.get("username") or user.get("name") or "unknown"
def is_duplicate_notification(key, ttl=CACHE_TTL_SECONDS):
"""通用去重:key 可以是任意 hashable 对象"""
now = datetime.now()
if key in NOTIFICATION_CACHE:
if now - NOTIFICATION_CACHE[key] < timedelta(seconds=ttl):
return True
NOTIFICATION_CACHE[key] = now
return False
def send_to_wecom(msg):
if not msg:
return
try:
import requests
payload = {"msgtype": "text", "text": {"content": msg}}
resp = requests.post(WECHAT_WEBHOOK_URL, json=payload, timeout=10)
if resp.status_code != 200:
logger.error(f"WeCom send failed: {resp.text}")
else:
logger.info("Sent to WeCom successfully")
except Exception as e:
logger.exception(f"Failed to send to WeCom: {e}")
# ==================== 格式化函数 ====================
def format_pipeline(data):
pipeline = data["object_attributes"]
source = pipeline.get("source", "unknown")
if source == "parent_pipeline":
return handle_child_pipeline(data)
else:
return handle_parent_pipeline(data)
def format_job(data):
build = data.get("build", {}) or data # 兼容不同结构
status = build.get("status", "unknown").lower()
stage = build.get("stage", "unknown")
job_name = build.get("name", "unknown")
ref = build.get("ref", "unknown")
project = data.get("project", {})
path_with_namespace = project.get("path_with_namespace", "Unknown")
user = get_trigger_user(data)
build_url = build.get("url", "#")
commit = data.get("commit", {})
commit_title = (commit.get("title") or commit.get("message") or "").strip()
emoji = {"执行成功": "✅", "执行失败": "❌", "被取消": "⏹️", "正在运行中": "▶️", "正在等待中": "⏳"}.get(status, "ℹ️")
lines = []
lines.append(f"{emoji} **Gitlab流水线任务{status.upper()}**")
lines.append(f"【任务】{job_name} ({stage})")
lines.append(f"【代码库】{path_with_namespace}")
lines.append(f"【分支】{ref}")
if commit_title:
lines.append(f"【提交信息】{truncate(commit_title)}")
lines.append(f"【触发人】{user}")
lines.append(f"【详情】{build_url}")
return "
".join(lines)
def format_merge_request(data):
mr = data.get("object_attributes", {})
action = mr.get("action", "open")
title = mr.get("title", "No title")
source_branch = mr.get("source_branch")
target_branch = mr.get("target_branch")
url = mr.get("url", "#")
author = mr.get("author_username") or get_trigger_user(data)
project = data.get("project", {})
path_with_namespace = project.get("path_with_namespace", "Unknown")
emoji = {"创建成功": "🆕", "更新通知": "🔄", "合并成功通知": "🎉", "被关闭": "🔒"}.get(action, "📝")
lines = []
lines.append(f"{emoji} **Gitlab流水线合并请求{action.upper()}**")
lines.append(f"【标题】{truncate(title)}")
lines.append(f"【代码库】{path_with_namespace}")
lines.append(f"【分支】{source_branch} → {target_branch}")
lines.append(f"【作者】{author}")
lines.append(f"【链接】{url}")
return "
".join(lines)
def format_push(data):
commits = data.get("commits", [])
total_commits = len(commits)
ref = data.get("ref", "")
branch = ref.replace("refs/heads/", "", 1)
project = data.get("project", {})
path_with_namespace = project.get("path_with_namespace", "Unknown")
user = get_trigger_user(data)
compare_url = data.get("compare_url", "#")
lines = []
lines.append("📦 **{user}推送了一个新的提交**")
lines.append(f"【代码库】{path_with_namespace}")
lines.append(f"【分支】{branch}")
lines.append(f"【提交数】{total_commits} commits")
lines.append(f"【推送人】{user}")
lines.append(f"【对比】{compare_url}")
if commits:
latest = commits[-1]
lines.append(f"【最新提交】{truncate(latest.get('message', ''))}")
return "
".join(lines)
def format_tag(data):
ref = data.get("ref", "")
tag = ref.replace("refs/tags/", "", 1)
project = data.get("project", {})
path_with_namespace = project.get("path_with_namespace", "Unknown")
user = get_trigger_user(data)
commits = data.get("commits", [])
message = ""
if commits:
message = commits[0].get("message", "")
lines = []
lines.append("🏷️ **{user}创建了一个新标签**")
lines.append(f"【代码库】{path_with_namespace}")
lines.append(f"【标签】{tag}")
lines.append(f"【创建人】{user}")
if message:
lines.append(f"【注释】{truncate(message)}")
return "
".join(lines)
def format_issue(data):
issue = data.get("object_attributes", {})
action = issue.get("action", "open")
title = issue.get("title", "No title")
url = issue.get("url", "#")
author = issue.get("author_username") or get_trigger_user(data)
project = data.get("project", {})
path_with_namespace = project.get("path_with_namespace", "Unknown")
emoji = {"open": "❗", "close": "✅", "reopen": "🔄"}.get(action, "📝")
lines = []
lines.append(f"{emoji} **Issue {action.upper()}**")
lines.append(f"【标题】{truncate(title)}")
lines.append(f"【代码库】{path_with_namespace}")
lines.append(f"【作者】{author}")
lines.append(f"【链接】{url}")
return "
".join(lines)
def format_note(data):
note = data.get("object_attributes", {})
noteable_type = note.get("noteable_type", "Unknown")
note_body = note.get("note", "")
url = note.get("url", "#")
author = get_trigger_user(data)
project = data.get("project", {})
path_with_namespace = project.get("path_with_namespace", "Unknown")
lines = []
lines.append("💬 **{author}添加了一个评论**")
lines.append(f"【类型】{noteable_type}")
lines.append(f"【代码库】{path_with_namespace}")
lines.append(f"【内容】{truncate(note_body)}")
lines.append(f"【链接】{url}")
return "
".join(lines)
def format_wiki_page(data):
wiki = data.get("object_attributes", {})
action = wiki.get("action", "create")
title = wiki.get("title", "Unknown")
url = wiki.get("url", "#")
author = get_trigger_user(data)
project = data.get("project", {})
path_with_namespace = project.get("path_with_namespace", "Unknown")
lines = []
lines.append("📘 **Wiki 页面更新**")
lines.append(f"【操作】{action}")
lines.append(f"【页面】{title}")
lines.append(f"【代码库】{path_with_namespace}")
lines.append(f"【编辑人】{author}")
lines.append(f"【链接】{url}")
return "
".join(lines)
def format_deployment(data):
deployment = data.get("deployment", {})
status = deployment.get("status", "unknown")
environment = deployment.get("environment", "unknown")
deployable = deployment.get("deployable", {})
job_name = deployable.get("name", "unknown")
ref = deployment.get("ref", "unknown")
user = get_trigger_user(data)
project = data.get("project", {})
path_with_namespace = project.get("path_with_namespace", "Unknown")
url = deployment.get("url", "#")
emoji = {"success": "🚀", "failed": "💥", "canceled": "⏹️"}.get(status, "⚙️")
lines = []
lines.append(f"{emoji} **部署 {status.upper()}**")
lines.append(f"【环境】{environment}")
lines.append(f"【任务】{job_name}")
lines.append(f"【代码库】{path_with_namespace}")
lines.append(f"【分支/Tag】{ref}")
lines.append(f"【操作人】{user}")
lines.append(f"【详情】{url}")
return "
".join(lines)
def format_emoji_event(data):
import re
user = get_trigger_user(data)
project = data.get("project", {})
path_with_namespace = project.get("path_with_namespace", "Unknown")
event_type = data.get("event_type", "award")
obj_attr = data.get("object_attributes", {})
emoji_name = obj_attr.get("name", "unknown")
awardable_type = obj_attr.get("awardable_type", "Unknown")
awarded_on_url = obj_attr.get("awarded_on_url", "#")
# 表情映射
EMOJI_MAP = {
"thumbsup": "👍",
"thumbsdown": "👎",
"heart": "❤️",
"laugh": "😄",
"confused": "😕",
"rocket": "🚀",
"eyes": "👀",
"tada": "🎉",
}
emoji_icon = EMOJI_MAP.get(emoji_name, f":{emoji_name}:")
# 动作中文
action = "添加了" if event_type == "award" else "取消了"
# 目标类型本地化
type_map = {
"MergeRequest": "合并请求",
"Issue": "议题",
"Commit": "提交"
}
target_type_cn = type_map.get(awardable_type, awardable_type)
# 从 URL 提取 ID(如 /merge_requests/3 → !3;/issues/42 → #42)
target_id = "未知"
if awardable_type == "MergeRequest":
match = re.search(r"/merge_requests/(d+)", awarded_on_url)
if match:
target_id = f"!{match.group(1)}"
elif awardable_type == "Issue":
match = re.search(r"/issues/(d+)", awarded_on_url)
if match:
target_id = f"#{match.group(1)}"
elif awardable_type == "Commit":
# 提交通常用短 hash,但 URL 中可能没有明显 ID,可省略或截取
# 这里简化为“提交”
target_id = ""
# 构建标题
if awardable_type == "Commit":
title = f"{user} {action} 一个提交的 {emoji_icon}"
else:
title = f"{user} {action}一个表情: {emoji_icon}"
lines = []
lines.append(f"**{title}**")
lines.append(f"【代码库】{path_with_namespace}")
lines.append(f"【表情名称】{emoji_name}")
lines.append(f"【详情链接】{awarded_on_url}")
return "
".join(lines)
# ==================== 处理函数(保留原逻辑) ====================
def handle_parent_pipeline(data):
pipeline = data["object_attributes"]
status = pipeline.get("status", "unknown").lower()
before_sha = pipeline.get("before_sha", "")
finished_at = pipeline.get("finished_at")
ref = pipeline.get("ref", "unknown")
path_with_namespace = data["project"].get("path_with_namespace", "Unknown")
trigger_user = get_trigger_user(data)
build_url = pipeline.get("url", "#")
commit = data.get("commit", {})
commit_title = (commit.get("title") or commit.get("message") or "").strip()
is_new_pipeline = (before_sha == "0000000000000000000000000000000000000000")
is_pending = status in ("pending") and finished_at is None
if is_new_pipeline and is_pending:
lines = []
lines.append("🔔 **Gitlab流水线创建成功**")
lines.append(f"【代码库】{path_with_namespace}")
lines.append(f"【分支/PR/Tag】{ref}")
if commit_title:
lines.append(f"【提交信息】{truncate(commit_title)}")
lines.append(f"【触发人】{trigger_user}")
lines.append(f"【任务详情】{build_url}")
return "
".join(lines)
if status == "canceled" and finished_at is not None:
lines = []
lines.append("⏹️ **Gitlab流水线已取消**")
lines.append(f"【代码库】{path_with_namespace}")
lines.append(f"【分支/PR/Tag】{ref}")
if commit_title:
lines.append(f"【提交信息】{truncate(commit_title)}")
lines.append(f"【触发人】{trigger_user}")
lines.append(f"【任务详情】{build_url}")
return "
".join(lines)
if status == "failed" and finished_at is not None:
builds = data.get("builds", [])
for job in builds:
if (
job.get("stage") == ".pre"
and job.get("name") == "pre"
and job.get("status") == "failed"
):
lines = []
lines.append("❌ **Gitlab流水线执行失败**")
lines.append(f"【代码库】{path_with_namespace}")
lines.append(f"【分支/PR/Tag】{ref}")
if commit_title:
lines.append(f"【提交信息】{truncate(commit_title)}")
lines.append(f"【触发人】{trigger_user}")
lines.append("【失败阶段】pre job 失败")
lines.append(f"【任务详情】{build_url}")
return "
".join(lines)
if status == "success" and finished_at is not None:
lines = []
lines.append("✅ **Gitlab 流水线运行成功**")
lines.append(f"【代码库】{path_with_namespace}")
lines.append(f"【分支/PR/Tag】{ref}")
if commit_title:
lines.append(f"【提交信息】{truncate(commit_title)}")
lines.append(f"【触发人】{trigger_user}")
lines.append(f"【任务详情】{build_url}")
return "
".join(lines)
return None
def handle_child_pipeline(data):
pipeline = data["object_attributes"]
status = pipeline.get("status", "unknown").lower()
if not (status in ("failed")):
return None
ref = pipeline.get("ref", "unknown")
path_with_namespace = data["project"].get("path_with_namespace", "Unknown")
trigger_user = get_trigger_user(data)
build_url = pipeline.get("url", "#")
commit = data.get("commit", {})
commit_title = (commit.get("title") or commit.get("message") or "").strip()
lines = []
if status == "failed":
builds = data.get("builds", [])
failed_stage = None
for job in builds:
if job.get("status") == "failed":
failed_stage = job.get("stage", "unknown")
break
lines.append("❌ **流水线失败**")
if failed_stage:
lines.append(f"【失败阶段】{failed_stage} stage 失败")
else:
lines.append("【失败阶段】未知阶段失败")
lines.append(f"【代码库】{path_with_namespace}")
lines.append(f"【分支/PR/Tag】{ref}")
if commit_title:
lines.append(f"【提交信息】{truncate(commit_title)}")
lines.append(f"【触发人】{trigger_user}")
lines.append(f"【任务详情】{build_url}")
return "
".join(lines)
# ==================== Webhook 路由 ====================
@app.route('/webhook', methods=['POST'])
def webhook():
try:
data = request.get_json()
if not data:
return jsonify({"error": "No JSON data"}), 400
object_kind = data.get("object_kind")
logger.info(f"Received event: {object_kind}")
content = None
if object_kind == "pipeline":
content = format_pipeline(data)
elif object_kind == "build":
content = format_job(data)
elif object_kind == "merge_request":
content = format_merge_request(data)
elif object_kind == "push":
content = format_push(data)
elif object_kind == "tag_push":
content = format_tag(data)
elif object_kind == "issue":
content = format_issue(data)
elif object_kind == "note":
content = format_note(data)
elif object_kind == "wiki_page":
content = format_wiki_page(data)
elif object_kind == "deployment":
content = format_deployment(data)
elif object_kind == "emoji":
content = format_emoji_event(data)
else:
logger.info(f"Ignored unsupported event: {object_kind}")
return jsonify({"status": "ignored"}), 200
if content:
# 可选:为不同事件设置不同去重 key
cache_key = (object_kind, str(hash(content))[:16]) # 简单去重
if not is_duplicate_notification(cache_key):
logger.info(f"Sending to WeCom:
{content}")
send_to_wecom(content)
else:
logger.info("Duplicate notification, skipped.")
else:
logger.info("No content to notify.")
return jsonify({"status": "processed"}), 200
except Exception as e:
logger.exception("Unexpected error in webhook")
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
DOCKERFILE
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
COPY app.py .
ENV FLASK_APP=app.py
ENV FLASK_RUN_HOST=0.0.0.0
ENV FLASK_ENV=production
EXPOSE 5000
CMD ["flask", "run"]
requirements.txt
Flask==3.0.3
requests==2.32.3
gunicorn==22.0.0


