课程 9:PySpark 数据挖掘及 Redis 结果存储

本课程演示一个使用 PySpark 的实际数据挖掘任务——特别是词频统计——并展示如何使用 Redis 高效地存储和检索这些结果。这将 Spark 的分布式处理能力与 Redis 的快速内存访问相结合。

1. 使用 PySpark 进行数据挖掘:词频统计示例

词频统计是分布式数据处理中一个经典的入门示例。它演示了 PySpark 中使用 RDD (弹性分布式数据集) 的关键转换和行动操作。

逐步进行 PySpark 词频统计

确保已安装 PySpark (pip install pyspark)。此示例在本地模式下运行。

1. 创建样本数据并初始化 SparkSession

我们将首先定义一些样本文本并创建一个 SparkSession。

from pyspark.sql import SparkSession

# 创建一个 SparkSession
spark = SparkSession.builder \
    .appName("PySparkWordCountToRedis") \
    .master("local[*]") \
    .getOrCreate()

sc = spark.sparkContext # 获取 SparkContext 用于 RDD 操作

# 样本文本数据
text_data = [
    "Hello Spark, this is a sample text.",
    "Spark is great for big data processing.",
    "This sample text will be used for word count.",
    "Hello again Spark, process this text."
]

# 从字符串列表创建 RDD
text_rdd = sc.parallelize(text_data)
print("初始 RDD 内容:") # "Initial RDD content:"
for line in text_rdd.collect():
    print(line)

2. 使用 RDD 转换执行词频统计

我们将使用几个 RDD 转换来统计每个单词的出现次数:

  • flatMap():将每一行分割成单个单词。使用 lower() 使计数不区分大小写。
  • map():为每个单词创建 (单词, 1) 的键值对。
  • reduceByKey():聚合每个单词的计数。
# FlatMap 将行分割成单词,转换为小写,并移除标点符号 (基本处理)
words_rdd = text_rdd.flatMap(lambda line: line.lower().replace('.', '').replace(',', '').split(" "))

# 将每个单词映射为 (单词, 1) 元组
word_map_rdd = words_rdd.map(lambda word: (word, 1))

# 过滤掉可能由多个空格产生的空字符串
word_map_rdd = word_map_rdd.filter(lambda x: x[0] != "")

# 按键聚合以统计每个单词的出现次数
word_counts_rdd = word_map_rdd.reduceByKey(lambda a, b: a + b)

# 行动:收集结果
word_counts_result = word_counts_rdd.collect()

print("\\nPySpark 词频统计结果:") # "Word Count Results from PySpark:"
for word, count in word_counts_result:
    print(f"'{word}': {count}")

DataFrame 方法 (替代方案):对于更结构化的数据或复杂操作,通常首选 DataFrame。可以通过将 RDD 转换为 DataFrame 并使用 DataFrame API 操作(如 explodegroupBycount)来完成类似的词频统计。

2. 将 PySpark 结果存储在 Redis 中

一旦我们从 PySpark 获得了分析结果,就可以将它们存储在 Redis 中,以便其他应用程序快速访问或用于缓存。

在 Redis 哈希中存储词频统计结果

Redis 哈希 (Hash) 是存储词频统计的合适数据结构,其中每个单词是哈希中的一个字段,其计数是值。我们将使用 `redis-py` 库。

确保 Redis 正在运行且可访问。您可以通过 Docker 运行它:docker run -d -p 6379:6379 --name my-redis redis:alpine

安装 Python Redis 客户端:pip install redis

1. 连接到 Redis 并存储结果

import redis

# 假设 word_counts_result 可从上面的 PySpark 部分获得
# word_counts_result = [('hello', 2), ('spark', 3), ...] 

# Redis 连接详细信息
redis_host = 'localhost'
redis_port = 6379
redis_db = 0
redis_hash_key = 'pyspark_word_counts' # Redis 中哈希的名称

try:
    r = redis.Redis(host=redis_host, port=redis_port, db=redis_db, decode_responses=True)
    r.ping()
    print("\\n成功连接到 Redis!") # "Successfully connected to Redis!"

    # 为此示例清除哈希中的任何旧数据
    r.delete(redis_hash_key)

    # 将结果存储在 Redis 哈希中
    # 使用管道进行更高效的批量插入
    pipe = r.pipeline()
    for word, count in word_counts_result:
        if word: # 确保单词不为空
            pipe.hset(redis_hash_key, word, count)
    pipe.execute()
    
    print(f"已在 Redis 哈希 '{redis_hash_key}' 中存储 {len(word_counts_result)} 个词频统计。") # "Stored ... word counts in Redis hash ..."

except redis.exceptions.ConnectionError as e:
    print(f"无法连接到 Redis: {e}") # "Could not connect to Redis:"
    # 在实际应用程序中,应适当处理此错误
    # 对于此脚本,如果 Redis 不可用,我们将退出
    spark.stop() # 如果 Redis 失败,则停止 Spark 会话
    exit()
except Exception as e:
    print(f"Redis 操作期间发生错误: {e}") # "An error occurred during Redis operations:"
    spark.stop()
    exit()

