4卡V100 部署命令集以及测试结果记录
sudo docker run --gpus all -v /home/darius/model:/model -p 23333:23333 --ipc=host openmmlab/lmdeploy:latest lmdeploy serve api_server /model/Qwen3-30B-A3B-GPTQ-Int4 --model-name Qwen3-A3B --server-port 23333 --tp 4 --cache-max-entry-count 0.7 --tool-call-parser qwen
vllm serve Qwen2.5-VL-7B-GPTQ-INT4/ --max-model-len 8192 --swap-space 16 --gpu-memory-utilization 0.4 -tp 4
import argparse
import asyncio
import time
import numpy as np
import aiohttp
import json
import base64 # New import
import os # New import
from typing import List, Dict, Any, Tuple, Union # Updated import
import logging
import sys
# 导入系统监控库
import psutil
# 设置默认参数
DEFAULT_API_URL = "http://ks.sligenai.cn:5005/v1"
DEFAULT_MODEL = "Qwen2.5-VL-7B-GPTQ-INT4/" # 注意: 此模型可能不支持多模态。如果需要测试多模态功能,请根据实际情况调整模型名称。
API_AUTH_TOKEN = "none" # 你的 token 替换这里
# Configure logging
# Initial configuration to print to console
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
async def completion_request(
session: aiohttp.ClientSession,
url: str,
model: str,
messages_content: Union[str, List[Dict[str, Any]]], # Changed type hint to accept string or list of content parts
max_tokens: int,
stream: bool = False
) -> Tuple[float, int]:
"""发送请求到OpenAI兼容的API并计算处理时间"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_AUTH_TOKEN}"
}
messages_payload = []
if isinstance(messages_content, str):
# Text-only prompt
messages_payload = [{"role": "user", "content": messages_content}]
elif isinstance(messages_content, list):
# Multimodal content (list of text/image_url objects following OpenAI Vision API format)
messages_payload = [{"role": "user", "content": messages_content}]
else:
logging.error(f"无效的 messages_content 类型: {type(messages_content)}. 预期 str 或 List[Dict].")
return 0, 0
payload = {
"model": model,
"messages": messages_payload,
"max_tokens": max_tokens,
"stream": stream
}
start_time = time.time()
tokens_received = 0
try:
if not stream:
# 不使用流式输出,主要用于文本输入处理速度测试。
# 对于多模态非流式请求,返回值中的tokens将为1,表示处理了一个请求单元。
async with session.post(f"{url}/chat/completions", headers=headers, json=payload, timeout=300) as response:
response.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx)
result = await response.json()
end_time = time.time()
processing_time = end_time - start_time
# For input test, tokens is a measure of prompt length or input processed.
# For text-only, it's simple word count.
if isinstance(messages_content, str):
tokens = len(messages_content.split()) # Simple word count as input tokens
else:
# For multimodal non-stream, it's hard to define "input tokens" in a comparable way.
# We will return 1 to indicate a single multimodal request was processed.
# The focus for multimodal will typically be on output tokens (streaming).
tokens = 1
return processing_time, tokens
else:
# 使用流式输出,测试token生成速度 (适用于文本和多模态生成)
async with session.post(f"{url}/chat/completions", headers=headers, json=payload, timeout=300) as response:
response.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx)
# Use the time when the first token is received as the start of effective generation.
# The total time for the connection includes network latency and initial processing.
first_token_received_time = 0
async for line in response.content:
if line.strip():
try:
# Handling different possible stream data formats (e.g., "data: {json}")
if line.startswith(b"data: "):
line = line[6:]
# Check for [DONE] signal first, which indicates end of stream
if line.strip() == b"[DONE]":
break
data = json.loads(line)
# Check for content in choices[0].delta (standard OpenAI stream format)
if "choices" in data and data["choices"] and "delta" in data["choices"][0]:
if "content" in data["choices"][0]["delta"]:
content = data["choices"][0]["delta"]["content"]
# Only count if content is not empty
if content:
if first_token_received_time == 0:
first_token_received_time = time.time() # Mark time when first content token arrives
tokens_received += 1
except json.JSONDecodeError:
# logging.debug(f"Could not decode JSON from stream line: {line.strip()}")
pass # Silently ignore malformed lines that aren't valid JSON
except Exception as e:
logging.error(f"处理流式响应行时发生错误: {e} - 行: {line.strip().decode()}")
pass # Continue processing other lines even if one fails
end_time = time.time()
# Calculate generation time.
# If tokens were received, use the time from the first token until the end.
# If no tokens were received (e.g., empty response, or an error after initial connection),
# use the total duration of the request from its start.
if tokens_received > 0 and first_token_received_time > 0:
generation_time = end_time - first_token_received_time
else:
# If no tokens generated (e.g., error or empty response), use total time for this attempt.
generation_time = end_time - start_time
return generation_time, tokens_received
except aiohttp.ClientError as e:
logging.error(f"HTTP 客户端错误: {e}")
return 0, 0
except json.JSONDecodeError as e:
logging.error(f"JSON 解码错误: {e}")
return 0, 0
except asyncio.TimeoutError:
logging.error(f"请求超时 (300秒)。")
return 0, 0
except Exception as e:
logging.error(f"请求过程中发生意外错误: {e}")
return 0, 0
async def run_input_test(threads: int, prompt_length: int, api_url: str, model: str) -> List[float]:
"""测试文本输入处理速度 (tokens/second)"""
async with aiohttp.ClientSession() as session:
tasks = []
# 创建随机文本作为提示, 确保长度
base_prompt = "请详细解释人工智能的历史和发展。"
# Ensure the prompt is at least prompt_length characters long
prompt = (base_prompt * (prompt_length // len(base_prompt) + 1))[:prompt_length]
logging.info(f"文本输入测试: 使用 {threads} 个并发,提示长度 {prompt_length} 字符。")
for _ in range(threads):
# Pass the prompt string directly to messages_content
tasks.append(completion_request(session, api_url, model, prompt, 1, stream=False))
results = await asyncio.gather(*tasks)
# 计算每个请求的tokens/second
tokens_per_second = []
for processing_time, tokens in results:
if processing_time > 0 and tokens > 0:
tokens_per_second.append(tokens / processing_time)
elif processing_time == 0 and tokens > 0: # If time is 0 but tokens > 0, it's extremely fast.
tokens_per_second.append(float('inf'))
else:
logging.warning(f"文本输入测试: 无效结果 (时间: {processing_time:.4f}s, Tokens: {tokens})")
return tokens_per_second
async def run_output_test(threads: int, tokens_to_generate: int, api_url: str, model: str) -> Tuple[List[float], float, float, float]:
"""测试文本输出生成速度 (tokens/second)"""
async with aiohttp.ClientSession() as session:
tasks = []
# 使用简单的提示来生成指定数量的token
prompt = "请写一篇关于人工智能的短文,包含尽可能多的内容。"
logging.info(f"文本输出测试: 使用 {threads} 个并发,每个请求生成 {tokens_to_generate} 个token。")
batch_start_time = time.time()
for _ in range(threads):
# Pass the prompt string directly to messages_content
tasks.append(completion_request(session, api_url, model, prompt, tokens_to_generate, stream=True))
individual_results = await asyncio.gather(*tasks)
batch_end_time = time.time()
# 计算每个请求的tokens/second
tokens_per_second = []
total_tokens = 0
max_individual_generation_time = 0 # Max time for a single connection to finish its generation
for generation_time, tokens in individual_results:
if generation_time > 0 and tokens > 0:
tokens_per_second.append(tokens / generation_time)
total_tokens += tokens
max_individual_generation_time = max(max_individual_generation_time, generation_time)
elif generation_time == 0 and tokens > 0:
tokens_per_second.append(float('inf')) # Indicate very fast generation for this request
total_tokens += tokens
else:
logging.warning(f"文本输出测试: 无效结果 (时间: {generation_time:.4f}s, Tokens: {tokens})")
# Calculate total throughput based on the total batch time (start of first request to end of last request)
total_batch_time = batch_end_time - batch_start_time
total_throughput = total_tokens / total_batch_time if total_batch_time > 0 else 0
# Calculate theoretical throughput: total tokens divided by the time it took for the slowest single connection
theoretical_throughput = total_tokens / max_individual_generation_time if max_individual_generation_time > 0 else 0
return tokens_per_second, total_throughput, total_tokens, theoretical_throughput
async def run_multimodal_test(threads: int, image_path: str, text_prompt: str, tokens_to_generate: int, api_url: str, model: str) -> Tuple[List[float], float, float, float]:
"""测试多模态模型输出生成速度 (tokens/second)"""
if not os.path.exists(image_path):
logging.error(f"错误: 图片文件不存在于指定路径: {image_path}")
return [], 0, 0, 0
try:
with open(image_path, "rb") as image_file:
encoded_image = base64.b64encode(image_file.read()).decode("utf-8")
# 尝试根据文件扩展名确定MIME类型
image_ext = os.path.splitext(image_path)[1].lower()
if image_ext == '.jpg' or image_ext == '.jpeg':
mime_type = "image/jpeg"
elif image_ext == '.png':
mime_type = "image/png"
elif image_ext == '.gif':
mime_type = "image/gif"
elif image_ext == '.webp': # Add webp support
mime_type = "image/webp"
else:
logging.warning(f"未知图片格式 '{image_ext}'。将使用 'application/octet-stream' 作为MIME类型,可能导致API解析问题。")
mime_type = "application/octet-stream" # Fallback to generic binary
image_data_uri = f"data:{mime_type};base64,{encoded_image}"
except Exception as e:
logging.error(f"读取或编码图片失败: {image_path} - {e}")
return [], 0, 0, 0
# 构建多模态消息内容,遵循OpenAI Vision API的messages格式
messages_content = [
{"type": "text", "text": text_prompt},
{"type": "image_url", "image_url": {"url": image_data_uri}}
]
async with aiohttp.ClientSession() as session:
tasks = []
logging.info(f"多模态测试: 使用 {threads} 个并发,每个请求生成 {tokens_to_generate} 个token,图片路径: {image_path}")
batch_start_time = time.time()
for _ in range(threads):
# Pass the multimodal content list to messages_content
tasks.append(completion_request(session, api_url, model, messages_content, tokens_to_generate, stream=True))
individual_results = await asyncio.gather(*tasks)
batch_end_time = time.time()
tokens_per_second = []
total_tokens = 0
max_individual_generation_time = 0
for generation_time, tokens in individual_results:
if generation_time > 0 and tokens > 0:
tokens_per_second.append(tokens / generation_time)
total_tokens += tokens
max_individual_generation_time = max(max_individual_generation_time, generation_time)
elif generation_time == 0 and tokens > 0:
tokens_per_second.append(float('inf')) # Indicate very fast generation for this request
total_tokens += tokens
else:
logging.warning(f"多模态输出测试: 无效结果 (时间: {generation_time:.4f}s, Tokens: {tokens})")
total_batch_time = batch_end_time - batch_start_time
total_throughput = total_tokens / total_batch_time if total_batch_time > 0 else 0
theoretical_throughput = total_tokens / max_individual_generation_time if max_individual_generation_time > 0 else 0
return tokens_per_second, total_throughput, total_tokens, theoretical_throughput
async def monitor_system_metrics(interval: int, log_func):
"""
周期性地收集并记录 CPU 性能指标。
log_func: 用于记录日志的函数 (e.g., logging.info)
"""
if not psutil:
log_func("psutil 库未安装,系统监控已禁用。请运行 pip install psutil")
return
log_func("开始系统性能监控 (仅CPU)...")
try:
while True:
cpu_percent = psutil.cpu_percent(interval=None) # Non-blocking call for CPU usage
cpu_temp_info = "N/A"
if hasattr(psutil, 'sensors_temperatures') and psutil.sensors_temperatures():
temps = psutil.sensors_temperatures()
# Attempt to find CPU core temperatures or a general CPU temperature
found_temp = False
for sensor_name, entries in temps.items():
# Look for common sensor names indicating CPU or core temperatures
if 'cpu' in sensor_name.lower() or 'core' in sensor_name.lower() or 'package' in sensor_name.lower():
core_temps_list = [f"{t.current:.1f}°C" for t in entries if t.current is not None]
if core_temps_list:
cpu_temp_info = f"CPU温度: {', '.join(core_temps_list)}"
found_temp = True
break
if not found_temp:
cpu_temp_info = "详细CPU温度不可用 (请检查 psutil sensors_temperatures)"
log_func(f"系统指标: CPU 使用率 {cpu_percent:.1f}%, {cpu_temp_info}")
await asyncio.sleep(interval)
except asyncio.CancelledError:
log_func("系统性能监控任务已取消。")
except Exception as e:
log_func(f"系统性能监控发生错误: {e}")
async def async_main():
parser = argparse.ArgumentParser(description="测试OpenAI兼容接口的本地大语言模型并发性能")
parser.add_argument("-st", "--start_threads", type=int, default=10, help="并发线程起始数 (步进测试模式) 或压力测试模式下的固定并发数")
parser.add_argument("-et", "--end_threads", type=int, default=200, help="并发线程结束数 (仅步进测试模式有效)")
parser.add_argument("-ts", "--step_threads", type=int, default=10, help="并发线程步进增量 (仅步进测试模式有效)")
parser.add_argument("-pp", "--prompt_length", type=int, default=100, help="用于文本输入测试的提示长度 (字符数)")
parser.add_argument("-tg", "--tokens_to_generate", type=int, default=100, help="用于文本和多模态输出测试的生成token数")
parser.add_argument("-u", "--url", type=str, default=DEFAULT_API_URL, help="API URL")
parser.add_argument("-m", "--model", type=str, default=DEFAULT_MODEL, help="模型名称")
parser.add_argument("-i", "--input", action="store_true", help="运行文本输入测试")
parser.add_argument("-o", "--output", action="store_true", help="运行文本输出测试")
parser.add_argument("-mm", "--multimodal", action="store_true", help="运行多模态测试 (需要模型支持图像输入)")
parser.add_argument("-ip", "--image_path", type=str, default="testphoto.jpg", help="多模态测试使用的图片路径 (例如: testphoto.jpg 或 testphoto.png)")
parser.add_argument("-mtp", "--multimodal_text_prompt", type=str, default="请描述图片内容并写一段相关的创意短文。", help="多模态测试的文本提示")
parser.add_argument("-l", "--log_file", type=str, default="performance_test_log.txt", help="输出结果到指定日志文件")
parser.add_argument("-d", "--duration_hours", type=float, default=0, help="持续运行压力测试的时长 (小时)。0表示运行步进测试模式。")
args = parser.parse_args()
# Add file handler for logging
try:
file_handler = logging.FileHandler(args.log_file, mode='w', encoding='utf-8')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(file_handler)
logging.info(f"测试结果将同时输出到控制台和文件: {args.log_file}")
except IOError as e:
logging.error(f"无法创建或写入日志文件 {args.log_file}: {e}. 将只输出到控制台。")
# If no specific test type is selected, run text input and text output tests by default
if not args.input and not args.output and not args.multimodal:
args.input = True
args.output = True
if args.duration_hours > 0:
# 压力测试模式
logging.info(f"进入压力测试模式:持续 {args.duration_hours} 小时,并发数:{args.start_threads}")
stress_test_duration_seconds = args.duration_hours * 3600
stress_test_concurrency = args.start_threads # 使用 start_threads 作为压力测试的固定并发数
# 启动系统性能监控任务 (仅CPU)
monitor_task = asyncio.create_task(monitor_system_metrics(interval=10, log_func=logging.info))
all_input_rates = [] # For text input test results
all_output_rates = [] # For text output test results
all_multimodal_output_rates = [] # For multimodal output test results
# These lists will aggregate throughput and tokens from ALL generation tests (text and multimodal)
all_total_throughputs_combined = []
all_theoretical_throughputs_combined = []
all_total_tokens_generated_combined = []
start_stress_test_time = time.time()
batch_counter = 0
logging.info(f"API URL: {args.url}, 模型: {args.model}")
logging.info(f"文本输入测试提示长度: {args.prompt_length}, 文本/多模态输出生成token数: {args.tokens_to_generate}")
while time.time() - start_stress_test_time < stress_test_duration_seconds:
batch_start_time = time.time()
batch_counter += 1
logging.info(f"\n{'='*15} 压力测试批次 {batch_counter} {'='*15}")
remaining_time_seconds = stress_test_duration_seconds - (time.time() - start_stress_test_time)
logging.info(f" 距离测试结束还有约 {remaining_time_seconds / 3600:.2f} 小时 ({remaining_time_seconds:.0f} 秒)")
# Run text input test if enabled
if args.input:
logging.info("------ 运行文本输入测试 (批次) ------")
input_results = await run_input_test(
stress_test_concurrency,
args.prompt_length,
args.url,
args.model
)
if input_results:
all_input_rates.extend(input_results)
logging.info(f" 批次文本输入平均速率: {np.mean(input_results):.2f} tokens/s")
else:
logging.warning(" 批次文本输入测试未返回有效结果。")
# Run text output test if enabled
if args.output:
logging.info("------ 运行文本输出测试 (批次) ------")
output_results = await run_output_test(
stress_test_concurrency,
args.tokens_to_generate,
args.url,
args.model
)
if output_results:
all_output_rates.extend(output_results[0])
all_total_throughputs_combined.append(output_results[1])
all_total_tokens_generated_combined.append(output_results[2])
all_theoretical_throughputs_combined.append(output_results[3])
logging.info(f" 批次文本输出平均生成率: {np.mean(output_results[0]):.2f} tokens/s/conn")
logging.info(f" 批次文本总吞吐量: {output_results[1]:.2f} tokens/s")
else:
logging.warning(" 批次文本输出测试未返回有效结果。")
# Run multimodal test if enabled
if args.multimodal:
logging.info("------ 运行多模态测试 (批次) ------")
multimodal_results = await run_multimodal_test(
stress_test_concurrency,
args.image_path,
args.multimodal_text_prompt,
args.tokens_to_generate,
args.url,
args.model
)
if multimodal_results:
all_multimodal_output_rates.extend(multimodal_results[0])
all_total_throughputs_combined.append(multimodal_results[1])
all_total_tokens_generated_combined.append(multimodal_results[2])
all_theoretical_throughputs_combined.append(multimodal_results[3])
logging.info(f" 批次多模态平均生成率: {np.mean(multimodal_results[0]):.2f} tokens/s/conn")
logging.info(f" 批次多模态总吞吐量: {multimodal_results[1]:.2f} tokens/s")
else:
logging.warning(" 批次多模态测试未返回有效结果。")
batch_end_time = time.time()
batch_duration = batch_end_time - batch_start_time
logging.info(f"批次 {batch_counter} 耗时: {batch_duration:.2f} 秒")
# Add a small delay between batches if the batch execution time is too short
# This helps prevent spinning CPU on very fast APIs or overwhelming client-side resources
if batch_duration < 1:
await asyncio.sleep(1 - batch_duration)
# 压力测试总计结果汇总
logging.info(f"\n{'='*20} 压力测试总计结果 {'='*20}")
if all_input_rates:
logging.info(f" 整体平均文本输入预处理速率: {np.mean(all_input_rates):.2f} tokens/second")
logging.info(f" 整体文本输入预处理标准差: {np.std(all_input_rates):.2f} tokens/second")
if all_output_rates:
logging.info(f" 整体平均文本token生成率 (每连接): {np.mean(all_output_rates):.2f} tokens/second")
logging.info(f" 整体文本token生成标准差 (每连接): {np.std(all_output_rates):.2f} tokens/second")
if all_multimodal_output_rates:
logging.info(f" 整体平均多模态token生成率 (每连接): {np.mean(all_multimodal_output_rates):.2f} tokens/second")
logging.info(f" 整体多模态token生成标准差 (每连接): {np.std(all_multimodal_output_rates):.2f} tokens/second")
# Combined generation throughput (text + multimodal)
if all_total_throughputs_combined:
logging.info(f" 整体平均总吞吐量 (所有生成测试): {np.mean(all_total_throughputs_combined):.2f} tokens/second")
overall_total_tokens_sum = sum(all_total_tokens_generated_combined)
overall_actual_duration = time.time() - start_stress_test_time
overall_throughput_over_duration = overall_total_tokens_sum / overall_actual_duration if overall_actual_duration > 0 else 0
logging.info(f" 总测试时长内实际吞吐量 (所有生成测试): {overall_throughput_over_duration:.2f} tokens/second")
if np.mean(all_theoretical_throughputs_combined) > 0:
overall_efficiency_pct = (np.mean(all_total_throughputs_combined) / np.mean(all_theoretical_throughputs_combined)) * 100
logging.info(f" 整体平均效率系数 (所有生成测试): {overall_efficiency_pct:.2f}%")
else:
logging.warning("无法计算整体平均效率系数,理论吞吐量为0。")
logging.info(f" 总生成tokens (所有生成测试): {overall_total_tokens_sum}")
else:
logging.warning("未运行任何输出测试或所有请求失败,无法计算整体输出指标。")
# Cancel and wait for the system monitor task to finish
monitor_task.cancel()
await monitor_task
else:
# 步进测试模式 (原有的逻辑)
logging.info(f"开始步进性能测试,并发数从 {args.start_threads} 到 {args.end_threads},步进 {args.step_threads}。")
logging.info(f"API URL: {args.url}, 模型: {args.model}")
logging.info(f"文本输入测试提示长度: {args.prompt_length}, 文本/多模态输出生成token数: {args.tokens_to_generate}")
for current_threads in range(args.start_threads, args.end_threads + 1, args.step_threads):
logging.info(f"\n{'='*20} 正在测试并发数: {current_threads} {'='*20}")
input_avg_rate = 0.0
input_std_dev = 0.0
output_avg_rate = 0.0
output_std_dev = 0.0
text_total_throughput = 0.0
text_theoretical_throughput = 0.0
text_scaling_efficiency_pct = 0.0
text_total_tokens_generated = 0
multimodal_avg_rate = 0.0
multimodal_std_dev = 0.0
multimodal_total_throughput = 0.0
multimodal_theoretical_throughput = 0.0
multimodal_scaling_efficiency_pct = 0.0
multimodal_total_tokens_generated = 0
if args.input:
logging.info("------ 运行文本输入测试 ------")
tokens_per_second_input = await run_input_test(
current_threads,
args.prompt_length,
args.url,
args.model
)
if tokens_per_second_input:
input_avg_rate = np.mean(tokens_per_second_input)
input_std_dev = np.std(tokens_per_second_input)
logging.info("文本输入测试结果:")
logging.info(f" 平均预处理速率: {input_avg_rate:.2f} tokens/second")
logging.info(f" 标准差: {input_std_dev:.2f} tokens/second")
else:
logging.warning("文本输入测试未返回有效结果或所有请求失败。")
if args.output:
logging.info("------ 运行文本输出测试 ------")
tokens_per_second_output, text_total_throughput, text_total_tokens_generated, text_theoretical_throughput = await run_output_test(
current_threads,
args.tokens_to_generate,
args.url,
args.model
)
if tokens_per_second_output:
output_avg_rate = np.mean(tokens_per_second_output)
output_std_dev = np.std(tokens_per_second_output)
logging.info("文本输出测试结果:")
logging.info(f" 平均token生成率: {output_avg_rate:.2f} tokens/second/connection")
logging.info(f" 标准差: {output_std_dev:.2f} tokens/second/connection")
logging.info(f" 总token吞吐量: {text_total_throughput:.2f} tokens/second (所有{current_threads}个连接)")
if text_theoretical_throughput > 0:
logging.info(f" 理论最大吞吐量: {text_theoretical_throughput:.2f} tokens/second (基于最慢连接)")
text_scaling_efficiency_pct = (text_total_throughput / text_theoretical_throughput * 100)
logging.info(f" 效率系数: {text_scaling_efficiency_pct:.2f}% (实际/理论)")
else:
logging.warning(" 理论最大吞吐量为0,无法计算效率系数。")
logging.info(f" 总生成tokens: {text_total_tokens_generated}")
else:
logging.warning("文本输出测试未返回有效结果或所有请求失败。")
if args.multimodal:
logging.info("------ 运行多模态测试 ------")
tokens_per_second_multimodal, multimodal_total_throughput, multimodal_total_tokens_generated, multimodal_theoretical_throughput = await run_multimodal_test(
current_threads,
args.image_path,
args.multimodal_text_prompt,
args.tokens_to_generate,
args.url,
args.model
)
if tokens_per_second_multimodal:
multimodal_avg_rate = np.mean(tokens_per_second_multimodal)
multimodal_std_dev = np.std(tokens_per_second_multimodal)
logging.info("多模态测试结果:")
logging.info(f" 平均token生成率: {multimodal_avg_rate:.2f} tokens/second/connection")
logging.info(f" 标准差: {multimodal_std_dev:.2f} tokens/second/connection")
logging.info(f" 总token吞吐量: {multimodal_total_throughput:.2f} tokens/second (所有{current_threads}个连接)")
if multimodal_theoretical_throughput > 0:
logging.info(f" 理论最大吞吐量: {multimodal_theoretical_throughput:.2f} tokens/second (基于最慢连接)")
multimodal_scaling_efficiency_pct = (multimodal_total_throughput / multimodal_theoretical_throughput * 100)
logging.info(f" 效率系数: {multimodal_scaling_efficiency_pct:.2f}% (实际/理论)")
else:
logging.warning(" 理论最大吞吐量为0,无法计算效率系数。")
logging.info(f" 总生成tokens: {multimodal_total_tokens_generated}")
else:
logging.warning("多模态测试未返回有效结果或所有请求失败。")
# 每步的总结
logging.info("\n====== 性能测试总结 ======")
logging.info(f" 当前模型为:{DEFAULT_MODEL}")
logging.info(f" 并发连接数: {current_threads}")
if args.input:
logging.info(f" 文本输入测试 - 平均预处理速率: {input_avg_rate:.2f} tokens/second")
logging.info(f" 文本输入测试 - 标准差: {input_std_dev:.2f} tokens/second")
if args.output:
logging.info(f" 文本输出测试 - 总吞吐量: {text_total_throughput:.2f} tokens/second")
logging.info(f" 文本输出测试 - 每个连接平均生成速率: {output_avg_rate:.2f} tokens/second")
logging.info(f" 文本输出测试 - 效率系数: {text_scaling_efficiency_pct:.2f}%")
if args.multimodal:
logging.info(f" 多模态测试 - 总吞吐量: {multimodal_total_throughput:.2f} tokens/second")
logging.info(f" 多模态测试 - 每个连接平均生成速率: {multimodal_avg_rate:.2f} tokens/second")
logging.info(f" 多模态测试 - 效率系数: {multimodal_scaling_efficiency_pct:.2f}%")
logging.info("建议:")
if args.output:
if current_threads == args.start_threads and args.end_threads > args.start_threads:
logging.info("- (文本输出) 建议继续增加并发连接数以探索系统最大吞吐量。")
if output_avg_rate > 0 and output_std_dev / output_avg_rate > 0.5 and current_threads > 1:
logging.info("- (文本输出) 连接间性能差异较大,可能存在资源争用或网络抖动。")
if current_threads > args.start_threads and text_scaling_efficiency_pct < 80 and text_scaling_efficiency_pct > 0:
logging.info("- (文本输出) 系统扩展性可能受限,增加更多连接可能无法线性提升性能。")
elif current_threads > args.start_threads and text_scaling_efficiency_pct >= 95:
logging.info("- (文本输出) 系统具有良好的并发处理能力,扩展效率高。")
if args.multimodal:
if current_threads == args.start_threads and args.end_threads > args.start_threads:
logging.info("- (多模态) 建议继续增加并发连接数以探索系统最大吞吐量。")
if multimodal_avg_rate > 0 and multimodal_std_dev / multimodal_avg_rate > 0.5 and current_threads > 1:
logging.info("- (多模态) 连接间性能差异较大,可能存在资源争用或网络抖动。")
if current_threads > args.start_threads and multimodal_scaling_efficiency_pct < 80 and multimodal_scaling_efficiency_pct > 0:
logging.info("- (多模态) 系统扩展性可能受限,增加更多连接可能无法线性提升性能。")
elif current_threads > args.start_threads and multimodal_scaling_efficiency_pct >= 95:
logging.info("- (多模态) 系统具有良好的并发处理能力,扩展效率高。")
logging.info(f"{'='*50}\n")
if __name__ == "__main__":
# 使用 asyncio.run() 运行顶层异步函数
asyncio.run(async_main())
"""
运行注释和运行示例:
脚本说明:
这是一个用于测试兼容 OpenAI API 的本地大语言模型 (LLM) 并发性能的 Python 脚本。
它支持三种测试模式:
1. **步进测试模式 (默认)**: 逐步增加并发连接数,测试在不同并发下的性能表现。
python llm-test.py -i -o -st 10 -et 300 -ts 10 -pp 500 -tg 500 -l custom_model_test2.log
2. **压力测试模式**: 以固定的并发数持续运行指定时长,模拟长时间高负载情况,并提供 CPU 功耗和温度监控。
python llm-test.py -d 8 -st 300 -pp 500 -tg 500 -l custom_model_stress_test.log
3. **多模态测试功能**: 增加了对支持图像输入的多模态模型的测试,通过将图片编码为 base64 字符串发送。
python llm-test.py -i -o -mm -st 10 -et 300 -ts 10 -pp 500 -tg 500 -ip testphoto.jpg -mtp "这张图片里有什么?请详细描述。" -l custom_model_full_test.log
多模态测试注意事项:
API兼容性: 请确保您测试的后端API (例如 http://ks.sligenai.cn:5004/v1) 支持 OpenAI 的 Vision API 消息格式,即 messages 字段中可以包含 { "type": "image_url", "image_url": { "url": "data:image/..." } }。
模型支持: DEFAULT_MODEL 需要是实际支持多模态输入(视觉能力)的模型。如果您的模型不支持,测试将失败或返回错误。
图片文件: 请确保 testphoto.jpg (或您通过 -ip 指定的文件) 存在于脚本运行的相同目录下,且是可读的图片文件 (JPG, PNG, GIF, WEBP等)。
主要功能:
- **文本输入测试 (-i)**: 测量 API 处理纯文本输入提示的平均速度 (tokens/second)。
- **文本输出测试 (-o)**: 测量 API 生成纯文本输出 token 的平均速度 (tokens/second/connection) 和总吞吐量 (tokens/second)。
- **多模态测试 (-mm)**: 测量 API 处理图像和文本组合输入并生成输出 token 的平均速度和总吞吐量。
- **并发控制**: 可配置起始、结束并发数和步进增量。
- **压力测试**: 可设置测试持续时长(小时)。
- **系统监控**: 在压力测试模式下,会实时监控并记录 CPU 使用率和 CPU 温度。
- **日志输出**: 所有测试结果和监控数据都会同时输出到控制台和指定日志文件。
安装依赖:
```bash
pip install aiohttp numpy psutil
"""