MaxKB v2+MinerU:通过API实现PDF⽂档解析并存储⾄知识库

MinerU是一款开源的高质量数据提取工具,能够将PDF文档转换为Markdown和JSON格式。2025年6月13日,MinerU发布了v2.0版本,相较于v1.0版本实现了架构和功能的全面重构与升级。在优化代码结构和交互方式的同时,v2.0版本还集成了小参数量、高性能多模态文档解析模型,能够实现端到端的高速、高精度文档理解。实际测试表明,新版本对复杂图表的解析效果较上一版本有明显提升,目前已经能够满足90%以上的复杂文档解析需求。

值得一提的是,MinerU出色的PDF文档解析能力特别适合与MaxKB开源项目配合使用。通过"MinerU+MaxKB"的组合方案,用户不仅能够获得高质量的文档解析效果,还能显著提升知识库问答系统的性能。为方便用户集成,MinerU项目现已提供API对接服务。接下来,本文将详细介绍如何通过MinerU在线API实现与MaxKB的对接。

一、实现方法

当用户提供文件地址后,系统会将该地址赋值给file_url变量,并作为参数传递给MinerU文件解析服务。MinerU在完成文件解析后,会返回一个任务ID(task_id)。系统会将其传入MinerU的查询接口,当检测到任务处理完成时,自动获取结果文件的下载链接(full_url)。随后,系统执行文件下载操作,将结果文件保存到MaxKB容器的/opt/maxkb/download目录下。最后,系统会自动完成文件上传和智能分段处理,将内容存储到知识库中。

二、MaxKB函数创建

我们需要在MaxKB的函数库中创建四个核心功能函数,其用途分别为:

  1. 调用MinerU单个文件解析

  2. 从MinerU获取任务结果

  3. 通过URL链接下载文件至服务器

  4. 将解析后的ZIP文件上传至知识库

  • MinerU单个文件解析函数:负责调用MinerU的单文件解析服务,通过传入PDF文档的在线地址来创建解析任务,并返回对应的task_id;
import requests

def create_task(file_url):

    url = 'https://mineru.net/api/v4/extract/task'
    
    token = '*自己申请的token*'
    
    header = {
    
    'Content-Type': 'application/json',
    
    'Authorization': f'Bearer {token      }'
    
      }

    data = {
    
    'url': file_url,
    
    'is_ocr': True, #是否启动 ocr 功能,默认 false
    
    'enable_formula': True, #是否开启公式识别,默认 true
    
    'enable_table': True, #是否开启表格识别,默认 true
    
    'language': "ch", #指定文档语言,默认 ch,可以设置为auto
    
    'model_version': "v2", #mineru模型版本,两个选项:v1、v2,默认v1。
    
    }

    res = requests.post(url,headers=header,json=data,timeout=30)
    
    res_data = res.json()
    
    task_id_data = res_data["data"]["task_id"]
    
    return task_id_data

  • MinerU获取任务结果函数:用于查询任务状态,通过传入task_id获取解析结果,成功后将返回ZIP格式解析文件的下载地址;
import time

import requests

def querybyid(task_id,max_retries=100,retry_interval=5):

    url = f'https://mineru.net/api/v4/extract/task/{task_id}'
    
    token = '*自己申请的token*'
    
    header = {
    
    'Content-Type': 'application/json',
    
    'Authorization': f'Bearer {token}'
    
    }
    
    retries = 0
    
    while retries < max_retries:

            try:

                res = requests.get(url, headers=header, timeout=5)
                
                res.  raise_for_status() # 检查请求是否成功
                
                data = res.json()
                
                if "data" in data and "full_zip_url" in data["data"] and data["data"]["full_zip_url"]:
    
                    return data["data"]["full_zip_url"]
    
                else:
    
                    print(f"full_zip_url 为空,正在等待任务完成。已重试 {retries + 1} 次,共 {max_retries} 次。")
                    
                    time.sleep(retry_interval)
                    
                    retries += 1
            
            except requests.exceptions.RequestException as e:

                print(f"请求失败,错误信息:{e}。正在重试...")
                
                time.sleep(retry_interval)
                
                retries += 1
      
    raise Exception(f"在 {max_retries} 次重试后,仍未获取到有效的 full_zip_url。")

  • 文件下载函数:根据提供的ZIP文件下载链接,将文件保存至容器内的/opt/maxkb/download目录。需要注意的是,MaxKB默认使用sandbox用户运行,需确保该用户对/opt/maxkb/download目录有读写权限;
docker exec -it maxkb bash             --启动maxkb
cd /opt/maxkb                          --进入目录
mkdir download                         --创建文件夹
chown sandbox download/                --用户授权
import os

