GitLab Webhook接入企业微信(解决invalid message type)

内容分享3天前发布
0 0 0

项目概述

因GITLAB的webhook 消息格式与企业微信不兼容,无法直接转发到企业微信,GitLab to WeChat Work Webhook Forwarder 是一个轻量级服务,用于将 GitLab 的 Webhook 事件转发至企业微信,实现自动化通知。支持多种事件类型(如流水线、合并请求、代码推送等),并提供消息格式化、去重机制和 Docker 部署支持。



{"errcode":40008,"errmsg":"invalid message type, hint: [1762826916378852572159192], from 192.168.xx.xx, more info at https://open.work.weixin.qq.com/devtool/query?e=40008"}

核心功能

事件支持

流水线(
pipeline
)、构建任务(
build
)、合并请求(
merge_request
)代码推送(
push
)、标签推送(
tag_push
)、议题(
issue
)评论(
note
)、Wiki 事件(
wiki_page
)、部署(
deployment

消息处理

智能格式化:自动截断长文本、结构化展示关键信息(仓库、分支、触发人等)去重机制:基于事件内容和类型哈希,默认缓存 60 秒避免重复通知错误处理:请求超时保护(10秒)、异常捕获及日志记录


目录结构

GitLab Webhook接入企业微信(解决invalid message type)

部署指南

环境要求

Python 3.11+ 或 Docker

配置步骤

设置企业微信 Webhook 地址为环境变量:


export WECHAT_WEBHOOK_URL="https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY"

安装依赖:


pip install -r requirements.txt

运行方式

本地启动



python app.py  # 开发模式
gunicorn -w 4 -b 0.0.0.0:5000 app:app  # 生产环境

Docker 部署



docker build -t gitlab-wechat-webhook .
docker run -d -p 5000:5000 -e WECHAT_WEBHOOK_URL="YOUR_URL" gitlab-wechat-webhook


GitLab 配置

进入项目设置 → Webhooks填写以下信息:
URL:
http://your-server:5000/webhook
触发事件:勾选需要转发的事件类型


消息示例

流水线成功



✅ **Gitlab 流水线运行成功**  
【代码库】group/project  
【分支】main  
【提交信息】feat: add new feature  
【触发人】username  
【详情】https://gitlab.com/group/project/-/pipelines/123  

合并请求创建



🆕 **Gitlab 合并请求创建**  
【标题】Add new feature  
【分支】feature-branch → main  
【作者】username  
【链接】https://gitlab.com/group/project/-/merge_requests/456  


高级配置

缓存时间:修改
app.py
中的
CACHE_TTL_SECONDS
日志级别:调整
logging.basicConfig(level=logging.INFO)


故障排查

企业微信未收到消息:检查
WECHAT_WEBHOOK_URL
是否有效,查看容器日志(
docker logs gitlab-wechat-webhook
GitLab 未触发:确认 Webhook URL 可访问,检查防火墙设置


扩展开发

添加新事件:在
webhook()
中新增判断逻辑,编写对应的
format_xxx()
函数自定义消息:修改现有
format_xxx()
函数的内容结构


源码:
app.py



import os
import logging
from flask import Flask, request, jsonify
from datetime import datetime, timedelta
 
# ====== 配置 ======
WECHAT_WEBHOOK_URL = os.environ.get("WECHAT_WEBHOOK_URL")
if not WECHAT_WEBHOOK_URL:
    raise EnvironmentError("WECHAT_WEBHOOK_URL environment variable is required")
 
NOTIFICATION_CACHE = {}
CACHE_TTL_SECONDS = 60
 
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
app = Flask(__name__)
 
def truncate(text, length=50):
    if len(text) > length:
        return text[:length] + "..."
    return text
 
def get_trigger_user(data):
    user = data.get("user", {})
    return user.get("username") or user.get("name") or "unknown"
 
def is_duplicate_notification(key, ttl=CACHE_TTL_SECONDS):
    """通用去重:key 可以是任意 hashable 对象"""
    now = datetime.now()
    if key in NOTIFICATION_CACHE:
        if now - NOTIFICATION_CACHE[key] < timedelta(seconds=ttl):
            return True
    NOTIFICATION_CACHE[key] = now
    return False
 
def send_to_wecom(msg):
    if not msg:
        return
    try:
        import requests
        payload = {"msgtype": "text", "text": {"content": msg}}
        resp = requests.post(WECHAT_WEBHOOK_URL, json=payload, timeout=10)
        if resp.status_code != 200:
            logger.error(f"WeCom send failed: {resp.text}")
        else:
            logger.info("Sent to WeCom successfully")
    except Exception as e:
        logger.exception(f"Failed to send to WeCom: {e}")
 
# ==================== 格式化函数 ====================
 
def format_pipeline(data):
    pipeline = data["object_attributes"]
    source = pipeline.get("source", "unknown")
    if source == "parent_pipeline":
        return handle_child_pipeline(data)
    else:
        return handle_parent_pipeline(data)
 
def format_job(data):
    build = data.get("build", {}) or data  # 兼容不同结构
    status = build.get("status", "unknown").lower()
    stage = build.get("stage", "unknown")
    job_name = build.get("name", "unknown")
    ref = build.get("ref", "unknown")
    project = data.get("project", {})
    path_with_namespace = project.get("path_with_namespace", "Unknown")
    user = get_trigger_user(data)
    build_url = build.get("url", "#")
    commit = data.get("commit", {})
    commit_title = (commit.get("title") or commit.get("message") or "").strip()
 
    emoji = {"执行成功": "✅", "执行失败": "❌", "被取消": "⏹️", "正在运行中": "▶️", "正在等待中": "⏳"}.get(status, "ℹ️")
    lines = []
    lines.append(f"{emoji} **Gitlab流水线任务{status.upper()}**")
    lines.append(f"【任务】{job_name} ({stage})")
    lines.append(f"【代码库】{path_with_namespace}")
    lines.append(f"【分支】{ref}")
    if commit_title:
        lines.append(f"【提交信息】{truncate(commit_title)}")
    lines.append(f"【触发人】{user}")
    lines.append(f"【详情】{build_url}")
    return "
".join(lines)
 
def format_merge_request(data):
    mr = data.get("object_attributes", {})
    action = mr.get("action", "open")
    title = mr.get("title", "No title")
    source_branch = mr.get("source_branch")
    target_branch = mr.get("target_branch")
    url = mr.get("url", "#")
    author = mr.get("author_username") or get_trigger_user(data)
    project = data.get("project", {})
    path_with_namespace = project.get("path_with_namespace", "Unknown")
 
    emoji = {"创建成功": "🆕", "更新通知": "🔄", "合并成功通知": "🎉", "被关闭": "🔒"}.get(action, "📝")
    lines = []
    lines.append(f"{emoji} **Gitlab流水线合并请求{action.upper()}**")
    lines.append(f"【标题】{truncate(title)}")
    lines.append(f"【代码库】{path_with_namespace}")
    lines.append(f"【分支】{source_branch} → {target_branch}")
    lines.append(f"【作者】{author}")
    lines.append(f"【链接】{url}")
    return "
".join(lines)
 
def format_push(data):
    commits = data.get("commits", [])
    total_commits = len(commits)
    ref = data.get("ref", "")
    branch = ref.replace("refs/heads/", "", 1)
    project = data.get("project", {})
    path_with_namespace = project.get("path_with_namespace", "Unknown")
    user = get_trigger_user(data)
    compare_url = data.get("compare_url", "#")
 
    lines = []
    lines.append("📦 **{user}推送了一个新的提交**")
    lines.append(f"【代码库】{path_with_namespace}")
    lines.append(f"【分支】{branch}")
    lines.append(f"【提交数】{total_commits} commits")
    lines.append(f"【推送人】{user}")
    lines.append(f"【对比】{compare_url}")
    if commits:
        latest = commits[-1]
        lines.append(f"【最新提交】{truncate(latest.get('message', ''))}")
    return "
".join(lines)
 
def format_tag(data):
    ref = data.get("ref", "")
    tag = ref.replace("refs/tags/", "", 1)
    project = data.get("project", {})
    path_with_namespace = project.get("path_with_namespace", "Unknown")
    user = get_trigger_user(data)
    commits = data.get("commits", [])
    message = ""
    if commits:
        message = commits[0].get("message", "")
 
    lines = []
    lines.append("🏷️ **{user}创建了一个新标签**")
    lines.append(f"【代码库】{path_with_namespace}")
    lines.append(f"【标签】{tag}")
    lines.append(f"【创建人】{user}")
    if message:
        lines.append(f"【注释】{truncate(message)}")
    return "
".join(lines)
 
def format_issue(data):
    issue = data.get("object_attributes", {})
    action = issue.get("action", "open")
    title = issue.get("title", "No title")
    url = issue.get("url", "#")
    author = issue.get("author_username") or get_trigger_user(data)
    project = data.get("project", {})
    path_with_namespace = project.get("path_with_namespace", "Unknown")
 
    emoji = {"open": "❗", "close": "✅", "reopen": "🔄"}.get(action, "📝")
    lines = []
    lines.append(f"{emoji} **Issue {action.upper()}**")
    lines.append(f"【标题】{truncate(title)}")
    lines.append(f"【代码库】{path_with_namespace}")
    lines.append(f"【作者】{author}")
    lines.append(f"【链接】{url}")
    return "
".join(lines)
 
def format_note(data):
    note = data.get("object_attributes", {})
    noteable_type = note.get("noteable_type", "Unknown")
    note_body = note.get("note", "")
    url = note.get("url", "#")
    author = get_trigger_user(data)
    project = data.get("project", {})
    path_with_namespace = project.get("path_with_namespace", "Unknown")
 
    lines = []
    lines.append("💬 **{author}添加了一个评论**")
    lines.append(f"【类型】{noteable_type}")
    lines.append(f"【代码库】{path_with_namespace}")
    lines.append(f"【内容】{truncate(note_body)}")
    lines.append(f"【链接】{url}")
    return "
".join(lines)
 
def format_wiki_page(data):
    wiki = data.get("object_attributes", {})
    action = wiki.get("action", "create")
    title = wiki.get("title", "Unknown")
    url = wiki.get("url", "#")
    author = get_trigger_user(data)
    project = data.get("project", {})
    path_with_namespace = project.get("path_with_namespace", "Unknown")
 
    lines = []
    lines.append("📘 **Wiki 页面更新**")
    lines.append(f"【操作】{action}")
    lines.append(f"【页面】{title}")
    lines.append(f"【代码库】{path_with_namespace}")
    lines.append(f"【编辑人】{author}")
    lines.append(f"【链接】{url}")
    return "
".join(lines)
 
def format_deployment(data):
    deployment = data.get("deployment", {})
    status = deployment.get("status", "unknown")
    environment = deployment.get("environment", "unknown")
    deployable = deployment.get("deployable", {})
    job_name = deployable.get("name", "unknown")
    ref = deployment.get("ref", "unknown")
    user = get_trigger_user(data)
    project = data.get("project", {})
    path_with_namespace = project.get("path_with_namespace", "Unknown")
    url = deployment.get("url", "#")
 
    emoji = {"success": "🚀", "failed": "💥", "canceled": "⏹️"}.get(status, "⚙️")
    lines = []
    lines.append(f"{emoji} **部署 {status.upper()}**")
    lines.append(f"【环境】{environment}")
    lines.append(f"【任务】{job_name}")
    lines.append(f"【代码库】{path_with_namespace}")
    lines.append(f"【分支/Tag】{ref}")
    lines.append(f"【操作人】{user}")
    lines.append(f"【详情】{url}")
    return "
".join(lines)
 
def format_emoji_event(data):
    import re
    user = get_trigger_user(data)
    project = data.get("project", {})
    path_with_namespace = project.get("path_with_namespace", "Unknown")
 
    event_type = data.get("event_type", "award")
    obj_attr = data.get("object_attributes", {})
    emoji_name = obj_attr.get("name", "unknown")
    awardable_type = obj_attr.get("awardable_type", "Unknown")
    awarded_on_url = obj_attr.get("awarded_on_url", "#")
 
   # 表情映射
    EMOJI_MAP = {
        "thumbsup": "👍",
        "thumbsdown": "👎",
        "heart": "❤️",
        "laugh": "😄",
        "confused": "😕",
        "rocket": "🚀",
        "eyes": "👀",
        "tada": "🎉",
    }
    emoji_icon = EMOJI_MAP.get(emoji_name, f":{emoji_name}:")
 
    # 动作中文
    action = "添加了" if event_type == "award" else "取消了"
 
    # 目标类型本地化
    type_map = {
        "MergeRequest": "合并请求",
        "Issue": "议题",
        "Commit": "提交"
    }
    target_type_cn = type_map.get(awardable_type, awardable_type)
 
    # 从 URL 提取 ID(如 /merge_requests/3 → !3;/issues/42 → #42)
    target_id = "未知"
    if awardable_type == "MergeRequest":
        match = re.search(r"/merge_requests/(d+)", awarded_on_url)
        if match:
            target_id = f"!{match.group(1)}"
    elif awardable_type == "Issue":
        match = re.search(r"/issues/(d+)", awarded_on_url)
        if match:
            target_id = f"#{match.group(1)}"
    elif awardable_type == "Commit":
        # 提交通常用短 hash,但 URL 中可能没有明显 ID,可省略或截取
        # 这里简化为“提交”
        target_id = ""
 
    # 构建标题
    if awardable_type == "Commit":
        title = f"{user} {action} 一个提交的 {emoji_icon}"
    else:
        title = f"{user} {action}一个表情: {emoji_icon}"
 
    lines = []
    lines.append(f"**{title}**")
    lines.append(f"【代码库】{path_with_namespace}")
    lines.append(f"【表情名称】{emoji_name}")
    lines.append(f"【详情链接】{awarded_on_url}")
 
    return "
".join(lines)
 
 
# ==================== 处理函数(保留原逻辑) ====================
 
def handle_parent_pipeline(data):
    pipeline = data["object_attributes"]
    status = pipeline.get("status", "unknown").lower()
    before_sha = pipeline.get("before_sha", "")
    finished_at = pipeline.get("finished_at")
    ref = pipeline.get("ref", "unknown")
    path_with_namespace = data["project"].get("path_with_namespace", "Unknown")
    trigger_user = get_trigger_user(data)
    build_url = pipeline.get("url", "#")
    commit = data.get("commit", {})
    commit_title = (commit.get("title") or commit.get("message") or "").strip()
 
    is_new_pipeline = (before_sha == "0000000000000000000000000000000000000000")
    is_pending = status in ("pending") and finished_at is None
 
    if is_new_pipeline and is_pending:
        lines = []
        lines.append("🔔 **Gitlab流水线创建成功**")
        lines.append(f"【代码库】{path_with_namespace}")
        lines.append(f"【分支/PR/Tag】{ref}")
        if commit_title:
            lines.append(f"【提交信息】{truncate(commit_title)}")
        lines.append(f"【触发人】{trigger_user}")
        lines.append(f"【任务详情】{build_url}")
        return "
".join(lines)
 
 
    if status == "canceled" and finished_at is not None:
        lines = []
        lines.append("⏹️ **Gitlab流水线已取消**")
        lines.append(f"【代码库】{path_with_namespace}")
        lines.append(f"【分支/PR/Tag】{ref}")
        if commit_title:
            lines.append(f"【提交信息】{truncate(commit_title)}")
        lines.append(f"【触发人】{trigger_user}")
        lines.append(f"【任务详情】{build_url}")
        return "
".join(lines)
 
    if status == "failed" and finished_at is not None:
        builds = data.get("builds", [])
        for job in builds:
            if (
                job.get("stage") == ".pre"
                and job.get("name") == "pre"
                and job.get("status") == "failed"
            ):
                lines = []
                lines.append("❌ **Gitlab流水线执行失败**")
                lines.append(f"【代码库】{path_with_namespace}")
                lines.append(f"【分支/PR/Tag】{ref}")
                if commit_title:
                    lines.append(f"【提交信息】{truncate(commit_title)}")
                lines.append(f"【触发人】{trigger_user}")
                lines.append("【失败阶段】pre job 失败")
                lines.append(f"【任务详情】{build_url}")
                return "
".join(lines)
    if status == "success" and finished_at is not None:
        lines = []
        lines.append("✅ **Gitlab 流水线运行成功**")
        lines.append(f"【代码库】{path_with_namespace}")
        lines.append(f"【分支/PR/Tag】{ref}")
        if commit_title:
            lines.append(f"【提交信息】{truncate(commit_title)}")
        lines.append(f"【触发人】{trigger_user}")
        lines.append(f"【任务详情】{build_url}")
        return "
".join(lines)
 
    return None
 
def handle_child_pipeline(data):
    pipeline = data["object_attributes"]
    status = pipeline.get("status", "unknown").lower()
 
    if not (status in ("failed")):
        return None
 
    ref = pipeline.get("ref", "unknown")
    path_with_namespace = data["project"].get("path_with_namespace", "Unknown")
    trigger_user = get_trigger_user(data)
    build_url = pipeline.get("url", "#")
    commit = data.get("commit", {})
    commit_title = (commit.get("title") or commit.get("message") or "").strip()
 
    lines = []
    if status == "failed":
        builds = data.get("builds", [])
        failed_stage = None
        for job in builds:
            if job.get("status") == "failed":
                failed_stage = job.get("stage", "unknown")
                break
        lines.append("❌ **流水线失败**")
        if failed_stage:
            lines.append(f"【失败阶段】{failed_stage} stage 失败")
        else:
            lines.append("【失败阶段】未知阶段失败")
 
    lines.append(f"【代码库】{path_with_namespace}")
    lines.append(f"【分支/PR/Tag】{ref}")
    if commit_title:
        lines.append(f"【提交信息】{truncate(commit_title)}")
    lines.append(f"【触发人】{trigger_user}")
    lines.append(f"【任务详情】{build_url}")
    return "
".join(lines)
 
# ==================== Webhook 路由 ====================
 
@app.route('/webhook', methods=['POST'])
def webhook():
    try:
        data = request.get_json()
        if not data:
            return jsonify({"error": "No JSON data"}), 400
 
        object_kind = data.get("object_kind")
        logger.info(f"Received event: {object_kind}")
 
        content = None
 
        if object_kind == "pipeline":
            content = format_pipeline(data)
        elif object_kind == "build":
            content = format_job(data)
        elif object_kind == "merge_request":
            content = format_merge_request(data)
        elif object_kind == "push":
            content = format_push(data)
        elif object_kind == "tag_push":
            content = format_tag(data)
        elif object_kind == "issue":
            content = format_issue(data)
        elif object_kind == "note":
            content = format_note(data)
        elif object_kind == "wiki_page":
            content = format_wiki_page(data)
        elif object_kind == "deployment":
            content = format_deployment(data)
        elif object_kind == "emoji":
            content = format_emoji_event(data)
        else:
            logger.info(f"Ignored unsupported event: {object_kind}")
            return jsonify({"status": "ignored"}), 200
 
        if content:
            # 可选:为不同事件设置不同去重 key
            cache_key = (object_kind, str(hash(content))[:16])  # 简单去重
            if not is_duplicate_notification(cache_key):
                logger.info(f"Sending to WeCom:
{content}")
                send_to_wecom(content)
            else:
                logger.info("Duplicate notification, skipped.")
        else:
            logger.info("No content to notify.")
 
        return jsonify({"status": "processed"}), 200
 
    except Exception as e:
        logger.exception("Unexpected error in webhook")
        return jsonify({"error": str(e)}), 500
 
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

DOCKERFILE



FROM python:3.11-slim
 
WORKDIR /app
 
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
 
 
COPY app.py .
 
ENV FLASK_APP=app.py
ENV FLASK_RUN_HOST=0.0.0.0
ENV FLASK_ENV=production
 
EXPOSE 5000
 
CMD ["flask", "run"]

requirements.txt



Flask==3.0.3
requests==2.32.3
gunicorn==22.0.0

© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
none
暂无评论...