Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| from pathlib import Path | |
| import traceback | |
| #import time | |
| from typing import Dict, Any, Type, Optional, Union, Literal #, BaseModel | |
| from pydantic import BaseModel | |
| from marker.models import create_model_dict | |
| #from marker.converters.extraction import ExtractionConverter as MarkerExtractor ## structured pydantic extraction | |
| from marker.converters.pdf import PdfConverter as MarkerConverter ## full document convertion/extraction | |
| from marker.config.parser import ConfigParser ## Process custom configuration | |
| from marker.services.openai import OpenAIService as MarkerOpenAIService | |
| from marker.settings import settings | |
| #from sympy import Union | |
| from utils.logger import get_logger | |
| logger = get_logger(__name__) | |
| # create/load models. Called to curtail reloading models at each instance | |
| def load_models(): | |
| """ Creates Marker's models dict. Initiate download of models """ | |
| return create_model_dict() | |
| # Full document converter | |
| class DocumentConverter: | |
| """ | |
| Business logic wrapper using Marker OpenAI LLM Services to | |
| convert documents (PDF, HTML files) into markdowns + assets. | |
| """ | |
| def __init__(self, | |
| #provider: str, | |
| model_id: str, | |
| #base_url: str, | |
| hf_provider: str, | |
| #endpoint_url: str, | |
| #backend_choice: str, | |
| #system_message: str, | |
| #max_tokens: int, | |
| temperature: float, | |
| top_p: float, | |
| #stream: bool, | |
| api_token: str, | |
| openai_base_url: str = "https://router.huggingface.co/v1", | |
| openai_image_format: Optional[str] = "webp", | |
| max_workers: Optional[str] = 1, #4, for config_dict["pdftext_workers"] | |
| max_retries: Optional[int] = 2, | |
| debug: Optional[bool] = None, #bool = False, | |
| #output_format: str = "markdown", | |
| output_format: Literal["markdown", "json", "html"] = "markdown", | |
| output_dir: Optional[Union[str, Path]] = "output_dir", | |
| use_llm: Optional[bool] = None, #bool = False, #Optional[bool] = False, #True, | |
| force_ocr: Optional[bool] = None, #bool = False, | |
| strip_existing_ocr: Optional[bool] = None, #bool = False, | |
| disable_ocr_math: Optional[bool] = None, #bool = False, | |
| page_range: Optional[str] = None, #str = None #Optional[str] = None, | |
| ): | |
| #self.converter = None #MarkerConverter | |
| self.model_id = model_id #"model_name" | |
| self.openai_api_key = api_token ## to replace dependency on self.client.openai_api_key | |
| self.openai_base_url = openai_base_url #, #self.base_url, | |
| self.temperature = temperature #, self.client.temperature, | |
| self.top_p = top_p # self.client.top_p, | |
| self.llm_service = MarkerOpenAIService | |
| self.openai_image_format = openai_image_format #"png" #better compatibility | |
| self.max_workers = max_workers #int(1) ## pass to config_dict["pdftext_workers"] | |
| self.max_retries = max_retries ## pass to __call__ | |
| self.debug = debug | |
| #self.output_format = output_format | |
| self.output_format = output_format | |
| self.output_dir = settings.DEBUG_DATA_FOLDER if debug else output_dir, | |
| self.use_llm = use_llm if use_llm else False #use_llm[0] if isinstance(use_llm, tuple) else use_llm, #False, #True, | |
| self.force_ocr = force_ocr if force_ocr else False | |
| self.strip_existing_ocr = strip_existing_ocr #if strip_existing_ocr else False | |
| self.disable_ocr_math = disable_ocr_math #if disable_ocr else False | |
| #self.page_range = page_range[0] if isinstance(page_range, tuple) else page_range ##SMY: iterating twice because self.page casting as hint type tuple! | |
| self.page_range = page_range if page_range else None | |
| # self.page_range = page_range[0] if isinstance(page_range, tuple) else page_range if isinstance(page_range, str) else None, ##Example: "0,4-8,16" ##Marker parses as List[int] #]debug #len(pdf_file) | |
| self.converter = None | |
| # 0) Instantiate the LLM Client (OPENAIChatClient): Get a providerβagnostic chat function | |
| ##SMY: #future. Plan to integrate into Marker: uses its own LLM services (clients). As at 1.9.2, there's no huggingface client service. | |
| ''' | |
| try: | |
| self.client = OpenAIChatClient( | |
| model_id=model_id, | |
| hf_provider=hf_provider, | |
| #base_url=base_url, | |
| api_token=api_token, | |
| temperature=temperature, | |
| top_p=top_p, | |
| ) | |
| logger.log(level=20, msg="βοΈ OpenAIChatClient instantiated:", extra={"model_id": self.client.model_id, "chatclient": str(self.client)}) | |
| except Exception as exc: | |
| tb = traceback.format_exc() #exc.__traceback__ | |
| logger.exception(f"β Error initialising OpenAIChatClient: {exc}\n{tb}") | |
| raise RuntimeError(f"β Error initialising OpenAIChatClient: {exc}\n{tb}") #.with_traceback(tb) | |
| ''' | |
| # 1) # Define the custom configuration for the Hugging Face LLM. | |
| # Use typing.Dict and typing.Any for flexible dictionary type hints | |
| try: | |
| #self.config_dict: Dict[str, Any] = self.get_config_dict(model_id=model_id, llm_service=str(self.llm_service), output_format=output_format) | |
| self.config_dict: Dict[str, Any] = self.get_config_dict() | |
| ##SMY: execute if page_range is none. `else None` ensures valid syntactic expression | |
| ##SMY: if falsely empty tuple () or None, pop the "page_range" key-value pair, else do nothing if truthy tuple value (i.e. keep as-is) | |
| self.config_dict.pop("page_range", None) if not self.config_dict.get("page_range") else None | |
| # use_llm test moved to config_dict | |
| #self.config_dict.pop("use_llm", None) if not self.config_dict.get("use_llm") or self.config_dict.get("use_llm") is False or self.config_dict.get("use_llm") == 'False' else None | |
| self.config_dict.pop("force_ocr", None) if not self.config_dict.get("force_ocr") or self.config_dict.get("force_ocr") is False or self.config_dict.get("force_ocr") == 'False' else None | |
| logger.log(level=20, msg="βοΈ config_dict custom configured:", extra={"service": "openai"}) #, "config": str(self.config_dict)}) | |
| except Exception as exc: | |
| tb = traceback.format_exc() #exc.__traceback__ | |
| logger.exception(f"β Error configuring custom config_dict: {exc}\n{tb}") | |
| raise RuntimeError(f"β Error configuring custom config_dict: {exc}\n{tb}") #.with_traceback(tb) | |
| # 2) Use the Marker's ConfigParser to process configuration. | |
| # The `ConfigParser` class is explicitly imported and used as the type hint. | |
| try: | |
| config_parser: ConfigParser = ConfigParser(self.config_dict) | |
| logger.log(level=20, msg="βοΈ parsed/processed custom config_dict:", extra={"config": str(config_parser)}) #.config_dict)}) | |
| except Exception as exc: | |
| tb = traceback.format_exc() #exc.__traceback__ | |
| logger.exception(f"β Error parsing/processing custom config_dict: {exc}\n{tb}") | |
| raise RuntimeError(f"β Error parsing/processing custom config_dict: {exc}\n{tb}") #.with_traceback(tb) | |
| # 3) Load models if not already loaded in reload mode | |
| from globals import config_load_models | |
| try: | |
| if config_load_models.model_dict: | |
| model_dict = config_load_models.model_dict | |
| #elif not config_load_models.model_dict or 'model_dict' not in globals(): | |
| else: | |
| model_dict = load_models() | |
| '''if 'model_dict' not in globals(): | |
| #model_dict = self.load_models() | |
| model_dict = load_models()''' | |
| except OSError as exc_ose: | |
| tb = traceback.format_exc() #exc.__traceback__ | |
| logger.warning(f"β οΈ OSError: the paging file is too small (to complete reload): {exc_ose}\n{tb}") | |
| pass | |
| except Exception as exc: | |
| tb = traceback.format_exc() #exc.__traceback__ | |
| logger.exception(f"β Error loading models (reload): {exc}\n{tb}") | |
| raise RuntimeError(f"β Error loading models (reload): {exc}\n{tb}") #.with_traceback(tb) | |
| # 4) Instantiate Marker's MarkerConverter (PdfConverter) with config managed by config_parser | |
| try: # Assign llm_service if api_token. ##SMY: split and slicing ##Gets the string value | |
| #llm_service_str = None if api_token == '' or api_token is None or self.use_llm is False else str(self.llm_service).split("'")[1] # | |
| llm_service_str = None if not self.use_llm or self.use_llm == "False" or self.use_llm is False else str(self.llm_service).split("'")[1] # | |
| # sets api_key required by Marker ## to handle Marker's assertion test on OpenAI | |
| if llm_service_str: | |
| os.environ["OPENAI_API_KEY"] = api_token if api_token and api_token != '' else os.getenv("OPENAI_API_KEY") or os.getenv("GEMINI_API_KEY") or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
| #logger.log(level=20, msg="self.converter: instantiating MarkerConverter:", extra={"llm_service_str": llm_service_str, "api_token": api_token}) ##debug | |
| config_dict = config_parser.generate_config_dict() | |
| #config_dict["pdftext_worker"] = self.max_workers #1 ##SMY: moved to get_config_dicts() | |
| #self.converter: marker.converters.pdf.PdfConverter | |
| self.converter = MarkerConverter( | |
| #artifact_dict=create_model_dict(), | |
| artifact_dict=model_dict if model_dict else create_model_dict(), | |
| config=config_dict, | |
| #config=config_parser.generate_config_dict(), | |
| #llm_service=self.llm_service ##SMY expecting str but self.llm_service, is service object marker.services of type BaseServices | |
| llm_service=llm_service_str, ##resolve | |
| processor_list=config_parser.get_processors(), | |
| renderer=config_parser.get_renderer(), | |
| ) | |
| logger.log(level=20, msg="βοΈ MarkerConverter instantiated successfully:", extra={"converter.config": str(self.converter.config.get("openai_base_url")), "use_llm":self.converter.use_llm}) | |
| #return self.converter ##SMY: to query why did I comment out?. Bingo: "__init__() should return None, not 'PdfConverter'" | |
| except Exception as exc: | |
| tb = traceback.format_exc | |
| logger.exception(f"β Error initialising MarkerExtractor: {exc}\n{tb}") | |
| raise RuntimeError(f"β Error initialising MarkerExtractor: {exc}\n{tb}") | |
| # Define the custom configuration for HF LLM. | |
| #def get_config_dict(self, model_id: str, llm_service=MarkerOpenAIService, output_format: Optional[str] = "markdown" ) -> Dict[str, Any]: | |
| def get_config_dict(self, ) -> Dict[str, Any]: | |
| """ Define the custom configuration for the Hugging Face LLM: combining Markers cli_options and LLM. """ | |
| try: | |
| ## LLM Enable higher quality processing. ## See MarkerOpenAIService, | |
| ##llm_service = llm_service.removeprefix("<class '").removesuffix("'>") # e.g <class 'marker.services.openai.OpenAIService'> | |
| #llm_service = str(llm_service).split("'")[1] ## SMY: split and slicing | |
| self.use_llm = self.use_llm[0] if isinstance(self.use_llm, tuple) else self.use_llm | |
| self.page_range = self.page_range[0] if isinstance(self.page_range, tuple) else self.page_range #if isinstance(self.page_range, str) else None, ##SMY: passing as hint type tuple! | |
| ##SMY: TODO: convert to {inputs} and called from gradio_ui | |
| if not self.use_llm or self.use_llm == 'False': | |
| config_dict = { | |
| "output_format" : self.output_format, #"markdown", | |
| #"openai_model" : self.model_id, #self.client.model_id, #"model_name" | |
| #"openai_api_key" : self.openai_api_key, #self.client.openai_api_key, #self.api_token, | |
| #"openai_base_url": self.openai_base_url, #self.client.base_url, #self.base_url, | |
| #"temperature" : self.temperature, #self.client.temperature, | |
| #"top_p" : self.top_p, #self.client.top_p, | |
| #"openai_image_format": self.openai_image_format, #"webp", #"png" #better compatibility | |
| "pdftext_workers": self.max_workers, ## number of workers to use for pdftext." | |
| #"max_retries" : self.max_retries, #3, ## pass to __call__ | |
| "debug" : self.debug, | |
| "output_dir" : self.output_dir, | |
| #"use_llm" : self.use_llm, #False, #True, | |
| "force_ocr" : self.force_ocr, #False, | |
| "strip_existing_ocr": self.strip_existing_ocr, #False | |
| "disable_ocr_math": self.disable_ocr_math, | |
| "page_range" : self.page_range, ##debug #len(pdf_file) | |
| } | |
| else: | |
| config_dict = { | |
| "output_format" : self.output_format, #"markdown", | |
| "openai_model" : self.model_id, #self.client.model_id, #"model_name" | |
| "openai_api_key" : self.openai_api_key, #self.client.openai_api_key, #self.api_token, | |
| "openai_base_url": self.openai_base_url, #self.client.base_url, #self.base_url, | |
| "temperature" : self.temperature, #self.client.temperature, | |
| "top_p" : self.top_p, #self.client.top_p, | |
| "openai_image_format": self.openai_image_format, #"webp", #"png" #better compatibility | |
| "pdftext_workers": self.max_workers, ## number of workers to use for pdftext." | |
| #"max_retries" : self.max_retries, #3, ## pass to __call__ | |
| "debug" : self.debug, | |
| "output_dir" : self.output_dir, | |
| "use_llm" : self.use_llm, #False, #True, | |
| "force_ocr" : self.force_ocr, #False, | |
| "strip_existing_ocr": self.strip_existing_ocr, #False | |
| "disable_ocr_math": self.disable_ocr_math, | |
| "page_range" : self.page_range, ##debug #len(pdf_file) | |
| } | |
| return config_dict | |
| except Exception as exc: | |
| tb = traceback.format_exc() #exc.__traceback__ | |
| logger.exception(f"β Error configuring custom config_dict: {exc}\n{tb}") | |
| raise RuntimeError(f"β Error configuring custom config_dict: {exc}\n{tb}") #").with_traceback(tb) | |
| #raise | |