Machine Learning in Action Chapter04 Classifying with probability theory: naive Bayes

Machine Learning in Action Chapter04 Classifying with probability theory: naive Bayes

基于贝叶斯决策理论的分类方法

朴素贝叶斯是贝叶斯决策理论的一部分。

  • 优点:在数据较小的情况下,仍然有效,可以处理多类别问题
  • 缺点:对于输入数据的准备方式较为敏感
  • 使用数据类型:标称型数据

其实这里的贝叶斯决策的核心思想就是选择具有最高概率的决策。对某个事件所属各个列表的可能性进行计算,取最大值所在类别为结果。

条件概率

\[p(c|x)=\frac{p(x|c)p(c)}{p(x)}\]

使用条件概率来分类

使用贝叶斯决策理论来计算两个概率 \(p_1(x,y)\)\(p_2(x,y)\)

  • 如果 \(p_1(x,y)>p_2(x,y)\),那么属于类型 \(c_1\)
  • 如果 \(p_2(x,y)>p_1(x,y)\),那么属于类型 \(c_2\)

使用朴素贝叶斯进行文档分类

朴素贝叶斯的一般过程

1
2
3
4
5
6
(1)收集数据: 可以使用任何方法。
(2)准备数据: 需要数值型或者布尔型数据。
(3)分析数据: 有大量特征时,绘制特征作用不大,此时使用直方图效果更好。
(4)训练算法: 计算不同的独立特征的条件概率。
(5)测试算法: 计算错误率。
(6)使用算法: 一个常见的朴素贝叶斯应用是文档分类。可以在任意的分类场景中使用朴素贝叶斯分类器,不一定非要是文本。

朴素贝叶斯的两个假设(何为“朴素”):

  • 所有样本特征之间相互独立
  • 每个特征同等重要

使用 Python 进行文本分类

准备数据:从文本中构建词向量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def load_data_set():
"""
创建数据集,都是假的 fake data set
:return: 单词列表posting_list, 所属类别class_vec
"""
posting_list = [
['my', 'dog', 'has', 'flea', 'problems', 'help', 'please'],
['maybe', 'not', 'take', 'him', 'to', 'dog', 'park', 'stupid'],
['my', 'dalmation', 'is', 'so', 'cute', 'I', 'love', 'him'],
['stop', 'posting', 'stupid', 'worthless', 'gar e'],
['mr', 'licks', 'ate', 'my', 'steak', 'how', 'to', 'stop', 'him'],
['quit', 'buying', 'worthless', 'dog', 'food', 'stupid']]
class_vec = [0, 1, 0, 1, 0, 1] # 1 is 侮辱性的文字, 0 is not
return posting_list, class_vec

def create_vocab_list(data_set):
"""
获取所有单词的集合
:param data_set: 数据集
:return: 所有单词的集合(即不含重复元素的单词列表)
"""
vocab_set = set() # create empty set
for item in data_set:
# | 求两个集合的并集
vocab_set = vocab_set | set(item)
return list(vocab_set)

def set_of_words2vec(vocab_list, input_set):
"""
遍历查看该单词是否出现,出现该单词则将该单词置1
:param vocab_list: 所有单词集合列表
:param input_set: 输入数据集
:return: 匹配列表[0,1,0,1...],其中 1与0 表示词汇表中的单词是否出现在输入的数据集中
"""
# 创建一个和词汇表等长的向量,并将其元素都设置为0
result = [0] * len(vocab_list)
# 遍历文档中的所有单词,如果出现了词汇表中的单词,则将输出的文档向量中的对应值设为1
for word in input_set:
if word in vocab_list:
result[vocab_list.index(word)] = 1
else:
# 这个后面应该注释掉,因为对你没什么用,这只是为了辅助调试的
# print('the word: {} is not in my vocabulary'.format(word))
pass
return result

训练算法:从词向量计算概率

用 w 表示单词向量(句),c 表示类别,则替换条件概率公示的 x,y 得:

\[p(c_i|w)=\frac{p(w|c_i)p(c_i)}{p(w)}\]

根据朴素贝叶斯假设,讲 w 展开为一个个独立特征,则将上述概率写为 \(p(w_0,w_1,\dots,w_N|c_i)\),则可以简化为 \(p(w_0|c_i)p(w_1|c_i)\dots p(w_N|c_i)\)。该函数伪代码如下:

1
2
3
4
5
6
7
8
9
计算每个类别中的文档数目
对每篇训练文档:
对每个类别:
如果词条出现在文档中 -> 增加该词条的计数值
增加所有词条的计数值
对每个类别:
对每个词条:
将该词条的数目除以总词条数目得到条件概率
返回每个类别的条件概

贝叶斯分类器训练函数为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def _train_naive_bayes(train_mat, train_category):
"""
朴素贝叶斯分类原版
:param train_mat: type is ndarray
总的输入文本,大致是 [[0,1,0,1], [], []]
:param train_category: 文件对应的类别分类, [0, 1, 0],
列表的长度应该等于上面那个输入文本的长度
:return:
"""
train_doc_num = len(train_mat)
words_num = len(train_mat[0])
# 因为侮辱性的被标记为了1, 所以只要把他们相加就可以得到侮辱性的有多少
# 侮辱性文件的出现概率,即train_category中所有的1的个数,
# 代表的就是多少个侮辱性文件,与文件的总数相除就得到了侮辱性文件的出现概率
pos_abusive = np.sum(train_category) / train_doc_num
# 单词出现的次数
# 原版
p0num = np.zeros(words_num)
p1num = np.zeros(words_num)

# 整个数据集单词出现的次数(原来是0,后面改成2了)
p0num_all = 0
p1num_all = 0

for i in range(train_doc_num):
# 遍历所有的文件,如果是侮辱性文件,就计算此侮辱性文件中出现的侮辱性单词的个数
if train_category[i] == 1:
p1num += train_mat[i]
p1num_all += np.sum(train_mat[i])
else:
p0num += train_mat[i]
p0num_all += np.sum(train_mat[i])
# 后面需要改成改成取 log 函数
p1vec = p1num / p1num_all
p0vec = p0num / p0num_all
return p0vec, p1vec, pos_abusive

测试算法:根据现实情况修改分类器

在 Python 中,多个很小的数相乘,会四舍五入得到零,为了避免概率为零的情况发生,对乘积取自然对数。代数中,有 \(ln(a+b)=ln(a)+ln(b)\)

新的朴素贝叶斯分类函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def train_naive_bayes(train_mat, train_category):
"""
朴素贝叶斯分类修正版, 注意和原来的对比,为什么这么做可以查看书
:param train_mat: type is ndarray
总的输入文本,大致是 [[0,1,0,1], [], []]
:param train_category: 文件对应的类别分类, [0, 1, 0],
列表的长度应该等于上面那个输入文本的长度
:return:
"""
train_doc_num = len(train_mat)
words_num = len(train_mat[0])
# 因为侮辱性的被标记为了1, 所以只要把他们相加就可以得到侮辱性的有多少
# 侮辱性文件的出现概率,即train_category中所有的1的个数,
# 代表的就是多少个侮辱性文件,与文件的总数相除就得到了侮辱性文件的出现概率
pos_abusive = np.sum(train_category) / train_doc_num
# 单词出现的次数
# 原版,变成ones是修改版,这是为了防止数字过小溢出
# p0num = np.zeros(words_num)
# p1num = np.zeros(words_num)
p0num = np.ones(words_num)
p1num = np.ones(words_num)
# 整个数据集单词出现的次数(原来是0,后面改成2了)
p0num_all = 2.0
p1num_all = 2.0

