Source code for texar.torch.data.tokenizers.gpt2_tokenizer

# Copyright 2019 The Texar Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Pre-trained GPT-2 tokenizer.

Code structure adapted from:
    `https://github.com/huggingface/pytorch-transformers/blob/master/pytorch_transformers/tokenization_gpt2.py`
"""

from typing import Any, Dict, List, Optional, Tuple

import os
import json
import regex as re

from texar.torch.modules.pretrained.gpt2 import PretrainedGPT2Mixin
from texar.torch.data.tokenizers.tokenizer_base import TokenizerBase
from texar.torch.data.tokenizers.gpt2_tokenizer_utils import \
    bytes_to_unicode, get_pairs

__all__ = [
    'GPT2Tokenizer',
]


[docs]class GPT2Tokenizer(TokenizerBase, PretrainedGPT2Mixin): r"""Pre-trained GPT2 Tokenizer. Args: pretrained_model_name (optional): a `str`, the name of pre-trained model (e.g., `117M`). Please refer to :class:`~texar.torch.modules.PretrainedGPT2Mixin` for all supported models. If None, the model name in :attr:`hparams` is used. cache_dir (optional): the path to a folder in which the pre-trained models will be cached. If `None` (default), a default directory (``texar_data`` folder under user's home directory) will be used. hparams (dict or HParams, optional): Hyperparameters. Missing hyperparameter will be set to default values. See :meth:`default_hparams` for the hyperparameter structure and default values. """ _IS_PRETRAINED = True _MAX_INPUT_SIZE = { 'gpt2-small': 1024, 'gpt2-medium': 1024, 'gpt2-large': 1024, 'gpt2-xl': 1024, } _DEPRECATED_MAX_INPUT_SIZE = { '117M': 1024, '345M': 1024, } _MAX_INPUT_SIZE.update(_DEPRECATED_MAX_INPUT_SIZE) _VOCAB_FILE_NAMES = { 'vocab_file': 'encoder.json', 'merges_file': 'vocab.bpe', } _VOCAB_FILE_MAP = { 'vocab_file': { 'gpt2-small': 'encoder.json', 'gpt2-medium': 'encoder.json', 'gpt2-large': 'encoder.json', 'gpt2-xl': 'encoder.json', '117M': 'encoder.json', '345M': 'encoder.json', }, 'merges_file': { 'gpt2-small': 'vocab.bpe', 'gpt2-medium': 'vocab.bpe', 'gpt2-large': 'vocab.bpe', 'gpt2-xl': 'vocab.bpe', '117M': 'vocab.bpe', '345M': 'vocab.bpe', }, } def __init__(self, pretrained_model_name: Optional[str] = None, cache_dir: Optional[str] = None, hparams=None): self.load_pretrained_config(pretrained_model_name, cache_dir, hparams) super().__init__(hparams=None) self.config = { 'errors': self.hparams['errors'] } if self.pretrained_model_dir is not None: assert self.pretrained_model_name is not None vocab_file = os.path.join(self.pretrained_model_dir, self._VOCAB_FILE_MAP['vocab_file'] [self.pretrained_model_name]) merges_file = os.path.join(self.pretrained_model_dir, self._VOCAB_FILE_MAP['merges_file'] [self.pretrained_model_name]) assert pretrained_model_name is not None if self._MAX_INPUT_SIZE.get(pretrained_model_name): self.max_len = self._MAX_INPUT_SIZE[pretrained_model_name] else: vocab_file = self.hparams['vocab_file'] merges_file = self.hparams['merges_file'] if self.hparams.get('max_len'): self.max_len = self.hparams['max_len'] if not os.path.isfile(vocab_file): raise ValueError("Can't find a vocabulary file at path " "'{}".format(vocab_file)) if not os.path.isfile(merges_file): raise ValueError("Can't find a merges file at path " "'{}".format(merges_file)) with open(vocab_file, encoding="utf-8") as fp: self.encoder = json.load(fp) self.decoder = {v: k for k, v in self.encoder.items()} self.errors = self.hparams["errors"] # how to handle errors in decoding self.byte_encoder = bytes_to_unicode() self.byte_decoder = {v: k for k, v in self.byte_encoder.items()} with open(merges_file, encoding='utf-8') as fp: bpe_data = fp.read().split('\n')[1:-1] bpe_merges = [tuple(merge.split()) for merge in bpe_data] self.bpe_ranks = dict(zip(bpe_merges, range(len(bpe_merges)))) self.cache: Dict[str, str] = {} # Should haved added re.IGNORECASE so BPE merges can happen for # capitalized versions of contractions self.pat = re.compile(r"""'s|'t|'re|'ve|'m|'ll|'d| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+""") # pylint: disable=line-too-long def _map_text_to_token(self, text: str) -> List[str]: # type: ignore r"""Tokenize a string. """ bpe_tokens: List[str] = [] for token in re.findall(self.pat, text): token = ''.join(self.byte_encoder[b] for b in token.encode('utf-8')) bpe_tokens.extend( bpe_token for bpe_token in self._bpe(token).split(' ')) return bpe_tokens
[docs] def save_vocab(self, save_dir: str) -> Tuple[str, str]: r"""Save the tokenizer vocabulary and merge files to a directory.""" if not os.path.isdir(save_dir): raise ValueError("Vocabulary path ({}) should be a " "directory".format(save_dir)) vocab_file = os.path.join(save_dir, self._VOCAB_FILE_NAMES['vocab_file']) merge_file = os.path.join(save_dir, self._VOCAB_FILE_NAMES['merges_file']) with open(vocab_file, 'w', encoding='utf-8') as f: f.write(json.dumps(self.encoder, ensure_ascii=False)) index = 0 with open(merge_file, "w", encoding="utf-8") as writer: writer.write(u'#version: 0.2\n') for bpe_tokens, token_index in sorted(self.bpe_ranks.items(), key=lambda kv: kv[1]): if index != token_index: print("Saving vocabulary to {}: BPE merge indices are " "not consecutive. Please check that the tokenizer " "is not corrupted!".format(merge_file)) index = token_index writer.write(' '.join(bpe_tokens) + u'\n') index += 1 return (vocab_file, merge_file)
def _bpe(self, token: str) -> str: if token in self.cache: return self.cache[token] word = tuple(token) pairs = get_pairs(word) if not pairs: return token while True: bigram = min(pairs, key=lambda pair: self.bpe_ranks.get( pair, float('inf'))) if bigram not in self.bpe_ranks: break first, second = bigram new_word: List[str] = [] i = 0 while i < len(word): try: j = word.index(first, i) new_word.extend(word[i:j]) i = j except ValueError: new_word.extend(word[i:]) break if word[i] == first and i < len(word) - 1 \ and word[i + 1] == second: new_word.append(first + second) i += 2 else: new_word.append(word[i]) i += 1 new_word = tuple(new_word) word = new_word if len(word) == 1: break pairs = get_pairs(word) word = ' '.join(word) self.cache[token] = word return word @property def vocab_size(self) -> int: return len(self.encoder) def _map_token_to_id(self, token: str) -> int: r"""Maps a token to an id using the vocabulary.""" return self.encoder.get(token, self.encoder.get(self.unk_token)) def _map_id_to_token(self, index: int) -> str: r"""Maps an id to a token using the vocabulary.""" token = self.decoder.get(index) assert isinstance(token, str) return token
[docs] def map_token_to_text(self, tokens: List[str]) -> str: r"""Maps a sequence of tokens (string) in a single string.""" text = ''.join(tokens) text = bytearray([self.byte_decoder[c] for c in text]).decode( 'utf-8', errors=self.errors) return text
[docs] def encode_text( # type: ignore self, text: str, max_seq_length: Optional[int] = None, append_eos_token: bool = True) -> Tuple[List[int], int]: r"""Adds special tokens to a sequence and computes the corresponding sequence length for GPT2 specific tasks. The sequence will be truncated if its length is larger than ``max_seq_length``. A GPT2 sequence has the following format: `[bos_token]` X `[eos_token]` `[pad_token]` Args: text: Input text. max_seq_length: Maximum sequence length. append_eos_token: Whether to append ``eos_token`` after the sequence. Returns: A tuple of `(input_ids, seq_len)`, where - ``input_ids``: A list of input token ids with added special tokens. - ``seq_len``: The sequence length. """ if max_seq_length is None: max_seq_length = self.max_len token_ids = self.map_text_to_id(text) assert isinstance(token_ids, list) bos_token_id = self._map_token_to_id(self.bos_token) eos_token_id = self._map_token_to_id(self.eos_token) pad_token_id = self._map_token_to_id(self.pad_token) if append_eos_token: input_ids = token_ids[:max_seq_length - 2] input_ids = [bos_token_id] + input_ids + [eos_token_id] else: input_ids = token_ids[:max_seq_length - 1] input_ids = [bos_token_id] + input_ids seq_len = len(input_ids) # Pad up to the maximum sequence length. input_ids = input_ids + [pad_token_id] * (max_seq_length - seq_len) assert len(input_ids) == max_seq_length return input_ids, seq_len
[docs] @staticmethod def default_hparams() -> Dict[str, Any]: r"""Returns a dictionary of hyperparameters with default values. * The tokenizer is determined by the constructor argument :attr:`pretrained_model_name` if it's specified. In this case, `hparams` are ignored. * Otherwise, the tokenizer is determined by `hparams['pretrained_model_name']` if it's specified. All other configurations in `hparams` are ignored. * If the above two are `None`, the tokenizer is defined by the configurations in `hparams`. .. code-block:: python { "pretrained_model_name": "117M", "vocab_file": None, "merges_file": None, "max_len": 1024, "bos_token": "<|endoftext|>", "eos_token": "<|endoftext|>", "unk_token": "<|endoftext|>", "pad_token": "<|endoftext|>", "errors": "replace", "name": "gpt2_tokenizer", } Here: `"pretrained_model_name"`: str or None The name of the pre-trained GPT2 model. `"vocab_file"`: str or None The path to a vocabulary json file mapping tokens to ids. `"merges_file"`: str or None The path to a merges file. `"max_len"`: int The maximum sequence length that this model might ever be used with. `"bos_token"`: str Beginning of sentence token `"eos_token"`: str End of sentence token `"unk_token"`: str Unknown token `"pad_token"`: str Padding token `"errors"`: str Response when mapping tokens to text fails. The possible values are `ignore`, `replace`, and `strict`. `"name"`: str Name of the tokenizer. """ return { 'pretrained_model_name': '117M', 'vocab_file': None, 'merges_file': None, 'max_len': 1024, 'bos_token': '<|endoftext|>', 'eos_token': '<|endoftext|>', 'unk_token': '<|endoftext|>', 'pad_token': '<|endoftext|>', 'errors': 'replace', 'name': 'gpt2_tokenizer', '@no_typecheck': ['pretrained_model_name'], }
@classmethod def _transform_config(cls, pretrained_model_name: str, cache_dir: str): r"""Returns the configuration of the pre-trained GPT2 tokenizer.""" return { 'vocab_file': None, 'merges_file': None, 'max_len': 1024, 'bos_token': '<|endoftext|>', 'eos_token': '<|endoftext|>', 'unk_token': '<|endoftext|>', 'pad_token': '<|endoftext|>', 'errors': 'replace', }