Spaces:
Build error
Build error
| import gradio as gr | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| import pandas as pd | |
| import numpy as np | |
| import faiss | |
| import ast | |
| import torch.nn.functional as F | |
| import torch | |
| from transformers import AutoModel, AutoTokenizer | |
| Encoding_model = 'jinaai/jina-embeddings-v2-base-zh' | |
| model = AutoModel.from_pretrained(Encoding_model, trust_remote_code=True, torch_dtype=torch.bfloat16) | |
| model#.to("cuda") | |
| similarity_model = 'Alibaba-NLP/gte-multilingual-base' | |
| similarity_tokenizer = AutoTokenizer.from_pretrained(similarity_model) | |
| similarity_model = AutoModel.from_pretrained(similarity_model, trust_remote_code=True)#.to("cuda") | |
| def get_not_empty_data(df,x_column="text",y_column="label"): | |
| df = df[df[y_column] != "[]"].reset_index(drop=True) | |
| res_dict = {} | |
| for idx in df.index: | |
| if df.loc[idx,x_column] not in res_dict: | |
| res_dict[df.loc[idx,x_column]] = ast.literal_eval(df.loc[idx,y_column]) | |
| else: | |
| res_dict[df.loc[idx,x_column]] += ast.literal_eval(df.loc[idx,y_column]) | |
| res_dict = {k:list(set(v)) for k,v in res_dict.items()} | |
| df_dict = pd.DataFrame({"x":res_dict.keys(),"y":res_dict.values()}) | |
| return df_dict | |
| data_all = pd.read_excel("data_Excel_format.xlsx") | |
| df_dict_all = get_not_empty_data(data_all) | |
| x_dict = df_dict_all["x"].values | |
| y_dict = df_dict_all["y"].values | |
| def calc_scores(x): | |
| return (x[:1] @ x[1:].T) | |
| def get_idxs(threshold,max_len,arr): | |
| res = np.where(arr >= threshold)[0] | |
| if len(res)<max_len: | |
| return res | |
| res = res[np.argsort(-arr[res])][:3] | |
| return res | |
| def merge_set_to_list(set_list): | |
| res = set() | |
| for i in set_list: | |
| res = res | i | |
| return res | |
| def get_predict_result(index,score,threshold,max_len): | |
| score = score.flatten() | |
| index = index.flatten() | |
| index_of_index = np.where(score >= threshold)[0] | |
| if len(index_of_index)>=max_len: | |
| index_of_index = index_of_index[np.argsort(-index[index_of_index])][:3] | |
| if len(index_of_index)==0: | |
| return {},[] | |
| res_index = index[index_of_index] | |
| res = merge_set_to_list([set(i) for i in y_dict[res_index]]) | |
| return res,x_dict[res_index] | |
| vec = np.empty(shape=[0,768],dtype="float32") | |
| bsize = 256 | |
| with torch.no_grad(): | |
| for i in range(0,len(x),bsize): | |
| tmp = model.encode(x[i:i+bsize]) | |
| vec = np.concatenate([vec,tmp]) | |
| index = faiss.IndexFlatIP(768) | |
| faiss.normalize_L2(vec) | |
| index.add(vec) | |
| faiss.write_index(index,"all_index.faiss") | |
| index = faiss.read_index("all_index.faiss") | |
| def predict_label(x,threshold=0.85,n_nearest=10,max_result_len=3): | |
| bsize=1 | |
| y_pred = [] | |
| with torch.no_grad(): | |
| for i in range(0,len(x),bsize): | |
| sentences = x[i:i+bsize] | |
| vec = model.encode(sentences) | |
| faiss.normalize_L2(vec) | |
| scores, indexes = index.search(vec,n_nearest) | |
| x_pred = np.array([[sentences[j]]+s.tolist() for j,s in enumerate(x_dict[indexes])]) | |
| batch_dict = similarity_tokenizer(x_pred.flatten().tolist(), max_length=768, padding=True, truncation=True, return_tensors='pt')#.to("cuda") | |
| outputs = similarity_model(**batch_dict) | |
| dimension=768 | |
| embeddings = outputs.last_hidden_state[:, 0][:dimension] | |
| embeddings = F.normalize(embeddings, p=2, dim=1) | |
| embeddings = embeddings.view(len(x_pred),n_nearest+1,dimension).detach().cpu().numpy() | |
| scores = [calc_scores(embeddings[b]) for b in range(embeddings.shape[0])] | |
| pred = [get_predict_result(indexes[k],scores[k],threshold=threshold,max_len=max_result_len) for k in range(len(scores))] | |
| y_pred.append([i[0] for i in pred]) | |
| return y_pred | |
| CSS_Content = """ | |
| <!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <style> | |
| #custom_id { | |
| border: 2px solid red; | |
| padding: 10px; | |
| background-color: lightgray; | |
| } | |
| </style> | |
| </head> | |
| </html> | |
| <span style="color: red;line-height:1;">红色字体:潜在风险</span><br> | |
| <span style="color: blue;line-height:1;">蓝色字体:权限获取</span><br> | |
| <span style="color: purple;line-height:1;">紫色字体:数据收集</span><br> | |
| <span style="color: green;line-height:1;">绿色字体:数据、权限管理</span><br> | |
| <span style="color: brown;line-height:1;">棕色字体:共享、委托、转让、公开(披露)</span><br> | |
| """ | |
| color_dict = {"潜在风险":"red", | |
| "权限获取":"blue", | |
| "数据收集":"purple", | |
| "数据、权限管理":"green", | |
| "共享、委托、转让、公开(披露)":"brown" | |
| } | |
| def generate_HTML(text,threshold=0.85,n_nearest=10,max_result_len=3): | |
| sentences = text.split("\n") | |
| sentences = [i for i in map(lambda x:x.split("。"),sentences)] | |
| res = CSS_Content | |
| for paragraph in sentences: | |
| tmp_res = [] | |
| pred_label = predict_label(paragraph,threshold,n_nearest,max_result_len) | |
| for i,x in enumerate(pred_label): | |
| pre = "<span" | |
| if len(x[0])>0: | |
| for j in color_dict.keys(): #color dict重要性递减,所以只取第一个标签的颜色 | |
| if j in x[0]: | |
| pre += f' style="color: {color_dict[j]};line-height:1;"' | |
| break | |
| tmp_res.append(pre+">"+paragraph[i]+"</span>") | |
| res += "。".join(tmp_res) | |
| res += "<br>" | |
| return res | |
| with gr.Blocks() as demo: | |
| with gr.Row(): | |
| input_text = gr.Textbox(lines=25,label="输入") | |
| with gr.Row(): | |
| threshold = gr.Slider(minimum=0.5,maximum=0.85,value=0.75,step=0.05,interactive=True,label="相似度阈值") | |
| n_nearest = gr.Slider(minimum=3,maximum=10,value=10,step=1,interactive=True,label="粗筛语句数量") | |
| max_result_len = gr.Slider(minimum=1,maximum=5,value=3,step=1,interactive=True,label="精筛语句数量") | |
| with gr.Row(): | |
| submit_button = gr.Button("检测") | |
| with gr.Row(): | |
| output_text = gr.HTML(CSS_Content) | |
| output_text.elem_id="custom_id" | |
| submit_button.click(fn=generate_HTML, inputs=[input_text,threshold,n_nearest,max_result_len], outputs=output_text) | |
| demo.launch() |