for i in range(train_doc_num):
# 遍历所有的文件,如果是侮辱性文件,就计算此侮辱性文件中出现的侮辱性单词的个数
if train_category[i] == 1:
p1num += train_mat[i]
p1num_all += np.sum(train_mat[i])
else:
p0num += train_mat[i]
p0num_all += np.sum(train_mat[i])
# 后面改成取 log 函数
p1vec = np.log(p1num / p1num_all)
p0vec = np.log(p0num / p0num_all)
return p0vec, p1vec, pos_abusive

准备数据:文档词袋模型

在之前的 set_of_words2vec 中,我们将每个词的出现与否作为一个特征,这可以被描述为词集模型,但是一个词在一个文档中若出现不止一次,则词集模型不能表示文档的全部信息。下面是修改后的词袋模型:

1
2
3
4
5
6
7
8
9
def bag_words2vec(vocab_list, input_set):
# 注意和原来的做对比
result = [0] * len(vocab_list)
for word in input_set:
if word in vocab_list:
result[vocab_list.index(word)] += 1
else:
print('the word: {} is not in my vocabulary'.format(word))
return result

示例:用朴素贝叶斯过滤垃圾邮件

通用框架:

1
2
3
4
5
6
(1) 收集数据:提供文本文件。  
(2) 准备数据:将文本文件解析成词条向量。
(3) 分析数据:检查词条确保解析的正确性。
(4) 训练算法:使用我们之前建立的trainNB0()函数。
(5) 测试算法:使用classifyNB(),并且构建一个新的测试函数来计算文档集的错误率。
(6) 使用算法:构建一个完整的程序对一组文档进行分类,将错分的文档输出到屏幕上

准备数据:切分文本

文本文件内容如下:

使用正则表达式来切分文本:

1
2
3
4
5
6
7
8
9
10
11
12
13
def text_parse(big_str):
"""
这里就是做词划分
:param big_str: 某个被拼接后的字符串
:return: 全部是小写的word列表,去掉少于 2 个字符的字符串
"""
import re
# 其实这里比较推荐用 \W+ 代替 \W*,
# 因为 \W*会match empty patten,在py3.5+之后就会出现什么问题,推荐自己修改尝试一下,可能就会re.split理解更深了
token_list = re.split(r'\W+', big_str)
if len(token_list) == 0:
print(token_list)
return [tok.lower() for tok in token_list if len(tok) > 2]

测试算法:使用朴素贝叶斯进行交叉验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def spam_test():
"""
对贝叶斯垃圾邮件分类器进行自动化处理。
:return: nothing
"""
doc_list = []
class_list = []
full_text = []
for i in range(1, 26):
# 添加垃圾邮件信息
# 这里需要做一个说明,为什么我会使用try except 来做
# 因为我们其中有几个文件的编码格式是 windows 1252 (spam: 17.txt, ham: 6.txt...)
# 这里其实还可以 :
# import os
# 然后检查 os.system(' file {}.txt'.format(i)),看一下返回的是什么
# 如果正常能读返回的都是: ASCII text
# 对于except需要处理的都是返回: Non-ISO extended-ASCII text, with very long lines
try:
words = text_parse(open('./email/spam/{}.txt'.format(i)).read())
except:
words = text_parse(open('./email/spam/{}.txt'.format(i), encoding='Windows 1252').read())
doc_list.append(words)
full_text.extend(words)
class_list.append(1)
try:
# 添加非垃圾邮件
words = text_parse(open('./email/ham/{}.txt'.format(i)).read())
except:
words = text_parse(open('./email/ham/{}.txt'.format(i), encoding='Windows 1252').read())
doc_list.append(words)
full_text.extend(words)
class_list.append(0)
# 创建词汇表
vocab_list = create_vocab_list(doc_list)

import random
# 生成随机取10个数, 为了避免警告将每个数都转换为整型
test_set = [int(num) for num in random.sample(range(50), 10)]
# 并在原来的training_set中去掉这10个数
training_set = list(set(range(50)) - set(test_set))

