实战项目:电影推荐系统
💡 项目目标:构建一个完整的电影推荐系统,学习协同过滤和内容推荐算法的实际应用。
项目概述
推荐系统是机器学习在商业中最成功的应用之一,广泛应用于电商、视频、音乐等平台。
✅ 项目特色:
• 真实数据集(MovieLens)
• 多种推荐算法对比
• 完整的评估体系
• 可部署的Web应用
数据集介绍
我们使用MovieLens数据集,包含用户对电影的评分数据。
# 数据加载和探索
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# 加载数据
ratings = pd.read_csv("ratings.csv")
movies = pd.read_csv("movies.csv")
print("评分数据形状:", ratings.shape)
print("电影数据形状:", movies.shape)
# 数据预览
print("
评分数据预览:")
print(ratings.head())
print("
电影数据预览:")
print(movies.head())
# 基本统计
print(f"
用户数量: {ratings["userId"].nunique()}")
print(f"电影数量: {ratings["movieId"].nunique()}")
print(f"评分数量: {len(ratings)}")
print(f"评分范围: {ratings["rating"].min()} - {ratings["rating"].max()}")
数据探索性分析
# 评分分布分析
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
ratings["rating"].hist(bins=10, edgecolor="black")
plt.title("评分分布")
plt.xlabel("评分")
plt.ylabel("频次")
plt.subplot(1, 3, 2)
user_rating_counts = ratings.groupby("userId").size()
user_rating_counts.hist(bins=50, edgecolor="black")
plt.title("用户评分数量分布")
plt.xlabel("评分数量")
plt.ylabel("用户数")
plt.subplot(1, 3, 3)
movie_rating_counts = ratings.groupby("movieId").size()
movie_rating_counts.hist(bins=50, edgecolor="black")
plt.title("电影被评分次数分布")
plt.xlabel("被评分次数")
plt.ylabel("电影数")
plt.tight_layout()
plt.show()
# 热门电影分析
popular_movies = ratings.groupby("movieId").agg({
"rating": ["count", "mean"]
}).round(2)
popular_movies.columns = ["评分次数", "平均评分"]
popular_movies = popular_movies[popular_movies["评分次数"] >= 100]
popular_movies = popular_movies.sort_values("评分次数", ascending=False)
print("最受欢迎的电影(评分次数最多):")
print(popular_movies.head(10))
推荐算法实现
1. 基于用户的协同过滤
找到相似用户,推荐他们喜欢的电影。
协同过滤的核心思想:"喜欢相似物品的用户,可能对其他物品也有相似的偏好"
# 用户协同过滤实现
from sklearn.metrics.pairwise import cosine_similarity
from scipy.sparse import csr_matrix
class UserBasedCF:
def __init__(self, ratings_df):
self.ratings_df = ratings_df
self.user_item_matrix = None
self.user_similarity = None
def create_user_item_matrix(self):
"""创建用户-物品评分矩阵"""
self.user_item_matrix = self.ratings_df.pivot_table(
index="userId",
columns="movieId",
values="rating"
).fillna(0)
def calculate_user_similarity(self):
"""计算用户相似度"""
# 使用余弦相似度
user_matrix = csr_matrix(self.user_item_matrix.values)
self.user_similarity = cosine_similarity(user_matrix)
def get_recommendations(self, user_id, n_recommendations=10):
"""为指定用户生成推荐"""
if self.user_item_matrix is None:
self.create_user_item_matrix()
if self.user_similarity is None:
self.calculate_user_similarity()
# 获取用户索引
user_idx = self.user_item_matrix.index.get_loc(user_id)
# 获取相似用户
user_similarities = self.user_similarity[user_idx]
similar_users = np.argsort(user_similarities)[::-1][1:11] # 前10个相似用户
# 获取用户已评分的电影
user_ratings = self.user_item_matrix.iloc[user_idx]
rated_movies = user_ratings[user_ratings > 0].index
# 计算推荐分数
recommendations = {}
for movie_id in self.user_item_matrix.columns:
if movie_id not in rated_movies:
score = 0
similarity_sum = 0
for similar_user_idx in similar_users:
similar_user_rating = self.user_item_matrix.iloc[similar_user_idx,
self.user_item_matrix.columns.get_loc(movie_id)]
if similar_user_rating > 0:
similarity = user_similarities[similar_user_idx]
score += similarity * similar_user_rating
similarity_sum += similarity
if similarity_sum > 0:
recommendations[movie_id] = score / similarity_sum
# 排序并返回前N个推荐
sorted_recommendations = sorted(recommendations.items(),
key=lambda x: x[1], reverse=True)
return sorted_recommendations[:n_recommendations]
# 使用示例
cf_model = UserBasedCF(ratings)
recommendations = cf_model.get_recommendations(user_id=1, n_recommendations=5)
print("用户1的推荐电影:")
for movie_id, score in recommendations:
movie_title = movies[movies["movieId"] == movie_id]["title"].iloc[0]
print(f"{movie_title}: {score:.3f}")
2. 基于物品的协同过滤
找到相似电影,推荐用户可能喜欢的相似电影。
# 物品协同过滤实现
class ItemBasedCF:
def __init__(self, ratings_df):
self.ratings_df = ratings_df
self.item_similarity = None
def calculate_item_similarity(self):
"""计算物品相似度"""
# 创建物品-用户矩阵
item_user_matrix = self.ratings_df.pivot_table(
index="movieId",
columns="userId",
values="rating"
).fillna(0)
# 计算物品相似度
item_matrix = csr_matrix(item_user_matrix.values)
self.item_similarity = cosine_similarity(item_matrix)
self.item_ids = item_user_matrix.index
def get_similar_items(self, movie_id, n_similar=10):
"""获取相似电影"""
if self.item_similarity is None:
self.calculate_item_similarity()
movie_idx = self.item_ids.get_loc(movie_id)
similarities = self.item_similarity[movie_idx]
similar_indices = np.argsort(similarities)[::-1][1:n_similar+1]
similar_movies = []
for idx in similar_indices:
similar_movie_id = self.item_ids[idx]
similarity_score = similarities[idx]
similar_movies.append((similar_movie_id, similarity_score))
return similar_movies
# 使用示例
item_cf = ItemBasedCF(ratings)
similar_movies = item_cf.get_similar_items(movie_id=1, n_similar=5)
print("与电影1相似的电影:")
for movie_id, similarity in similar_movies:
movie_title = movies[movies["movieId"] == movie_id]["title"].iloc[0]
print(f"{movie_title}: {similarity:.3f}")
3. 矩阵分解(SVD)
使用奇异值分解降维,发现潜在因子。
# SVD矩阵分解推荐
from sklearn.decomposition import TruncatedSVD
from sklearn.preprocessing import StandardScaler
class SVDRecommender:
def __init__(self, ratings_df, n_components=50):
self.ratings_df = ratings_df
self.n_components = n_components
self.svd = TruncatedSVD(n_components=n_components, random_state=42)
self.user_factors = None
self.item_factors = None
def fit(self):
"""训练SVD模型"""
# 创建用户-物品矩阵
user_item_matrix = self.ratings_df.pivot_table(
index="userId",
columns="movieId",
values="rating"
).fillna(0)
# SVD分解
self.user_factors = self.svd.fit_transform(user_item_matrix)
self.item_factors = self.svd.components_
self.user_ids = user_item_matrix.index
self.movie_ids = user_item_matrix.columns
return self
def predict_rating(self, user_id, movie_id):
"""预测用户对电影的评分"""
user_idx = self.user_ids.get_loc(user_id)
movie_idx = self.movie_ids.get_loc(movie_id)
predicted_rating = np.dot(self.user_factors[user_idx],
self.item_factors[:, movie_idx])
return predicted_rating
def get_recommendations(self, user_id, n_recommendations=10):
"""为用户生成推荐"""
user_idx = self.user_ids.get_loc(user_id)
# 计算用户对所有电影的预测评分
user_vector = self.user_factors[user_idx]
predicted_ratings = np.dot(user_vector, self.item_factors)
# 获取用户已评分的电影
user_ratings = self.ratings_df[self.ratings_df["userId"] == user_id]
rated_movies = set(user_ratings["movieId"])
# 生成推荐
recommendations = []
for i, movie_id in enumerate(self.movie_ids):
if movie_id not in rated_movies:
recommendations.append((movie_id, predicted_ratings[i]))
# 排序并返回前N个
recommendations.sort(key=lambda x: x[1], reverse=True)
return recommendations[:n_recommendations]
# 使用示例
svd_model = SVDRecommender(ratings, n_components=50)
svd_model.fit()
svd_recommendations = svd_model.get_recommendations(user_id=1, n_recommendations=5)
print("SVD推荐结果:")
for movie_id, score in svd_recommendations:
movie_title = movies[movies["movieId"] == movie_id]["title"].iloc[0]
print(f"{movie_title}: {score:.3f}")
模型评估
评估指标
推荐系统评估指标
# 模型评估
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
def evaluate_model(model, test_data):
"""评估推荐模型"""
predictions = []
actuals = []
for _, row in test_data.iterrows():
user_id = row["userId"]
movie_id = row["movieId"]
actual_rating = row["rating"]
try:
predicted_rating = model.predict_rating(user_id, movie_id)
predictions.append(predicted_rating)
actuals.append(actual_rating)
except:
continue
rmse = np.sqrt(mean_squared_error(actuals, predictions))
mae = mean_absolute_error(actuals, predictions)
return rmse, mae
# 划分训练集和测试集
train_data, test_data = train_test_split(ratings, test_size=0.2, random_state=42)
# 训练模型
svd_model = SVDRecommender(train_data)
svd_model.fit()
# 评估模型
rmse, mae = evaluate_model(svd_model, test_data.head(1000)) # 使用部分测试数据
print(f"RMSE: {rmse:.3f}")
print(f"MAE: {mae:.3f}")
系统部署
Flask Web应用
# Flask应用示例
from flask import Flask, request, jsonify, render_template
import pickle
app = Flask(__name__)
# 加载训练好的模型
with open("recommendation_model.pkl", "rb") as f:
model = pickle.load(f)
@app.route("/")
def home():
return render_template("index.html")
@app.route("/recommend", methods=["POST"])
def recommend():
user_id = request.json["user_id"]
n_recommendations = request.json.get("n_recommendations", 10)
try:
recommendations = model.get_recommendations(user_id, n_recommendations)
# 获取电影信息
movie_list = []
for movie_id, score in recommendations:
movie_info = movies[movies["movieId"] == movie_id].iloc[0]
movie_list.append({
"title": movie_info["title"],
"genres": movie_info["genres"],
"score": round(score, 3)
})
return jsonify({
"status": "success",
"recommendations": movie_list
})
except Exception as e:
return jsonify({
"status": "error",
"message": str(e)
})
if __name__ == "__main__":
app.run(debug=True)
✅ 项目成果:
• 实现了多种推荐算法
• 建立了完整的评估体系
• 部署了可用的Web应用
• 掌握了推荐系统的核心技术
⚠️ 优化方向:
• 冷启动问题:新用户和新物品的推荐
• 实时推荐:处理用户实时行为
• 多样性:避免推荐结果过于相似
• 可解释性:解释推荐理由
项目总结
通过这个项目,我们学习了推荐系统的核心算法和实现方法。推荐系统是机器学习在商业中最有价值的应用之一,掌握这些技术对于数据科学家来说非常重要。
恭喜完成机器学习基础课程的学习!