1
2
3
4
5
6
7
8
9
10
11
12
13 """
14 Hidden Markov Models (HMMs) largely used to assign the correct label sequence
15 to sequential data or assess the probability of a given label and data
16 sequence. These models are finite state machines characterised by a number of
17 states, transitions between these states, and output symbols emitted while in
18 each state. The HMM is an extension to the Markov chain, where each state
19 corresponds deterministically to a given event. In the HMM the observation is
20 a probabilistic function of the state. HMMs share the Markov chain's
21 assumption, being that the probability of transition from one state to another
22 only depends on the current state - i.e. the series of states that led to the
23 current state are not used. They are also time invariant.
24
25 The HMM is a directed graph, with probability weighted edges (representing the
26 probability of a transition between the source and sink states) where each
27 vertex emits an output symbol when entered. The symbol (or observation) is
28 non-deterministically generated. For this reason, knowing that a sequence of
29 output observations was generated by a given HMM does not mean that the
30 corresponding sequence of states (and what the current state is) is known.
31 This is the 'hidden' in the hidden markov model.
32
33 Formally, a HMM can be characterised by:
34 - the output observation alphabet. This is the set of symbols which may be
35 observed as output of the system.
36 - the set of states.
37 - the transition probabilities M{a_{ij} = P(s_t = j | s_{t-1} = i)}. These
38 represent the probability of transition to each state from a given
39 state.
40 - the output probability matrix M{b_i(k) = P(X_t = o_k | s_t = i)}. These
41 represent the probability of observing each symbol in a given state.
42 - the initial state distribution. This gives the probability of starting
43 in each state.
44
45 To ground this discussion, take a common NLP application, part-of-speech (POS)
46 tagging. An HMM is desirable for this task as the highest probability tag
47 sequence can be calculated for a given sequence of word forms. This differs
48 from other tagging techniques which often tag each word individually, seeking
49 to optimise each individual tagging greedily without regard to the optimal
50 combination of tags for a larger unit, such as a sentence. The HMM does this
51 with the Viterbi algorithm, which efficiently computes the optimal path
52 through the graph given the sequence of words forms.
53
54 In POS tagging the states usually have a 1:1 correspondence with the tag
55 alphabet - i.e. each state represents a single tag. The output observation
56 alphabet is the set of word forms (the lexicon), and the remaining three
57 parameters are derived by a training regime. With this information the
58 probability of a given sentence can be easily derived, by simply summing the
59 probability of each distinct path through the model. Similarly, the highest
60 probability tagging sequence can be derived with the Viterbi algorithm,
61 yielding a state sequence which can be mapped into a tag sequence.
62
63 This discussion assumes that the HMM has been trained. This is probably the
64 most difficult task with the model, and requires either MLE estimates of the
65 parameters or unsupervised learning using the Baum-Welch algorithm, a variant
66 of EM.
67 """
68
69 from nltk_lite.probability import *
70 from numpy import *
71 import re
72
73
74 _NINF = float('-1e300')
75
76 _TEXT = 0
77 _TAG = 1
78
80 """
81 Hidden Markov model class, a generative model for labelling sequence data.
82 These models define the joint probability of a sequence of symbols and
83 their labels (state transitions) as the product of the starting state
84 probability, the probability of each state transition, and the probability
85 of each observation being generated from each state. This is described in
86 more detail in the module documentation.
87
88 This implementation is based on the HMM description in Chapter 8, Huang,
89 Acero and Hon, Spoken Language Processing.
90 """
91 - def __init__(self, symbols, states, transitions, outputs, priors):
92 """
93 Creates a hidden markov model parametised by the the states,
94 transition probabilities, output probabilities and priors.
95
96 @param symbols: the set of output symbols (alphabet)
97 @type symbols: (seq) of any
98 @param states: a set of states representing state space
99 @type states: seq of any
100 @param transitions: transition probabilities; Pr(s_i | s_j)
101 is the probability of transition from state i
102 given the model is in state_j
103 @type transitions: C{ConditionalProbDistI}
104 @param outputs: output probabilities; Pr(o_k | s_i) is the
105 probability of emitting symbol k when entering
106 state i
107 @type outputs: C{ConditionalProbDistI}
108 @param priors: initial state distribution; Pr(s_i) is the
109 probability of starting in state i
110 @type priors: C{ProbDistI}
111 """
112
113 self._states = states
114 self._transitions = transitions
115 self._symbols = symbols
116 self._outputs = outputs
117 self._priors = priors
118 self._cache = None
119
121 """
122 Returns the probability of the given symbol sequence. If the sequence
123 is labelled, then returns the joint probability of the symbol, state
124 sequence. Otherwise, uses the forward algorithm to find the
125 probability over all label sequences.
126
127 @return: the probability of the sequence
128 @rtype: float
129 @param sequence: the sequence of symbols which must contain the TEXT
130 property, and optionally the TAG property
131 @type sequence: Token
132 """
133 return exp(self.log_probability(sequence))
134
136 """
137 Returns the log-probability of the given symbol sequence. If the
138 sequence is labelled, then returns the joint log-probability of the
139 symbol, state sequence. Otherwise, uses the forward algorithm to find
140 the log-probability over all label sequences.
141
142 @return: the log-probability of the sequence
143 @rtype: float
144 @param sequence: the sequence of symbols which must contain the TEXT
145 property, and optionally the TAG property
146 @type sequence: Token
147 """
148
149 T = len(sequence)
150 N = len(self._states)
151
152 if T > 0 and sequence[0][_TAG]:
153 last_state = sequence[0][_TAG]
154 p = self._priors.logprob(last_state) + \
155 self._outputs[last_state].logprob(sequence[0][_TEXT])
156 for t in range(1, T):
157 state = sequence[t][_TAG]
158 p += self._transitions[last_state].logprob(state) + \
159 self._outputs[state].logprob(sequence[t][_TEXT])
160 last_state = state
161 return p
162 else:
163 alpha = self._forward_probability(sequence)
164 p = _log_add(*alpha[T-1, :])
165 return p
166
167 - def tag(self, unlabeled_sequence):
168 """
169 Tags the sequence with the highest probability state sequence. This
170 uses the best_path method to find the Viterbi path.
171
172 @return: a labelled sequence of symbols
173 @rtype: list
174 @param unlabeled_sequence: the sequence of unlabeled symbols
175 @type unlabeled_sequence: list
176 """
177
178 path = self.best_path(unlabeled_sequence)
179 for i in range(len(path)):
180 unlabeled_sequence[i] = (unlabeled_sequence[i][_TEXT], path[i])
181 return unlabeled_sequence
182
184 """
185 @return: the log probability of the symbol being observed in the given
186 state
187 @rtype: float
188 """
189 return self._outputs[state].logprob(symbol)
190
192 if not self._cache:
193 N = len(self._states)
194 M = len(self._symbols)
195 P = zeros(N, float32)
196 X = zeros((N, N), float32)
197 O = zeros((N, M), float32)
198 for i in range(N):
199 si = self._states[i]
200 P[i] = self._priors.logprob(si)
201 for j in range(N):
202 X[i, j] = self._transitions[si].logprob(self._states[j])
203 for k in range(M):
204 O[i, k] = self._outputs[si].logprob(self._symbols[k])
205 S = {}
206 for k in range(M):
207 S[self._symbols[k]] = k
208 self._cache = (P, O, X, S)
209
211 """
212 Returns the state sequence of the optimal (most probable) path through
213 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic
214 programming.
215
216 @return: the state sequence
217 @rtype: sequence of any
218 @param unlabeled_sequence: the sequence of unlabeled symbols
219 @type unlabeled_sequence: list
220 """
221
222 symbols = [token[_TEXT] for token in unlabeled_sequence]
223 T = len(symbols)
224 N = len(self._states)
225 self._create_cache()
226 P, O, X, S = self._cache
227
228 V = zeros((T, N), float32)
229 B = ones((T, N), int) * -1
230
231 V[0] = P + O[:, S[symbols[0]]]
232 for t in range(1, T):
233 for j in range(N):
234 vs = V[t-1, :] + X[:, j]
235 best = argmax(vs)
236 V[t, j] = vs[best] + O[j, S[symbols[t]]]
237 B[t, j] = best
238
239 current = argmax(V[T-1,:])
240 sequence = [current]
241 for t in range(T-1, 0, -1):
242 last = B[t, current]
243 sequence.append(last)
244 current = last
245
246 sequence.reverse()
247 return map(self._states.__getitem__, sequence)
248
250 """
251 Returns the state sequence of the optimal (most probable) path through
252 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic
253 programming. This uses a simple, direct method, and is included for
254 teaching purposes.
255
256 @return: the state sequence
257 @rtype: sequence of any
258 @param unlabeled_sequence: the sequence of unlabeled symbols
259 @type unlabeled_sequence: list
260 """
261
262 T = len(unlabeled_sequence)
263 N = len(self._states)
264 V = zeros((T, N), float64)
265 B = {}
266
267
268 symbol = unlabeled_sequence[0][_TEXT]
269 for i, state in enumerate(self._states):
270 V[0, i] = self._priors.logprob(state) + \
271 self._output_logprob(state, symbol)
272 B[0, state] = None
273
274
275 for t in range(1, T):
276 symbol = unlabeled_sequence[t][_TEXT]
277 for j in range(N):
278 sj = self._states[j]
279 best = None
280 for i in range(N):
281 si = self._states[i]
282 va = V[t-1, i] + self._transitions[si].logprob(sj)
283 if not best or va > best[0]:
284 best = (va, si)
285 V[t, j] = best[0] + self._output_logprob(sj, symbol)
286 B[t, sj] = best[1]
287
288
289 best = None
290 for i in range(N):
291 val = V[T-1, i]
292 if not best or val > best[0]:
293 best = (val, self._states[i])
294
295
296 current = best[1]
297 sequence = [current]
298 for t in range(T-1, 0, -1):
299 last = B[t, current]
300 sequence.append(last)
301 current = last
302
303 sequence.reverse()
304 return sequence
305
307 """
308 Randomly sample the HMM to generate a sentence of a given length. This
309 samples the prior distribution then the observation distribution and
310 transition distribution for each subsequent observation and state.
311 This will mostly generate unintelligible garbage, but can provide some
312 amusement.
313
314 @return: the randomly created state/observation sequence,
315 generated according to the HMM's probability
316 distributions. The SUBTOKENS have TEXT and TAG
317 properties containing the observation and state
318 respectively.
319 @rtype: list
320 @param rng: random number generator
321 @type rng: Random (or any object with a random() method)
322 @param length: desired output length
323 @type length: int
324 """
325
326
327 tokens = []
328 state = self._sample_probdist(self._priors, rng.random(), self._states)
329 symbol = self._sample_probdist(self._outputs[state],
330 rng.random(), self._symbols)
331 tokens.append((symbol, state))
332
333 for i in range(1, length):
334
335 state = self._sample_probdist(self._transitions[state],
336 rng.random(), self._states)
337 symbol = self._sample_probdist(self._outputs[state],
338 rng.random(), self._symbols)
339 tokens.append((symbol, state))
340
341 return tokens
342
344 cum_p = 0
345 for sample in samples:
346 add_p = probdist.prob(sample)
347 if cum_p <= p <= cum_p + add_p:
348 return sample
349 cum_p += add_p
350 raise Exception('Invalid probability distribution - does not sum to one')
351
352 - def entropy(self, unlabeled_sequence):
353 """
354 Returns the entropy over labellings of the given sequence. This is
355 given by:
356
357 H(O) = - sum_S Pr(S | O) log Pr(S | O)
358
359 where the summation ranges over all state sequences, S. Let M{Z =
360 Pr(O) = sum_S Pr(S, O)} where the summation ranges over all state
361 sequences and O is the observation sequence. As such the entropy can
362 be re-expressed as:
363
364 H = - sum_S Pr(S | O) log [ Pr(S, O) / Z ]
365 = log Z - sum_S Pr(S | O) log Pr(S, 0)
366 = log Z - sum_S Pr(S | O) [ log Pr(S_0) + sum_t Pr(S_t | S_{t-1})
367 + sum_t Pr(O_t | S_t) ]
368
369 The order of summation for the log terms can be flipped, allowing
370 dynamic programming to be used to calculate the entropy. Specifically,
371 we use the forward and backward probabilities (alpha, beta) giving:
372
373 H = log Z - sum_s0 alpha_0(s0) beta_0(s0) / Z * log Pr(s0)
374 + sum_t,si,sj alpha_t(si) Pr(sj | si) Pr(O_t+1 | sj) beta_t(sj)
375 / Z * log Pr(sj | si)
376 + sum_t,st alpha_t(st) beta_t(st) / Z * log Pr(O_t | st)
377
378 This simply uses alpha and beta to find the probabilities of partial
379 sequences, constrained to include the given state(s) at some point in
380 time.
381 """
382
383 T = len(unlabeled_sequence)
384 N = len(self._states)
385
386 alpha = self._forward_probability(unlabeled_sequence)
387 beta = self._backward_probability(unlabeled_sequence)
388 normalisation = _log_add(*alpha[T-1, :])
389
390 entropy = normalisation
391
392
393 for i, state in enumerate(self._states):
394 p = exp(alpha[0, i] + beta[0, i] - normalisation)
395 entropy -= p * self._priors.logprob(state)
396
397
398
399 for t0 in range(T - 1):
400 t1 = t0 + 1
401 for i0, s0 in enumerate(self._states):
402 for i1, s1 in enumerate(self._states):
403 p = exp(alpha[t0, i0] + self._transitions[s0].logprob(s1) +
404 self._outputs[s1].logprob(unlabeled_sequence[t1][_TEXT]) +
405 beta[t1, i1] - normalisation)
406 entropy -= p * self._transitions[s0].logprob(s1)
407
408
409
410 for t in range(T):
411 for i, state in enumerate(self._states):
412 p = exp(alpha[t, i] + beta[t, i] - normalisation)
413 entropy -= p * self._outputs[state].logprob(unlabeled_sequence[t][_TEXT])
414
415
416 return entropy
417
419 """
420 Returns the pointwise entropy over the possible states at each
421 position in the chain, given the observation sequence.
422 """
423
424 T = len(unlabeled_sequence)
425 N = len(self._states)
426
427 alpha = self._forward_probability(unlabeled_sequence)
428 beta = self._backward_probability(unlabeled_sequence)
429 normalisation = _log_add(*alpha[T-1, :])
430
431 entropies = zeros(T, float64)
432 probs = zeros(N, float64)
433 for t in range(T):
434 for s in range(N):
435 probs[s] = alpha[t, s] + beta[t, s] - normalisation
436
437 for s in range(N):
438 entropies[t] -= exp(probs[s]) * probs[s]
439
440 return entropies
441
443 T = len(unlabeled_sequence)
444 N = len(self._states)
445
446 labellings = [[state] for state in self._states]
447 for t in range(T - 1):
448 current = labellings
449 labellings = []
450 for labelling in current:
451 for state in self._states:
452 labellings.append(labelling + [state])
453
454 log_probs = []
455 for labelling in labellings:
456 labelled_sequence = unlabeled_sequence[:]
457 for t, label in enumerate(labelling):
458 labelled_sequence[t] = (labelled_sequence[t][_TEXT], label)
459 lp = self.log_probability(labelled_sequence)
460 log_probs.append(lp)
461 normalisation = _log_add(*log_probs)
462
463
464
465
466
467
468
469
470
471 entropy = 0
472 for lp in log_probs:
473 lp -= normalisation
474 entropy -= exp(lp) * lp
475
476 return entropy
477
479 T = len(unlabeled_sequence)
480 N = len(self._states)
481
482 labellings = [[state] for state in self._states]
483 for t in range(T - 1):
484 current = labellings
485 labellings = []
486 for labelling in current:
487 for state in self._states:
488 labellings.append(labelling + [state])
489
490 log_probs = []
491 for labelling in labellings:
492 labelled_sequence = unlabeled_sequence[:]
493 for t, label in enumerate(labelling):
494 labelled_sequence[t] = (labelled_sequence[t][_TEXT], label)
495 lp = self.log_probability(labelled_sequence)
496 log_probs.append(lp)
497
498 normalisation = _log_add(*log_probs)
499
500 probabilities = zeros((T, N), float64)
501 probabilities[:] = _NINF
502 for labelling, lp in zip(labellings, log_probs):
503 lp -= normalisation
504 for t, label in enumerate(labelling):
505 index = self._states.index(label)
506 probabilities[t, index] = _log_add(probabilities[t, index], lp)
507
508 entropies = zeros(T, float64)
509 for t in range(T):
510 for s in range(N):
511 entropies[t] -= exp(probabilities[t, s]) * probabilities[t, s]
512
513 return entropies
514
516 """
517 Return the forward probability matrix, a T by N array of
518 log-probabilities, where T is the length of the sequence and N is the
519 number of states. Each entry (t, s) gives the probability of being in
520 state s at time t after observing the partial symbol sequence up to
521 and including t.
522
523 @param unlabeled_sequence: the sequence of unlabeled symbols
524 @type unlabeled_sequence: list
525 @return: the forward log probability matrix
526 @rtype: array
527 """
528 T = len(unlabeled_sequence)
529 N = len(self._states)
530 alpha = zeros((T, N), float64)
531
532 symbol = unlabeled_sequence[0][_TEXT]
533 for i, state in enumerate(self._states):
534 alpha[0, i] = self._priors.logprob(state) + \
535 self._outputs[state].logprob(symbol)
536 for t in range(1, T):
537 symbol = unlabeled_sequence[t][_TEXT]
538 for i, si in enumerate(self._states):
539 alpha[t, i] = _NINF
540 for j, sj in enumerate(self._states):
541 alpha[t, i] = _log_add(alpha[t, i], alpha[t-1, j] +
542 self._transitions[sj].logprob(si))
543 alpha[t, i] += self._outputs[si].logprob(symbol)
544
545 return alpha
546
548 """
549 Return the backward probability matrix, a T by N array of
550 log-probabilities, where T is the length of the sequence and N is the
551 number of states. Each entry (t, s) gives the probability of being in
552 state s at time t after observing the partial symbol sequence from t
553 .. T.
554
555 @return: the backward log probability matrix
556 @rtype: array
557 @param unlabeled_sequence: the sequence of unlabeled symbols
558 @type unlabeled_sequence: list
559 """
560 T = len(unlabeled_sequence)
561 N = len(self._states)
562 beta = zeros((T, N), float64)
563
564
565 beta[T-1, :] = log(1)
566
567
568 for t in range(T-2, -1, -1):
569 symbol = unlabeled_sequence[t+1][_TEXT]
570 for i, si in enumerate(self._states):
571 beta[t, i] = _NINF
572 for j, sj in enumerate(self._states):
573 beta[t, i] = _log_add(beta[t, i],
574 self._transitions[si].logprob(sj) +
575 self._outputs[sj].logprob(symbol) +
576 beta[t + 1, j])
577
578 return beta
579
581 return '<HiddenMarkovModel %d states and %d output symbols>' \
582 % (len(self._states), len(self._symbols))
583
585 """
586 Algorithms for learning HMM parameters from training data. These include
587 both supervised learning (MLE) and unsupervised learning (Baum-Welch).
588 """
589 - def __init__(self, states=None, symbols=None):
590 """
591 Creates an HMM trainer to induce an HMM with the given states and
592 output symbol alphabet. A supervised and unsupervised training
593 method may be used. If either of the states or symbols are not given,
594 these may be derived from supervised training.
595
596 @param states: the set of state labels
597 @type states: sequence of any
598 @param symbols: the set of observation symbols
599 @type symbols: sequence of any
600 """
601 if states:
602 self._states = states
603 else:
604 self._states = []
605 if symbols:
606 self._symbols = symbols
607 else:
608 self._symbols = []
609
610 - def train(self, labelled_sequences=None, unlabeled_sequences=None,
611 **kwargs):
612 """
613 Trains the HMM using both (or either of) supervised and unsupervised
614 techniques.
615
616 @return: the trained model
617 @rtype: HiddenMarkovModel
618 @param labelled_sequences: the supervised training data, a set of
619 labelled sequences of observations
620 @type labelled_sequences: list
621 @param unlabeled_sequences: the unsupervised training data, a set of
622 sequences of observations
623 @type unlabeled_sequences: list
624 @param kwargs: additional arguments to pass to the training methods
625 """
626 assert labelled_sequences or unlabeled_sequences
627 model = None
628 if labelled_sequences:
629 model = self.train_supervised(labelled_sequences, **kwargs)
630 if unlabeled_sequences:
631 if model: kwargs['model'] = model
632 model = self.train_unsupervised(unlabeled_sequences, **kwargs)
633 return model
634
636 """
637 Trains the HMM using the Baum-Welch algorithm to maximise the
638 probability of the data sequence. This is a variant of the EM
639 algorithm, and is unsupervised in that it doesn't need the state
640 sequences for the symbols. The code is based on 'A Tutorial on Hidden
641 Markov Models and Selected Applications in Speech Recognition',
642 Lawrence Rabiner, IEEE, 1989.
643
644 @return: the trained model
645 @rtype: HiddenMarkovModel
646 @param unlabeled_sequences: the training data, a set of
647 sequences of observations
648 @type unlabeled_sequences: list
649 @param kwargs: may include the following parameters::
650 model - a HiddenMarkovModel instance used to begin the Baum-Welch
651 algorithm
652 max_iterations - the maximum number of EM iterations
653 convergence_logprob - the maximum change in log probability to
654 allow convergence
655 """
656
657 N = len(self._states)
658 M = len(self._symbols)
659 symbol_dict = dict((self._symbols[i], i) for i in range(M))
660
661
662
663 model = kwargs.get('model')
664 if not model:
665 priors = UniformProbDist(self._states)
666 transitions = DictionaryConditionalProbDist(
667 dict((state, UniformProbDist(self._states))
668 for state in self._states))
669 output = DictionaryConditionalProbDist(
670 dict((state, UniformProbDist(self._symbols))
671 for state in self._states))
672 model = HiddenMarkovModel(self._symbols, self._states,
673 transitions, output, priors)
674
675
676 model._priors = MutableProbDist(model._priors, self._states)
677 model._transitions = DictionaryConditionalProbDist(
678 dict((s, MutableProbDist(model._transitions[s], self._states))
679 for s in self._states))
680 model._outputs = DictionaryConditionalProbDist(
681 dict((s, MutableProbDist(model._outputs[s], self._symbols))
682 for s in self._states))
683
684
685 converged = False
686 last_logprob = None
687 iteration = 0
688 max_iterations = kwargs.get('max_iterations', 1000)
689 epsilon = kwargs.get('convergence_logprob', 1e-6)
690 while not converged and iteration < max_iterations:
691 A_numer = ones((N, N), float64) * _NINF
692 B_numer = ones((N, M), float64) * _NINF
693 A_denom = ones(N, float64) * _NINF
694 B_denom = ones(N, float64) * _NINF
695
696 logprob = 0
697 for sequence in unlabeled_sequences:
698 sequence = list(sequence)
699 if not sequence:
700 continue
701
702
703 alpha = model._forward_probability(sequence)
704 beta = model._backward_probability(sequence)
705
706
707 T = len(sequence)
708 lpk = _log_add(*alpha[T-1, :])
709 logprob += lpk
710
711
712
713
714 local_A_numer = ones((N, N), float64) * _NINF
715 local_B_numer = ones((N, M), float64) * _NINF
716 local_A_denom = ones(N, float64) * _NINF
717 local_B_denom = ones(N, float64) * _NINF
718
719
720 for t in range(T):
721 x = sequence[t][_TEXT]
722 if t < T - 1:
723 xnext = sequence[t+1][_TEXT]
724 xi = symbol_dict[x]
725 for i in range(N):
726 si = self._states[i]
727 if t < T - 1:
728 for j in range(N):
729 sj = self._states[j]
730 local_A_numer[i, j] = \
731 _log_add(local_A_numer[i, j],
732 alpha[t, i] +
733 model._transitions[si].logprob(sj) +
734 model._outputs[sj].logprob(xnext) +
735 beta[t+1, j])
736 local_A_denom[i] = _log_add(local_A_denom[i],
737 alpha[t, i] + beta[t, i])
738 else:
739 local_B_denom[i] = _log_add(local_A_denom[i],
740 alpha[t, i] + beta[t, i])
741
742 local_B_numer[i, xi] = _log_add(local_B_numer[i, xi],
743 alpha[t, i] + beta[t, i])
744
745
746 for i in range(N):
747 for j in range(N):
748 A_numer[i, j] = _log_add(A_numer[i, j],
749 local_A_numer[i, j] - lpk)
750 for k in range(M):
751 B_numer[i, k] = _log_add(B_numer[i, k],
752 local_B_numer[i, k] - lpk)
753
754 A_denom[i] = _log_add(A_denom[i], local_A_denom[i] - lpk)
755 B_denom[i] = _log_add(B_denom[i], local_B_denom[i] - lpk)
756
757
758
759 for i in range(N):
760 si = self._states[i]
761 for j in range(N):
762 sj = self._states[j]
763 model._transitions[si].update(sj, A_numer[i,j] - A_denom[i])
764 for k in range(M):
765 ok = self._symbols[k]
766 model._outputs[si].update(ok, B_numer[i,k] - B_denom[i])
767
768
769
770
771 if iteration > 0 and abs(logprob - last_logprob) < epsilon:
772 converged = True
773
774 print 'iteration', iteration, 'logprob', logprob
775 iteration += 1
776 last_logprob = logprob
777
778 return model
779
781 """
782 Supervised training maximising the joint probability of the symbol and
783 state sequences. This is done via collecting frequencies of
784 transitions between states, symbol observations while within each
785 state and which states start a sentence. These frequency distributions
786 are then normalised into probability estimates, which can be
787 smoothed if desired.
788
789 @return: the trained model
790 @rtype: HiddenMarkovModel
791 @param labelled_sequences: the training data, a set of
792 labelled sequences of observations
793 @type labelled_sequences: list
794 @param kwargs: may include an 'estimator' parameter, a function taking
795 a C{FreqDist} and a number of bins and returning a C{ProbDistI};
796 otherwise a MLE estimate is used
797 """
798
799
800 estimator = kwargs.get('estimator')
801 if estimator == None:
802 estimator = lambda fdist, bins: MLEProbDist(fdist)
803
804
805
806 starting = FreqDist()
807 transitions = ConditionalFreqDist()
808 outputs = ConditionalFreqDist()
809 for sequence in labelled_sequences:
810 lasts = None
811 for token in sequence:
812 state = token[_TAG]
813 symbol = token[_TEXT]
814 if lasts == None:
815 starting.inc(state)
816 else:
817 transitions[lasts].inc(state)
818 outputs[state].inc(symbol)
819 lasts = state
820
821
822 if state not in self._states:
823 self._states.append(state)
824 if symbol not in self._symbols:
825 self._symbols.append(symbol)
826
827
828 N = len(self._states)
829 pi = estimator(starting, N)
830 A = ConditionalProbDist(transitions, estimator, False, N)
831 B = ConditionalProbDist(outputs, estimator, False, len(self._symbols))
832
833 return HiddenMarkovModel(self._symbols, self._states, A, B, pi)
834
836 """
837 Adds the logged values, returning the logarithm of the addition.
838 """
839 x = max(values)
840 if x > _NINF:
841 sum_diffs = 0
842 for value in values:
843 sum_diffs += exp(value - x)
844 return x + log(sum_diffs)
845 else:
846 return x
847
864
865 def cpd(array, conditions, samples):
866 d = {}
867 for values, condition in zip(array, conditions):
868 d[condition] = pd(values, samples)
869 return DictionaryConditionalProbDist(d)
870
871 A = array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], float64)
872 A = cpd(A, states, states)
873 B = array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], float64)
874 B = cpd(B, states, symbols)
875 pi = array([0.5, 0.2, 0.3], float64)
876 pi = pd(pi, states)
877
878 model = HiddenMarkovModel(symbols=symbols, states=states,
879 transitions=A, outputs=B, priors=pi)
880
881 print 'Testing', model
882
883 for test in [['up', 'up'], ['up', 'down', 'up'],
884 ['down'] * 5, ['unchanged'] * 5 + ['up']]:
885
886 sequence = [(t, None) for t in test]
887
888 print 'Testing with state sequence', test
889 print 'probability =', model.probability(sequence)
890 print 'tagging = ', model.tag(sequence)
891 print 'p(tagged) = ', model.probability(sequence)
892 print 'H = ', model.entropy(sequence)
893 print 'H_exh = ', model._exhaustive_entropy(sequence)
894 print 'H(point) = ', model.point_entropy(sequence)
895 print 'H_exh(point)=', model._exhaustive_point_entropy(sequence)
896 print
897
898
900 from nltk_lite.corpora import brown
901 from itertools import islice
902
903 sentences = list(islice(brown.tagged(), num_sents))
904
905 tag_set = ["'", "''", '(', ')', '*', ',', '.', ':', '--', '``', 'abl',
906 'abn', 'abx', 'ap', 'ap$', 'at', 'be', 'bed', 'bedz', 'beg', 'bem',
907 'ben', 'ber', 'bez', 'cc', 'cd', 'cd$', 'cs', 'do', 'dod', 'doz',
908 'dt', 'dt$', 'dti', 'dts', 'dtx', 'ex', 'fw', 'hv', 'hvd', 'hvg',
909 'hvn', 'hvz', 'in', 'jj', 'jjr', 'jjs', 'jjt', 'md', 'nn', 'nn$',
910 'nns', 'nns$', 'np', 'np$', 'nps', 'nps$', 'nr', 'nr$', 'od', 'pn',
911 'pn$', 'pp$', 'ppl', 'ppls', 'ppo', 'pps', 'ppss', 'ql', 'qlp', 'rb',
912 'rb$', 'rbr', 'rbt', 'rp', 'to', 'uh', 'vb', 'vbd', 'vbg', 'vbn',
913 'vbz', 'wdt', 'wp$', 'wpo', 'wps', 'wql', 'wrb']
914
915 sequences = []
916 sequence = []
917 symbols = set()
918 start_re = re.compile(r'[^-*+]*')
919 for sentence in sentences:
920 for i in range(len(sentence)):
921 word, tag = sentence[i]
922 word = word.lower()
923 symbols.add(word)
924 m = start_re.match(tag)
925
926 tag = m.group(0)
927 if tag not in tag_set:
928 tag = '*'
929 sentence[i] = (word, tag)
930
931 return sentences, tag_set, list(symbols)
932
933 -def test_pos(model, sentences, display=False):
934 from sys import stdout
935
936 count = correct = 0
937 for sentence in sentences:
938 orig_tags = [token[_TAG] for token in sentence]
939 sentence = [(token[_TEXT], None) for token in sentence]
940 new_tags = model.best_path(sentence)
941 if display:
942 print 'Untagged:'
943 print sentence
944 print 'HMM-tagged:'
945 print new_tags
946 print 'Entropy:'
947 print model.entropy(sentence)
948 print '-' * 60
949 else:
950 print '\b.',
951 stdout.flush()
952 for orig_tag, new_tag in zip(orig_tags, new_tags):
953 count += 1
954 if orig_tag == new_tag:
955 correct += 1
956
957 print 'accuracy over', count, 'tokens %.1f' % (100.0 * correct / count)
958
974
976 unlabeled = []
977 for sentence in sentences:
978 unlabeled.append((token[_TEXT], None) for token in sentence)
979 return unlabeled
980
982
983
984 print
985 print "Baum-Welch demo for POS tagging"
986 print
987
988 print 'Training HMM (supervised)...'
989 sentences, tag_set, symbols = load_pos(210)
990 symbols = set()
991 for sentence in sentences:
992 for token in sentence:
993 symbols.add(token[_TEXT])
994
995 trainer = HiddenMarkovModelTrainer(tag_set, list(symbols))
996 hmm = trainer.train_supervised(sentences[10:200],
997 estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins))
998 print 'Training (unsupervised)...'
999
1000 unlabeled = _untag(sentences[200:210])
1001 hmm = trainer.train_unsupervised(unlabeled, model=hmm, max_iterations=5)
1002 test_pos(hmm, sentences[:10], True)
1003
1005
1006
1007
1008 print
1009 print "Baum-Welch demo for market example"
1010 print
1011
1012
1013 symbols = ['up', 'down', 'unchanged']
1014 states = ['bull', 'bear', 'static']
1015
1016 def pd(values, samples):
1017 d = {}
1018 for value, item in zip(values, samples):
1019 d[item] = value
1020 return DictionaryProbDist(d)
1021
1022 def cpd(array, conditions, samples):
1023 d = {}
1024 for values, condition in zip(array, conditions):
1025 d[condition] = pd(values, samples)
1026 return DictionaryConditionalProbDist(d)
1027
1028 A = array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], float64)
1029 A = cpd(A, states, states)
1030 B = array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], float64)
1031 B = cpd(B, states, symbols)
1032 pi = array([0.5, 0.2, 0.3], float64)
1033 pi = pd(pi, states)
1034
1035 model = HiddenMarkovModel(symbols=symbols, states=states,
1036 transitions=A, outputs=B, priors=pi)
1037
1038
1039 training = []
1040 import random
1041 rng = random.Random()
1042 for i in range(10):
1043 item = model.random_sample(rng, 5)
1044 training.append((i[0], None) for i in item)
1045
1046
1047 trainer = HiddenMarkovModelTrainer(states, symbols)
1048 hmm = trainer.train_unsupervised(training, model=model, max_iterations=1000)
1049
1050 if __name__ == '__main__':
1051 demo()
1052 demo_pos()
1053 demo_pos_bw()
1054
1055