能否基于pypandoc做一个官方的文本生成函数

现在的文本生成并下载功能是一个刚需,一般生成的文件,只能通过复制黏贴的形式下载,要么是md格式要么是复制到word里面严重变形。能否官方出一个文本生成并下载的函数呢。

通过我的摸索,我们找到一个比较好的解决方案,使用 pypandoc 这个开源工具,可以将md文件转存为word文件保存。同时可以套用已经设定好的模板保存。也就是说,通过ai生成的md格式文本,可以直接按照提前设定好的模板格式生成文件word文件下载。




但是问题是,我自己建的服务器,稳定性不好,经常出现链接超时的问题。
请求官方能否出一个类似的函数,可以直接生成想要的文件。

dify中有类似的工具,比如生成excel、word或者其他文件下载。

你的想法很好,可以到 github 提交一下 issue。

在issue提交结果被吞了,网络问题。 但是确实需要这个模块或者功能,做成插件就好了

工具中设置代码如下,命名 markdown转docx :

import requests
import json
def getPost(raw_text):
    # 定义要传递的 RAW TEXT 文字(即LLM输出内容)
    # 发送 POST 请求,并传入 RAW TEXT
    url = 'http://xxx.xxx.xxx.xxx:8090/office/word/convert'#这个是md_to_doc_server.py的flask service 链接地址,详见md_to_doc_server.py文件。
    try:
        response = requests.post(url, data=raw_text, timeout=(3,10))#发送POST请求到flask service,由对方将RAW TEXT转化为word文档
    except requests.exceptions.RequestException as e:
        return str(e)
    result = response.json()['download_url']
    return "[--->>>>> 点这里下载包含本次对话内容的word文档]("+result+")"
    #return result

设置单独的python应用,文件名md_to_docx_server.py 代码如下


from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import FileResponse, JSONResponse
import os
import time
import logging
from pydantic import BaseModel
from docx import Document as DocxDocument
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import datetime
import re

# 服务器配置
SERVER_PORT = 8090  # 默认端口,可在此修改

app = FastAPI(title="Markdown to Word Converter")

# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class MarkdownContent(BaseModel):
    content: str

# 获取当前程序运行目录并创建必要的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
temp_dir = os.path.join(current_dir, 'temp')
output_dir = os.path.join(current_dir, 'output')
os.makedirs(temp_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)

def str2timestamp(date_str, format='%Y-%m-%d'):
    """将日期字符串转换为时间戳"""
    try:
        tp = time.strptime(date_str, format)
        return int(time.mktime(tp))
    except ValueError:
        return 0

def cleanup_old_files():
    """清理2天前的docx文件"""
    logger.info("开始清理过期文件...")
    
    # 计算2天前的日期和时间戳
    five_days_ago = datetime.datetime.now() - datetime.timedelta(days=2)
    standard_timestamp = str2timestamp(five_days_ago.strftime('%Y-%m-%d'))
    
    deleted_count = 0
    current_time = datetime.datetime.now()
    
    # 检查当前时间是否在19点到23点之间
    if current_time.hour >= 19 and current_time.hour < 23:
        if os.path.exists(output_dir):
            for filename in os.listdir(output_dir):
                if filename.endswith('.docx'):
                    file_path = os.path.join(output_dir, filename)
                    
                    # 获取文件修改时间
                    file_mtime = os.path.getmtime(file_path)
                    file_date = datetime.datetime.fromtimestamp(file_mtime)
                    
                    # 如果文件修改时间早于2天前,则删除
                    if file_mtime < standard_timestamp:
                        try:
                            os.remove(file_path)
                            deleted_count += 1
                            logger.info(f"已删除过期文件: {filename}")
                        except Exception as e:
                            logger.error(f"删除文件失败 {filename}: {str(e)}")
    
    logger.info(f"文件清理完成,共删除 {deleted_count} 个文件")

import re