import requests

from urllib.parse import urlparse

def download_file(download_url, save_dir='/opt/maxkb/download'):

    os.makedirs(save_dir, exist_ok=True)

    #获取文件名
    parsed_url = urlparse(download_url)
    
    filename = os.path.basename(parsed_url.path)
    
    save_path = os.path.join(save_dir, filename) # 文件下载后保存的目录,需要默认用户对此目录有读写权限
    
    #下载文件
    try:

        response = requests.get(download_url, stream=True)
        
        response.raise_for_status() # 检查请求是否成功
        
        total_size = int(response.headers.get('content-length', 0))
        
        block_size = 1024 # 1KB
        
        progress = 0
        
        print(f"开始下载 {filename} 到 {save_dir}")
        
        with open(save_path, 'wb') as f:

            for data in response.iter_content(block_size):

                f.write(data)

                progress += len(data)

                #打印下载进度
                print(f"下载进度: {progress / total_size * 100:.2f}%", end='\r')

        print(f"\n下载完成: {save_path}")

        return save_path

    except requests.exceptions.RequestException as e:

        print(f"下载失败: {e}")
        
        return None

  • ZIP文件上传至知识库:通过MaxKB API将服务器上的ZIP解析文件上传至知识库存储。
import os
import json
import logging
import requests

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

def initialize(file_path, file_name=None, source_file_id=None):
    config = {
        'authorization_apikey': '*API 密钥*',
        'split_url':  'http://*IP地址*:8080/admin/api/workspace/default/knowledge/*知识库Id*/document/split?with_filter=true',
        'upload_url': 'http://*IP地址*:8080/admin/api/workspace/default/knowledge/*知识库Id*/document/batch_create',
        'file_path':  file_path,
        'file_name':  file_name or os.path.basename(file_path),
        'source_file_id': source_file_id
    }
    return config

def upload_file(config):
    headers = {
        'accept': 'application/json',
        'Authorization': f'Bearer {config["authorization_apikey"]}'
    }
    try:
        files = {'file': open(config["file_path"], 'rb')}
        response = requests.post(config["split_url"], headers=headers, files=files)
        response.raise_for_status()
        response_data = response.json()
        map_content = {}
        if response_data.get("data"):
            first_item = response_data["data"][0]
            if first_item.get("content"):
                first_content_item = first_item["content"][0]
                map_content[first_content_item.get("title", "")] = first_content_item.get("content", "")
        return map_content
    except requests.exceptions.RequestException as e:
        logging.error(f"文件分段上传失败: {e}")
        return {}
    except Exception as e:
        logging.error(f"处理文件内容时出错: {e}")
        return {}


def send_post_request(config, map_content):
    headers = {
        "Content-Type": "application/json",
        "Authorization": f'Bearer {config["authorization_apikey"]}'
    }

    paragraphs = [
        {
            "title": k,
            "content": v,
            "is_active": True
        }
        for k, v in map_content.items()
    ]

    payload = [
        {
            "name": config["file_name"],
            "paragraphs": paragraphs,
            "source_file_id": config["source_file_id"]
        }
    ]

    response = requests.put(config["upload_url"], headers=headers, data=json.dumps(payload))
    try:
        response.raise_for_status()
        logging.info("上传文件响应: %s", response.text)
        return response
    except requests.exceptions.RequestException as e:
        logging.error("上传文件失败: %s", e)
        return False


def main(file_path, file_name=None, source_file_id=None):
    config = initialize(file_path, file_name, source_file_id)
    map_content = upload_file(config)
    if not map_content:
        return "文件分段上传失败或内容为空,程序终止"
    if not send_post_request(config, map_content):
        return "文件上传失败,程序终止"
    return "文件已上传成功,并保存在知识库中"

三、创建应用

在上述四个函数创建完成后,我们可以在MaxKB中尝试创建高级应用。输入或提取上传文件的链接后,按照前文顺序依次添加MinerU单个文件解析函数节点→从MinerU获取任务结果函数节点→下载文件函数节点→文件上传函数节点。

小助手提示“文件上传成功”,即可回到知识库页面,在目标知识库中看到新上传的文档。

总结来说,MinerU v2.0是一款开源、高性能的PDF文档解析工具,具备强大的多模态处理能力。通过MaxKB与MinerU的深度联动,可以基于函数调用构建清晰高效的 “文件地址→解析→下载→上传” 自动化流程,无缝衔接原始文档与结构化知识库的构建。 “MinerU+MaxKB”的组合方案,不仅可以显著提升了文档解析的精度与效率,更能大幅增强知识库问答系统的能力与效果。

2 个赞

从哪个AI 弄来的代码,经过测试了吗?

