Spaces:
Runtime error
Runtime error
| # Copyright 2020 The HuggingFace Datasets Authors and the current dataset script contributor. | |
| # | |
| # #TODO: license: MIT pending (evaluation suite itself can be completely open, nothing copyleft from the dataset reaches us here) | |
| """TODO: Add a description here.""" | |
| # TODO: Add BibTeX citation | |
| _CITATION = """\ | |
| @InProceedings{huggingface:module, | |
| title = {A great new module}, | |
| authors={huggingface, Inc.}, | |
| year={2023} | |
| } | |
| """ | |
| # TODO: Add description of the module here | |
| _DESCRIPTION = """\ | |
| This EvaluationSuite currently solves {1} tasks to test code intelligence of genereative language models for "creative programming" (fragment shaders). | |
| """ | |
| # via https://huggingface.co/docs/evaluate/evaluation_suite | |
| import evaluate | |
| from evaluate import evaluator #used by Suite.run() | |
| from evaluate.evaluator.utils import DatasetColumn # used in .prepare_data() | |
| from evaluate.evaluation_suite import SubTask | |
| from datasets import Dataset | |
| from typing import Any, Callable, Dict, List, Optional, Union # used in .prepare_pipeline() | |
| import transformers | |
| from transformers import Pipeline, pipeline, GenerationConfig, AutoTokenizer #GenerationConfig to specify greedy and avoid error | |
| from datasets import load_dataset #used by Suite.run() | |
| # write a custom evaluator, inherent from: https://github.com/huggingface/evaluate/blob/v0.4.0/src/evaluate/evaluator/text_generation.py#L31 | |
| class ReturnGenerationEvaluator(evaluate.TextGenerationEvaluator): | |
| def __init__(self, task="text-generation", default_metric_name="exact_match", predictions_prefix: str = "generated"): | |
| super().__init__(task=task, default_metric_name=default_metric_name) | |
| self.predictions_prefix = predictions_prefix | |
| greedy_cfg = GenerationConfig( | |
| do_sample = False, # default to ensure greedy | |
| num_beams = 1, # same as above | |
| ) | |
| PIPELINE_KWARGS = {"return_full_text": False, "generation_config":greedy_cfg} #these kwargs are for the pipeline call, not the pipeline init - but that seems to still work. | |
| # for the pipeline init we need to copy the whole function and add two lines. this still prints errors due to the pad_toke_id = eos_token_id change. | |
| # from: https://github.com/huggingface/evaluate/blob/v0.4.0/src/evaluate/evaluator/base.py#L375 | |
| def prepare_pipeline( | |
| self, | |
| model_or_pipeline: Union[str, "Pipeline", Callable, "PreTrainedModel", "TFPreTrainedModel"], # noqa: F821 | |
| tokenizer: Union["PreTrainedTokenizerBase", "FeatureExtractionMixin"] = None, # noqa: F821 | |
| feature_extractor: Union["PreTrainedTokenizerBase", "FeatureExtractionMixin"] = None, # noqa: F821 | |
| device: int = None, | |
| ): | |
| """ | |
| Prepare pipeline. | |
| Args: | |
| model_or_pipeline (`str` or `Pipeline` or `Callable` or `PreTrainedModel` or `TFPreTrainedModel`, | |
| defaults to `None`): | |
| If the argument in not specified, we initialize the default pipeline for the task. If the argument is of the type `str` or | |
| is a model instance, we use it to initialize a new `Pipeline` with the given model. Otherwise we assume the | |
| argument specifies a pre-initialized pipeline. | |
| preprocessor (`PreTrainedTokenizerBase` or `FeatureExtractionMixin`, *optional*, defaults to `None`): | |
| Argument can be used to overwrite a default preprocessor if `model_or_pipeline` represents a model for | |
| which we build a pipeline. If `model_or_pipeline` is `None` or a pre-initialized pipeline, we ignore | |
| this argument. | |
| Returns: | |
| The initialized pipeline, with modifications for the specific task of generating text, even with long inputs. | |
| """ | |
| if device is None: | |
| device = self._infer_device() | |
| if ( | |
| isinstance(model_or_pipeline, str) | |
| or isinstance(model_or_pipeline, transformers.PreTrainedModel) | |
| or isinstance(model_or_pipeline, transformers.TFPreTrainedModel) | |
| ): | |
| if isinstance(model_or_pipeline, str): | |
| # load tokenizer manually, since the pipeline does fail to do so at times. needed for bigcode/santacoder for example. | |
| tokenizer = AutoTokenizer.from_pretrained(model_or_pipeline, trust_remote_code=True) | |
| pipe = pipeline( | |
| self.task, | |
| model=model_or_pipeline, | |
| tokenizer=tokenizer, | |
| feature_extractor=feature_extractor, | |
| device=device, | |
| # my additions here: | |
| handle_long_generation= "hole", #our solution? relevant: https://github.com/huggingface/transformers/issues/14033#issuecomment-948385227 | |
| # pad_token_id=tokenizer.eos_token_id, #to avoid the warning, however there might be issues as tokenizers will call this differently. | |
| do_sample=False, #important to get reproduceable results but we need to make sure the generator is deterministic | |
| trust_remote_code=True, # do we need this for some custom models? need to test if it works right here. one example is bigcode/santacoder | |
| ) | |
| else: | |
| if model_or_pipeline is None: | |
| pipe = pipeline(self.task, device=device) | |
| else: | |
| pipe = model_or_pipeline | |
| # if tokenizer is not None and feature_extractor is not None: | |
| # logger.warning("Ignoring the value of the preprocessor argument (`tokenizer` or `feature_extractor`).") #excluded warning because I didn't import logger | |
| if (pipe.task != self.task) and not (self.task == "translation" and pipe.task.startswith("translation")): | |
| raise ValueError( | |
| f"Incompatible `model_or_pipeline`. Please specify `model_or_pipeline` compatible with the `{self.task}` task." | |
| ) | |
| # fixinging default for max_lenght | |
| pipe.model.config.max_length = self._resolve_context_lenght(pipe=pipe) | |
| # update the generation config with information from the pipe | |
| self._update_generation_config(pipe) | |
| return pipe | |
| def _update_generation_config(self, pipe): | |
| """ | |
| Update the generation config with information from the pipe. Sets eos_token_id and pad_token_id. | |
| Args: | |
| pipe (:class:`~transformers.Pipeline`): we need to access the tokenizer.vocab | |
| returns: | |
| None | |
| """ | |
| semicolon_token_ids = [v for k,v in pipe.tokenizer.vocab.items() if ";" in k] # this requires the tokenizer, which we only have once a pipe is made. | |
| # GenerationConfig.update also exists, but it does only replace, not add kwargs. | |
| self.greedy_cfg.eos_token_id = semicolon_token_ids # eos_token_id can be a list, so we give them all possible tokens. | |
| self.greedy_cfg.pad_token_id = semicolon_token_ids[0] # pad_token_id has to be an int, so we just take the first one. | |
| return None # doesn't do anything? | |
| def _resolve_context_lenght(self, model_or_pipeline=None, pipe=None): #TODO should really copy the typing hints here. | |
| if isinstance(model_or_pipeline, transformers.GPT2Model): # you are comparing a string here -.- | |
| return model_or_pipeline.config.n_ctx # how GPT2 models might handle is, seen with | |
| if pipe is not None: #should I figure out a way to pass this. | |
| return pipe.tokenizer.model_max_length # this is set to something small for pipeline default task, but we would want to put it to the max instead. | |
| # tokenizer needs to know the context length for our pipe strategy, but it has to be passed to the tokenizer, not model. | |
| # the tokenizer should read from the model config, but that can be wrong, or it has a task overwrite (for "text-generation" for example you get 50) | |
| #model_or_pipeline only exists via the .compute call, so we have to take it in | |
| # model_or_pipeline.tokenier.config.max_new_tokens = 1024 # we shouldn't return it, but overwrite the tokenizer config, which the pipeline relies on. | |
| return 1024 # we shouldn't return it, but overwrite the tokenizer config, which the pipeline relies on. | |
| def _estimate_stopping(self, labels, **kwargs): | |
| """ estimates max_new_tokens for the pipeline call | |
| by counting the characters in the longest string of the references adding 5 (for good measure but probably not needed) | |
| Args: | |
| labels: A list of dicts by knowing the labels | |
| Returns: | |
| `int`: the estimated max_new_tokens, should be smaller than context_lenght in all cases | |
| """ | |
| context_lenght = self._resolve_context_lenght(**kwargs) | |
| estimate = min(max([len(ref) for ref in labels]) + 5, context_lenght) #does the min call get done inside the pipeline anyway? is there even a single case where the return statement is this long? | |
| return estimate | |
| # this one needs to be adjusted | |
| def predictions_processor(self, predictions, *args, **kwargs): | |
| """ | |
| processes the output of the pipeline to be compatible with the metric. | |
| generated texts cut off by the first semicolon and whitespaces are stripped (using python str builtins) | |
| Args: | |
| predictions: A list of lists of dicts | |
| Returns: | |
| `dict`: All the processed text are flattened and stored under the "predictions" key. | |
| """ | |
| return {"predictions": [pred[f"{self.predictions_prefix}_text"].split(";")[0].strip() for pred_list in predictions for pred in pred_list]} | |
| # straight copy, doesn't seem to give me the | |
| def prepare_data(self, data: Dataset, input_column: str, label_column: str, *args, **kwargs): | |
| """ | |
| Prepare data. | |
| Args: | |
| data (`Dataset`): Specifies the dataset we will run evaluation on. | |
| input_column (`str`, defaults to `"text"`): | |
| the name of the column containing the text feature in the dataset specified by `data`. | |
| label_column (`str`, defaults to `"label"`): | |
| the name of the column containing the labels in the dataset specified by `data`. | |
| Returns: | |
| `dict`: metric inputs. everything before the first semicolon and whitespaces are stripped (using python str builtins, just like the pred prep) | |
| `list`: pipeline inputs. | |
| """ | |
| self.check_required_columns(data, {"input_column": input_column, "label_column": label_column}) #this will throw and exception with useful error messages | |
| # don't put everything in the return statement, so you have the control... | |
| references = [ref.split(";")[0].strip() for ref in data[label_column]] | |
| self.PIPELINE_KWARGS.update({"max_new_tokens": self._estimate_stopping(references)}) #this is a hack, does it work tho? | |
| return {"references": references}, data[input_column] #DatasetColumn(data, input_column) doesn't seem to work. data[input_column] does, but ignores any of the features of the helper class.. | |
| # via: https://huggingface.co/docs/evaluate/evaluation_suite | |
| # relevant source: https://github.com/huggingface/evaluate/blob/v0.4.0/src/evaluate/evaluation_suite/__init__.py | |
| class Suite(evaluate.EvaluationSuite): | |
| def __init__(self, name): | |
| super().__init__(name) | |
| self.preprocessor = lambda x: {"return_statement": x["return_statement"].split(";")[0]} #like this? refactored to RetrunGenerationEvaluator | |
| self.suite = [ | |
| # more subtasks are only possible once we can pass custom evaluators. -> https://github.com/huggingface/evaluate/pull/367 | |
| SubTask( #this one is adjusted already | |
| task_type="text-generation", #this call an evaluator, but can you specify your own custom evaluator instead? | |
| data="Vipitis/Shadertoys-fine", | |
| subset="return_completion", | |
| split="test", # use this to select a subset of the data during testing, perhaps remove later? | |
| args_for_task={ | |
| # "metric": "exact_match", | |
| "input_column": "body", | |
| "label_column": "return_statement", | |
| } | |
| ) | |
| ] | |
| # from: https://github.com/huggingface/evaluate/blob/v0.4.0/src/evaluate/evaluation_suite/__init__.py#LL103C5-L129C27 | |
| def run( | |
| self, model_or_pipeline: Union[str, "Pipeline", Callable, "PreTrainedModel", "TFPreTrainedModel"] = "Vipitis/CodeGPT-small-java-adaptedGPT2-transfer-shadertoys", #not so useful default model? | |
| snippet: int = "" # noqa: F821 | |
| ) -> Dict[str, float]: | |
| self.assert_suite_nonempty() | |
| results_all = [] | |
| for task in self.suite: | |
| task_name = task.data | |
| if task.data_preprocessor: # task requires extra preprocessing is all done inside the Evaluator | |
| ds = load_dataset(task.data, name=task.subset, split=(task.split + f"[:{snippet}]")) | |
| task.data = ds.map(task.data_preprocessor) | |
| task_evaluator = ReturnGenerationEvaluator() #this is the change we make: specify our custom evaluator from above. | |
| args_for_task = task.args_for_task | |
| args_for_task["model_or_pipeline"] = model_or_pipeline | |
| args_for_task["data"] = task.data | |
| args_for_task["subset"] = task.subset | |
| args_for_task["split"] = (task.split + f"[:{snippet}]") #make a downselection of the split via keywordarg in the .run() call? | |
| results = task_evaluator.compute(**args_for_task) | |
| results["model_cp"] = model_or_pipeline #added this to the output, should be useful. But be careful when passed something that is not a string. #TODO: currently the same for all tasks, maybe move to the list? | |
| results["task_name"] = task_name + "/" + task.subset if task.subset else task_name | |
| results["data_preprocessor"] = str(task.data_preprocessor) if task.data_preprocessor is not None else None | |
| results_all.append(results) | |
| return results_all |