def convert_markdown_to_docx(markdown_content: str, output_path: str):
    """将Markdown内容转换为Word文档"""
    from docx.shared import Pt
    from docx.enum.style import WD_STYLE_TYPE
    
    doc = DocxDocument()
    
    # 设置默认字体为宋体
    for style in doc.styles:
        if style.type == WD_STYLE_TYPE.PARAGRAPH or style.type == WD_STYLE_TYPE.CHARACTER:
            if hasattr(style.font, 'name'):
                style.font.name = '宋体'
                # 解决Windows与Mac字体不一致问题
                style.font.name_ascii = 'Times New Roman'
                style.font.name_east_asia = '宋体'
    
    # 预编译正则表达式
    bold_pattern = re.compile(r'\*\*(.*?)\*\*|__(.*?)__')
    italic_pattern = re.compile(r'\*(.*?)\*|_(.*?)_')
    code_pattern = re.compile(r'`(.*?)`')
    link_pattern = re.compile(r'\[(.*?)\]\((.*?)\)')
    heading_pattern = re.compile(r'^(#{1,6})\s+(.*)$')
    
    lines = markdown_content.split('\n')
    i = 0
    
    while i < len(lines):
        line = lines[i].strip()
        i += 1
        
        if not line:
            continue
            
        # 处理标题
        heading_match = heading_pattern.match(line)
        if heading_match:
            hashes, title_text = heading_match.groups()
            level = len(hashes)
            if level == 1:
                # 一级标题
                doc.add_heading(title_text, level=0)
            elif level == 2:
                # 二级标题
                doc.add_heading(title_text, level=1)
            elif level == 3:
                # 三级标题
                doc.add_heading(title_text, level=2)
            else:
                # 四级及以上标题
                doc.add_heading(title_text, level=2)
            continue
        
        # 处理列表
        if line.startswith('- ') or line.startswith('* '):
            # 无序列表
            list_items = []
            # 收集所有连续的列表项
            while i <= len(lines):
                list_items.append(line[2:].strip())
                if i < len(lines):
                    next_line = lines[i].strip()
                    if next_line.startswith('- ') or next_line.startswith('* '):
                        line = next_line
                        i += 1
                    else:
                        break
                else:
                    break
            
            # 添加无序列表
            for item in list_items:
                para = doc.add_paragraph(style='List Bullet')
                # 处理行内格式
                add_formatted_text(para, item, bold_pattern, italic_pattern, code_pattern, link_pattern)
            continue
        
        # 处理数字列表
        if line and line[0].isdigit() and '.' in line[:3]:
            # 有序列表
            list_items = []
            # 收集所有连续的列表项
            while i <= len(lines):
                # 提取列表文本
                list_text = line.split('.', 1)[1].strip() if '.' in line else line
                list_items.append(list_text)
                if i < len(lines):
                    next_line = lines[i].strip()
                    if next_line and next_line[0].isdigit() and '.' in next_line[:3]:
                        line = next_line
                        i += 1
                    else:
                        break
                else:
                    break
            
            # 添加有序列表
            for item in list_items:
                para = doc.add_paragraph(style='List Number')
                # 处理行内格式
                add_formatted_text(para, item, bold_pattern, italic_pattern, code_pattern, link_pattern)
            continue
        
        # 处理代码块
        if line.startswith('```'):
            # 收集代码块内容
            code_content = []
            while i < len(lines):
                code_line = lines[i]
                i += 1
                if code_line.strip() == '```':
                    break
                code_content.append(code_line)
            
            # 添加代码块
            para = doc.add_paragraph()
            run = para.add_run('\n'.join(code_content))
            run.font.name = 'Courier New'
            run.font.name_ascii = 'Courier New'
            run.font.size = Pt(10)
            continue
        
        # 处理引用
        if line.startswith('>'):
            # 收集引用内容
            quote_content = []
            while i <= len(lines):
                if line.startswith('>'):
                    quote_content.append(line[1:].strip())
                if i < len(lines):
                    next_line = lines[i].strip()
                    if next_line.startswith('>') or not next_line:
                        line = next_line
                        i += 1
                    else:
                        break
                else:
                    break
            
            # 添加引用
            para = doc.add_paragraph('\n'.join(quote_content), style='Intense Quote')
            continue
        
        # 处理表格
        if line.startswith('|') and '|' in line:
            # 收集表格内容
            table_lines = []
            while i <= len(lines):
                table_lines.append(line)
                if i < len(lines):
                    next_line = lines[i].strip()
                    if next_line.startswith('|') or '|' in next_line:
                        line = next_line
                        i += 1
                    else:
                        break
                else:
                    break
            
            # 简单处理表格,作为普通文本添加
            para = doc.add_paragraph('\n'.join(table_lines))
            continue
        
        # 处理分隔线
        if line.startswith('---') or line.startswith('***'):
            para = doc.add_paragraph()
            para.add_run('-' * 50)
            continue
        
        # 处理普通段落
        para = doc.add_paragraph()
        add_formatted_text(para, line, bold_pattern, italic_pattern, code_pattern, link_pattern)
    
    doc.save(output_path)

