1 Pathway简介
Pathway 是一个高性能、低延迟的 Python ETL(Extract, Transform, Load)框架,专为流处理、实时分析、大型语言模型(LLM)管道以及检索增强生成(RAG)设计。它提供了一个简单易用的 Python API,开发者可以无缝集成常用的 Python 机器学习库,例如 LangChain 或其他嵌入模型。Pathway 的代码设计兼顾开发和生产环境,能够高效处理批处理和流式数据。
官网地址:https://pathway.com/
其核心是一个基于 Rust 开发的引擎,采用 Differential Dataflow 技术,支持增量计算、多线程、多进程以及分布式计算,从而实现卓越的性能和可扩展性。
- Pathway的特点
- 流处理 Pathway 专注于实时数据流处理,能够以低延迟和高吞吐量处理动态数据,非常适合实时问答、监控系统等场景。
- LLM集成 Pathway 提供了专门的 LLM 工具集,支持构建实时 LLM 和 RAG 管道。它包括对 LLM 服务(如 OpenAI)的封装、文本解析器、嵌入生成器和文本分割器等实用工具。
- 数据连接器 Pathway 支持多种数据源,包括 Kafka、Google Drive、PostgreSQL、SharePoint 等。此外,通过与 Airbyte 的集成,Pathway 可以连接超过 300 种数据源,极大地扩展了其适用范围。
- 可扩展性 得益于 Rust 引擎的支持,Pathway 能够轻松实现分布式计算,适合处理大规模数据集和复杂任务。
- 社区支持 Pathway 拥有一个活跃的社区,提供详尽的文档、教程以及 Discord 平台上的技术支持,帮助用户快速上手并解决问题。

