Dify工具使用全场景:rookie_text2data (插件篇·第1期)

内容分享3周前发布
2 9 0

今天我升级dify 1.1.0 版本,在插件市场里下载了一rookie_text2data,今天把它分享给大家如何使用?

注意:dify 1.0.0 后要从插件要从市场、github、本地三种方式下载,但市场下载要科学上网,不然打不开,这点不太友善,不过其它功能算是正常。

Dify工具使用全场景:rookie_text2data (插件篇·第1期)

Dify工具使用全场景:rookie_text2data (插件篇·第1期)

上面的图片信息,是我科学上网从市场里下载后再导入到本dify的插入。如果谁有要这里的插入,请留言我可以单独发你们。

接下来我先给大家介绍一rookie_text2data:

一、简介

Dify 作为一个功能强劲的人工智能应用开发平台,旗下的 rookie_text2data 插件在其生态体系中独具特色。这一开源组件极大地革新了数据库数据获取方式,让开发者仅通过自然语言指令,就能便捷地从数据库中调取数据。它显著降低了数据库操作的技术难度,即便是非专业的数据库开发人员,也能够高效地与数据库进行交互。rookie_text2data 插件的诞生,为数据处理和应用开发领域带来了全新的便捷途径,有力推动了相关工作的高效开展。

Github地址:
https://github.com/jaguarliuu/rookie_text2data

二、核心功能

  1. 自然语言解析:当用户输入如 “查询上个月销售额超过 100 万的产品” 这类日常语言描述的查询需求时,插件能够凭借先进的算法精准剖析其中的语义和意图。它就像一位精通语言的专家,能将普通的语言转化为计算机可理解的指令。
  2. 数据库适配:无论是广泛使用的 MySQL,还是功能强劲的 PostgreSQL 等常见数据库类型,rookie_text2data 都能完美兼容。无论数据库结构多么复杂,它都能依据自然语言指令,智能生成与之对应的数据库查询语句,实现无缝对接。
  3. 结果呈现:插件从数据库获取数据后,会以清晰直观的格式呈现给用户,表格形式是最常见的一种。这种方式方便用户查看数据,并且能够轻松进行后续的数据处理工作。

三、技术优势

  1. 低代码需求:以往复杂的 SQL 语句编写工作在 rookie_text2data 插件面前变得不再必要。这一特性极大地提升了开发效率,显著缩短了开发时间,同时也降低了开发成本,让开发工作变得更加轻松高效。
  2. 准确性高:依托先进的自然语言处理技术,插件对用户指令的理解准确率极高。这就确保了获取到的数据与用户的预期高度契合,有效避免了因理解偏差导致的数据获取错误。
  3. 灵活性强:该插件能够灵活适应不同的数据库结构以及复杂多变的业务场景查询需求。无论是简单的小型数据库,还是结构复杂的大型数据库,亦或是多样化的业务查询,它都能应对自如,展现出强劲的通用性。

四、使用场景

  1. 数据分析:企业分析师借助 rookie_text2data 插件,只需通过自然语言就能快速获取所需数据,进而开展销售数据分析、用户行为分析等工作,为企业决策提供有力的数据支持。

更换图片

  1. 业务运营:业务人员可以随时通过自然语言查询订单信息、库存情况等关键业务数据,及时掌握业务动态,从而对运营流程进行优化,提升业务运营效率。

更换图片

  1. 应用开发:开发者在构建应用时,利用该插件能够实现前端用户通过自然语言与后端数据库的顺畅交互,极大地提升了应用的易用性,为用户带来更便捷的使用体验。

更换图片

五、如何安装下载 rookie_text2data

Dify工具使用全场景:rookie_text2data (插件篇·第1期)

直接从市场中市场,然后导入到dify中,在工具插件里就可以直接看到了。

六、如何使用

  1. 配置数据库连接:在使用插件之前,需要在 Dify 平台中准确配置与目标数据库的连接信息。这些信息包括数据库地址、端口、用户名以及密码等,只有配置正确,插件才能与数据库建立有效连接。

Dify工具使用全场景:rookie_text2data (插件篇·第1期)

  1. 输入自然语言查询:在插件提供的输入框内,用户输入想要查询的自然语言指令,列如 “查询最新一条文章信息表记录”,简洁明了地表达查询需求。

Dify工具使用全场景:rookie_text2data (插件篇·第1期)

我的表结构如下:

