# Demo entry 6356018

semi-supervised EM

Submitted by anonymous on Apr 16, 2017 at 09:26
Language: Python. Code size: 6.3 kB.

```import numpy as np

# Implementation of Naive Bayes trained by EM for semi-supervised text classification

def softmax(loga, k=-np.inf, out=None):
if out is None: out = np.empty_like(loga).astype(np.double)
m = np.max(loga)
logam = loga - m
sup = logam > k
inf = np.logical_not(sup)
out[sup] = np.exp(logam[sup])
out[inf] = 0.0
out /= np.sum(out)
return out

def logsum(loga, k=-np.inf):
B = np.max(loga)
logaB = aB = loga - B
sup = logaB > k
inf = np.logical_not(sup)
aB[sup] = np.exp(logaB[sup])
aB[inf] = 0.0
return (np.log(np.sum(aB)) + B)

def loglikelihood(td, delta, tdu, p_w_c_log, p_c_log):
V, Xl = td.shape
V_, Xu = tdu.shape
Xl_, M = delta.shape

lik = 0.0

## labeled
# log P(x|c)
p_x_c_log = np.zeros((Xl,M), np.double)
for w,d in zip(*td.nonzero()):
p_x_c_log[d,:] += p_w_c_log[w,:] * td[w,d]

# add log P(c) + lop P(x|c) if x has label c
for d,c in zip(*delta.nonzero()):
lik += p_c_log[c] + p_x_c_log[d,c]

## unlabelled
# log P(x|c)
p_x_c_log = np.zeros((Xu,M), np.double)
for w,d in zip(*tdu.nonzero()):
p_x_c_log[d,:] += p_w_c_log[w,:] * tdu[w,d]

p_x_c_log += p_c_log[np.newaxis,:]

for d in range(Xu):
lik += logsum(p_x_c_log[d,:], k=-10)

return lik

def normalize_p_c(p_c):
M = len(p_c)
denom = M + np.sum(p_c)
p_c += 1.0
p_c /= denom

def normalize_p_w_c(p_w_c):
V, X = p_w_c.shape
denoms = V + np.sum(p_w_c, axis=0)
p_w_c += 1.0
p_w_c /= denoms[np.newaxis,:]

class SemiNB(object):

def __init__(self, model=None):
"""
model: a model, as returned by get_model() or train().
"""
self.p_w_c = None
self.p_c = None
if model is not None: self.set_model(model)
self.debug = False

def train(self, td, delta, normalize=True, sparse=True):
"""
td: term-document matrix V x X
delta: X x M matrix
where delta(d,c) = 1 if document d belongs to class c
"""

X_, M = delta.shape
V, X = td.shape
assert(X_ == X)

# P(c)
self.p_c = np.sum(delta, axis=0)

# P(w|c)
self.p_w_c = np.zeros((V,M), dtype=np.double)

if sparse:
# faster when delta is sparse
# select indices of documents that have class c
for d,c in zip(*delta.nonzero()):
# select indices of terms that are non-zero
for w in np.flatnonzero(td[:,d]):
self.p_w_c[w,c] += td[w,d] * delta[d,c]
else:
# faster when delta is non-sparse
for w,d in zip(*td.nonzero()):
self.p_w_c[w,:] += td[w,d] * delta[d,:]

if normalize:
normalize_p_c(self.p_c)
normalize_p_w_c(self.p_w_c)

return self.get_model()

def train_semi(self, td, delta, tdu, maxiter=50, eps=0.01):
"""
td: V x X term document matrix
delta: X x M label matrix
tdu: V x Xu term document matrix (unlabeled)
maxiter: maximum number of iterations
eps: stop if no more progress than esp
"""
X_, M = delta.shape
V, X = td.shape
assert(X_ == X)

# compute counts for labeled data once for all
self.train(td, delta, normalize=False)
p_c_l = np.array(self.p_c, copy=True)
p_w_c_l = np.array(self.p_w_c, copy=True)

# normalize to get initial classifier
normalize_p_c(self.p_c)
normalize_p_w_c(self.p_w_c)

lik = loglikelihood(td, delta, tdu, np.log(self.p_w_c), np.log(self.p_c))

for iteration in range(1, maxiter+1):
# E-step: find the probabilistic labels for unlabeled data
delta_u = self.predict_proba_all(tdu)

# M-step: train classifier with the union of
#         labeled and unlabeled data
self.train(tdu, delta_u, normalize=False, sparse=False)
self.p_c += p_c_l
self.p_w_c += p_w_c_l
normalize_p_c(self.p_c)
normalize_p_w_c(self.p_w_c)

lik_new = loglikelihood(td, delta, tdu,
np.log(self.p_w_c), np.log(self.p_c))
lik_diff = lik_new - lik
assert(lik_diff >= -1e-10)
lik = lik_new

if lik_diff < eps:
print "No more progress, stopping EM at iteration", iteration
break

if self.debug:
print "Iteration", iteration
print "L += %f" % lik_diff

return self.get_model()

def p_x_c_log_all(self, td):
M = len(self.p_c)
V, X = td.shape
p_x_c_log = np.zeros((X,M), np.double)
p_w_c_log = np.log(self.p_w_c)

# log P(x|c)
for w,d in zip(*td.nonzero()):
p_x_c_log[d,:] += p_w_c_log[w,:] * td[w,d]

return p_x_c_log

def predict_proba(self, x):
"""
x: a V array representing a document

Compute a M array containing P(c|x).
"""
return self.predict_proba_all(x[:,np.newaxis])[0,:]

def predict_proba_all(self, td):
"""
td: V x X term document matrix

Compute an X x M matrix of P(c|x) for all x in td.
"""
V, X = td.shape

# log P(x|c)
p_x_c_log = self.p_x_c_log_all(td)

p_x_c_log += np.log(self.p_c)[np.newaxis,:]

# sotfmax(log P(x|c) + log P(c)) = P(c|x)
for d in range(X):
softmax(p_x_c_log[d,:], k=-10, out=p_x_c_log[d,:])

return p_x_c_log

def predict(self, x):
"""
x: a V array representing a document

Compute the predicted class index.
"""
return self.predict_all(x[:,np.newaxis])[0]

def predict_all(self, td):

# log P(x|c)
p_x_c_log = self.p_x_c_log_all(td)

p_x_c_log += np.log(self.p_c)[np.newaxis,:]

return p_x_c_log.argmax(axis=1)

def get_model(self):
return (self.p_w_c, self.p_c)

def set_model(self, model):
self.p_w_c, self.p_c = model
```

This snippet took 0.01 seconds to highlight.

Back to the Entry List or Home.