|
|
import json
|
|
|
import os
|
|
|
|
|
|
def parse_llm_response(response):
|
|
|
"""
|
|
|
解析LLM响应并转换为标准格式
|
|
|
Args:
|
|
|
response: 可以是字符串或带有content属性的对象
|
|
|
"""
|
|
|
|
|
|
if isinstance(response, str):
|
|
|
content = response
|
|
|
elif isinstance(response, dict) and 'content' in response:
|
|
|
content = response['content']
|
|
|
else:
|
|
|
content = response.content
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
if '```json' in content:
|
|
|
content = content.split('```json\n')[1].split('\n```')[0]
|
|
|
elif '```' in content:
|
|
|
content = content.split('```\n')[1].split('\n```')[0]
|
|
|
|
|
|
|
|
|
if content.find('[') != -1 and content.find(']') != -1:
|
|
|
start = content.find('[')
|
|
|
end = content.rfind(']') + 1
|
|
|
content = content[start:end]
|
|
|
|
|
|
|
|
|
content = content.strip()
|
|
|
|
|
|
|
|
|
triples = json.loads(content)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
formatted_triples = {
|
|
|
"triple_list": [
|
|
|
[triple["entity1"], triple["relation"], triple["entity2"]]
|
|
|
for triple in triples
|
|
|
if all(key in triple for key in ["entity1", "relation", "entity2"])
|
|
|
]
|
|
|
}
|
|
|
|
|
|
|
|
|
return formatted_triples
|
|
|
|
|
|
except json.JSONDecodeError as e:
|
|
|
print(f"JSON解析错误,原始内容: {content}")
|
|
|
print(f"错误详情: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
def save_to_json(text, formatted_triples, model_series, output_dir='./output'):
|
|
|
"""
|
|
|
保存结果到JSON文件
|
|
|
"""
|
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
output_path = f'{output_dir}/{model_series}.json'
|
|
|
|
|
|
|
|
|
if os.path.exists(output_path):
|
|
|
with open(output_path, 'r', encoding='utf-8') as f:
|
|
|
existing_data = json.load(f)
|
|
|
else:
|
|
|
existing_data = []
|
|
|
|
|
|
|
|
|
new_item = {
|
|
|
"text": text,
|
|
|
"triple_list": formatted_triples["triple_list"]
|
|
|
}
|
|
|
|
|
|
|
|
|
if isinstance(existing_data, dict):
|
|
|
existing_data = [existing_data]
|
|
|
existing_data.append(new_item)
|
|
|
|
|
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
|
json.dump(existing_data, f, ensure_ascii=False, indent=4)
|
|
|
|
|
|
def save_raw_response(response, prompt, model_series, output_dir='./output/two_shot_raw'):
|
|
|
"""
|
|
|
保存原始响应到JSON文件
|
|
|
"""
|
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
output_path = f'{output_dir}/{model_series}.json'
|
|
|
|
|
|
if os.path.exists(output_path):
|
|
|
with open(output_path, 'r', encoding='utf-8') as f:
|
|
|
existing_data = json.load(f)
|
|
|
else:
|
|
|
existing_data = []
|
|
|
|
|
|
new_item = {
|
|
|
"prompt": prompt,
|
|
|
|
|
|
|
|
|
|
|
|
"response": response.replace('\\n', '\n') if isinstance(response, str) else str(response)
|
|
|
}
|
|
|
|
|
|
existing_data.append(new_item)
|
|
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
|
json.dump(existing_data, f, ensure_ascii=False, indent=4)
|
|
|
|