Dify工具使用全场景:rookie_text2data (插件篇·第1期)

  1. 获取数据结果:用户点击执行按钮后,插件会迅速对指令进行解析,生成相应的数据库查询,并将查询结果呈目前结果展示区域。用户在此区域能够清晰地查看数据,获取所需信息。
{
  "text": "",
  "files": [],
  "json": [
    {
      "data": [
        {
          "columns": [
            "编号",
            "文章标题",
            "文章内容",
            "分类",
            "文章状态",
            "创建时间",
            "更新时间"
          ],
          "data": [
            {
              "分类": null,
              "创建时间": "2025-03-19T23:43:31.237000",
              "文章内容": "嘿这里是酷睿。今天我动手做了一个很酷的东西,手势传感器。而且它特别的地方就是能够穿透物体,无需接触就能隔空检测到手并识别手的动作。

它不仅能识别基本的上下左右滑动手势,还能通过画圈圈的方式连续调节数值。甚至可以实时追踪手的三d空间位置,前后左右的移动对应x y坐标,上下移动对应z坐标。为什么要做这么一个东西呢?故事要从几年前说起,我试过将手势传感器集成到我的作品里,虽然效果还可以,但注意看这里需要挖个洞才能让传感器正常工作。

这是由于市面上常见的手势传感器,它的工作原理就像一台像素很低的相机,它依靠光来检测。如果我拿东西把它挡住,它就看不见东西了。就像我拿东西把你挡住一样,你也看不见我对你比划什么。

于是我一直在寻找有没有一种可以穿透物体的手势传感器,直到我发现了这个它利用电场感应理论上可以穿透非金属物体。要制作它只需要芯片和p c b。第一我参考芯片的应用设计指南绘制原理图,确定p c b大小和形状后,不知t x和r x电极,它们用于产生和检测电场变化,从而感应到手p c b有了,接着就只需要贴装芯片和几个电阻电容,这样硬件就完成了。
****
做好了要怎么使用呢?要让它工作还需要写入固件。我们需要一个u s b转iphone c硬件。虽然市面上有许多这类模块,但官方的软件只认他们自己的模块,所以还需要一个烧录器,先给单片机写入固件,固件写好后连上电脑就能正常通讯。

实则使用效果已经很不错了,要进一步微调的话,也可以设置一些参数和阈值,还可以进行校准。在这基础上,我想到一些有趣的应用,所以我写了个程序用来追踪手的位置,从而在空中写字a。c. 离。

好。我一共做了两个连在一起,可以在空中操控游戏。发挥创意还能达到类似体感交互的效果。

就是。像这个p c b板,我是在嘉立创下单制作的,这次我上了四层板。由于按照设计指南,如果想要让芯片和传感区域在同一个位置的话,最好用上四层板,这样信号才比较好像佳丽创能够支持四层板的免费打样。

# 不错
",
              "文章标题": "可隔空穿透!我 DIY 了一个独特的手势传感器",
              "文章状态": "1",
              "更新时间": "2025-03-19T23:44:51.505000",
              "编号": 13
            }
          ],
          "sql": "SELECT 
    `id` AS 编号,
    `title` AS 文章标题,
    `content` AS 文章内容,
    `category` AS 分类,
    `status` AS 文章状态,
    `created_at` AS 创建时间,
    `updated_at` AS 更新时间
FROM 
    `articles`
ORDER BY 
    `created_at` DESC
LIMIT 1;",
          "status": "success"
        },
        {
          "message": "数据库连接已关闭",
          "status": "info"
        }
      ],
      "status": "success"
    }
  ]
}

