引言
在处理客户咨询时,我们经常遇到关于MaxKB支持的最大并发数的问题,尤其是在举办大型活动或千人会议时。本文将基于实测结果,为您提供MaxKB的并发支持能力,适用于MaxKB版本1.9及以上。
一、测试环境
- 云服务提供商 :阿里云
- 服务器配置 :CPU密集型,4核心32GB内存,5M带宽
- 测试结果 :最大并发支持1500-1800用户
二、关键参数解释与计算公式
2.1 名词解释:
- Worker数 :Python执行时的工作进程数量。从MaxKB 1.8版本起,默认设置为CPU核心数的一半(向下取整)。
- 最大Worker数 :服务器能够配置的最大工作进程数。
- 最大并发数 :理论上支持的最大并发数,超过此值可能导致页面卡死。
- 实际并发数 :实际运行中的最大并发数。
- 数据库最大连接数(max_connections) :高并发时需增加数据库连接数以避免报错。
2.2 计算公式:
- 最大Worker数 = 2 × CPU核心数 + 1
- 最大并发数 = Worker数 / 最大Worker数 × 200
- 数据库最大连接数 = Worker数 × 400(向上取整)
- PostgreSQL内存限制 = 内存值 × 1/4 到 内存值 × 1/2
三、实例计算
3.1 根据服务器配置计算并发数
假设服务器配置为4核心8GB内存:
- 最大Worker数 = 2 × 4 + 1 = 9
- 最大并发数 = 9 × 200 = 1800(建议实际并发数小于1800)
- 数据库最大连接数 = 9 × 400 = 3600(建议设置为4000)
- PostgreSQL内存限制 = 8GB × 1/4 到 8GB × 1/2 = 2GB 到 4GB
3.2 根据并发需求计算服务器配置
若需支持3000并发:
- 最大Worker数 = 3000 / 200 = 15
- CPU核心数 = (15 - 1) / 2 = 7(建议申请8核心)
- 内存配置:8核心8GB、8核心16GB、8核心32GB均可
四、步骤设置
4.1 设置 PgSql 内存
在/opt/maxkb目录下找到docker-compose-pgsql.yml文件,按需设置mem_limt。
4.2 设置 Worker 数
在安装部署之后,在 /opt/maxkb 目录下面,找到 docker-compose.yaml 文件,设置worker数。
4.3 设置 max_connections 数
在/opt/maxkb/data找到postgresql.conf文件,设置max_connections。
4.4 重启MaxKB并检查配置
使用以下命令重启MaxKB并检查配置是否成功:
docker-compose -f docker-compose.yml -f docker-compose-pgsql.yml up -d 或者 mkctl reload
重启后,使用以下命令检查Worker数是否正确:
ps -ef | grep gun
看到返回信息的worker 数为自己设置的就好。
4.5 检查数据库最大连接数
## 进入到pgsql容器里面,执行查看最大连接数即可验证。
docker exec -it pgsql bash
psql
## 查看最大连接数
show max_connections;
## 拓展查询:
## 查看当前连接数
select count(*) from pg_stat_activity;
## 释放空闲连接
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE state='idle';
五、并发脚本验证
package org.example.wpsautoproject;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.experimental.Accessors;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class MaxKbThreadTest {
// static String Authorization = "application-bc3b5aff9036f48fc648101f9b03d9bb"; //wlx
static String Authorization = "application-25f025d67a5ca8cce3e547f7dbef75bc";
// static String Authorization_id = "b89897f4-b9f9-11ef-a5d1-0242ac120003";//wlx
// http://democenter.fit2cloud.cn:28090/api/application/f2de9864-bdaa-11ef-8e75-0242ac120003
static String Authorization_id = "829c653a-86b0-11ef-a2b7-0242ac120004";//客户
static String demo_url = "http://democenter.fit2cloud.cn:28090";
@SneakyThrows
@Test
public void apitest1() {
int concurrentRequests = 10;
for (int index = 0; index < 5; index++) {
List<Thread> threads = new CopyOnWriteArrayList<>();
System.out.println("Starting iteration " + (index + 1));
for (int i = 0; i < concurrentRequests; i++) {
Thread t = new Thread(() -> {
long startTime = System.currentTimeMillis();
try {
System.out.println("Thread " + Thread.currentThread().getName() + " is running.");
questionAI("人工客服"); // 用户问题
} catch (Exception e) {
e.printStackTrace(); // 记录异常信息
} finally {
long endTime = System.currentTimeMillis();
System.out.println("Thread " + Thread.currentThread().getName() + " finished in " + (endTime - startTime) + " ms");
}
}, "RequestThread-" + (i + 1)); // 为线程指定名称
threads.add(t);
}
threads.forEach(Thread::start);
for (Thread t : threads) {
try {
t.join(); // 等待线程完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重新设置中断状态
e.printStackTrace(); // 记录异常信息
}
}
System.out.println("All threads completed in iteration " + (index + 1));
try {
Thread.sleep(120000); // 等待2分钟
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重新设置中断状态
e.printStackTrace(); // 记录异常信息
}
}
}
@Test
public void test_pool() throws InterruptedException {
int CONCURRENT_REQUESTS = 50; // 线程数
int ITERATIONS = 1; // 由于高并发,可能只需要迭代一次
long WAITING_TIME_MS = 120000; // 2 minutes in milliseconds
long totalStartTime = System.currentTimeMillis(); // 记录总开始时间
AtomicInteger totalExecutedThreads = new AtomicInteger(0); // 用于记录总的执行线程数
for (int index = 0; index < ITERATIONS; index++) {
ExecutorService executor = Executors.newCachedThreadPool(); // 使用缓存线程池
System.out.println("Starting iteration " + (index + 1));
for (int i = 0; i < CONCURRENT_REQUESTS; i++) {
executor.submit(() -> {
long startTime = System.currentTimeMillis();
try {
System.out.println("Thread " + Thread.currentThread().getName() + " is running.");
questionAI("人工客服");
} catch (Exception e) {
e.printStackTrace(); // 记录异常信息
} finally {
long endTime = System.currentTimeMillis();
System.out.println("Thread " + Thread.currentThread().getName() + " finished in " + (endTime - startTime) + " ms");
totalExecutedThreads.incrementAndGet(); // 增加总执行线程数
}
});
}
executor.shutdown(); // 启动一次性执行后关闭线程池
try {
if (!executor.awaitTermination(WAITING_TIME_MS, TimeUnit.MILLISECONDS)) {
System.out.println("Forcing shutdown after waiting for " + WAITING_TIME_MS + "ms");
executor.shutdownNow(); // 强制停止
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
executor.shutdownNow();
}
System.out.println("All threads completed in iteration " + (index + 1));
}
long totalEndTime = System.currentTimeMillis(); // 记录总结束时间
System.out.println("Total time to execute all threads: " + (totalEndTime - totalStartTime) + " ms");
System.out.println("Total threads executed: " + totalExecutedThreads.get()); // 打印总执行的线程数
}
public void questionAI(String question) {
// 获取会话ID
String sessionId = getNewSessionId();
System.out.println("sessionId==>" + sessionId);
// 设置请求体
ChatMessageRequest body = new ChatMessageRequest();
body.setMessage(question).setRe_chat(false).setStream(false);
// 构建 POST 请求
String url = String.format(demo_url + "/api/application/chat_message/%s", sessionId);
HttpResponse response = HttpRequest.post(url)
.header("Accept", "application/json;charset=UTF-8")
.header("Authorization", Authorization)
.body(JSON.toJSONString(body))
.execute();
String responseBody = response.body();
JSONObject jsonResponse = JSON.parseObject(responseBody);
String content = jsonResponse.getString("data");
System.out.println("getStatus====>" + response.getStatus());
System.out.println("jsonResponse==>" + jsonResponse);
System.out.println(content);
}
private String getNewSessionId() {
String url = String.format(demo_url + "/api/application/%s/chat/open", Authorization_id);
HttpResponse response = HttpRequest.get(url)
.header("Accept", "application/json;charset=UTF-8")
.header("Authorization", Authorization)
.execute();
String responseBody = response.body();
JSONObject jsonResponse = JSON.parseObject(responseBody);
String sessionId = jsonResponse.getString("data");
return sessionId;
}
@Data
@Accessors(chain = true)
public static class ChatMessageRequest {
private String message;
private boolean re_chat;
private boolean stream;
}
}