1 个赞

最后的函数用不了。。。。。

最后一个函数需要修改API密钥、IP和知识库ID哈

本地部署的mineru也能通过API的方式调用吗?

可以的,需要修改一下函数中的url

这个是 MaxKB v2 版本的方案,请确认你的版本,并确认函数中的环境参数都根据你的环境进行了替换。

最后一个函数需要修改API密钥、IP和知识库ID,是不是指的专业版的maxkb?社区版的没有api key管理。

是的,开源版只能调用会话相关的 API。

1 个赞

那知识库里的概览里的api文档和base url地址及api key和最后一个函数库里的参数有关系吗?社区版的也能实现maxkb v2+mineru的解析高级编排吗?

没有关系。社区版没有api key,无法实现。

好的,谢谢

第一个函数调试出现“‘NoneType’ object is not subscriptable”是什么问题?

你没给输入?

AI生成的工具代码,刚调试通,表单收集pdf文档,工具接收后解析返回md文本,比较简单。

import json
import re
import ast
import tempfile
import os
import time
import requests
from typing import Dict, Any

def execute(**kwargs) -> str:
    """
    MaxKB 自定义工具:通过 MinerU 后端 API 解析单个 PDF,直接返回 Markdown 文本
    参数:
        file_input: 列表,包含一个字典,例如:
            [{'name': 'xxx.pdf', 'file_id': '019da9f9-...', 'size': 4041631, 'uid': ..., 'status': 'success'}]
        config: 可选配置,例如:
            {"server_url": "http://10.21.8.30:8080", "url_prefix": "http://10.21.8.28:8080/admin", ...}
    返回:
        该 PDF 对应的 Markdown 文本字符串
    """
    # ========== 1. 提取输入参数 ==========
    file_input_raw = kwargs.get("file_input")
    config_raw = kwargs.get("config", {})

    # ========== 2. 默认配置 ==========
