课程 9:PySpark 数据挖掘及 Redis 结果存储
本课程演示一个使用 PySpark 的实际数据挖掘任务——特别是词频统计——并展示如何使用 Redis 高效地存储和检索这些结果。这将 Spark 的分布式处理能力与 Redis 的快速内存访问相结合。
1. 使用 PySpark 进行数据挖掘:词频统计示例
词频统计是分布式数据处理中一个经典的入门示例。它演示了 PySpark 中使用 RDD (弹性分布式数据集) 的关键转换和行动操作。
逐步进行 PySpark 词频统计
确保已安装 PySpark (pip install pyspark)。此示例在本地模式下运行。
1. 创建样本数据并初始化 SparkSession
我们将首先定义一些样本文本并创建一个 SparkSession。
from pyspark.sql import SparkSession
# 创建一个 SparkSession
spark = SparkSession.builder \
.appName("PySparkWordCountToRedis") \
.master("local[*]") \
.getOrCreate()
sc = spark.sparkContext # 获取 SparkContext 用于 RDD 操作
# 样本文本数据
text_data = [
"Hello Spark, this is a sample text.",
"Spark is great for big data processing.",
"This sample text will be used for word count.",
"Hello again Spark, process this text."
]
# 从字符串列表创建 RDD
text_rdd = sc.parallelize(text_data)
print("初始 RDD 内容:") # "Initial RDD content:"
for line in text_rdd.collect():
print(line)
2. 使用 RDD 转换执行词频统计
我们将使用几个 RDD 转换来统计每个单词的出现次数:
flatMap():将每一行分割成单个单词。使用lower()使计数不区分大小写。map():为每个单词创建 (单词, 1) 的键值对。reduceByKey():聚合每个单词的计数。
# FlatMap 将行分割成单词,转换为小写,并移除标点符号 (基本处理)
words_rdd = text_rdd.flatMap(lambda line: line.lower().replace('.', '').replace(',', '').split(" "))
# 将每个单词映射为 (单词, 1) 元组
word_map_rdd = words_rdd.map(lambda word: (word, 1))
# 过滤掉可能由多个空格产生的空字符串
word_map_rdd = word_map_rdd.filter(lambda x: x[0] != "")
# 按键聚合以统计每个单词的出现次数
word_counts_rdd = word_map_rdd.reduceByKey(lambda a, b: a + b)
# 行动:收集结果
word_counts_result = word_counts_rdd.collect()
print("\\nPySpark 词频统计结果:") # "Word Count Results from PySpark:"
for word, count in word_counts_result:
print(f"'{word}': {count}")
DataFrame 方法 (替代方案):对于更结构化的数据或复杂操作,通常首选 DataFrame。可以通过将 RDD 转换为 DataFrame 并使用 DataFrame API 操作(如 explode、groupBy 和 count)来完成类似的词频统计。
2. 将 PySpark 结果存储在 Redis 中
一旦我们从 PySpark 获得了分析结果,就可以将它们存储在 Redis 中,以便其他应用程序快速访问或用于缓存。
在 Redis 哈希中存储词频统计结果
Redis 哈希 (Hash) 是存储词频统计的合适数据结构,其中每个单词是哈希中的一个字段,其计数是值。我们将使用 `redis-py` 库。
确保 Redis 正在运行且可访问。您可以通过 Docker 运行它:docker run -d -p 6379:6379 --name my-redis redis:alpine。
安装 Python Redis 客户端:pip install redis。
1. 连接到 Redis 并存储结果
import redis
# 假设 word_counts_result 可从上面的 PySpark 部分获得
# word_counts_result = [('hello', 2), ('spark', 3), ...]
# Redis 连接详细信息
redis_host = 'localhost'
redis_port = 6379
redis_db = 0
redis_hash_key = 'pyspark_word_counts' # Redis 中哈希的名称
try:
r = redis.Redis(host=redis_host, port=redis_port, db=redis_db, decode_responses=True)
r.ping()
print("\\n成功连接到 Redis!") # "Successfully connected to Redis!"
# 为此示例清除哈希中的任何旧数据
r.delete(redis_hash_key)
# 将结果存储在 Redis 哈希中
# 使用管道进行更高效的批量插入
pipe = r.pipeline()
for word, count in word_counts_result:
if word: # 确保单词不为空
pipe.hset(redis_hash_key, word, count)
pipe.execute()
print(f"已在 Redis 哈希 '{redis_hash_key}' 中存储 {len(word_counts_result)} 个词频统计。") # "Stored ... word counts in Redis hash ..."
except redis.exceptions.ConnectionError as e:
print(f"无法连接到 Redis: {e}") # "Could not connect to Redis:"
# 在实际应用程序中,应适当处理此错误
# 对于此脚本,如果 Redis 不可用,我们将退出
spark.stop() # 如果 Redis 失败,则停止 Spark 会话
exit()
except Exception as e:
print(f"Redis 操作期间发生错误: {e}") # "An error occurred during Redis operations:"
spark.stop()
exit()
2. 从 Redis 检索结果
try:
# 从哈希中检索所有词频统计
retrieved_word_counts = r.hgetall(redis_hash_key)
print(f"\\n从 Redis 哈希 '{redis_hash_key}' 检索到的所有词频统计:") # "Retrieved all word counts from Redis hash..."
if retrieved_word_counts:
for word, count in retrieved_word_counts.items():
print(f"'{word}': {count}")
else:
print("在 Redis 中未找到词频统计(或键不存在)。") # "No word counts found in Redis (or key does not exist)."
# 检索特定单词的计数
specific_word = 'spark'
count_specific_word = r.hget(redis_hash_key, specific_word)
if count_specific_word:
print(f"\\n单词 '{specific_word}' 的计数: {count_specific_word}") # "Count for word..."
else:
print(f"\\n在 Redis 哈希中未找到单词 '{specific_word}'。") # "Word ... not found in Redis hash."
except Exception as e:
print(f"从 Redis 检索数据时发生错误: {e}") # "An error occurred while retrieving data from Redis:"
finally:
# 在所有操作结束时停止 SparkSession
spark.stop()
print("\\nSparkSession 已停止。") # "SparkSession stopped."
替代 Redis 结构:用于排序结果的有序集合 (Sorted Set)
如果您想轻松检索按频率排序的单词,Redis 有序集合将非常适用。您可以使用单词作为成员,其计数作为分数:
r.zadd('word_counts_sorted_set', {word: count for word, count in word_counts_result})
然后,您可以使用 r.zrevrange('word_counts_sorted_set', 0, N-1, withscores=True) 检索前 N 个单词。
3. 与先前课程和 Docker 设置的集成
此示例可以利用先前课程中讨论的环境。
使用 Docker化的 Hadoop 和 Redis 运行
- Hadoop 上的 PySpark (课程 6):
如果您的输入数据(例如大型文本文件)位于 HDFS(来自课程 6)中,则可以将 PySpark 应用程序配置为在 YARN 集群上运行。
spark-submit脚本将用于将应用程序部署到集群。Spark 作业将从 HDFS 读取,执行词频统计,然后将结果(在驱动程序处收集或按分区处理)写入 Redis。 - Redis 实例 (课程 8):
课程 8 中设置的 Redis 实例(或类似的 Docker化 Redis)将用作结果的存储。PySpark 应用程序(无论是在本地运行、在 YARN 上运行还是在其自己的 Docker 容器中运行)都需要网络访问此 Redis 容器的 IP 地址和端口(通常为 6379)。
用于 Spark 和 Redis 的概念性 Docker Compose
对于运行与 Redis 交互的 PySpark 作业的自包含环境(无需课程 6 中的完整 Hadoop 设置),您可以使用 Docker Compose。这对于开发和测试使用 Redis 的 Spark 应用程序非常有用。
# docker-compose-spark-redis.yml
version: '3.8'
services:
spark-master:
image: bitnami/spark:3 # 使用特定版本,例如 3.2.1
container_name: spark-master
hostname: spark-master
ports:
- "8080:8080" # Spark Master Web UI
- "7077:7077" # Spark Master RPC
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
networks:
- spark_redis_network
spark-worker:
image: bitnami/spark:3
container_name: spark-worker-1
hostname: spark-worker-1
depends_on:
- spark-master
ports:
- "8081:8081" # Spark Worker Web UI
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
networks:
- spark_redis_network
redis:
image: redis:alpine
container_name: redis_service
hostname: redis_service
ports:
- "6379:6379"
networks:
- spark_redis_network
networks:
spark_redis_network:
driver: bridge
使用此 Docker Compose 运行 PySpark 脚本:
- 启动服务:
docker-compose -f docker-compose-spark-redis.yml up -d。 - 将您的 PySpark 脚本提交到 Spark master。通常,您可以从主机执行此操作(如果主机上安装了 PySpark),或者从另一个具有 PySpark 并可以网络访问 Spark master 的容器中执行。
从主机提交的示例:spark-submit --master spark://localhost:7077 your_pyspark_script.py
(如果从同一 Docker 网络内的容器运行脚本,请确保脚本连接到 Redis 地址为redis_service:6379;如果 Redis 端口已映射并且脚本在主机上运行,则连接到localhost:6379)。 - 或者,对于本课程脚本(在本地模式下运行 Spark)的更简单设置,您只需要 Redis 容器。PySpark 脚本本身将在您的主机(或任何装有 Python、PySpark 和 Redis 库的机器)上运行,并通过其映射端口(例如 `localhost:6379`)连接到 Redis 容器。
结论
本课程演示了一种常见模式:使用 PySpark 执行数据分析和挖掘任务,然后将有价值的(通常是聚合的)结果存储在像 Redis 这样的快速键值存储中。这使得其他应用程序可以快速访问这些见解,而无需重新运行复杂的计算。这种组合利用了两种技术的优势,可实现高效的大数据工作流程。