def add_formatted_text(para, text, bold_pattern, italic_pattern, code_pattern, link_pattern):
    """向段落中添加带有格式的文本"""
    from docx.shared import Pt
    
    # 处理文本中的各种格式
    
    # 先处理粗体
    pos = 0
    for match in bold_pattern.finditer(text):
        # 添加匹配前的文本
        if match.start() > pos:
            run = para.add_run(text[pos:match.start()])
            run.font.name = '宋体'
        # 添加粗体文本
        bold_text = match.group(1) or match.group(2)
        run = para.add_run(bold_text)
        run.font.name = '宋体'
        run.bold = True
        pos = match.end()
    
    # 更新文本为处理粗体后的剩余部分
    text = text[pos:]
    pos = 0
    
    # 处理斜体
    for match in italic_pattern.finditer(text):
        # 添加匹配前的文本
        if match.start() > pos:
            run = para.add_run(text[pos:match.start()])
            run.font.name = '宋体'
        # 添加斜体文本
        italic_text = match.group(1) or match.group(2)
        run = para.add_run(italic_text)
        run.font.name = '宋体'
        run.italic = True
        pos = match.end()
    
    # 更新文本为处理斜体后的剩余部分
    text = text[pos:]
    pos = 0
    
    # 处理行内代码
    for match in code_pattern.finditer(text):
        # 添加匹配前的文本
        if match.start() > pos:
            run = para.add_run(text[pos:match.start()])
            run.font.name = '宋体'
        # 添加代码文本
        code_text = match.group(1)
        run = para.add_run(code_text)
        run.font.name = 'Courier New'
        run.font.size = Pt(10)
        pos = match.end()
    
    # 更新文本为处理代码后的剩余部分
    text = text[pos:]
    pos = 0
    
    # 处理链接
    for match in link_pattern.finditer(text):
        # 添加匹配前的文本
        if match.start() > pos:
            run = para.add_run(text[pos:match.start()])
            run.font.name = '宋体'
        # 添加链接文本
        link_text = match.group(1)
        link_url = match.group(2)
        run = para.add_run(link_text)
        run.font.name = '宋体'
        run.underline = True
        run = para.add_run(f" ({link_url})")
        run.font.name = '宋体'
        pos = match.end()
    
    # 添加剩余文本
    if pos < len(text):
        run = para.add_run(text[pos:])
        run.font.name = '宋体'

def init_scheduler():
    """初始化定时任务调度器"""
    scheduler = BackgroundScheduler(timezone='Asia/Shanghai')
    
    # 每天22点开始执行清理任务,每分钟检查一次直到23点
    scheduler.add_job(
        cleanup_old_files,
        trigger=CronTrigger(hour=22, minute='*'),
        id='cleanup_job'
    )
    
    return scheduler

@app.on_event("startup")
async def startup_event():
    """应用启动时初始化定时任务"""
    scheduler = init_scheduler()
    scheduler.start()
    logger.info("定时任务调度器已启动,将在每天22:00-23:00期间执行文件清理")

@app.post("/office/word/convert")
async def convert_md_to_docx(request: Request):
    logger.info('Received request for /convert')
    
    # 在转换前先执行清理操作(如果当前时间在22-23点之间)
    current_time = datetime.datetime.now()
    if current_time.hour >= 22 and current_time.hour < 23:
        cleanup_old_files()
    
    try:
        content = await request.body()
        logger.info(f'Received content length: {len(content)} bytes')
        
        if not content:
            logger.error('No content part in the request')
            return JSONResponse(content={"error": "No content part"}, status_code=400)

        content = content.decode('utf-8')
        logger.info(f'Content preview: {content[:100]}...')
        
        if content == '':
            logger.error('No content provided')
            return JSONResponse(content={"error": "No content provided"}, status_code=400)

        # 从请求的内容中读取
        mdfile_name = str(int(time.time())) + ".md"
        md_file_path = os.path.join(temp_dir, mdfile_name)
        
        with open(md_file_path, 'w', encoding='utf-8') as f:
            f.write(content)

        # 将Markdown文件转换为Word文档
        file_name = str(int(time.time())) + ".docx"
        output_path = os.path.join(output_dir, file_name)
        
        # 读取Markdown文件内容
        with open(md_file_path, 'r', encoding='utf-8') as f:
            markdown_content = f.read()
        
        # 调用转换函数
        convert_markdown_to_docx(markdown_content, output_path)
        
        # 清理临时文件
        if os.path.exists(md_file_path):
            os.remove(md_file_path)
        
        # 返回文件的下载链接
        base_url = str(request.base_url)
        download_url = base_url + 'office/word/download/' + os.path.basename(output_path)
        logger.info(f'Conversion successful: {download_url}')
        return JSONResponse(content={"download_url": download_url}, status_code=200)
        
    except Exception as e:
        logger.error(f'Conversion failed: {str(e)}', exc_info=True)
        # 清理临时文件
        if 'md_file_path' in locals() and os.path.exists(md_file_path):
            os.remove(md_file_path)
        return JSONResponse(content={"error": f"Conversion failed: {str(e)}"}, status_code=500)

@app.get("/office/word/download/{filename}")
async def download_file(filename: str):
    file_path = os.path.join(output_dir, filename)
    if not os.path.exists(file_path):
        raise HTTPException(status_code=404, detail="File not found")
    return FileResponse(file_path, filename=filename)

if __name__ == "__main__":
    import uvicorn
    print(f"Starting server on port {SERVER_PORT}...")
    uvicorn.run(app, host="0.0.0.0", port=SERVER_PORT)

依赖 requirements.txt

代码如下:

fastapi==0.104.1
uvicorn==0.24.0
python-docx==1.1.0
apscheduler==3.10.4

可以直接用,也可以让大模型优化一下再部署

您好,感谢您的建议,MaxKB 工具自行在 Github 上面 Pull Request 即可,欢迎您提 PR。
https://github.com/1Panel-dev/MaxKB-toolstore/blob/main/如何提交工具.md