2. 从 Redis 检索结果

try:
    # 从哈希中检索所有词频统计
    retrieved_word_counts = r.hgetall(redis_hash_key)
    print(f"\\n从 Redis 哈希 '{redis_hash_key}' 检索到的所有词频统计:") # "Retrieved all word counts from Redis hash..."
    if retrieved_word_counts:
        for word, count in retrieved_word_counts.items():
            print(f"'{word}': {count}")
    else:
        print("在 Redis 中未找到词频统计(或键不存在)。") # "No word counts found in Redis (or key does not exist)."

    # 检索特定单词的计数
    specific_word = 'spark'
    count_specific_word = r.hget(redis_hash_key, specific_word)
    if count_specific_word:
        print(f"\\n单词 '{specific_word}' 的计数: {count_specific_word}") # "Count for word..."
    else:
        print(f"\\n在 Redis 哈希中未找到单词 '{specific_word}'。") # "Word ... not found in Redis hash."

except Exception as e:
    print(f"从 Redis 检索数据时发生错误: {e}") # "An error occurred while retrieving data from Redis:"

finally:
    # 在所有操作结束时停止 SparkSession
    spark.stop()
    print("\\nSparkSession 已停止。") # "SparkSession stopped."

替代 Redis 结构:用于排序结果的有序集合 (Sorted Set)

如果您想轻松检索按频率排序的单词,Redis 有序集合将非常适用。您可以使用单词作为成员,其计数作为分数:
r.zadd('word_counts_sorted_set', {word: count for word, count in word_counts_result})
然后,您可以使用 r.zrevrange('word_counts_sorted_set', 0, N-1, withscores=True) 检索前 N 个单词。

3. 与先前课程和 Docker 设置的集成

此示例可以利用先前课程中讨论的环境。

使用 Docker化的 Hadoop 和 Redis 运行

  • Hadoop 上的 PySpark (课程 6):

    如果您的输入数据(例如大型文本文件)位于 HDFS(来自课程 6)中,则可以将 PySpark 应用程序配置为在 YARN 集群上运行。spark-submit 脚本将用于将应用程序部署到集群。Spark 作业将从 HDFS 读取,执行词频统计,然后将结果(在驱动程序处收集或按分区处理)写入 Redis。

  • Redis 实例 (课程 8):

    课程 8 中设置的 Redis 实例(或类似的 Docker化 Redis)将用作结果的存储。PySpark 应用程序(无论是在本地运行、在 YARN 上运行还是在其自己的 Docker 容器中运行)都需要网络访问此 Redis 容器的 IP 地址和端口(通常为 6379)。

用于 Spark 和 Redis 的概念性 Docker Compose

对于运行与 Redis 交互的 PySpark 作业的自包含环境(无需课程 6 中的完整 Hadoop 设置),您可以使用 Docker Compose。这对于开发和测试使用 Redis 的 Spark 应用程序非常有用。

# docker-compose-spark-redis.yml
version: '3.8'

services:
  spark-master:
    image: bitnami/spark:3 # 使用特定版本,例如 3.2.1
    container_name: spark-master
    hostname: spark-master
    ports:
      - "8080:8080" # Spark Master Web UI
      - "7077:7077" # Spark Master RPC
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
    networks:
      - spark_redis_network

  spark-worker:
    image: bitnami/spark:3
    container_name: spark-worker-1
    hostname: spark-worker-1
    depends_on:
      - spark-master
    ports:
      - "8081:8081" # Spark Worker Web UI
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
    networks:
      - spark_redis_network

  redis:
    image: redis:alpine
    container_name: redis_service
    hostname: redis_service
    ports:
      - "6379:6379"
    networks:
      - spark_redis_network

networks:
  spark_redis_network:
    driver: bridge

使用此 Docker Compose 运行 PySpark 脚本:

  1. 启动服务:docker-compose -f docker-compose-spark-redis.yml up -d
  2. 将您的 PySpark 脚本提交到 Spark master。通常,您可以从主机执行此操作(如果主机上安装了 PySpark),或者从另一个具有 PySpark 并可以网络访问 Spark master 的容器中执行。
    从主机提交的示例:
    spark-submit --master spark://localhost:7077 your_pyspark_script.py
    (如果从同一 Docker 网络内的容器运行脚本,请确保脚本连接到 Redis 地址为 redis_service:6379;如果 Redis 端口已映射并且脚本在主机上运行,则连接到 localhost:6379)。
  3. 或者,对于本课程脚本(在本地模式下运行 Spark)的更简单设置,您只需要 Redis 容器。PySpark 脚本本身将在您的主机(或任何装有 Python、PySpark 和 Redis 库的机器)上运行,并通过其映射端口(例如 `localhost:6379`)连接到 Redis 容器。

结论

本课程演示了一种常见模式:使用 PySpark 执行数据分析和挖掘任务,然后将有价值的(通常是聚合的)结果存储在像 Redis 这样的快速键值存储中。这使得其他应用程序可以快速访问这些见解,而无需重新运行复杂的计算。这种组合利用了两种技术的优势,可实现高效的大数据工作流程。