MaxKB 高并发支持参数详解

引言

在处理客户咨询时,我们经常遇到关于MaxKB支持的最大并发数的问题,尤其是在举办大型活动或千人会议时。本文将基于实测结果,为您提供MaxKB的并发支持能力,适用于MaxKB版本1.9及以上。

一、测试环境

  • 云服务提供商 :阿里云
  • 服务器配置 :CPU密集型,4核心32GB内存,5M带宽
  • 测试结果 :最大并发支持1500-1800用户

二、关键参数解释与计算公式

2.1 名词解释:

  • Worker数 :Python执行时的工作进程数量。从MaxKB 1.8版本起,默认设置为CPU核心数的一半(向下取整)。
  • 最大Worker数 :服务器能够配置的最大工作进程数。
  • 最大并发数 :理论上支持的最大并发数,超过此值可能导致页面卡死。
  • 实际并发数 :实际运行中的最大并发数。
  • 数据库最大连接数(max_connections) :高并发时需增加数据库连接数以避免报错。

2.2 计算公式:

  • 最大Worker数 = 2 × CPU核心数 + 1
  • 最大并发数 = Worker数 / 最大Worker数 × 200
  • 数据库最大连接数 = Worker数 × 400(向上取整)
  • PostgreSQL内存限制 = 内存值 × 1/4 到 内存值 × 1/2

三、实例计算

3.1 根据服务器配置计算并发数

假设服务器配置为4核心8GB内存:

  1. 最大Worker数 = 2 × 4 + 1 = 9
  2. 最大并发数 = 9 × 200 = 1800(建议实际并发数小于1800)
  3. 数据库最大连接数 = 9 × 400 = 3600(建议设置为4000)
  4. PostgreSQL内存限制 = 8GB × 1/4 到 8GB × 1/2 = 2GB 到 4GB

3.2 根据并发需求计算服务器配置

若需支持3000并发:

  1. 最大Worker数 = 3000 / 200 = 15
  2. CPU核心数 = (15 - 1) / 2 = 7(建议申请8核心)
  3. 内存配置:8核心8GB、8核心16GB、8核心32GB均可

四、步骤设置

4.1 设置 PgSql 内存

在/opt/maxkb目录下找到docker-compose-pgsql.yml文件,按需设置mem_limt。

4.2 设置 Worker 数

在安装部署之后,在 /opt/maxkb 目录下面,找到 docker-compose.yaml 文件,设置worker数。

4.3 设置 max_connections 数

在/opt/maxkb/data找到postgresql.conf文件,设置max_connections。

4.4 重启MaxKB并检查配置

使用以下命令重启MaxKB并检查配置是否成功:

docker-compose  -f docker-compose.yml  -f docker-compose-pgsql.yml up -d  或者 mkctl reload

重启后,使用以下命令检查Worker数是否正确:

ps -ef | grep gun

看到返回信息的worker 数为自己设置的就好。

4.5 检查数据库最大连接数

## 进入到pgsql容器里面,执行查看最大连接数即可验证。
docker exec -it pgsql bash
psql
##  查看最大连接数
show max_connections;
 
 
## 拓展查询:
 
## 查看当前连接数
select count(*) from pg_stat_activity;
## 释放空闲连接
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE state='idle';

image

五、并发脚本验证

package org.example.wpsautoproject;

