在5880显卡上无法成功推理模型
源代码如下:
'''
import os
from typing import List, Dict, Optional, Tuple
import torch
import torchvision.transforms as T
from torchvision.transforms.functional import InterpolationMode
from PIL import Image
from transformers import AutoModel, AutoTokenizer
from typing import Union
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
class InternVl35Inf:
def init(self,
model_id: str,
dtype: str = "bfloat16",
input_size: int = 448,
max_num_tiles: int = 12,
prompt: Optional[str] = None):
"""
Args:
model_id: 模型路径或 HuggingFace 模型名
dtype: "bfloat16"|"float16"|"float32"
input_size: 输入图像尺寸
max_num_tiles: 每图最多切块数(含缩略图)
prompt: 自定义提示词
"""
self.model_id = model_id
self.dtype = self._parse_dtype(dtype)
self.input_size = input_size
self.max_num_tiles = max_num_tiles
self.prompt = prompt or "question"
# 直接加载模型并支持自动多GPU
self.model = AutoModel.from_pretrained(
self.model_id,
torch_dtype=self.dtype,
low_cpu_mem_usage=True,
use_flash_attn=True,
trust_remote_code=True,
device_map="auto" # 自动选择设备分配到多个GPU
).eval()
self.tokenizer = AutoTokenizer.from_pretrained(
self.model_id, trust_remote_code=True, use_fast=False
)
# --------------------
# 公共接口
# --------------------
def read_wl(self,
image: Union[str, Image.Image],
max_new_tokens: int = 128) -> str:
"""
单张图片推理
Args:
image: 文件路径(str) 或 PIL.Image.Image
max_new_tokens: 最大生成 token 数
"""
# torch.cuda.empty_cache() # 在每次处理批量前清理显存缓存
if isinstance(image, str):
pixel_values = self._load_image(image) # 从路径加载
elif isinstance(image, Image.Image):
pixel_values = self._load_image_from_pil(image) # 从 PIL.Image 加载
else:
raise TypeError("image 必须是 str 路径 或 PIL.Image.Image")
gen_cfg = dict(
max_new_tokens=max_new_tokens,
do_sample=False,
pad_token_id=self.tokenizer.eos_token_id
)
print("开始推理...")
output = self.model.chat(
self.tokenizer, pixel_values, self.prompt, generation_config=gen_cfg
)
print("生成的token:", output)
print("推理完成...")
return (output or "").strip()
# print("推理完成...")
# return (output or "").strip()
# with torch.inference_mode():
# print("开始调用模型推理...")
# # import pdb; pdb.set_trace()
# output = self.model.chat(
# self.tokenizer, pixel_values, self.prompt, generation_config=gen_cfg
# )
# print("推理完成...")
# return (output or "").strip()
def read_images(self,
images: List[Union[str, Image.Image]],
names: List[str] = None,
batch_size: int = 8,
max_new_tokens: int = 128) -> Dict[str, str]:
"""
批量推理:输入可以是路径 或 PIL.Image 对象
Args:
images: [str 或 PIL.Image.Image] 列表
names: 可选,输出结果的名字(默认用文件名或索引)
"""
results: Dict[str, str] = {}
total = len(images)
if names is None:
names = []
for i, img in enumerate(images):
if isinstance(img, str):
names.append(os.path.basename(img))
else:
names.append(f"image_{i}.png")
for start in range(0, total, batch_size):
end = min(start + batch_size, total)
batch_imgs = images[start:end]
batch_names = names[start:end]
# torch.cuda.empty_cache() # 在每次处理批量前清理显存缓存
print(f"正在处理批次 {start + 1} 到 {end} ...")
for img, name in zip(batch_imgs, batch_names):
try:
results[name] = self.read_wl(img, max_new_tokens=max_new_tokens)
except Exception as e:
results[name] = f"推理失败: {str(e)}"
return results
# --------------------
# 内部工具方法
# --------------------
@staticmethod
def _parse_dtype(dtype: str):
d = dtype.lower()
if d in ("bf16", "bfloat16"):
return torch.bfloat16
if d in ("fp16", "float16", "half"):
return torch.float16
return torch.float32
def _build_transform(self) -> T.Compose:
return T.Compose([
T.Lambda(lambda img: img.convert('RGB') if img.mode != 'RGB' else img),
T.Resize((self.input_size, self.input_size), interpolation=InterpolationMode.BICUBIC),
T.ToTensor(),
T.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
])
def _find_closest_aspect_ratio(self,
aspect_ratio: float,
target_ratios: List[Tuple[int, int]],
width: int,
height: int) -> Tuple[int, int]:
best_ratio_diff = float("inf")
best_ratio = (1, 1)
area = width * height
for ratio in target_ratios:
target_aspect_ratio = ratio[0] / ratio[1]
ratio_diff = abs(aspect_ratio - target_aspect_ratio)
if ratio_diff < best_ratio_diff:
best_ratio_diff = ratio_diff
best_ratio = ratio
elif ratio_diff == best_ratio_diff:
if area > 0.5 * self.input_size * self.input_size * ratio[0] * ratio[1]:
best_ratio = ratio
return best_ratio
def _dynamic_preprocess(self,
image: Image.Image,
min_num: int = 1,
max_num: Optional[int] = None,
use_thumbnail: bool = True) -> List[Image.Image]:
if max_num is None:
max_num = self.max_num_tiles
orig_width, orig_height = image.size
aspect_ratio = orig_width / orig_height
target_ratios = set(
(i, j) for n in range(min_num, max_num + 1)
for i in range(1, n + 1) for j in range(1, n + 1)
if i * j <= max_num and i * j >= min_num
)
target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1])
target_aspect_ratio = self._find_closest_aspect_ratio(
aspect_ratio, target_ratios, orig_width, orig_height
)
target_width = self.input_size * target_aspect_ratio[0]
target_height = self.input_size * target_aspect_ratio[1]
blocks = target_aspect_ratio[0] * target_aspect_ratio[1]
resized_img = image.resize((target_width, target_height), resample=Image.BICUBIC)
processed_images = []
for i in range(blocks):
box = (
(i % (target_width // self.input_size)) * self.input_size,
(i // (target_width // self.input_size)) * self.input_size,
((i % (target_width // self.input_size)) + 1) * self.input_size,
((i // (target_width // self.input_size)) + 1) * self.input_size
)
processed_images.append(resized_img.crop(box))
if use_thumbnail and len(processed_images) != 1:
processed_images.append(image.resize((self.input_size, self.input_size)))
return processed_images
def _load_image(self, image_file: str) -> torch.Tensor:
"""从文件路径加载"""
print(f"正在加载图像: {image_file}")
image = Image.open(image_file).convert("RGB")
print(f"图像尺寸: {image.size}")
return self._load_image_from_pil(image)
def _load_image_from_pil(self, image: Image.Image) -> torch.Tensor:
"""从 PIL.Image 加载"""
print(f"开始预处理图像...")
transform = self._build_transform()
tiles = self._dynamic_preprocess(image, max_num=self.max_num_tiles, use_thumbnail=True)
print(f"预处理完成,处理了 {len(tiles)} 张小图块")
pixel_values = [transform(img) for img in tiles]
return torch.stack(pixel_values).to(self.model.device, dtype=self.dtype) # 使用自动选择的设备
--------------------
使用示例
--------------------
if name == "main":
MODEL_PATH = "models/InternVL3_5-14B/"
import time
start_time = time.time()
print("开始加载模型...")
reader = InternVl35Inf(model_id=MODEL_PATH, dtype="bfloat16")
print(f"模型加载完成,耗时:{time.time() - start_time:.2f}秒")
# 单张图片推理(路径)
print(reader.read_wl("data/net02.jpg"))
'''
输出结果(推理速度5分钟以上,出来乱码的结果):
图1:5880显卡,transformers==4.55.4
图2:5880显卡,transformers==4.53.2