2 Pathway与RAGFlow的对比
为了更好地理解 Pathway 的定位,我们将其与另一个开源 RAG 引擎 RAGFlow 进行对比:
- RAGFlow
- 定位:专注于深度文档理解和知识图谱提取,特别适合企业文档问答和知识管理场景。
- 优势:提供用户友好的图形界面,支持复杂格式文档(如 PDF、Word)的解析。
- 适用场景:更适合静态文档处理和知识库构建。
- Pathway
- 定位:强调高性能、实时数据处理和 LLM 集成,适合实时问答和高性能 RAG 应用。
- 优势:支持流式数据处理,提供 YAML 配置和图形界面,易于快速开发和部署。
- 适用场景:更适合动态数据处理和实时响应场景。
- 易用性 Pathway 通过 Python API 和 YAML 配置简化了开发流程,开发者可以快速构建和调整应用。
- 性能 Pathway 的 Rust 引擎和增量计算能力使其在处理大规模数据和实时任务时表现优异。
- 可扩展性 Pathway 支持分布式计算,能够轻松扩展以应对企业级需求。
3 Pathway的Python演示代码
以下是一个完整的 Pathway RAG 应用示例,展示如何读取文档、生成嵌入并运行向量存储服务器。此代码假设您有一个包含文档的 ./data 文件夹,并使用 OpenAI 的嵌入模型。
import pathway as pw
# Declare the Schema of your tables using pw.Schema.
# There are two input tables: (1) measurements which is
# live stream and (2) threshold which is a CSV that might be modified over time.
# Both have two columns: a name (str) and a float.
class MeasurementSchema(pw.Schema):
name: str
value: float
class ThresholdSchema(pw.Schema):
name: str
threshold: float
# Define Kafka configuration to connect to your Kafka instance
rdkafka_settings = {
"bootstrap.servers": "server-address:9092",
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-256",
"group.id": "$GROUP_NAME",
"session.timeout.ms": "6000",
"sasl.username": "username",
"sasl.password": "********",
}
# Accessing the measurements using the Kafka Connector
measurements_table = pw.io.kafka.read(
rdkafka_settings,
topic="topic",
schema=MeasurementSchema,
format="json",
autocommit_duration_ms=1000
)
# Accessing the threshold data stored in CSV files
thresholds_table = pw.io.csv(
'./threshold-data/',
schema=ThresholdSchema,
)
# Joining tables on the column name
joined_table = (
# The left table is measurements_table (referred as pw.left)
measurements_table
.join(
# The right table is thresholds_table (referred as pw.right)
thresholds_table,
# The join is done on the column name of each table
pw.left.name==pw.right.name,
)
# The columns of the joined table are chosen using select
.select(
# All the columns of measurements are kept
*pw.left,
# The threshold column of the threshold table is kept
pw.right.threshold
)
)
alerts_table = (
joined_values
# Filtering value strictly higher than the threshold.
.filter(pw.this.value > pw.this.threshold)
# Only name and value fields are kept
.select(pw.this.name, pw.this.value)
)
# Sending the results to another Kafka topic, on the same Kafka instance
pw.io.kafka.write(
alerts_table, rdkafka_settings, topic_name="alerts_topic", format="json"
)
# Launching Pathway computation.
pw.run()
代码说明
- 依赖导入
- pathway:Pathway 的核心库。
- VectorStoreServer:Pathway 提供的向量存储服务类。
- OpenAIEmbeddings:LangChain 提供的 OpenAI 嵌入模型。
- CharacterTextSplitter:用于将文档分割成小块。
- API密钥设置
- 从环境变量中读取 OpenAI API 密钥,确保安全性。
- 数据读取
- 使用 pw.io.fs.read 从 ./data 文件夹读取文件,支持流式处理(mode=”streaming”)。
- 嵌入和分割
- 使用 OpenAI 的嵌入模型生成向量。
- 将文档分割成大小为 1000 字符的块,重叠 200 字符以保留上下文。
- 服务器运行
- 创建并运行 VectorStoreServer,使用 FAISS 作为向量存储后端。
- 服务器监听 0.0.0.0:8756,可以通过网络访问。
运行准备
- 安装依赖:
- bash
pip install pathway langchain-openai
- 设置环境变量:
- bash
export OPENAI_API_KEY="your-openai-api-key"
- 准备数据:在项目根目录下创建 ./data 文件夹,放入需要处理的文档(如 .txt 文件)。
运行代码后,服务器将启动并监听指定端口,您可以通过 HTTP 请求查询嵌入后的文档内容。
- Pathway的应用场景
- 实时问答 Pathway 的流处理能力使其能够实时响应用户查询,适用于客服系统或在线问答平台。
- LLM管道 支持构建复杂的 LLM 工作流,例如自动文本摘要、翻译或内容生成。
- 数据分析 Pathway 的高性能特性使其适合实时分析大规模数据集,例如金融市场监控或日志分析。
- Pathway的未来发展
Pathway 目前处于快速发展阶段,未来计划包括:
- 支持更多数据源,进一步丰富其连接器生态。
- 优化 LLM 和 RAG 功能,提升性能和用户体验。
- 通过社区反馈不断改进,提供更多实用工具和模板。

- 结论
Pathway 是一个功能强大且易于使用的 Python 框架,特别适合需要高性能、实时数据处理和 LLM 集成的场景。其灵活的 API 和 Rust 驱动的引擎为开发者提供了高效的工具,用于构建和部署复杂的 RAG 应用。无论是实时问答、数据分析还是 LLM 管道管理,Pathway 都能提供卓越的性能和开发体验。
RA/SD 衍生者AI训练营。发布者:稻草人,转载请注明出处:https://www.shxcj.com/ai%e6%95%99%e6%88%91%e5%81%9a%e4%ba%8b%e4%b9%8brag%e5%bc%80%e5%8f%91-23-rag%e6%a1%86%e6%9e%b6%e4%b9%8bpathway%e5%b9%b3%e5%8f%b0%e6%a1%86%e6%9e%b6%e7%a0%94%e7%a9%b6/