课程 8:PySpark 与 Redis 入门
本课程介绍用于大规模数据处理的 Apache PySpark 和用于高性能内存数据存储的 Redis。我们将涵盖每种技术的基础知识,并简要讨论它们如何协同工作。
1. PySpark 简介
Apache Spark 是一个为速度、易用性和复杂分析而构建的强大开源分布式处理系统。PySpark 是 Apache Spark 的 Python API,允许您使用 Python 语言驾驭 Spark 的强大功能。
什么是 Apache Spark 和 PySpark?
- Apache Spark:一个用于大数据处理的统一分析引擎,内置了流处理、SQL (Spark SQL)、机器学习 (MLlib) 和图处理 (GraphX) 模块。它可以独立运行,也可以在 Apache Mesos、Kubernetes 上运行,或者(对我们来说很重要)在 Apache Hadoop YARN 上运行。
- PySpark:允许 Python 开发人员编写 Spark 应用程序。它结合了 Python 的简洁性和 Spark 的强大功能。
Spark 核心概念
- SparkSession:Spark 中 DataFrame 和 SQL 功能的主要入口点(自 Spark 2.0 起)。它封装了 SparkContext、SQLContext、HiveContext 和 StreamingContext。您可以使用
SparkSession.builder.appName("MyApp").getOrCreate()创建它。 - SparkContext:Spark 2.0 之前 Spark 功能的主要入口点,主要用于创建 RDD。可以通过 SparkSession 对象的
spark.sparkContext访问。 - 弹性分布式数据集 (RDDs / Resilient Distributed Datasets):DataFrame 出现之前 Spark 的基本数据结构。RDD 是一个不可变的、容错的、可并行处理的分布式对象集合。虽然功能强大,但对于结构化或半结构化数据,通常首选 DataFrame。
- DataFrame:一个分布式的数据集合,数据被组织成命名的列。它在概念上等同于关系数据库中的表或 R/Python (Pandas) 中的数据框,但是是分布式的。DataFrame 通过 Catalyst 优化器和 Tungsten 执行引擎提供了更高级别的 API 和优化。
基本 PySpark 操作
PySpark 操作分为转换 (Transformations) 和行动 (Actions)。
- 转换 (Transformations):从现有 RDD/DataFrame 创建新的 RDD/DataFrame(例如
map、filter、select、groupBy)。转换是惰性的,这意味着 Spark 在调用行动之前不会执行它们。 - 行动 (Actions):触发转换的执行,并将结果返回给驱动程序或将数据写入外部存储系统(例如
collect、count、show、save)。
PySpark 代码示例
要运行这些示例,您需要安装 PySpark (pip install pyspark)。对于本地模式,严格来说不需要 Hadoop/Spark 集群,因为 PySpark 会在您的机器上运行一个本地“微型集群”。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg
# 1. 创建一个 SparkSession
spark = SparkSession.builder \
.appName("PySparkIntro") \
.master("local[*]") \
.getOrCreate() # 使用 local[*] 在本地运行,利用所有可用核心
# 访问 SparkContext (如果需要 RDDs)
sc = spark.sparkContext
# 2. 创建 RDDs
print("\\n--- RDD 操作 ---")
data_list_rdd = [1, 2, 3, 4, 5, 5, 4, 3, 2, 1]
rdd = sc.parallelize(data_list_rdd)
print(f"初始 RDD: {rdd.collect()}")
# 转换: map (每个元素乘以 2)
rdd_mapped = rdd.map(lambda x: x * 2)
print(f"map (x*2) 后的 RDD: {rdd_mapped.collect()}")
# 转换: filter (保留大于 5 的元素)
rdd_filtered = rdd_mapped.filter(lambda x: x > 5)
print(f"filter (x > 5) 后的 RDD: {rdd_filtered.collect()}")
# 行动: count
print(f"过滤后 RDD 中的元素数量: {rdd_filtered.count()}")
# 行动: take (获取前 N 个元素)
print(f"从过滤后 RDD 中获取前 2 个元素: {rdd_filtered.take(2)}")
# 3. 创建 DataFrames
print("\\n--- DataFrame 操作 ---")
data_list_df = [("Alice", 25, "Engineer"),
("Bob", 30, "Doctor"),
("Charlie", 35, "Engineer"),
("David", 25, "Manager"),
("Eve", 30, "Engineer")]
columns = ["Name", "Age", "Occupation"]
df = spark.createDataFrame(data_list_df, columns)
print("初始 DataFrame:")
df.show() # 行动: 显示 DataFrame 内容
# 转换: select 选择特定列
df_selected = df.select("Name", "Occupation")
print("选择 Name 和 Occupation 列后的 DataFrame:")
df_selected.show()
# 转换: filter 过滤行 (Age > 28)
df_filtered_df = df.filter(col("Age") > 28) # 或者 df.filter("Age > 28")
print("过滤 Age > 28 后的 DataFrame:")
df_filtered_df.show()
# 转换: groupBy 和 aggregate (按职业计算平均年龄)
df_grouped = df.groupBy("Occupation").agg(avg("Age").alias("AverageAge"))
print("各职业的平均年龄:")
df_grouped.show()
# 行动: collect (以 Row 对象列表的形式获取所有数据)
collected_data = df_grouped.collect()
print(f"收集到的分组数据: {collected_data}")
# 4. 停止 SparkSession
spark.stop()
PySpark on Hadoop
虽然 PySpark 可以在本地模式下运行,但其真正的威力来自于在集群上运行。PySpark 与 Hadoop 无缝集成:
- YARN (Yet Another Resource Negotiator):PySpark 应用程序可以作为 YARN 应用程序运行。YARN 负责在 Hadoop 集群中为您的 Spark 作业分配资源(CPU、内存)。
- HDFS (Hadoop Distributed File System):PySpark 可以从 HDFS 读取数据并将数据写入 HDFS,这使其非常适合处理已存储在 Hadoop 集群(如课程 6 中概念性设置的集群)中的大型数据集。
当向 YARN 集群提交 PySpark 作业时,通常会使用 spark-submit 并指定一个类似 yarn 的 master URL(例如 spark-submit --master yarn my_pyspark_app.py)。
2. Redis 简介
Redis (Remote Dictionary Server) 是一个开源的内存数据结构存储系统,可用作数据库、缓存、消息代理和流处理引擎。它以其速度和灵活性而闻名。
什么是 Redis?
Redis 主要将数据存储在内存中,这使得读写操作非常快。它支持丰富的数据结构,使其能够灵活应对各种使用场景。
Redis 主要特性:
- 性能:由于其内存特性,可实现非常高的吞吐量和低延迟。
- 键值存储:Redis 的核心是一个键值存储系统,但“值”可以是复杂的数据结构。
- 丰富的数据结构:
- 字符串 (Strings):简单的文本或二进制数据,最大可达 512MB。
- 列表 (Lists):有序的字符串集合,类似于 Python 列表。可用于实现队列或栈。
- 哈希 (Hashes):存储包含字段-值对的对象,类似于 Python 字典。
- 集合 (Sets):无序的唯一字符串集合。可用于跟踪唯一项、执行集合操作(并集、交集)。
- 有序集合 (Sorted Sets):每个成员都与一个分数相关联的集合。成员按分数排序,使其适用于排行榜或优先队列。
- 持久化:Redis 提供数据持久化选项(RDB 快照、AOF 日志),因此即使服务器重新启动,数据也不会丢失。
- Lua 脚本:在服务器上执行复杂的原子操作。
- 发布/订阅 (Pub/Sub):内置的发布/订阅消息传递功能。
基本 Redis 命令与 Python 示例 (`redis-py`)
要将 Redis 与 Python 结合使用,请安装 redis-py 库:pip install redis
对于本地开发,您可以使用 Docker 运行 Redis:docker run -d -p 6379:6379 --name my-redis redis:alpine
以下示例使用 redis-py 与 Redis 服务器进行交互。
import redis
# 连接到 Redis (确保 Redis 服务器正在运行)
try:
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) # decode_responses=True 以获取字符串而非字节
r.ping() # 检查连接
print("成功连接到 Redis!") # "Successfully connected to Redis!"
except redis.exceptions.ConnectionError as e:
print(f"无法连接到 Redis: {e}") # "Could not connect to Redis:"
exit()
# 1. 字符串操作
print("\\n--- 字符串操作 ---")
r.set('user:1000:name', 'Alice Wonderland')
user_name = r.get('user:1000:name')
print(f"GET user:1000:name -> {user_name}")
r.set('counter', 10)
r.incr('counter') # 将值增加 1
r.incrby('counter', 5) # 将值增加 5
print(f"GET counter -> {r.get('counter')}")
# 2. 列表操作 (例如,用于简单队列)
print("\\n--- 列表操作 ---")
list_key = 'task_queue'
r.delete(list_key) # 清空列表以重新开始
r.lpush(list_key, 'task1', 'task2', 'task3') # 推入左侧 (头部)
print(f"LPUSH 到 {list_key}: task1, task2, task3")
r.rpush(list_key, 'task0') # 推入右侧 (尾部)
print(f"RPUSH 到 {list_key}: task0")
tasks = r.lrange(list_key, 0, -1) # 获取列表中的所有元素
print(f"LRANGE {list_key} (全部) -> {tasks}")
task = r.rpop(list_key) # 从右侧弹出 (如果是 LPUSH 推入则为 FIFO,如果是 RPUSH 推入则为 LIFO)
print(f"RPOP 从 {list_key} -> {task}")
print(f"LRANGE {list_key} (RPOP 之后) -> {r.lrange(list_key, 0, -1)}")
# 3. 哈希操作 (例如,用于存储对象属性)
print("\\n--- 哈希操作 ---")
hash_key = 'user:1001'
r.delete(hash_key)
r.hset(hash_key, mapping={ # 在新版 redis-py 中,hset 可以直接接受 mapping
'name': 'Bob The Builder',
'age': 40,
'occupation': 'Builder'
})
print(f"HSET 操作 {hash_key}")
user_name_hash = r.hget(hash_key, 'name')
print(f"HGET {hash_key} 字段 'name' -> {user_name_hash}")
user_info = r.hgetall(hash_key) # 获取哈希中的所有字段和值
print(f"HGETALL {hash_key} -> {user_info}")
# 4. 集合操作 (例如,用于唯一项)
print("\\n--- 集合操作 ---")
set_key = "unique_visitors_today"
r.delete(set_key)
r.sadd(set_key, "user:A", "user:B", "user:C", "user:A") # "user:A" 只添加一次
print(f"SADD 到 {set_key}: user:A, user:B, user:C, user:A")
print(f"SMEMBERS {set_key} -> {r.smembers(set_key)}")
print(f"SISMEMBER {set_key} 'user:B' -> {r.sismember(set_key, 'user:B')}") # 检查成员是否存在
print(f"SCARD {set_key} (基数/数量) -> {r.scard(set_key)}")
# 清理一些键 (可选)
# r.delete('user:1000:name', 'counter', list_key, hash_key, set_key)
以上示例展示了基本命令。Redis 为每种数据类型提供了更多命令,以及事务、流水线和更高级的功能。
3. PySpark 与 Redis 集成 (概念性)
PySpark 和 Redis 可以在各种场景中有效地协同工作,利用 Spark 的处理能力和 Redis 的数据访问速度。
常见用例:
- 缓存查找数据:
PySpark 作业通常需要通过从外部表或维度数据中查找值来丰富数据。如果此查找数据很大但适合放入 Redis,或者其经常访问的部分适合放入 Redis,则 Redis 可以作为快速缓存。每个 Spark 执行器都可以直接查询 Redis,从而减少数据库负载并加快查找速度。
示例:一个处理用户活动的 Spark 作业可能会查找存储在 Redis 中的用户个人资料详细信息(例如,人口统计信息、偏好设置)。
- 存储聚合结果或摘要:
处理大量数据后,PySpark 可以计算聚合、摘要或机器学习模型特征。这些结果如果不是特别大,可以写入 Redis,以便下游应用程序、仪表板或 API 快速访问。
示例:一个每日 Spark 作业分析销售数据,并将各地区最畅销产品存储在 Redis 哈希中,供实时仪表板使用。
- 快速中间数据存储:
在多阶段 Spark 应用程序中,需要由后续阶段或其他应用程序快速访问的中间结果可以潜在地存储在 Redis 中,尽管对于作业内数据,通常首选 Spark 自己的缓存机制。
- 提供机器学习模型:
PySpark 计算的特征可以存储在 Redis 中。当预测请求传入时,应用程序可以从 Redis 中快速获取这些特征以输入模型(该模型也可能通过 Redis 或其他系统提供服务)。
将 PySpark 连接到 Redis 时,通常会在每个 Spark 任务/分区内初始化 Redis 连接(例如,在 mapPartitions 函数内部),以避免连接对象本身的序列化问题。像 spark-redis 这样的库也可以促进更集成的访问。
结论
PySpark 为分布式数据处理提供了一个稳健且可扩展的引擎,允许对大型数据集进行复杂分析。Redis 提供了一个高性能的内存数据存储,非常适合缓存、快速查找和管理动态数据结构。了解这两种技术以及它们如何相互补充,为构建高效的大数据应用程序开辟了强大的可能性。