import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from collections import defaultdict
# 1. 基本条件概率计算
def basic_conditional_probability():
"""基本条件概率计算示例"""
# 模拟抛硬币实验
def simulate_coin_tosses(n_trials=10000):
"""模拟抛硬币"""
np.random.seed(42)
tosses = np.random.choice(['H', 'T'], size=n_trials, p=[0.5, 0.5])
return tosses
def calculate_conditional_probability():
"""计算条件概率"""
tosses = simulate_coin_tosses()
# 计算基本概率
p_heads = np.mean(tosses == 'H')
p_tails = np.mean(tosses == 'T')
# 计算连续两次正面的概率
consecutive_heads = 0
total_pairs = 0
for i in range(len(tosses) - 1):
if tosses[i] == 'H':
total_pairs += 1
if tosses[i + 1] == 'H':
consecutive_heads += 1
p_consecutive_heads = consecutive_heads / total_pairs if total_pairs > 0 else 0
print(f"正面的概率: {p_heads:.4f}")
print(f"反面的概率: {p_tails:.4f}")
print(f"在第一次是正面的情况下,第二次也是正面的概率: {p_consecutive_heads:.4f}")
return p_heads, p_tails, p_consecutive_heads
return calculate_conditional_probability()
# 2. 医疗诊断示例
def medical_diagnosis_example():
"""医疗诊断中的条件概率"""
# 假设数据
# P(疾病) = 0.01 (1%的人口有这种疾病)
# P(阳性测试|疾病) = 0.95 (95%的准确率)
# P(阳性测试|无疾病) = 0.05 (5%的假阳性率)
p_disease = 0.01
p_positive_given_disease = 0.95
p_positive_given_no_disease = 0.05
def bayes_theorem():
"""使用贝叶斯定理计算P(疾病|阳性测试)"""
# P(阳性测试) = P(阳性测试|疾病)P(疾病) + P(阳性测试|无疾病)P(无疾病)
p_positive = (p_positive_given_disease * p_disease +
p_positive_given_no_disease * (1 - p_disease))
# P(疾病|阳性测试) = P(阳性测试|疾病)P(疾病) / P(阳性测试)
p_disease_given_positive = (p_positive_given_disease * p_disease) / p_positive
print(f"疾病的基础概率: {p_disease:.4f}")
print(f"测试阳性的概率: {p_positive:.4f}")
print(f"在测试阳性的情况下,实际患病的概率: {p_disease_given_positive:.4f}")
return p_disease_given_positive
return bayes_theorem()
# 3. 朴素贝叶斯分类器
def naive_bayes_classifier():
"""朴素贝叶斯分类器示例"""
# 训练数据:邮件分类
training_data = [
("免费获得100万美元", "垃圾邮件"),
("会议安排在明天下午", "正常邮件"),
("限时优惠,立即购买", "垃圾邮件"),
("项目报告已更新", "正常邮件"),
("中奖通知,点击领取", "垃圾邮件"),
("团队会议取消", "正常邮件"),
("免费试用,无需付费", "垃圾邮件"),
("季度报告已提交", "正常邮件"),
("恭喜您中奖了", "垃圾邮件"),
("客户反馈收集", "正常邮件")
]
def extract_features(text):
"""提取特征(简化版)"""
words = text.lower().split()
return set(words)
def train_naive_bayes(data):
"""训练朴素贝叶斯模型"""
# 计算类别概率
class_counts = defaultdict(int)
word_counts = defaultdict(lambda: defaultdict(int))
total_docs = len(data)
for text, label in data:
class_counts[label] += 1
features = extract_features(text)
for word in features:
word_counts[label][word] += 1
# 计算P(类别)
class_probabilities = {}
for label, count in class_counts.items():
class_probabilities[label] = count / total_docs
# 计算P(单词|类别)
word_probabilities = defaultdict(lambda: defaultdict(float))
for label in class_counts:
total_words_in_class = sum(word_counts[label].values())
for word in word_counts[label]:
word_probabilities[label][word] = word_counts[label][word] / total_words_in_class
return class_probabilities, word_probabilities
def predict(text, class_probs, word_probs):
"""预测邮件类别"""
features = extract_features(text)
# 计算每个类别的后验概率
predictions = {}
for label in class_probs:
# 使用对数避免数值下溢
log_prob = np.log(class_probs[label])
for word in features:
if word in word_probs[label]:
log_prob += np.log(word_probs[label][word])
else:
# 拉普拉斯平滑
log_prob += np.log(0.01)
predictions[label] = log_prob
# 返回概率最高的类别
return max(predictions, key=predictions.get)
# 训练模型
class_probs, word_probs = train_naive_bayes(training_data)
# 测试
test_emails = [
"免费获得现金奖励",
"明天的工作安排",
"限时特价促销",
"项目进度报告"
]
print("朴素贝叶斯分类结果:")
for email in test_emails:
prediction = predict(email, class_probs, word_probs)
print(f"邮件: '{email}' -> 预测: {prediction}")
return class_probs, word_probs
# 4. 推荐系统示例
def recommendation_system_example():
"""推荐系统中的条件概率"""
# 用户-电影评分矩阵
ratings_data = {
'用户1': {'动作片': 5, '喜剧片': 3, '恐怖片': 1, '爱情片': 4},
'用户2': {'动作片': 4, '喜剧片': 5, '恐怖片': 2, '爱情片': 5},
'用户3': {'动作片': 1, '喜剧片': 4, '恐怖片': 5, '爱情片': 2},
'用户4': {'动作片': 3, '喜剧片': 4, '恐怖片': 3, '爱情片': 5},
'用户5': {'动作片': 5, '喜剧片': 2, '恐怖片': 4, '爱情片': 1}
}
def calculate_conditional_probabilities():
"""计算条件概率"""
# 将评分转换为偏好(评分>=4为喜欢)
preferences = {}
for user, ratings in ratings_data.items():
preferences[user] = {genre: rating >= 4 for genre, rating in ratings.items()}
# 计算P(喜欢B|喜欢A)
genres = ['动作片', '喜剧片', '恐怖片', '爱情片']
conditional_probs = {}
for genre1 in genres:
conditional_probs[genre1] = {}
for genre2 in genres:
if genre1 != genre2:
# 计算在喜欢genre1的情况下喜欢genre2的概率
likes_genre1 = sum(1 for user_prefs in preferences.values()
if user_prefs[genre1])
likes_both = sum(1 for user_prefs in preferences.values()
if user_prefs[genre1] and user_prefs[genre2])
if likes_genre1 > 0:
conditional_probs[genre1][genre2] = likes_both / likes_genre1
else:
conditional_probs[genre1][genre2] = 0
return conditional_probs
def recommend_movies(user_ratings):
"""基于条件概率推荐电影"""
conditional_probs = calculate_conditional_probabilities()
# 找出用户喜欢的电影类型
liked_genres = [genre for genre, rating in user_ratings.items() if rating >= 4]
# 计算推荐分数
recommendation_scores = {}
for genre in ['动作片', '喜剧片', '恐怖片', '爱情片']:
if genre not in liked_genres:
score = 0
for liked_genre in liked_genres:
score += conditional_probs[liked_genre].get(genre, 0)
recommendation_scores[genre] = score / len(liked_genres) if liked_genres else 0
# 返回推荐
recommendations = sorted(recommendation_scores.items(),
key=lambda x: x[1], reverse=True)
return recommendations
# 测试推荐系统
test_user = {'动作片': 5, '喜剧片': 2, '恐怖片': 1, '爱情片': 3}
recommendations = recommend_movies(test_user)
print("推荐系统结果:")
print(f"用户评分: {test_user}")
print("推荐电影类型(按推荐分数排序):")
for genre, score in recommendations:
print(f" {genre}: {score:.3f}")
return recommendations
# 5. 隐马尔可夫模型示例
def hidden_markov_model_example():
"""隐马尔可夫模型中的条件概率"""
# 天气模型:晴天/雨天 -> 活动:散步/购物/看电影
# 转移概率
transition_probs = {
'晴天': {'晴天': 0.8, '雨天': 0.2},
'雨天': {'晴天': 0.3, '雨天': 0.7}
}
# 发射概率
emission_probs = {
'晴天': {'散步': 0.6, '购物': 0.3, '看电影': 0.1},
'雨天': {'散步': 0.1, '购物': 0.4, '看电影': 0.5}
}
# 初始概率
initial_probs = {'晴天': 0.6, '雨天': 0.4}
def forward_algorithm(observations):
"""前向算法计算条件概率"""
states = ['晴天', '雨天']
n_states = len(states)
n_observations = len(observations)
# 初始化前向概率矩阵
forward = np.zeros((n_states, n_observations))
# 初始概率
for i, state in enumerate(states):
forward[i, 0] = initial_probs[state] * emission_probs[state][observations[0]]
# 递归计算
for t in range(1, n_observations):
for j, current_state in enumerate(states):
for i, prev_state in enumerate(states):
forward[j, t] += (forward[i, t-1] *
transition_probs[prev_state][current_state] *
emission_probs[current_state][observations[t]])
return forward
def predict_weather(observations):
"""预测天气状态"""
forward = forward_algorithm(observations)
# 找到最可能的状态序列
states = ['晴天', '雨天']
predicted_states = []
for t in range(forward.shape[1]):
max_state_idx = np.argmax(forward[:, t])
predicted_states.append(states[max_state_idx])
return predicted_states, forward
# 测试HMM
test_observations = ['散步', '购物', '看电影', '散步']
predicted_weather, forward_probs = predict_weather(test_observations)
print("隐马尔可夫模型结果:")
print(f"观察序列: {test_observations}")
print(f"预测天气: {predicted_weather}")
# 可视化
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
activities = ['散步', '购物', '看电影']
x = np.arange(len(activities))
width = 0.35
sunny_probs = [emission_probs['晴天'][activity] for activity in activities]
rainy_probs = [emission_probs['雨天'][activity] for activity in activities]
plt.bar(x - width/2, sunny_probs, width, label='晴天', alpha=0.8)
plt.bar(x + width/2, rainy_probs, width, label='雨天', alpha=0.8)
plt.xlabel('活动')
plt.ylabel('概率')
plt.title('发射概率')
plt.xticks(x, activities)
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.plot(range(len(test_observations)), predicted_weather, 'ro-', label='预测天气')
plt.xlabel('时间步')
plt.ylabel('天气状态')
plt.title('天气预测序列')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return predicted_weather, forward_probs
# 6. 数据稀疏性问题
def data_sparsity_example():
"""数据稀疏性问题示例"""
# 模拟稀疏数据
def generate_sparse_data(n_users=1000, n_items=100, sparsity=0.95):
"""生成稀疏的用户-物品交互数据"""
np.random.seed(42)
# 创建稀疏矩阵
data = np.zeros((n_users, n_items))
# 随机填充一些交互
n_interactions = int(n_users * n_items * (1 - sparsity))
user_indices = np.random.randint(0, n_users, n_interactions)
item_indices = np.random.randint(0, n_items, n_interactions)
ratings = np.random.randint(1, 6, n_interactions)
for i in range(n_interactions):
data[user_indices[i], item_indices[i]] = ratings[i]
return data
def calculate_conditional_probabilities_sparse(data):
"""计算稀疏数据的条件概率"""
n_users, n_items = data.shape
# 计算物品共现矩阵
cooccurrence = np.zeros((n_items, n_items))
for user in range(n_users):
user_items = np.where(data[user] > 0)[0]
for i in user_items:
for j in user_items:
if i != j:
cooccurrence[i, j] += 1
# 计算条件概率
conditional_probs = np.zeros((n_items, n_items))
for i in range(n_items):
total_i = np.sum(data[:, i] > 0)
if total_i > 0:
for j in range(n_items):
if i != j:
conditional_probs[i, j] = cooccurrence[i, j] / total_i
return conditional_probs
def evaluate_sparsity_impact():
"""评估稀疏性对条件概率估计的影响"""
sparsity_levels = [0.9, 0.95, 0.98, 0.99]
results = []
for sparsity in sparsity_levels:
data = generate_sparse_data(sparsity=sparsity)
conditional_probs = calculate_conditional_probabilities_sparse(data)
# 计算非零条件概率的数量
non_zero_probs = np.sum(conditional_probs > 0)
total_probs = conditional_probs.size - conditional_probs.shape[0] # 排除对角线
coverage = non_zero_probs / total_probs
results.append({
'sparsity': sparsity,
'coverage': coverage,
'non_zero_probs': non_zero_probs
})
print(f"稀疏性 {sparsity:.2f}: 覆盖率 {coverage:.4f}, 非零概率数量 {non_zero_probs}")
return results
results = evaluate_sparsity_impact()
# 可视化稀疏性影响
plt.figure(figsize=(10, 4))
plt.subplot(1, 2, 1)
sparsities = [r['sparsity'] for r in results]
coverages = [r['coverage'] for r in results]
plt.plot(sparsities, coverages, 'bo-')
plt.xlabel('数据稀疏性')
plt.ylabel('条件概率覆盖率')
plt.title('稀疏性对条件概率估计的影响')
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
non_zero_counts = [r['non_zero_probs'] for r in results]
plt.bar(range(len(sparsities)), non_zero_counts)
plt.xlabel('稀疏性级别')
plt.ylabel('非零条件概率数量')
plt.title('非零条件概率数量')
plt.xticks(range(len(sparsities)), [f'{s:.2f}' for s in sparsities])
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return results
# 运行所有示例
if __name__ == "__main__":
print("=== 基本条件概率计算 ===")
basic_conditional_probability()
print("\n=== 医疗诊断示例 ===")
medical_diagnosis_example()
print("\n=== 朴素贝叶斯分类器 ===")
naive_bayes_classifier()
print("\n=== 推荐系统示例 ===")
recommendation_system_example()
print("\n=== 隐马尔可夫模型示例 ===")
hidden_markov_model_example()
print("\n=== 数据稀疏性问题 ===")
data_sparsity_example()