AI教我做事之RAG开发-23 RAG框架之Pathway平台框架研究

1 Pathway简介

Pathway 是一个高性能、低延迟的 Python ETL(Extract, Transform, Load)框架,专为流处理、实时分析、大型语言模型(LLM)管道以及检索增强生成(RAG)设计。它提供了一个简单易用的 Python API,开发者可以无缝集成常用的 Python 机器学习库,例如 LangChain 或其他嵌入模型。Pathway 的代码设计兼顾开发和生产环境,能够高效处理批处理和流式数据。

官网地址:https://pathway.com/

其核心是一个基于 Rust 开发的引擎,采用 Differential Dataflow 技术,支持增量计算、多线程、多进程以及分布式计算,从而实现卓越的性能和可扩展性。

  1. Pathway的特点
  • 流处理 Pathway 专注于实时数据流处理,能够以低延迟和高吞吐量处理动态数据,非常适合实时问答、监控系统等场景。
  • LLM集成 Pathway 提供了专门的 LLM 工具集,支持构建实时 LLM 和 RAG 管道。它包括对 LLM 服务(如 OpenAI)的封装、文本解析器、嵌入生成器和文本分割器等实用工具。
  • 数据连接器 Pathway 支持多种数据源,包括 Kafka、Google Drive、PostgreSQL、SharePoint 等。此外,通过与 Airbyte 的集成,Pathway 可以连接超过 300 种数据源,极大地扩展了其适用范围。
  • 可扩展性 得益于 Rust 引擎的支持,Pathway 能够轻松实现分布式计算,适合处理大规模数据集和复杂任务。
  • 社区支持 Pathway 拥有一个活跃的社区,提供详尽的文档、教程以及 Discord 平台上的技术支持,帮助用户快速上手并解决问题。
AI教我做事之RAG开发-23 RAG框架之Pathway平台框架研究

2 Pathway与RAGFlow的对比

为了更好地理解 Pathway 的定位,我们将其与另一个开源 RAG 引擎 RAGFlow 进行对比:

  • RAGFlow
    • 定位:专注于深度文档理解和知识图谱提取,特别适合企业文档问答和知识管理场景。
    • 优势:提供用户友好的图形界面,支持复杂格式文档(如 PDF、Word)的解析。
    • 适用场景:更适合静态文档处理和知识库构建。
  • Pathway
    • 定位:强调高性能、实时数据处理和 LLM 集成,适合实时问答和高性能 RAG 应用。
    • 优势:支持流式数据处理,提供 YAML 配置和图形界面,易于快速开发和部署。
    • 适用场景:更适合动态数据处理和实时响应场景。
  • 易用性 Pathway 通过 Python API 和 YAML 配置简化了开发流程,开发者可以快速构建和调整应用。
  • 性能 Pathway 的 Rust 引擎和增量计算能力使其在处理大规模数据和实时任务时表现优异。
  • 可扩展性 Pathway 支持分布式计算,能够轻松扩展以应对企业级需求。

3 Pathway的Python演示代码

以下是一个完整的 Pathway RAG 应用示例,展示如何读取文档、生成嵌入并运行向量存储服务器。此代码假设您有一个包含文档的 ./data 文件夹,并使用 OpenAI 的嵌入模型。

import pathway as pw

# Declare the Schema of your tables using pw.Schema.
# There are two input tables: (1) measurements which is 
# live stream and (2) threshold which is a CSV that might be modified over time.
# Both have two columns: a name (str) and a float.
class MeasurementSchema(pw.Schema):
    name: str
    value: float

class ThresholdSchema(pw.Schema):
    name: str
    threshold: float

# Define Kafka configuration to connect to your Kafka instance
rdkafka_settings = {
    "bootstrap.servers": "server-address:9092",
    "security.protocol": "sasl_ssl",
    "sasl.mechanism": "SCRAM-SHA-256",
    "group.id": "$GROUP_NAME",
    "session.timeout.ms": "6000",
    "sasl.username": "username",
    "sasl.password": "********",
}

# Accessing the measurements using the Kafka Connector
measurements_table = pw.io.kafka.read(
    rdkafka_settings,
    topic="topic",
    schema=MeasurementSchema,
    format="json",
    autocommit_duration_ms=1000
)