import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.experimental.Accessors;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class MaxKbThreadTest {


    //    static String Authorization = "application-bc3b5aff9036f48fc648101f9b03d9bb"; //wlx
    static String Authorization = "application-25f025d67a5ca8cce3e547f7dbef75bc";

    //    static String Authorization_id = "b89897f4-b9f9-11ef-a5d1-0242ac120003";//wlx
    // http://democenter.fit2cloud.cn:28090/api/application/f2de9864-bdaa-11ef-8e75-0242ac120003
    static String Authorization_id = "829c653a-86b0-11ef-a2b7-0242ac120004";//客户

    static String demo_url = "http://democenter.fit2cloud.cn:28090";


    @SneakyThrows
    @Test
    public void apitest1() {
        int concurrentRequests = 10;
        for (int index = 0; index < 5; index++) {
            List<Thread> threads = new CopyOnWriteArrayList<>();
            System.out.println("Starting iteration " + (index + 1));
            for (int i = 0; i < concurrentRequests; i++) {
                Thread t = new Thread(() -> {
                    long startTime = System.currentTimeMillis();
                    try {
                        System.out.println("Thread " + Thread.currentThread().getName() + " is running.");
                        questionAI("人工客服"); // 用户问题
                    } catch (Exception e) {
                        e.printStackTrace(); // 记录异常信息
                    } finally {
                        long endTime = System.currentTimeMillis();
                        System.out.println("Thread " + Thread.currentThread().getName() + " finished in " + (endTime - startTime) + " ms");
                    }
                }, "RequestThread-" + (i + 1)); // 为线程指定名称
                threads.add(t);
            }
            threads.forEach(Thread::start);
            for (Thread t : threads) {
                try {
                    t.join(); // 等待线程完成
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // 重新设置中断状态
                    e.printStackTrace(); // 记录异常信息
                }
            }
            System.out.println("All threads completed in iteration " + (index + 1));
            try {
                Thread.sleep(120000); // 等待2分钟
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 重新设置中断状态
                e.printStackTrace(); // 记录异常信息
            }
        }
    }


    @Test
    public void test_pool() throws InterruptedException {

        int CONCURRENT_REQUESTS = 50; // 线程数
        int ITERATIONS = 1; // 由于高并发,可能只需要迭代一次
        long WAITING_TIME_MS = 120000; // 2 minutes in milliseconds
        long totalStartTime = System.currentTimeMillis(); // 记录总开始时间
        AtomicInteger totalExecutedThreads = new AtomicInteger(0); // 用于记录总的执行线程数

        for (int index = 0; index < ITERATIONS; index++) {
            ExecutorService executor = Executors.newCachedThreadPool(); // 使用缓存线程池
            System.out.println("Starting iteration " + (index + 1));

            for (int i = 0; i < CONCURRENT_REQUESTS; i++) {
                executor.submit(() -> {
                    long startTime = System.currentTimeMillis();
                    try {
                        System.out.println("Thread " + Thread.currentThread().getName() + " is running.");
                        questionAI("人工客服");
                    } catch (Exception e) {
                        e.printStackTrace(); // 记录异常信息
                    } finally {
                        long endTime = System.currentTimeMillis();
                        System.out.println("Thread " + Thread.currentThread().getName() + " finished in " + (endTime - startTime) + " ms");
                        totalExecutedThreads.incrementAndGet(); // 增加总执行线程数
                    }
                });
            }

            executor.shutdown(); // 启动一次性执行后关闭线程池
            try {
                if (!executor.awaitTermination(WAITING_TIME_MS, TimeUnit.MILLISECONDS)) {
                    System.out.println("Forcing shutdown after waiting for " + WAITING_TIME_MS + "ms");
                    executor.shutdownNow(); // 强制停止
                }
            } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                executor.shutdownNow();
            }

            System.out.println("All threads completed in iteration " + (index + 1));
        }

        long totalEndTime = System.currentTimeMillis(); // 记录总结束时间
        System.out.println("Total time to execute all threads: " + (totalEndTime - totalStartTime) + " ms");
        System.out.println("Total threads executed: " + totalExecutedThreads.get()); // 打印总执行的线程数

    }

    public void questionAI(String question) {
        // 获取会话ID
        String sessionId = getNewSessionId();

        System.out.println("sessionId==>" + sessionId);

        // 设置请求体
        ChatMessageRequest body = new ChatMessageRequest();
        body.setMessage(question).setRe_chat(false).setStream(false);
        // 构建 POST 请求
        String url = String.format(demo_url + "/api/application/chat_message/%s", sessionId);
        HttpResponse response = HttpRequest.post(url)
                .header("Accept", "application/json;charset=UTF-8")
                .header("Authorization", Authorization)
                .body(JSON.toJSONString(body))
                .execute();
        String responseBody = response.body();
        JSONObject jsonResponse = JSON.parseObject(responseBody);
        String content = jsonResponse.getString("data");
        System.out.println("getStatus====>" + response.getStatus());
        System.out.println("jsonResponse==>" + jsonResponse);
        System.out.println(content);
    }

    private String getNewSessionId() {
        String url = String.format(demo_url + "/api/application/%s/chat/open", Authorization_id);
        HttpResponse response = HttpRequest.get(url)
                .header("Accept", "application/json;charset=UTF-8")
                .header("Authorization", Authorization)
                .execute();

        String responseBody = response.body();
        JSONObject jsonResponse = JSON.parseObject(responseBody);
        String sessionId = jsonResponse.getString("data");
        return sessionId;
    }


    @Data
    @Accessors(chain = true)
    public static class ChatMessageRequest {
        private String message;
        private boolean re_chat;
        private boolean stream;
    }

}

2.2的计算公式和3.1的最大并发数计算公式不一致