Spaces:
Runtime error
Runtime error
| import os | |
| os.system('pip3 install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cpu') | |
| import glob, fitz | |
| import PIL | |
| import re | |
| import torch | |
| import cv2 | |
| import pytesseract | |
| import pandas as pd | |
| import numpy as np | |
| import gradio as gr | |
| from PIL import Image | |
| from tqdm import tqdm | |
| from difflib import SequenceMatcher | |
| from itertools import groupby | |
| from scipy import ndimage | |
| from scipy.ndimage import interpolation as inter | |
| from datasets import load_metric | |
| from datasets import load_dataset | |
| from datasets.features import ClassLabel | |
| from transformers import AutoProcessor | |
| from PIL import Image, ImageDraw, ImageFont | |
| from transformers import AutoModelForTokenClassification | |
| from transformers.data.data_collator import default_data_collator | |
| from datasets import Features, Sequence, ClassLabel, Value, Array2D, Array3D | |
| from transformers import LayoutLMv3ForTokenClassification,LayoutLMv3FeatureExtractor,LayoutLMv3ImageProcessor | |
| import io | |
| # import paddleocr | |
| # from paddleocr import PaddleOCR | |
| auth_token = os.environ.get("HUGGING_FACE_HUB_TOKEN") | |
| import warnings | |
| # Ignore warning messages | |
| warnings.filterwarnings("ignore") | |
| id2label= {0: 'others', 1: 'issuer_name', 2: 'issuer_addr', 3: 'issuer_cap', 4: 'issuer_city', 5: 'issuer_prov', 6: 'issuer_state', 7: 'issuer_tel', 8: 'issuer_id', 9: 'issuer_fax', 10: 'issuer_vat', 11: 'issuer_contact', 12: 'issuer_contact_email', 13: 'issuer_contact_phone', 14: 'receiver_name', 15: 'receiver_addr', 16: 'receiver_cap', 17: 'receiver_city', 18: 'receiver_prov', 19: 'receiver_state', 20: 'receiver_tel', 21: 'receiver_fax', 22: 'receiver_vat', 23: 'receiver_id', 24: 'receiver_contact', 25: 'dest_name', 26: 'dest_addr', 27: 'dest_cap', 28: 'dest_city', 29: 'dest_prov', 30: 'dest_state', 31: 'dest_tel', 32: 'dest_fax', 33: 'dest_vat', 34: 'doc_type', 35: 'doc_nr', 36: 'doc_date', 37: 'order_nr', 38: 'order_date', 39: 'service_order', 40: 'shipment_nr', 41: 'client_reference', 42: 'client_vat', 43: 'client_id', 44: 'client_code', 45: 'time', 46: 'notes', 47: 'client_tel', 48: 'art_code', 49: 'ref_code', 50: 'order_reason', 51: 'order_ref', 52: 'order_ref_date', 53: 'detail_desc', 54: 'lot_id', 55: 'lot_qty', 56: 'detail_um', 57: 'detail_qty', 58: 'detail_tare', 59: 'detail_grossw', 60: 'detail_packages', 61: 'detail_netw', 62: 'detail_origin', 63: 'payment_bank', 64: 'payment_terms', 65: 'tot_qty', 66: 'tot_grossw', 67: 'tot_netw', 68: 'tot_volume', 69: 'shipment_reason', 70: 'package_type', 71: 'transport_respons', 72: 'transport_vectors', 73: 'transport_terms', 74: 'transport_datetime', 75: 'return_plt', 76: 'nonreturn_plt', 77: 'dest_signature', 78: 'driver_signature', 79: 'transport_signature', 80: 'page', 81: 'varieta', 82: 'raccolta', 83: 'detail_volume'} | |
| custom_config = r'--oem 3 --psm 6' | |
| lang='eng' | |
| #Google Vision OCR | |
| from google.cloud import vision_v1p3beta1 as vision | |
| from google.cloud import vision_v1p3beta1 as vision | |
| from google.cloud import vision | |
| # os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "test-apikey.json" | |
| processor = AutoProcessor.from_pretrained("microsoft/layoutlmv3-base", apply_ocr=False) | |
| model = AutoModelForTokenClassification.from_pretrained("sxandie/doc-ai-information-extraction",use_auth_token=auth_token) | |
| from tabulate import tabulate | |
| def print_df(df): | |
| print(tabulate(df, headers = df.columns, tablefmt = 'psql')) | |
| def process_image_pytesseract(image,width,height): | |
| width, height = image.size | |
| feature_extractor = LayoutLMv3ImageProcessor(apply_ocr=True,lang=lang) | |
| encoding_feature_extractor = feature_extractor(image, return_tensors="pt",truncation=True) | |
| words, boxes = encoding_feature_extractor.words, encoding_feature_extractor.boxes | |
| return words,boxes | |
| def create_bounding_box5(vertices, width_scale, height_scale): | |
| # Get the x, y coordinates | |
| x1 = int(vertices[0].x * width_scale) | |
| y1 = int(vertices[0].y * height_scale) | |
| x2 = int(vertices[2].x * width_scale) | |
| y2 = int(vertices[2].y * height_scale) | |
| # Validate x1 < x2 | |
| if x1 > x2: | |
| x1, x2 = x2, x1 | |
| # Validate y1 < y2 | |
| if y1 > y2: | |
| y1, y2 = y2, y1 | |
| # Return valid bounding box | |
| return [x1, y1, x2, y2] | |
| #Google Vision OCR | |
| def process_image_GoogleVision(image, width, height): | |
| inference_image = [image.convert("RGB")] | |
| client = vision.ImageAnnotatorClient() | |
| with io.BytesIO() as output: | |
| image.save(output, format='JPEG') | |
| content = output.getvalue() | |
| image = vision.Image(content=content) | |
| response = client.text_detection(image=image) | |
| texts = response.text_annotations | |
| # Get the bounding box vertices and remove the first item | |
| bboxes = [text.bounding_poly.vertices[1:] for text in texts] | |
| # Create the list of words and boxes | |
| words = [text.description for text in texts] | |
| boxes = [create_bounding_box5(bbox, 1000/width, 1000/height) for bbox in bboxes] | |
| return words,boxes | |
| def generate_unique_colors(id2label): | |
| # Generate unique colors | |
| label_ints = np.random.choice(len(PIL.ImageColor.colormap), len(id2label), replace=False) | |
| label_color_pil = list(PIL.ImageColor.colormap.values()) | |
| label_color = [label_color_pil[i] for i in label_ints] | |
| color = {} | |
| for k, v in id2label.items(): | |
| if v[:2] == '': | |
| color['o'] = label_color[k] | |
| else: | |
| color[v[0:]] = label_color[k] | |
| return color | |
| def create_bounding_box1(bbox_data, width_scale: float, height_scale: float): | |
| xs = [] | |
| ys = [] | |
| for x, y in bbox_data: | |
| xs.append(x) | |
| ys.append(y) | |
| left = int(max(0, min(xs) * width_scale)) | |
| top = int(max(0, min(ys) * height_scale)) | |
| right = int(min(1000, max(xs) * width_scale)) | |
| bottom = int(min(1000, max(ys) * height_scale)) | |
| return [left, top, right, bottom] | |
| def unnormalize_box(bbox, width, height): | |
| return [ | |
| width * (bbox[0] / 1000), | |
| height * (bbox[1] / 1000), | |
| width * (bbox[2] / 1000), | |
| height * (bbox[3] / 1000), | |
| ] | |
| def iob_to_label(label): | |
| return id2label.get(label, 'others') | |
| def process_image(image): | |
| custom_config = r'--oem 3 --psm 6' | |
| # lang='eng+deu+ita+chi_sim' | |
| lang='eng' | |
| width, height = image.size | |
| feature_extractor = LayoutLMv3FeatureExtractor(apply_ocr=True) | |
| encoding_feature_extractor = feature_extractor(image, return_tensors="pt",truncation=True) | |
| words, boxes = encoding_feature_extractor.words, encoding_feature_extractor.boxes | |
| custom_config = r'--oem 3 --psm 6' | |
| # encode | |
| inference_image = [image.convert("RGB")] | |
| encoding = processor(inference_image , truncation=True, return_offsets_mapping=True, return_tensors="pt", padding="max_length", stride =128, max_length=512, return_overflowing_tokens=True) | |
| offset_mapping = encoding.pop('offset_mapping') | |
| overflow_to_sample_mapping = encoding.pop('overflow_to_sample_mapping') | |
| # change the shape of pixel values | |
| x = [] | |
| for i in range(0, len(encoding['pixel_values'])): | |
| x.append(encoding['pixel_values'][i]) | |
| x = torch.stack(x) | |
| encoding['pixel_values'] = x | |
| # forward pass | |
| outputs = model(**encoding) | |
| # get predictions | |
| predictions = outputs.logits.argmax(-1).squeeze().tolist() | |
| token_boxes = encoding.bbox.squeeze().tolist() | |
| # only keep non-subword predictions | |
| preds = [] | |
| l_words = [] | |
| bboxes = [] | |
| token_section_num = [] | |
| if (len(token_boxes) == 512): | |
| predictions = [predictions] | |
| token_boxes = [token_boxes] | |
| for i in range(0, len(token_boxes)): | |
| for j in range(0, len(token_boxes[i])): | |
| unnormal_box = unnormalize_box(token_boxes[i][j], width, height) | |
| if (np.asarray(token_boxes[i][j]).shape != (4,)): | |
| continue | |
| elif (token_boxes[i][j] == [0, 0, 0, 0] or token_boxes[i][j] == 0): | |
| #print('zero found!') | |
| continue | |
| # if bbox is available in the list, just we need to update text | |
| elif (unnormal_box not in bboxes): | |
| preds.append(predictions[i][j]) | |
| l_words.append(processor.tokenizer.decode(encoding["input_ids"][i][j])) | |
| bboxes.append(unnormal_box) | |
| token_section_num.append(i) | |
| else: | |
| # we have to update the word | |
| _index = bboxes.index(unnormal_box) | |
| if (token_section_num[_index] == i): | |
| # check if they're in a same section or not (documents with more than 512 tokens will divide to seperate | |
| # parts, so it's possible to have a word in both of the pages and we have to control that repetetive words | |
| # HERE: because they're in a same section, so we can merge them safely | |
| l_words[_index] = l_words[_index] + processor.tokenizer.decode(encoding["input_ids"][i][j]) | |
| else: | |
| continue | |
| return bboxes, preds, l_words, image | |
| def process_image_encoding(model, processor, image, words, boxes,width,height): | |
| # encode | |
| inference_image = [image.convert("RGB")] | |
| encoding = processor(inference_image ,words,boxes=boxes, truncation=True, return_offsets_mapping=True, return_tensors="pt", | |
| padding="max_length", stride =128, max_length=512, return_overflowing_tokens=True) | |
| offset_mapping = encoding.pop('offset_mapping') | |
| overflow_to_sample_mapping = encoding.pop('overflow_to_sample_mapping') | |
| # change the shape of pixel values | |
| x = [] | |
| for i in range(0, len(encoding['pixel_values'])): | |
| x.append(encoding['pixel_values'][i]) | |
| x = torch.stack(x) | |
| encoding['pixel_values'] = x | |
| # forward pass | |
| outputs = model(**encoding) | |
| # get predictions | |
| predictions = outputs.logits.argmax(-1).squeeze().tolist() | |
| token_boxes = encoding.bbox.squeeze().tolist() | |
| # only keep non-subword predictions | |
| preds = [] | |
| l_words = [] | |
| bboxes = [] | |
| token_section_num = [] | |
| if (len(token_boxes) == 512): | |
| predictions = [predictions] | |
| token_boxes = [token_boxes] | |
| for i in range(0, len(token_boxes)): | |
| for j in range(0, len(token_boxes[i])): | |
| unnormal_box = unnormalize_box(token_boxes[i][j], width, height) | |
| if (np.asarray(token_boxes[i][j]).shape != (4,)): | |
| continue | |
| elif (token_boxes[i][j] == [0, 0, 0, 0] or token_boxes[i][j] == 0): | |
| #print('zero found!') | |
| continue | |
| # if bbox is available in the list, just we need to update text | |
| elif (unnormal_box not in bboxes): | |
| preds.append(predictions[i][j]) | |
| l_words.append(processor.tokenizer.decode(encoding["input_ids"][i][j])) | |
| bboxes.append(unnormal_box) | |
| token_section_num.append(i) | |
| else: | |
| # we have to update the word | |
| _index = bboxes.index(unnormal_box) | |
| if (token_section_num[_index] == i): | |
| # check if they're in a same section or not (documents with more than 512 tokens will divide to seperate | |
| # parts, so it's possible to have a word in both of the pages and we have to control that repetetive words | |
| # HERE: because they're in a same section, so we can merge them safely | |
| l_words[_index] = l_words[_index] + processor.tokenizer.decode(encoding["input_ids"][i][j]) | |
| else: | |
| continue | |
| return bboxes, preds, l_words, image | |
| def process_form_(json_df): | |
| labels = [x['LABEL'] for x in json_df] | |
| texts = [x['TEXT'] for x in json_df] | |
| cmb_list = [] | |
| for i, j in enumerate(labels): | |
| cmb_list.append([labels[i], texts[i]]) | |
| grouper = lambda l: [[k] + sum((v[1::] for v in vs), []) for k, vs in groupby(l, lambda x: x[0])] | |
| list_final = grouper(cmb_list) | |
| lst_final = [] | |
| for x in list_final: | |
| json_dict = {} | |
| json_dict[x[0]] = (' ').join(x[1:]) | |
| lst_final.append(json_dict) | |
| return lst_final | |
| def createExcel(maindf, detailsdf, pdffile): | |
| outputPath = f'{pdffile}.xlsx' | |
| with pd.ExcelWriter(outputPath, engine='xlsxwriter') as writer: | |
| maindf.to_excel(writer, sheet_name='headers', index=False) | |
| detailsdf.to_excel(writer, sheet_name='details', index=False) | |
| worksheet1 = writer.sheets["headers"] | |
| for idx, col in enumerate(maindf): | |
| series = maindf[col] | |
| max_len = max(( | |
| series.astype(str).map(len).max(), | |
| len(str(series.name)) | |
| )) + 1 | |
| worksheet1.set_column(idx, idx, max_len) | |
| worksheet2 = writer.sheets["details"] | |
| for idx, col in enumerate(detailsdf): | |
| series = detailsdf[col] | |
| max_len = max(( | |
| series.astype(str).map(len).max(), | |
| len(str(series.name)) | |
| )) + 1 | |
| worksheet2.set_column(idx, idx, max_len) | |
| return outputPath | |
| def visualize_image(final_bbox, final_preds, l_words, image,label2color): | |
| draw = ImageDraw.Draw(image) | |
| font = ImageFont.load_default() | |
| json_df = [] | |
| for ix, (prediction, box) in enumerate(zip(final_preds, final_bbox)): | |
| if prediction is not None: | |
| predicted_label = iob_to_label(prediction).lower() | |
| if predicted_label not in ["others"]: | |
| draw.rectangle(box, outline=label2color[predicted_label]) | |
| draw.text((box[0]+10, box[1]-10), text=predicted_label, fill=label2color[predicted_label], font=font) | |
| json_dict = {} | |
| json_dict['TEXT'] = l_words[ix] | |
| json_dict['LABEL'] = label2color[predicted_label] | |
| json_df.append(json_dict) | |
| return image, json_df | |
| def rotate_image(image): | |
| extracted_text = pytesseract.image_to_string(image) | |
| # check if the image contains any text | |
| if not extracted_text: | |
| print("The image does not contain any text.") | |
| return None | |
| elif extracted_text.isspace(): | |
| print("The image contains only spaces.") | |
| return None | |
| text = pytesseract.image_to_osd(image) | |
| angle = int(re.search('(?<=Rotate: )\d+', text).group(0)) | |
| angle = 360 - angle | |
| rotated = ndimage.rotate(image, angle) | |
| data = Image.fromarray(rotated) | |
| return data | |
| # correct the skewness of images | |
| def correct_skew(image, delta=1, limit=5): | |
| def determine_score(arr, angle): | |
| data = inter.rotate(arr, angle, reshape=False, order=0) | |
| histogram = np.sum(data, axis=1, dtype=float) | |
| score = np.sum((histogram[1:] - histogram[:-1]) ** 2, dtype=float) | |
| return histogram, score | |
| # Convert the PIL Image object to a numpy array | |
| image = np.asarray(image.convert('L'), dtype=np.uint8) | |
| # Apply thresholding | |
| thresh = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1] | |
| scores = [] | |
| angles = np.arange(-limit, limit + delta, delta) | |
| for angle in angles: | |
| histogram, score = determine_score(thresh, angle) | |
| scores.append(score) | |
| best_angle = angles[scores.index(max(scores))] | |
| (h, w) = image.shape[:2] | |
| center = (w // 2, h // 2) | |
| M = cv2.getRotationMatrix2D(center, best_angle, 1.0) | |
| corrected = cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_CUBIC, \ | |
| borderMode=cv2.BORDER_REPLICATE) | |
| return best_angle, corrected | |
| def removeBorders(img): | |
| result = img.copy() | |
| if len(result.shape) == 2: | |
| # if the input image is grayscale, convert it to BGR format | |
| result = cv2.cvtColor(result, cv2.COLOR_GRAY2BGR) | |
| gray = cv2.cvtColor(result, cv2.COLOR_BGR2GRAY) # convert to grayscale | |
| thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1] | |
| # Remove horizontal lines | |
| horizontal_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (40,1)) | |
| remove_horizontal = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, horizontal_kernel, iterations=2) | |
| cnts = cv2.findContours(remove_horizontal, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| cnts = cnts[0] if len(cnts) == 2 else cnts[1] | |
| for c in cnts: | |
| cv2.drawContours(result, [c], -1, (255,255,255), 5) | |
| # Remove vertical lines | |
| vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1,40)) | |
| remove_vertical = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, vertical_kernel, iterations=2) | |
| cnts = cv2.findContours(remove_vertical, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| cnts = cnts[0] if len(cnts) == 2 else cnts[1] | |
| for c in cnts: | |
| cv2.drawContours(result, [c], -1, (255,255,255), 5) | |
| return result | |
| def color2label_except(label2color, excluded_labels): | |
| """ | |
| Inversely maps colors to labels based on the provided label2color dictionary, | |
| excluding the specified labels. | |
| Args: | |
| label2color (dict): Dictionary mapping labels to colors. | |
| excluded_labels (list): List of labels to exclude. | |
| Returns: | |
| dict: Dictionary mapping colors to labels, excluding the specified labels. | |
| """ | |
| # Filter out excluded labels from label2color dictionary | |
| filtered_label2color = {label: color for label, color in label2color.items() if label not in excluded_labels} | |
| # Invert the filtered label2color dictionary to create color2label mapping | |
| return {v: k for k, v in filtered_label2color.items()} | |
| def add_dataframe(df_main,labels_repeating,label2color): | |
| col_name_map =color2label_except(label2color,labels_repeating) | |
| columns = list(col_name_map.values()) | |
| data = {col:[] for col in columns} | |
| for i in df_main: | |
| for k, v in i.items(): | |
| if k in col_name_map: | |
| data[col_name_map[k]].append(v) | |
| # join the list of strings for each column and convert to a dataframe | |
| for col in columns: | |
| data[col] = [' '.join(data[col])] | |
| df_upper = pd.DataFrame(data) | |
| key_value_pairs = [] | |
| for col in df_upper.columns: | |
| key_value_pairs.append({'key': col, 'value': df_upper[col][0]}) | |
| df_key_value = pd.DataFrame(key_value_pairs) | |
| # Extract the value from the containertype column | |
| # container_quantity = int(df_key_value[df_key_value['key'] == 'containertype']['value'].str.split("x").str[0]) | |
| # # Add a new row to the DataFrame | |
| # df_key_value = df_key_value.append({'key': 'containerquantity', 'value': container_quantity}, ignore_index=True) | |
| # # Extract the desired value from the containertype column | |
| # df_key_value.loc[df_key_value['key'] == 'containertype', 'value'] = df_key_value.loc[df_key_value['key'] == 'containertype', 'value'].str.split("x").str[1] | |
| return df_key_value | |
| import statistics | |
| def id2label_row(s, id2label): | |
| if s in id2label.values(): | |
| return s | |
| return id2label[s] | |
| def dist_height(y1,y2): | |
| return abs(int(y1)- int(y2)) | |
| def mergeBoxes(df): | |
| xmin, ymin, xmax, ymax = [], [], [], [] | |
| for i in range(df.shape[0]): | |
| box = df['bbox_column'].iloc[i] | |
| xmin.append(box[0]) | |
| ymin.append(box[1]) | |
| xmax.append(box[2]) | |
| ymax.append(box[3]) | |
| return [min(xmin), min(ymin), max(xmax), max(ymax)] | |
| def transform_dataset(df, merge_labels): | |
| df_temp = df.iloc[merge_labels] # a duplicate df with only concerned rows | |
| df_temp.reset_index(drop = True, inplace = True) | |
| text = ' '.join(df_temp['scr_column']) | |
| bbox = mergeBoxes(df_temp) | |
| retain_index = merge_labels[0] #the first index is parent row | |
| df['scr_column'].iloc[retain_index] = text | |
| df['bbox_column'].iloc[retain_index] = bbox | |
| # keeping the first & removing rest | |
| df = df.loc[~df.index.isin(merge_labels[1:]), :] | |
| df.reset_index(drop = True, inplace = True) | |
| return df | |
| def box_overlap(box1, box2, horizontal_vertical): | |
| # Extract coordinates of box1 | |
| x1_box1, y1_box1, x2_box1, y2_box1 = box1 | |
| # Extract coordinates of box2 | |
| x1_box2, y1_box2, x2_box2, y2_box2 = box2 | |
| # Check if boxes overlap horizontally and vertically | |
| if horizontal_vertical == "H": | |
| if x1_box1 <= x2_box2 and x2_box1 >= x1_box2: | |
| return True | |
| else: | |
| return False | |
| if horizontal_vertical == "V": | |
| if y1_box1 <= y2_box2 and y2_box1 >= y1_box2: | |
| return True | |
| else: | |
| return False | |
| def horizonatal_merging(df, font_length, perform_overlapping =False, x_change = 0, y_change = 0): | |
| fat_df = df.copy() | |
| for i in range(df.shape[0]): | |
| box = fat_df['bbox_column'].iloc[i] | |
| fat_df['bbox_column'].iloc[i] = [box[0]-x_change, box[1]-y_change, box[2]+x_change, box[3] + y_change] | |
| if perform_overlapping == True: | |
| redundant_rows = [] | |
| for i in range(fat_df.shape[0]): | |
| box_i = fat_df.bbox_column[i] | |
| indices2merge = [] | |
| for j in range(i+1, fat_df.shape[0]): | |
| if fat_df.preds_column[j] == fat_df.preds_column[i]: # if labels are same | |
| box_j = fat_df.bbox_column[j] | |
| if abs(box_i[1]-box_j[3])<font_length*1.5: # if the boxes are at height within 50% more range of font size | |
| # Check if boxes overlap horizontally | |
| if box_overlap(box_i, box_j, 'H'): | |
| indices2merge.append(j) | |
| df.scr_column[i] += df.scr_column[j] | |
| box_i = fat_df.bbox_column[j] # finding the next connected word | |
| #once we have all indices that belong to a particular category | |
| # merging the boundong boxes, keeping them in 1st note/row. | |
| if len(indices2merge)!=0: | |
| df['bbox_column'].iloc[i] = mergeBoxes(df.loc[indices2merge]) | |
| redundant_rows.extend(indices2merge) | |
| #now since all the transformation is done, lets remove the redundant rows | |
| return df.drop(redundant_rows) | |
| def mergeLabelsExtensive_repeating(df_grouped, repeating_label): | |
| df_grouped.reset_index(inplace = True, drop = True) | |
| # this function merges same label entities together in a single instance. | |
| df_grouped = df_grouped[df_grouped['preds_column'].isin(repeating_label)] | |
| font_length =0 | |
| count = 0 | |
| while count<5 and count<df_grouped.shape[0]: | |
| box_i = df_grouped['bbox_column'].iloc[count] # box of current label contains [x1,y1,x3,y3] | |
| font_length += box_i[3]-box_i[1] | |
| count +=1 | |
| font_length = font_length/5 | |
| df_grouped = horizonatal_merging(df_grouped, font_length, True, 30, 0) | |
| return df_grouped | |
| def group_labels_wrt_height(df): | |
| """ | |
| This function groups the labels based on the height of the bounding box. | |
| """ | |
| #sorting the lines based on heights using column 'y_axis' | |
| df = df.sort_values(by='y_axis') | |
| df.reset_index(inplace = True, drop = True) | |
| print("entering: group_labels_wrt_height ") | |
| final_yaxis = [] | |
| final_scr = [] | |
| final_pred = [] | |
| current_group = [] | |
| current_scr = [] | |
| current_pred = [] | |
| # Iterate through the column values | |
| for i, (value,scr,preds ) in enumerate(zip(df['y_axis'], df['scr_column'], df['preds_column'])): | |
| if i == 0: | |
| # Start a new group with the first value | |
| current_group.append(value) | |
| current_scr.append(scr) | |
| current_pred.append(preds) | |
| else: | |
| # Check if the difference between the current value and the previous value is <= 20 | |
| if abs(value - df['y_axis'][i - 1]) <= 35: | |
| # Add the value to the current group | |
| current_group.append(value) | |
| current_scr.append(scr) | |
| current_pred.append(preds) | |
| else: | |
| # Start a new group with the current value | |
| final_yaxis.append(current_group) | |
| final_scr.append(current_scr) | |
| final_pred.append(current_pred) | |
| current_group = [value] | |
| current_scr = [scr] | |
| current_pred = [preds] | |
| # Add the last group | |
| final_yaxis.append(current_group) | |
| final_scr.append(current_scr) | |
| final_pred.append(current_pred) | |
| final_grouped_df = pd.DataFrame({'y_axis': final_yaxis, 'scr_column': final_scr, 'preds_column': final_pred}) | |
| print("Grouped df after sorting based on height") | |
| print_df(final_grouped_df) | |
| return final_grouped_df | |
| # searches the set of labels in the whole range | |
| def search_labelSet_height_range(df, d, keyList): | |
| print("search_labelSet_height_range") | |
| keyDict = dict.fromkeys(keyList, []) #stores the required information as dictonary, then coverted to df | |
| print("Dataframe from extraction is going to happen: ") | |
| for i in range(df.shape[0]): # search df for right-bottom y axis value and check if it lies within the range d. | |
| box = df['bbox_column'].iloc[i] | |
| if dist_height(box[1], d)<50: | |
| key = df['preds_column'].iloc[i] | |
| keyDict[key] = df['scr_column'].iloc[i] | |
| return keyDict | |
| def clean_colText(df, column): | |
| for i in range(df.shape[0]): | |
| df[column].iloc[i] = df[column].iloc[i].replace('[', '').replace('|', '').replace('+', '') | |
| return df | |
| def find_repeatingLabels(df, labels_repeating): | |
| print("In find_repeatingLabels: ") | |
| row2drop = [] # dropping the rows that have been covered in previous dataframe | |
| for i in range(df.shape[0]): | |
| df['preds_column'].iloc[i] = id2label_row(df['preds_column'].iloc[i], id2label) | |
| if df['preds_column'].iloc[i] not in labels_repeating: | |
| row2drop.append(i) | |
| df.drop(index = row2drop, inplace = True) | |
| df = clean_colText(df, 'scr_column') | |
| print("removing non-tabular labels.") | |
| df = mergeLabelsExtensive_repeating(df,labels_repeating) | |
| print('after merging non-tabular labels: ') | |
| labels_repeating = list(set(list(df["preds_column"]))) | |
| print("labels_repeating in this document are: ",labels_repeating) | |
| # adding extra column that contains the Y-axis information (Height) | |
| df['y_axis'] = np.NaN | |
| for i in range(df.shape[0]): | |
| box = df['bbox_column'].iloc[i] | |
| df['y_axis'].iloc[i] = box[1] | |
| print("After adding y-axis data in the dataframes: ") | |
| df = mergeLabelsExtensive(df) | |
| print("aftermerging the df extensively") | |
| print("Grouping the labels wrt heights: ") | |
| grouped_df = group_labels_wrt_height(df) | |
| #once labels are grouped, now we will create dictionaries for labels and values occuring in single line | |
| row_dicts = [] # will contains each row of df as single dictionary. | |
| for _, row in grouped_df.iterrows(): | |
| row_dict = {} | |
| for preds, scr in zip(row['preds_column'], row['scr_column']): | |
| row_dict[preds] = scr | |
| row_dicts.append(row_dict) | |
| #creating new | |
| final_df = pd.DataFrame(columns=labels_repeating) | |
| for d in row_dicts: | |
| final_df = final_df.append(d, ignore_index=True) | |
| final_df = final_df.fillna('') | |
| return final_df | |
| def mergeImageVertical(images): | |
| # pick the image which is the smallest, and resize the others to match it (can be arbitrary image shape here) | |
| min_shape = sorted( [(np.sum(i.size), i.size ) for i in images])[0][1] | |
| imgs_comb = np.hstack([i.resize(min_shape) for i in images]) | |
| # for a vertical stacking it is simple: use vstack | |
| imgs_comb = np.vstack([i.resize(min_shape) for i in images]) | |
| imgs_comb = Image.fromarray(imgs_comb) | |
| return imgs_comb | |
| def perform_erosion(img): | |
| # Check if the image is already in grayscale | |
| if len(img.shape) == 2: | |
| gray = img | |
| else: | |
| # Convert the image to grayscale | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| # Define the kernel for erosion and dilation | |
| kernel = np.ones((3, 3), np.uint8) | |
| # Perform erosion followed by dilation | |
| erosion = cv2.erode(gray, kernel, iterations=1) | |
| dilation = cv2.dilate(erosion, kernel, iterations=1) | |
| # Double the size of the image | |
| double_size = cv2.resize(gray, None, fx=2, fy=2, interpolation=cv2.INTER_LINEAR) | |
| # Perform erosion on the doubled image | |
| double_erosion = cv2.erode(double_size, kernel, iterations=1) | |
| return double_erosion | |
| def remove_leading_trailing_special_characters(input_string): | |
| cleaned_string = re.sub(r'^[^A-Za-z0-9]+|[^A-Za-z0-9]+$', '', str(input_string)) | |
| return cleaned_string | |
| def clean_dataframe(df): | |
| # Apply the remove_leading_trailing_special_characters function to all string columns | |
| for column in df.select_dtypes(include='object').columns: | |
| df[column] = df[column].apply(remove_leading_trailing_special_characters) | |
| # Remove rows with all NaN or blank values | |
| df = df.fillna('') # Replace NaN values with blank | |
| return df | |
| def mergeLabelsExtensive(df_grouped): | |
| i = 0 | |
| while i < df_grouped.shape[0]: | |
| merge_labels = [i] # collects indices whose data has been merged, so we need to delete it now. | |
| label = df_grouped['preds_column'].iloc[i] | |
| box1 = df_grouped['bbox_column'].iloc[i] | |
| for j in range(i+1, df_grouped.shape[0]): | |
| box2 = df_grouped['bbox_column'].iloc[j] | |
| if label == df_grouped['preds_column'].iloc[j] and dist_height(box1[3], box2[3])<20: # which are in the vicinity of 20 pixels. | |
| merge_labels.append(j) | |
| print_df(df_grouped) | |
| df_grouped = transform_dataset(df_grouped, merge_labels) | |
| i = i+1 | |
| return df_grouped | |
| def multilabelsHandle(df, thermo_details): | |
| # Since 0 is assigned to 'others' and these values are not so important. We delete these values. | |
| df = df[df.preds_column != 0] | |
| df.reset_index(drop=True, inplace=True) | |
| for i in range(df.shape[0]): | |
| df['preds_column'].iloc[i] = id2label.get(df['preds_column'].iloc[i]) | |
| df['preds_column'].unique() | |
| df_grouped = df.copy() #stores the index of relevant labels. | |
| df_grouped.shape[0] | |
| for i in range(df.shape[0]): | |
| if df['preds_column'].iloc[i] not in thermo_details: | |
| df_grouped.drop(i, inplace = True) | |
| df_grouped.reset_index(drop=True, inplace=True) | |
| keyList = df_grouped['preds_column'].unique() | |
| df_grouped = mergeLabelsExtensive(df_grouped) | |
| # extract the height of boxes | |
| df_grouped = extract_yaxis(df_grouped) | |
| shipment_labels = ['delivery_name','delivery_address','contact_phone'] | |
| # shipment | |
| heights_shipment = get_heights(df_grouped, shipment_labels) | |
| # now segregating the other repeating values in df like measiure, weight, volume etc. | |
| # they will be containeed within the heights, as they act as boudaries. | |
| df_labelSet = pd.DataFrame(columns= thermo_details) | |
| for i in range(len(heights_shipment)): | |
| if i == len(heights_shipment)-1: | |
| new_df = search_labelSet_between_h1_h2(df_grouped, heights_shipment[i], 5000, keyList) | |
| else: | |
| new_df = search_labelSet_between_h1_h2(df_grouped, heights_shipment[i], heights_shipment[i+1], keyList) | |
| df_labelSet = df_labelSet.append(new_df, ignore_index=True) | |
| return df_labelSet | |
| def completepreprocess(pdffile,ocr_type): | |
| myDataFrame = pd.DataFrame() | |
| myDataFrame2 = pd.DataFrame() | |
| merge_pages=[] | |
| doc = fitz.open(pdffile) | |
| for i in range(0, len(doc)): | |
| page = doc.load_page(i) | |
| zoom = 2 | |
| mat = fitz.Matrix(zoom, zoom) | |
| pix = page.get_pixmap(matrix = mat, dpi = 300) | |
| image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| ro_image = rotate_image(image) | |
| if ro_image is None: | |
| return None | |
| angle, skewed_image = correct_skew(ro_image) | |
| if skewed_image is None: | |
| return None | |
| remove_border = removeBorders(skewed_image) | |
| image = Image.fromarray(remove_border) | |
| width,height=image.size | |
| label2color = generate_unique_colors(id2label) | |
| width,height=image.size | |
| if ocr_type == "GoogleVisionOCR": | |
| words, boxes = process_image_GoogleVision(image, width, height) | |
| else: | |
| words, boxes = process_image_pytesseract(image, width, height) | |
| bbox, preds, words, image = process_image_encoding(model, processor, image, words, boxes,width,height) | |
| im, df_visualize = visualize_image(bbox, preds, words, image,label2color) | |
| df_main = process_form_(df_visualize) | |
| bbox_column = bbox | |
| preds_column = preds | |
| scr_column = words | |
| # dictionary of lists | |
| dict = {'bbox_column': bbox_column, 'preds_column': preds_column, 'scr_column': scr_column} | |
| df_single_page = pd.DataFrame(dict) | |
| labels_repeating = ['art_code', 'ref_code', 'detail_desc','lot_id','detail_qty','detail_um','detail_tare','detail_grossw','detail_netw','detail_origin','varieta','raccolta'] | |
| df_repeating_page = find_repeatingLabels(df_single_page, labels_repeating) | |
| myDataFrame2= myDataFrame2.append(df_repeating_page,sort=False) | |
| df1=add_dataframe(df_main,labels_repeating,label2color).astype(str) | |
| myDataFrame= myDataFrame.append(df1,sort=False).reset_index(drop = True) | |
| myDataFrame['value'].apply(len) | |
| row2drop = [] | |
| for i in range(myDataFrame.shape[0]): | |
| if len( myDataFrame['value'].iloc[i]) ==0: | |
| row2drop.append(i) | |
| myDataFrame.drop(index = row2drop, inplace = True) | |
| myDataFrame.reset_index(drop = True, inplace = True) | |
| myDataFrame = myDataFrame[myDataFrame["value"].notnull()] | |
| myDataFrame.drop_duplicates(subset=["key"],inplace=True) | |
| myDataFrame2 = myDataFrame2.loc[:, ~(myDataFrame2.apply(lambda x: all(isinstance(val, list) and len(val) == 0 for val in x)))] | |
| merge_pages.append(im) | |
| im2=mergeImageVertical(merge_pages) | |
| myDataFrame2 = clean_dataframe(myDataFrame2) | |
| myDataFrame = clean_dataframe(myDataFrame) | |
| myDataFrame = myDataFrame[myDataFrame['key'] != 'others'] | |
| output_excel_path = createExcel(myDataFrame, myDataFrame2, pdffile.name) | |
| return im2,myDataFrame,myDataFrame2,output_excel_path | |
| title = "Interactive demo: Transport Document Information Extraction model PDF/Images" | |
| description = "Results will show up in a few seconds. This model is trained on only 1326 Images whereas 226 images are used for testing purposes. The annotated image can be opened in a new window for a better view." | |
| css = """.output_image, .input_image {height: 600px !important}""" | |
| examples = [["sample_doc.pdf"]] | |
| iface = gr.Interface( | |
| fn=completepreprocess, | |
| inputs=[ | |
| gr.components.File(label="PDF"), | |
| gr.components.Dropdown(label="Select the OCR", choices=["Pytesseract","GoogleVisionOCR"]), | |
| ], | |
| outputs=[ | |
| gr.components.Image(type="pil", label="annotated image"), | |
| "dataframe", | |
| "dataframe", | |
| gr.File(label="Excel output") | |
| ], | |
| title=title, | |
| description=description, | |
| examples=examples, | |
| css=css | |
| ) | |
| iface.launch(inline=True, debug=True) |