如果想研究的更深,可以看一下源码:

 def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessage]:
        model_info= tool_parameters.get('model')
        # 获取连接参数
        _, conn_params = self._build_mysql_dsn(
            self.runtime.credentials['db_url'],
            self.runtime.credentials['db_password']
        )
        # 获取元数据
        metadata = self._get_metadata(conn_params)

        response = self.session.model.llm.invoke(
            model_config=LLMModelConfig(
                provider=model_info.get('provider'),
                model=model_info.get('model'),
                mode=model_info.get('mode'),
                completion_params=model_info.get('completion_params')
            ),
            prompt_messages=[
                SystemPromptMessage(
                    content=f"""你是一位资深数据库工程师兼SQL优化专家,拥有10年以上DBA经验。请根据提供的数据库元数据DDL和自然语言需求描述,生成符合企业级标准的优化SQL语句。

                            ## 系统要求:
                            1. 必须严格嵌入提供的DDL元数据{metadata},禁止使用任何未声明的表或字段
                            2. 仅返回SELECT语句,禁止包含INSERT/UPDATE/DELETE等DML操作
                            3. 必须使用LIMIT语句进行结果限制,防止数据泄露风险
                            5. 如果用户提出了具体的数据数量,则Limit用户的查询数量,否则Limit 100
                            4. 所有字段必须使用反引号包裹,符合MySQL标识符规范

                            ## 优化原则:
                            1. 采用覆盖索引策略,确保查询命中至少2个索引
                            2. 避免SELECT *,仅返回需求中的必要字段
                            3. 对日期字段使用CURDATE()等函数时需标注时间范围
                            4. 多表关联必须使用INNER JOIN,禁止LEFT/RIGHT JOIN

                            ## 验证机制:
                            1. 生成后自动检查表是否存在,若不存在则抛出错误
                            2. 条件字段值必须存在于目标表中,否则提示字段不存在
                            3. 自动生成EXPLAIN计划,确保type列显示为ref或eq_ref
                            4. 仅返回生成的SQL语句,禁止返回注释、DDL、描述等与SQL无关内容
                            5. 禁止使用任何转义字符(如''或")
                            6. 禁止在开头和结尾使用``` ```包裹SQL语句

                            ## 输出规范:
                                SELECT 
                                    `order_id` AS 订单编号,
                                    `amount` * 1.05 AS 含税金额
                                FROM 
                                    `orders` o
                                INNER JOIN 
                                    `customers` c ON o.customer_id = c.id
                                WHERE 
                                    o.status = 'paid' 
                                    AND c.region = 'Asia'
                                    AND o.created_at BETWEEN '2025-01-01' AND CURDATE()
                                LIMIT 100;
                    """
                ),
                UserPromptMessage(
                    content=f"用户想要查询的数据需求为:{tool_parameters.get('query')}"
                    
                )
            ],
            stream=False
        )

        excute_sql = response.message.content

        # 执行SQL
        result = self._execute_sql_generator(excute_sql, conn_params)

        yield self.create_json_message({
            "status": "success",
            "data": result
        })

    def _build_mysql_dsn(self, db_url: str, db_password: str) -> tuple[str, dict[str, Any]]:
        """
        将数据库URL和密码拼接为完整DSN,并返回解析后的连接参数
        
        参数:
            db_url (str): 格式示例 mysql://user@host:port/database
            db_password (str): 数据库密码(明文)
        
        返回:
            tuple: (完整DSN, 解析后的连接参数字典)
        
        异常:
            ValueError: 当URL格式无效时抛出
        """
        # 基础解析验证
        parsed = urlparse(db_url)
        if parsed.scheme != 'mysql':
            raise ValueError("仅支持mysql协议,当前协议:{}".format(parsed.scheme))

        # 解析用户名和主机信息
        username = parsed.username or 'root'
        password = quote(db_password, safe='')  # 处理特殊字符
        hostname = parsed.hostname or 'localhost'
        port = parsed.port or 3306

        # 处理数据库路径
        database = parsed.path.lstrip('/')
        if not database:
            database = 'test'

        # 构建新的netloc
        auth_part = f"{username}:{password}"
        netloc = f"{auth_part}@{hostname}"
        if parsed.port:
            netloc += f":{port}"

        # 生成完整DSN
        full_dsn = urlunparse((
            parsed.scheme,
            netloc,
            parsed.path,
            parsed.params,
            parsed.query,
            parsed.fragment
        ))

        # 生成连接参数字典(类型安全)
        connection_params = {
            'host': hostname,
            'port': port,
            'user': unquote(username),
            'password': unquote(db_password),  # 注意这里返回明文用于连接
            'database': database,
            'charset': 'utf8mb4',
            'connect_timeout': 5
        }

        return full_dsn, connection_params

    def _get_metadata(self, conn_params: dict[str, Any]) -> dict[str, Any]:
        """
        获取数据库元数据(表结构信息)
        
        返回结构示例:
        {
            "tables": [
                {
                    "name": "users",
                    "columns": [
                        {"name": "id", "type": "int", "comment": "主键ID"},
                        {"name": "name", "type": "varchar(255)", "comment": "用户名"}
                    ]
                }
            ]
        }
        """
        metadata = {"tables": []}
        
        try:
            with pymysql.connect(
                host=conn_params['host'],
                port=conn_params['port'],
                user=conn_params['user'],
                password=conn_params['password'],
                database=conn_params['database'],
                charset=conn_params['charset'],
                cursorclass=DictCursor
            ) as conn:
                with conn.cursor() as cursor:
                    # 获取所有表信息
                    cursor.execute("""
                        SELECT TABLE_NAME AS table_name,
                        TABLE_COMMENT AS table_comment
                        FROM INFORMATION_SCHEMA.TABLES
                        WHERE TABLE_SCHEMA = DATABASE()
                        AND TABLE_TYPE = 'BASE TABLE'
                    """)
                    tables = cursor.fetchall()

                    # 获取每个表的列信息
                    for table in tables:
                        cursor.execute("""
                            SELECT COLUMN_NAME AS name,
                                COLUMN_TYPE AS type,
                                COLUMN_COMMENT AS comment,
                                IS_NULLABLE AS nullable,
                                COLUMN_KEY AS key_type
                            FROM INFORMATION_SCHEMA.COLUMNS
                            WHERE TABLE_SCHEMA = DATABASE()
                            AND TABLE_NAME = %s
                            ORDER BY ORDINAL_POSITION
                        """, (table['table_name'],))
                        
                        columns = []
                        for col in cursor.fetchall():
                            columns.append({
                                "name": col['name'],
                                "type": col['type'],
                                "comment": col['comment'] or "",
                                "nullable": col['nullable'] == 'YES',
                                "primary_key": col['key_type'] == 'PRI'
                            })
                        
                        metadata['tables'].append({
                            "name": table['table_name'],
                            "comment": table['table_comment'] or "",
                            "columns": columns
                        })
        
        except pymysql.Error as e:
            code, msg = e.args
            error_map = {
                1142: ("权限不足,无法访问元数据表", 403),
                1045: ("数据库认证失败", 401),
                2003: ("无法连接数据库服务器", 503)
            }
            error_info = error_map.get(code, (f"数据库错误: {msg}", 500))
            raise RuntimeError(f"{error_info[0]} (错误码: {code})") from e
            
        return metadata
    
    def _execute_sql_generator(self,sql: str, conn_params: dict[str, Any]) -> Generator[dict[str, Any], None, None]:
        """
        基于PyMySQL的SQL执行生成器函数
        :param sql: 待执行的SQL语句
        :param conn_params: 数据库连接参数字典
        :yield: 返回包含执行状态和数据的字典
        """
        extracted_sql = self._extract_sql_from_text(sql)
        if not extracted_sql:
            # 如果无法提取(未匹配到代码块),直接使用原始字符串
            processed_sql = sql.strip()
        else:
            processed_sql = extracted_sql
        
        connection = None
        try:
            # 建立数据库连接
            connection = pymysql.connect(
                host=conn_params['host'],
                user=conn_params['user'],
                password=conn_params['password'],
                database=conn_params['database'],
                charset='utf8mb4',  # 必须指定字符集
                cursorclass=DictCursor  # 返回字典类型结果
            )
            with connection.cursor() as cursor:
                # 执行SQL语句
                cursor.execute(processed_sql)
                
                # 获取列名和数据(引用[2,5](@ref))
                columns = [desc[0] for desc in cursor.description]
                results = cursor.fetchall()
                
                # 生成结果集
                yield {
                    "status": "success",
                    "sql": processed_sql,
                    "columns": columns,
                    "data": results
                }
                
        except pymysql.MySQLError as e:
            # 捕获数据库特定错误
            yield {
                "status": "error",
                "excute_sql": processed_sql,
                "message": f"Database error: {str(e)}"
            }
        except Exception as ex:
            # 捕获其他异常
            yield {
                "status": "error",
                "excute_sql": processed_sql,
                "message": f"Execution error: {str(ex)}"
            }
        finally:
            # 资源释放
            if connection and connection.open:
                connection.close()
                yield {
                    "status": "info",
                    "message": "数据库连接已关闭"
                }



    def _extract_sql_from_text(self, text: str) -> str:
        """提取 ``的```sql...```代码块中的SQL内容(若存在)"""
        pattern = r'```sqls*(.*?)s*```'
        match = re.search(pattern, text, flags=re.DOTALL)
        if match:
            return match.group(1).strip()
        return ""
© 版权声明

相关文章

9 条评论

您必须登录才能参与评论!
立即登录
  • 头像
    苏家小黎 读者

    怎么解决数据权限的问题?

    无记录
  • 头像
    夜独哈哈和 读者

    可以传递一个密钥

    无记录
  • 头像
    不开心的小莉 读者

    不用指定表吗?我授权成功了,查询的时候数据库连接已关闭

    无记录
  • 头像
    读者

    不需要,表的注释要全,能让自然语言理解

    无记录
  • 头像
    落日余晖 读者

    数据库连接更新入库没找到,只有第一次输入

    无记录
  • 头像
    读者

    更新入库你得自己写一段代码,这个只查询的

    无记录
  • 头像
    Bllzban- 投稿者

    还是要改数据库,费劲

    无记录
  • 头像
    小金鱼 读者

    干货满满,赞一个

    无记录
  • 头像
    清婧小轩 读者

    收藏了,感谢分享

    无记录