defaults = {
    # MinerU 服务端地址(对应界面:Server URL 输入框)
    "server_url": "http://10.21.8.30:8080",
    # 后台管理接口前缀(内部管理API,非界面展示项)
    "url_prefix": "http://10.21.8.28:8080/admin",
    # 解析后端引擎(对应界面:Backend 下拉框,可选 hybrid-auto-engine/pipeline/vlm-auto-engine)
    "backend": "hybrid-auto-engine",
    # 解析模式:自动识别(固定值,无需手动修改)
    "parse_method": "auto",
    # 启用行内公式识别(对应界面 hybrid 后端:Enable inline formula recognition 复选框)
    "formula_enable": False,
    # 启用表格识别(对应界面:Enable table recognition 复选框)
    "table_enable": True,
    # OCR 语言列表(对应界面:OCR Language 下拉框,ch=中文/英文/繁体)
    "lang_list": ["ch"],
    # 转换起始页码(对应界面:Max convert pages 最小值,从第0页开始)
    "start_page_id": 0,
    # 转换结束页码(对应界面:Max convert pages 最大值,默认全部页面)
    "end_page_id": 99999,
    # 返回解析后的 Markdown 内容(界面展示:Markdown rendering / Markdown text)
    "return_md": True,
    # 返回中间处理 JSON 数据(内部调试用,不展示给用户)
    "return_middle_json": False,
    # 返回模型原始输出(内部调试用)
    "return_model_output": False,
    # 返回内容结构化列表(内部数据格式)
    "return_content_list": False,
    # 返回提取的图片文件(界面不默认开启,按需启用)
    "return_images": False,
    # 返回格式为 ZIP 压缩包(False=直接返回文件/文本,True=打包压缩)
    "response_format_zip": False,
    # 单个文件处理超时时间(单位:秒,默认5分钟)
    "timeout_per_file": 300,
    # 结果文件输出目录(界面生成的 Convert result 保存路径)
    "output_dir": "./output"
}

    # 解析用户自定义配置
    if isinstance(config_raw, str):
        try:
            config_raw = parse_input(config_raw)
        except:
            config_raw = {}
    cfg = {**defaults, **config_raw} if isinstance(config_raw, dict) else defaults

    # 必要配置检查
    if not cfg.get("server_url") or not cfg.get("url_prefix"):
        return "配置缺失: server_url 或 url_prefix"

    # ========== 3. 鲁棒解析函数 ==========
    def parse_input(raw):
        if isinstance(raw, (dict, list)):
            return raw
        if not isinstance(raw, str):
            raise ValueError(f"不支持的输入类型: {type(raw)}")
        text = raw.strip()
        text = re.sub(r'^\s*```(?:json)?\s*', '', text, flags=re.IGNORECASE)
        text = re.sub(r'\s*```\s*$', '', text).strip()
        start = text.find('{') if '{' in text else text.find('[')
        end = text.rfind('}') if '}' in text else text.rfind(']')
        if start != -1 and end != -1:
            text = text[start:end+1]
        try:
            return ast.literal_eval(text)
        except:
            text = re.sub(r"'([^']+)'", r'"\1"', text)
            return json.loads(text)

    # ========== 4. 展平文件列表(只取第一个) ==========
    def flatten_file_list(obj):
        result = []
        if isinstance(obj, (list, tuple)):
            for item in obj:
                result.extend(flatten_file_list(item))
        elif isinstance(obj, dict):
            if "name" in obj or "file_id" in obj:
                result.append(obj)
            else:
                for v in obj.values():
                    result.extend(flatten_file_list(v))
        return result

    # ========== 5. 解析 file_input ==========
    try:
        if file_input_raw is None:
            return "错误: file_input 为空"
        parsed = parse_input(file_input_raw)
        file_list = flatten_file_list(parsed)
    except Exception as e:
        return f"解析 file_input 失败: {str(e)}"

    if not file_list:
        return "错误: file_input 中未找到任何文件"

    # 只取第一个文件对象
    file_obj = file_list[0]
    if not isinstance(file_obj, dict):
        return f"错误: 文件对象格式不正确,应为字典,实际为 {type(file_obj)}"

    file_name = file_obj.get("name", "")
    file_id = file_obj.get("file_id", "")
    if not file_name or not file_id:
        return "错误: 文件对象缺少 name 或 file_id 字段"

    # 构造下载 URL
    base_prefix = cfg["url_prefix"].rstrip('/')
    download_url = f"{base_prefix}/oss/file/{file_id}"

    tmp_path = None
    try:
        # 下载文件到临时文件
        with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
            tmp_path = tmp.name

        dl_retries = 3
        for attempt in range(dl_retries):
            try:
                r = requests.get(download_url, timeout=30)
                if r.status_code == 200:
                    with open(tmp_path, "wb") as f:
                        f.write(r.content)
                    break
                else:
                    raise Exception(f"下载失败 HTTP {r.status_code}")
            except Exception as e:
                if attempt == dl_retries - 1:
                    raise
                time.sleep(2)

        # ========== 6. 调用后端 API(内联函数,确保参数正确) ==========
        def call_backend(file_path: str, orig_name: str) -> str:
            base_url = cfg["server_url"].rstrip('/')
            url = f"{base_url}/file_parse"
            with open(file_path, 'rb') as f:
                files = [('files', (orig_name, f, 'application/pdf'))]
                data = {
                    'backend': cfg.get('backend', 'hybrid-auto-engine'),
                    'parse_method': cfg.get('parse_method', 'auto'),
                    'formula_enable': str(cfg.get('formula_enable', True)).lower(),
                    'table_enable': str(cfg.get('table_enable', True)).lower(),
                    'lang_list': cfg.get('lang_list', ['ch']),
                    'start_page_id': cfg.get('start_page_id', 0),
                    'end_page_id': cfg.get('end_page_id', 99999),
                    'return_md': str(cfg.get('return_md', True)).lower(),
                    'return_middle_json': str(cfg.get('return_middle_json', False)).lower(),
                    'return_model_output': str(cfg.get('return_model_output', False)).lower(),
                    'return_content_list': str(cfg.get('return_content_list', False)).lower(),
                    'return_images': str(cfg.get('return_images', False)).lower(),
                    'response_format_zip': str(cfg.get('response_format_zip', False)).lower(),
                    'output_dir': cfg.get('output_dir', './output')
                }
                resp = requests.post(url, files=files, data=data, timeout=cfg['timeout_per_file'])
            if resp.status_code != 200:
                raise Exception(f"后端错误 (HTTP {resp.status_code}): {resp.text[:500]}")
            content_type = resp.headers.get('Content-Type', '')
            if 'application/json' in content_type:
                result = resp.json()
                if isinstance(result, str):
                    return result
                if 'markdown' in result:
                    return result['markdown']
                if 'data' in result:
                    return result['data']
                if 'results' in result and isinstance(result['results'], dict):
                    for file_value in result['results'].values():
                        if isinstance(file_value, dict) and 'md_content' in file_value:
                            return file_value['md_content']
                        elif isinstance(file_value, str):
                            return file_value
                return json.dumps(result, ensure_ascii=False)
            else:
                return resp.text

        md_text = call_backend(tmp_path, file_name)
        return md_text

    except Exception as e:
        return f"解析失败: {str(e)}"
    finally:
        if tmp_path and os.path.exists(tmp_path):
            os.unlink(tmp_path)