# Accessing the threshold data stored in CSV files
thresholds_table = pw.io.csv(
    './threshold-data/',
    schema=ThresholdSchema,
)

# Joining tables on the column name
joined_table = (
    # The left table is measurements_table (referred as pw.left)
    measurements_table
    .join(
        # The right table is thresholds_table (referred as pw.right)
        thresholds_table,
        # The join is done on the column name of each table 
        pw.left.name==pw.right.name,
    )
    # The columns of the joined table are chosen using select
    .select(
        # All the columns of measurements are kept
        *pw.left,
        # The threshold column of the threshold table is kept
        pw.right.threshold
    )
)

alerts_table = (
    joined_values
    # Filtering value strictly higher than the threshold.
    .filter(pw.this.value > pw.this.threshold)
    # Only name and value fields are kept
    .select(pw.this.name, pw.this.value)
)

# Sending the results to another Kafka topic, on the same Kafka instance
pw.io.kafka.write(
    alerts_table, rdkafka_settings, topic_name="alerts_topic", format="json"
)

# Launching Pathway computation.
pw.run()

代码说明

  1. 依赖导入
    1. pathway:Pathway 的核心库。
    2. VectorStoreServer:Pathway 提供的向量存储服务类。
    3. OpenAIEmbeddings:LangChain 提供的 OpenAI 嵌入模型。
    4. CharacterTextSplitter:用于将文档分割成小块。
  2. API密钥设置
    1. 从环境变量中读取 OpenAI API 密钥,确保安全性。
  3. 数据读取
    1. 使用 pw.io.fs.read 从 ./data 文件夹读取文件,支持流式处理(mode=”streaming”)。
  4. 嵌入和分割
    1. 使用 OpenAI 的嵌入模型生成向量。
    2. 将文档分割成大小为 1000 字符的块,重叠 200 字符以保留上下文。
  5. 服务器运行
    1. 创建并运行 VectorStoreServer,使用 FAISS 作为向量存储后端。
    2. 服务器监听 0.0.0.0:8756,可以通过网络访问。

运行准备

  • 安装依赖
  • bash
  • pip install pathway langchain-openai
  • 设置环境变量
  • bash
  • export OPENAI_API_KEY="your-openai-api-key"
  • 准备数据:在项目根目录下创建 ./data 文件夹,放入需要处理的文档(如 .txt 文件)。

运行代码后,服务器将启动并监听指定端口,您可以通过 HTTP 请求查询嵌入后的文档内容。

  1. Pathway的应用场景
  • 实时问答 Pathway 的流处理能力使其能够实时响应用户查询,适用于客服系统或在线问答平台。
  • LLM管道 支持构建复杂的 LLM 工作流,例如自动文本摘要、翻译或内容生成。
  • 数据分析 Pathway 的高性能特性使其适合实时分析大规模数据集,例如金融市场监控或日志分析。
  1. Pathway的未来发展

Pathway 目前处于快速发展阶段,未来计划包括:

  • 支持更多数据源,进一步丰富其连接器生态。
  • 优化 LLM 和 RAG 功能,提升性能和用户体验。
  • 通过社区反馈不断改进,提供更多实用工具和模板。
AI教我做事之RAG开发-23 RAG框架之Pathway平台框架研究
  1. 结论

Pathway 是一个功能强大且易于使用的 Python 框架,特别适合需要高性能、实时数据处理和 LLM 集成的场景。其灵活的 API 和 Rust 驱动的引擎为开发者提供了高效的工具,用于构建和部署复杂的 RAG 应用。无论是实时问答、数据分析还是 LLM 管道管理,Pathway 都能提供卓越的性能和开发体验。

RA/SD 衍生者AI训练营。发布者:稻草人,转载请注明出处:https://www.shxcj.com/ai%e6%95%99%e6%88%91%e5%81%9a%e4%ba%8b%e4%b9%8brag%e5%bc%80%e5%8f%91-23-rag%e6%a1%86%e6%9e%b6%e4%b9%8bpathway%e5%b9%b3%e5%8f%b0%e6%a1%86%e6%9e%b6%e7%a0%94%e7%a9%b6/

(0)
上一篇 1天前
下一篇 23小时前

相关推荐

发表回复

登录后才能评论
本文授权以下站点有原版访问授权 https://www.shxcj.com https://www.2img.ai https://www.2video.cn