Source code for chemdataextractor.nlp.tag

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Tagger implementations. Used for part-of-speech tagging and named entity recognition.

"""
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import division
from abc import ABCMeta, abstractmethod
from collections import defaultdict
import io
import logging
import pickle
import random
import re

import dawg
import pycrfsuite
import six

from ..data import load_model, find_data
from .lexicon import Lexicon


log = logging.getLogger(__name__)


[docs]class BaseTagger(six.with_metaclass(ABCMeta)): """Abstract tagger class from which all taggers inherit. Subclasses must implement a ``tag()`` method. """
[docs] @abstractmethod def tag(self, tokens): """Return a list of (token, tag) tuples for the given list of token strings. :param list(str) tokens: The list of tokens to tag. :rtype: list(tuple(str, str)) """ return
[docs] def tag_sents(self, sentences): """Apply the ``tag`` method to each sentence in ``sentences``.""" return [self.tag(s) for s in sentences]
[docs] def evaluate(self, gold): """Evaluate the accuracy of this tagger using a gold standard corpus. :param list(list(tuple(str, str))) gold: The list of tagged sentences to score the tagger on. :returns: Tagger accuracy value. :rtype: float """ tagged_sents = self.tag_sents([w for (w, t) in sent] for sent in gold) gold_tokens = sum(gold, []) test_tokens = sum(tagged_sents, []) accuracy = float(sum(x == y for x, y in six.moves.zip(gold_tokens, test_tokens))) / len(test_tokens) return accuracy
[docs]class NoneTagger(BaseTagger): """Tag every token with None."""
[docs] def tag(self, tokens): """""" return [(token, None) for token in tokens]
[docs]class RegexTagger(BaseTagger): """Regular Expression Tagger.""" # TODO: I think NLTK RegexTagger has recently been improved to be more like this, so maybe we can just remove this? # We aren't actually using this anywhere because the regex ability in parsers is more flexible... # But may be useful for users that want an easy way to override some other tagger? #: Regular expression patterns in (regex, tag) tuples. patterns = [ (r'^-?[0-9]+(.[0-9]+)?$', 'CD'), # cardinal numbers (r'(The|the|A|a|An|an)$', 'AT'), # articles (r'.*able$', 'JJ'), # adjectives (r'.*ness$', 'NN'), # nouns formed from adjectives (r'.*ly$', 'RB'), # adverbs (r'.*s$', 'NNS'), # plural nouns (r'.*ing$', 'VBG'), # gerunds (r'.*ed$', 'VBD'), # past tense verbs (r'.*', 'NN') # nouns (default) ] #: The lexicon to use lexicon = Lexicon()
[docs] def __init__(self, patterns=None, lexicon=None): """ :param list(tuple(string, string)) patterns: List of (regex, tag) pairs. """ self.patterns = patterns if patterns is not None else self.patterns self.regexes = [(re.compile(pattern, re.I | re.U), tag) for pattern, tag in self.patterns] self.lexicon = lexicon if lexicon is not None else self.lexicon log.debug('%s: Initializing with %s patterns' % (self.__class__.__name__, len(self.patterns)))
[docs] def tag(self, tokens): """Return a list of (token, tag) tuples for a given list of tokens.""" tags = [] for token in tokens: normalized = self.lexicon[token].normalized for regex, tag in self.regexes: if regex.match(normalized): tags.append((token, tag)) break else: tags.append((token, None)) return tags
[docs]class AveragedPerceptron(object): """Averaged Perceptron implementation. Based on implementation by Matthew Honnibal, released under the MIT license. See more: http://spacy.io/blog/part-of-speech-POS-tagger-in-python/ https://github.com/sloria/textblob-aptagger """
[docs] def __init__(self): # Each feature gets its own weight vector, so weights is a dict-of-dicts self.weights = {} self.classes = set() # The accumulated values, for the averaging. Keyed by feature/class tuples self._totals = defaultdict(int) # The last time the feature was changed, for the averaging. Keyed by feature/class tuples self._tstamps = defaultdict(int) # Number of instances seen self.i = 0
[docs] def predict(self, features): """Dot-product the features and current weights and return the best label.""" scores = defaultdict(float) for feat in features: if feat not in self.weights: continue weights = self.weights[feat] for label, weight in weights.items(): scores[label] += weight # Do a secondary alphabetic sort, for stability return max(self.classes, key=lambda label: (scores[label], label))
[docs] def update(self, truth, guess, features): """Update the feature weights.""" def upd_feat(c, f, w, v): param = (f, c) self._totals[param] += (self.i - self._tstamps[param]) * w self._tstamps[param] = self.i self.weights[f][c] = w + v self.i += 1 if truth == guess: return None for f in features: weights = self.weights.setdefault(f, {}) upd_feat(truth, f, weights.get(truth, 0.0), 1.0) upd_feat(guess, f, weights.get(guess, 0.0), -1.0) return None
[docs] def average_weights(self): """Average weights from all iterations.""" for feat, weights in self.weights.items(): new_feat_weights = {} for clas, weight in weights.items(): param = (feat, clas) total = self._totals[param] total += (self.i - self._tstamps[param]) * weight averaged = round(total / float(self.i), 3) if averaged: new_feat_weights[clas] = averaged self.weights[feat] = new_feat_weights return None
[docs] def save(self, path): """Save the pickled model weights.""" with io.open(path, 'wb') as fout: return pickle.dump(dict(self.weights), fout)
[docs] def load(self, path): """Load the pickled model weights.""" with io.open(path, 'rb') as fin: self.weights = pickle.load(fin)
[docs]class ApTagger(six.with_metaclass(ABCMeta, BaseTagger)): """Greedy Averaged Perceptron tagger, based on implementation by Matthew Honnibal, released under the MIT license. See more: http://spacy.io/blog/part-of-speech-POS-tagger-in-python/ https://github.com/sloria/textblob-aptagger """ START = ['-START-', '-START2-'] lexicon = Lexicon() clusters = False
[docs] def __init__(self, model=None, lexicon=None, clusters=None): """""" self.perceptron = AveragedPerceptron() self.tagdict = {} self.classes = set() self.model = model if model is not None else self.model self.lexicon = lexicon if lexicon is not None else self.lexicon self.clusters = clusters if clusters is not None else self.clusters log.debug('%s: Initializing with %s' % (self.__class__.__name__, self.model))
[docs] def tag(self, tokens): """Return a list of (token, tag) tuples for a given list of tokens.""" # Lazy load model first time we tag if not self.classes: self.load(self.model) prev, prev2 = self.START tags = [] for i, token in enumerate(tokens): tag = self.tagdict.get(token) if not tag: features = self._get_features(i, tokens, prev, prev2) tag = self.perceptron.predict(features) tags.append((token, tag)) prev2 = prev prev = tag return tags
[docs] def train(self, sentences, nr_iter=5): """Train a model from sentences. :param sentences: A list of sentences, each of which is a list of (token, tag) tuples. :param nr_iter: Number of training iterations. """ self._make_tagdict(sentences) self.perceptron.classes = self.classes for iter_ in range(nr_iter): c = 0 n = 0 for sentence in sentences: prev, prev2 = self.START context = [t[0] for t in sentence] for i, (token, tag) in enumerate(sentence): guess = self.tagdict.get(token) if not guess: feats = self._get_features(i, context, prev, prev2) guess = self.perceptron.predict(feats) self.perceptron.update(tag, guess, feats) prev2 = prev prev = guess c += guess == tag n += 1 random.shuffle(sentences) log.debug('Iter %s: %s/%s=%s' % (iter_, c, n, (float(c) / n) * 100)) self.perceptron.average_weights()
[docs] def save(self, f): """Save pickled model to file.""" return pickle.dump((self.perceptron.weights, self.tagdict, self.classes, self.clusters), f, protocol=pickle.HIGHEST_PROTOCOL)
[docs] def load(self, model): """Load pickled model.""" self.perceptron.weights, self.tagdict, self.classes, self.clusters = load_model(model) self.perceptron.classes = self.classes
@abstractmethod def _get_features(self, i, context, prev, prev2): """Map tokens into a feature representation.""" pass def _make_tagdict(self, sentences): """Make a tag dictionary for single-tag words.""" counts = defaultdict(lambda: defaultdict(int)) for sent in sentences: for word, tag in sent: counts[word][tag] += 1 self.classes.add(tag) freq_thresh = 20 ambiguity_thresh = 0.97 for word, tag_freqs in counts.items(): tag, mode = max(tag_freqs.items(), key=lambda item: item[1]) n = sum(tag_freqs.values()) # Don't add rare words to the tag dictionary, only add quite unambiguous words if n >= freq_thresh and (float(mode) / n) >= ambiguity_thresh: self.tagdict[word] = tag
[docs]class CrfTagger(BaseTagger): """Tagger that uses Conditional Random Fields (CRF).""" lexicon = Lexicon() clusters = False #: Parameters to pass to training algorithm. See http://www.chokkan.org/software/crfsuite/manual.html params = { # These parameters are valid for the default LBFGS training algorithm. Change if using another. 'c1': 1.0, # Coefficient for L1 regularization (OWL-QN). Default 0. 'c2': 0.001, # Coefficient for L2 regularization. Default 1. 'max_iterations': 50, # The maximum number of iterations for L-BFGS optimization. Default INT_MAX. 'feature.possible_transitions': False, # Force to generate all possible transition features. Default False. 'feature.possible_states': False, # Force to generate all possible state features. Default False. # 'feature.minfreq' : 2, # The minimum frequency of features. Default 0. # 'epsilon' : # Epsilon for testing the convergence of the objective. Default 0.00001. }
[docs] def __init__(self, model=None, lexicon=None, clusters=None, params=None): """""" self.model = model if model is not None else self.model self.lexicon = lexicon if lexicon is not None else self.lexicon self.clusters = clusters if clusters is not None else self.clusters self.params = params if params is not None else self.params self._tagger = pycrfsuite.Tagger() self._loaded_model = False
[docs] def load(self, model): log.debug('Loading %s' % model) self._tagger.open(find_data(model)) self._loaded_model = True
[docs] def tag(self, tokens): """Return a list of ((token, tag), label) tuples for a given list of (token, tag) tuples.""" # Lazy load model first time we tag if not self._loaded_model: self.load(self.model) features = [self._get_features(tokens, i) for i in range(len(tokens))] labels = self._tagger.tag(features) tagged_sent = list(zip(tokens, labels)) return tagged_sent
[docs] def train(self, sentences, model): """Train the CRF tagger using CRFSuite. :params sentences: Annotated sentences. :params model: Path to save pickled model. """ trainer = pycrfsuite.Trainer(verbose=True) trainer.set_params(self.params) for sentence in sentences: tokens, labels = zip(*sentence) features = [self._get_features(tokens, i) for i in range(len(tokens))] trainer.append(features, labels) trainer.train(model) self.load(model)
[docs]class DictionaryTagger(BaseTagger): """Dictionary Tagger. Tag tokens based on inclusion in a DAWG.""" #: The lexicon to use. lexicon = Lexicon() #: DAWG model file path. model = None #: Entity tag. Matches will be tagged like 'B-CM' and 'I-CM' according to IOB scheme. TODO: Optional no B/I? entity = 'CM' #: Delimiters that define where matches are allowed to start or end. delimiters = re.compile(r'(^.|\b|\s|\W|.$)') #: Whether dictionary matches are case sensitive. case_sensitive = False
[docs] def __init__(self, words=None, model=None, entity=None, case_sensitive=None, lexicon=None): """ :param list(list(string)) words: list of words, each of which is a list of tokens. """ self._dawg = dawg.CompletionDAWG() self.model = model if model is not None else self.model self.entity = entity if entity is not None else self.entity self.case_sensitive = case_sensitive if case_sensitive is not None else self.case_sensitive self.lexicon = lexicon if lexicon is not None else self.lexicon self._loaded_model = False if words is not None: self.build(words)
[docs] def load(self, model): """Load pickled DAWG from disk.""" self._dawg.load(find_data(model)) self._loaded_model = True
[docs] def save(self, path): """Save pickled DAWG to disk.""" self._dawg.save(path)
[docs] def build(self, words): """Construct dictionary DAWG from tokenized words.""" words = [self._normalize(tokens) for tokens in words] self._dawg = dawg.CompletionDAWG(words) self._loaded_model = True
def _normalize(self, tokens): """Normalization transform to apply to both dictionary words and input tokens.""" if self.case_sensitive: return ' '.join(self.lexicon[t].normalized for t in tokens) else: return ' '.join(self.lexicon[t].lower for t in tokens)
[docs] def tag(self, tokens): """Return a list of (token, tag) tuples for a given list of tokens.""" if not self._loaded_model: self.load(self.model) tags = [None] * len(tokens) norm = self._normalize(tokens) length = len(norm) # A set of allowed indexes for matches to start or end at delims = [0] + [i for span in [m.span() for m in self.delimiters.finditer(norm)] for i in span] + [length] # Token indices token_at_index = [] for i, t in enumerate(tokens): token_at_index.extend([i] * (len(self.lexicon[t].normalized) + 1)) start_i = 0 end_i = 1 matches = {} next_start = end_i # TODO: This could be a little more efficient by skipping indexes forward to next delim points. while True: current = norm[start_i:end_i] if self._dawg.has_keys_with_prefix(current): # print('%s:%s:%s' % (start_i, end_i, current)) # If the current span is in the dawg, and isn't followed by an alphanumeric character if current in self._dawg and start_i in delims and end_i in delims: # print(current) # Subsequent longer matches with same start_i will overwrite values in matches dict matches[start_i] = (start_i, end_i, current) # We can skip forward to after this match next time we increment start_i next_start = end_i # Increment end_i provided we aren't already at the end of the input if end_i < length: end_i += 1 continue # Increment start_i provided we aren't already at the end of the input start_i = next_start if start_i >= length - 1: break end_i = start_i + 1 next_start = end_i # Apply matches as tags to the relevant tokens for start_i, end_i, current in matches.values(): start_token = token_at_index[start_i] end_token = token_at_index[end_i] # Possible for match to start in 'I' token from prev match. Merge matches by not overwriting to 'B'. if not tags[start_token] == 'I-%s' % self.entity: tags[start_token] = 'B-%s' % self.entity tags[start_token+1:end_token+1] = ['I-%s' % self.entity] * (end_token - start_token) tokentags = list(zip(tokens, tags)) return tokentags