training_mat = []
training_class = []
for doc_index in training_set:
training_mat.append(set_of_words2vec(vocab_list, doc_list[doc_index]))
training_class.append(class_list[doc_index])
p0v, p1v, p_spam = train_naive_bayes(
np.array(training_mat),
np.array(training_class)
)

# 开始测试
error_count = 0
for doc_index in test_set:
word_vec = set_of_words2vec(vocab_list, doc_list[doc_index])
if classify_naive_bayes(
np.array(word_vec),
p0v,
p1v,
p_spam
) != class_list[doc_index]:
error_count += 1
print('the error rate is {}'.format(
error_count / len(test_set)
))

示例:使用朴素贝叶斯分类器从个人广告中获取区域倾向

收集数据:导入 RSS 源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def calc_most_freq(vocab_list, full_text):
# RSS源分类器及高频词去除函数
from operator import itemgetter
freq_dict = {}
for token in vocab_list:
freq_dict[token] = full_text.count(token)
sorted_freq = sorted(freq_dict.items(), key=itemgetter(1), reverse=True)
return sorted_freq[0:30]

def local_words(feed1, feed0):
# 下面操作和上面那个 spam_test函数基本一样,理解了一个,两个都ok
doc_list = []
class_list = []
full_text = []
# 找出两个中最小的一个
min_len = min(len(feed0), len(feed1))
for i in range(min_len):
# 类别 1
word_list = text_parse(feed1['entries'][i]['summary'])
doc_list.append(word_list)
full_text.extend(word_list)
class_list.append(1)
# 类别 0
word_list = text_parse(feed0['entries'][i]['summary'])
doc_list.append(word_list)
full_text.extend(word_list)
class_list.append(0)
vocab_list = create_vocab_list(doc_list)
# 去掉高频词
top30words = calc_most_freq(vocab_list, full_text)
for pair in top30words:
if pair[0] in vocab_list:
vocab_list.remove(pair[0])
# 获取训练数据和测试数据

import random
# 生成随机取10个数, 为了避免警告将每个数都转换为整型
test_set = [int(num) for num in random.sample(range(2 * min_len), 20)]
# 并在原来的training_set中去掉这10个数
training_set = list(set(range(2 * min_len)) - set(test_set))

# 把这些训练集和测试集变成向量的形式
training_mat = []
training_class = []
for doc_index in training_set:
training_mat.append(bag_words2vec(vocab_list, doc_list[doc_index]))
training_class.append(class_list[doc_index])
p0v, p1v, p_spam = train_naive_bayes(
np.array(training_mat),
np.array(training_class)
)
error_count = 0
for doc_index in test_set:
word_vec = bag_words2vec(vocab_list, doc_list[doc_index])
if classify_naive_bayes(
np.array(word_vec),
p0v,
p1v,
p_spam
) != class_list[doc_index]:
error_count += 1
print("the error rate is {}".format(error_count / len(test_set)))
return vocab_list, p0v, p1v

分析数据:显示地域相关的用词

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def get_top_words():
import feedparser
ny = feedparser.parse('http://newyork.craigslist.org/stp/index.rss')
sf = feedparser.parse('http://sfbay.craigslist.org/stp/index.rss')
vocab_list, p_sf, p_ny = local_words(ny, sf)
top_ny = []
top_sf = []
for i in range(len(p_sf)):
if p_sf[i] > -6.0:
top_sf.append((vocab_list[i], p_sf[i]))
if p_ny[i] > -6.0:
top_ny.append((vocab_list[i], p_ny[i]))
sorted_sf = sorted(top_sf, key=lambda pair: pair[1], reverse=True)
sorted_ny = sorted(top_ny, key=lambda pair: pair[1], reverse=True)
print('\n----------- this is SF ---------------\n')
for item in sorted_sf:
print(item[0])
print('\n----------- this is NY ---------------\n')
for item in sorted_ny:
print(item[0])

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×