
OpenAI ChatGPT API 與 React JS 的完美結(jié)合:全面指南
touch .gitignore
在下一節(jié)中,我們將使用 FastAPI 和 Python 構(gòu)建我們的聊天 Web 服務(wù)器。
在本節(jié)中,我們將使用 FastAPI 構(gòu)建聊天服務(wù)器來與用戶通信。我們將使用 WebSockets 來確保客戶端和服務(wù)器之間的雙向通信,以便我們可以實(shí)時(shí)向用戶發(fā)送響應(yīng)。
要啟動(dòng)我們的服務(wù)器,我們需要設(shè)置我們的 Python 環(huán)境。在 VS Code 中打開項(xiàng)目文件夾,然后打開終端。
從項(xiàng)目根目錄進(jìn)入服務(wù)器目錄,并運(yùn)行以下命令:
python3.8 -m venv env
這將為我們的Python項(xiàng)目創(chuàng)建一個(gè)名為“env”的虛擬環(huán)境。要激活虛擬環(huán)境,請運(yùn)行以下命令:
source env/bin/activate
接下來,在 Python 環(huán)境中安裝幾個(gè)庫。
pip install fastapi uuid uvicorn gunicorn WebSockets python-dotenv aioredis
接下來,在終端中運(yùn)行“touch .env”命令來創(chuàng)建一個(gè)環(huán)境文件。我們將在.env文件中定義應(yīng)用程序變量和秘密變量。
添加應(yīng)用程序環(huán)境變量,并將其設(shè)置為“development”,如下所示:export APP_ENV=development。接下來,我們將使用FastAPI服務(wù)器設(shè)置一個(gè)開發(fā)服務(wù)器。
在服務(wù)器目錄的根目錄中,創(chuàng)建一個(gè)名為 然后粘貼以下開發(fā)服務(wù)器代碼的新文件:main.py
from fastapi import FastAPI, Requestimport uvicornimport os
from dotenv import load_dotenv
load_dotenv()
api = FastAPI()
@api.get("/test")
async def root():
return {"msg": "API is Online"}
if __name__ == "__main__":
if os.environ.get('APP_ENV') == "development":
uvicorn.run("main:api", host="0.0.0.0", port=3500,
workers=4, reload=True) else:
pass
首先,我們導(dǎo)入FastAPI并將其初始化為api。然后我們從python-dotenv
庫中導(dǎo)入load_dotenv
,并初始化它以加載.env文件中的變量,
然后,我們創(chuàng)建一個(gè)簡單的測試路由來測試 API。測試路由將返回一個(gè)簡單的 JSON 響應(yīng),告訴我們 API 已聯(lián)機(jī)。
最后,我們通過使用uvicorn.run
并提供所需的參數(shù)來設(shè)置開發(fā)服務(wù)器。API將在端口3500上運(yùn)行。
最后,在終端中使用python main.py
運(yùn)行服務(wù)器。一旦你在終端上看到應(yīng)用程序啟動(dòng)完成,在瀏覽器上導(dǎo)航到URL http://localhost:3500/test,你應(yīng)該會得到一個(gè)這樣的網(wǎng)頁:
在本節(jié)中,我們將為API添加路由。首先,創(chuàng)建一個(gè)名為“src”的新文件夾,這個(gè)文件夾將作為我們存放所有API代碼的主要目錄。
創(chuàng)建一個(gè)名為routes的子文件夾,cd到該文件夾中,創(chuàng)建一個(gè)名為chat.py的新文件,然后添加以下代碼:
import os
from fastapi import APIRouter, FastAPI, WebSocket, Request
chat = APIRouter()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(request: Request):
return None
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket = WebSocket):
return None
我們創(chuàng)建了三個(gè)終端節(jié)點(diǎn):
/token
將向用戶頒發(fā)用于訪問聊天會話的會話令牌。由于聊天應(yīng)用程序?qū)⒐_開放,我們不想擔(dān)心身份驗(yàn)證,只需保持簡單 – 但我們?nèi)匀恍枰环N方法來識別每個(gè)唯一的用戶會話。/refresh_token
如果連接丟失,將獲取用戶的會話歷史記錄,只要令牌仍處于活動(dòng)狀態(tài)且未過期。/chat
將打開一個(gè) WebSocket 來在客戶端和服務(wù)器之間發(fā)送消息。接下來,將聊天路由連接到我們的主API。首先,我們需要從src
導(dǎo)入chat
。在main.py
文件中聊天。然后,我們將通過在初始化的FastAPI類上調(diào)用include_router
方法并傳遞chat作為參數(shù)來包含路由器。
更新您的代碼,如下所示:
from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
from routes.chat import chat
load_dotenv()
api = FastAPI()
api.include_router(chat)
@api.get("/test")
async def root():
return {"msg": "API is Online"}
if __name__ == "__main__":
if os.environ.get('APP_ENV') == "development":
uvicorn.run("main:api", host="0.0.0.0", port=3500,
workers=4, reload=True)
else:
pass
我們將使用uuid4生成用戶令牌,并利用這個(gè)令牌為聊天終端節(jié)點(diǎn)創(chuàng)建動(dòng)態(tài)路由。鑒于該終端節(jié)點(diǎn)是公開可訪問的,因此關(guān)于JWT和身份驗(yàn)證的詳細(xì)內(nèi)容,我們在此就不做深入探討了。
如果最初uuid
未安裝,請pip install uuid
.接下來,在 chat.py 中,導(dǎo)入 UUID,并使用以下/token
代碼更新路由:
from fastapi import APIRouter, FastAPI, WebSocket, Request, BackgroundTasks, HTTPExceptionimport uuid
# @route POST /token
# @desc Route generating chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
token = str(uuid.uuid4())
data = {"name": name, "token": token}
return data
在上面的代碼中,客戶端提供了他們的名稱,這是必需的。我們快速檢查以確保 name 字段不為空,然后使用 uuid4 生成一個(gè) token。
會話數(shù)據(jù)是 name 和 token 的簡單字典。最終,我們需要持久保存此會話數(shù)據(jù)并設(shè)置超時(shí),但現(xiàn)在我們只需將其返回給客戶端即可。
因?yàn)槲覀儗y試 WebSocket 端點(diǎn),所以我們需要使用像 Postman 這樣的工具來允許這樣做(因?yàn)?FastAPI 上的默認(rèn) swagger 文檔不支持 WebSockets)。
在Postman中,為您的開發(fā)環(huán)境創(chuàng)建一個(gè)集合,并向localhost:3500/token
發(fā)送一個(gè)POST請求,指定名稱作為查詢參數(shù)并傳遞一個(gè)值。您應(yīng)該得到如下所示的響應(yīng):
令牌生成器郵差
在src根目錄下,創(chuàng)建一個(gè)名為socket的新文件夾,并添加一個(gè)名為connection.py
的文件。在這個(gè)文件中,我們將定義控制到WebSockets的連接的類,以及所有連接和斷開連接的助手方法。
在connection.py
添加以下代碼:
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
ConnectionManager
類是用active_connections
屬性初始化的,該屬性是一個(gè)活動(dòng)連接列表。
然后,異步連接方法將接受WebSocket
并將其添加到活動(dòng)連接列表中,而disconnect
方法將從活動(dòng)連接列表中刪除WebSocket
。
最后,send_personal_message
方法將接收消息和我們想要發(fā)送消息的Websocke
t,并異步發(fā)送消息。
WebSockets是一個(gè)相當(dāng)廣泛的主題,我們這里所討論的只是冰山一角。但即便如此,所學(xué)的內(nèi)容應(yīng)該已經(jīng)足夠您創(chuàng)建多個(gè)連接,并能夠異步地處理發(fā)送到這些連接的消息了。
你可以閱讀更多關(guān)于 FastAPI Websockets 和 Sockets 編程 的內(nèi)容。
要使用ConnectionManager
,請?jiān)?code>src.routes.chat.py中導(dǎo)入并初始化它,并用下面的代碼更新/chat
WebSocket路由:
from ..socket.connection import ConnectionManager
manager = ConnectionManager()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
在websocket_endpoint
函數(shù)(它接受一個(gè)WebSocket)中,我們將新的WebSocket添加到連接管理器并運(yùn)行一個(gè)while True
循環(huán),以確保套接字保持打開狀態(tài)。除非套接字?jǐn)嚅_連接。
當(dāng)連接打開時(shí),我們使用websocket.receive_test()
接收客戶端發(fā)送的任何消息,并將它們打印到終端。
然后,我們現(xiàn)在將硬編碼的響應(yīng)發(fā)送回客戶端。最終,從客戶端收到的消息將發(fā)送到 AI 模型,而返回給客戶端的響應(yīng)則將是AI模型生成的回復(fù)。
在Postman中,我們可以通過創(chuàng)建一個(gè)新的WebSocket請求來測試這個(gè)端點(diǎn),并連接到WebSocket端點(diǎn)localhost:3500/chat
。
當(dāng)您單擊 connect 時(shí),Messages 窗格將顯示 API 客戶端已連接到 URL,并且套接字已打開。
要對此進(jìn)行測試,請向聊天服務(wù)器發(fā)送消息 “Hello Bot”,您應(yīng)該會立即收到測試響應(yīng) “Response: Simulating response from the GPT service” ,如下所示:
為了能夠區(qū)分兩個(gè)不同的客戶端會話并限制聊天會話,我們將使用定時(shí)令牌,作為查詢參數(shù)傳遞給 WebSocket 連接。
在 socket 文件夾中,創(chuàng)建一個(gè)名為utils.py
然后添加以下代碼的文件:
from fastapi import WebSocket, status, Query
from typing import Optional
async def get_token(
websocket: WebSocket,
token: Optional[str] = Query(None),
):
if token is None or token == "":
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return token
get_token 函數(shù)接收 WebSocket 和令牌,然后檢查令牌是 None 還是 null。
如果滿足這種情況的話,該函數(shù)會返回策略沖突的狀態(tài),如果可用,該函數(shù)將僅返回令牌。我們稍后將通過額外的令牌驗(yàn)證來擴(kuò)展此功能。
為了使用這個(gè)函數(shù),我們將它注入到/chat
路由中。FastAPI提供了一個(gè)Depends
類來輕松地注入依賴項(xiàng),因此我們不必修改裝飾器。
將/chat
路由更新為以下內(nèi)容:
from ..socket.utils import get_token
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
現(xiàn)在,當(dāng)您嘗試在 Postman 中連接到/chat
終端節(jié)點(diǎn)時(shí),您將收到 403 錯(cuò)誤。目前,提供令牌作為查詢參數(shù),并為令牌提供任何值。然后您應(yīng)該能夠像以前一樣進(jìn)行連接,只是現(xiàn)在連接需要一個(gè)令牌。
郵差聊天測試與令牌
恭喜你走到了這一步!您的chat.py
文件現(xiàn)在應(yīng)如下所示:
import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
chat = APIRouter()
manager = ConnectionManager()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
data = {"name": name, "token": token}
return data
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
在本教程的下一部分,我們將重點(diǎn)介紹如何處理應(yīng)用程序的狀態(tài)以及在客戶端和服務(wù)器之間傳遞數(shù)據(jù)。
我們的應(yīng)用程序目前不存儲任何狀態(tài),并且無法識別用戶或存儲和檢索聊天數(shù)據(jù)。我們還會在聊天會話期間向客戶端返回硬編碼響應(yīng)。
在本教程的這一部分中,我們將介紹以下內(nèi)容:
Redis 是一種開源內(nèi)存數(shù)據(jù)存儲,您可以將其用作數(shù)據(jù)庫、緩存、消息代理和流式處理引擎。它支持多種數(shù)據(jù)結(jié)構(gòu),是具有實(shí)時(shí)功能的分布式應(yīng)用程序的完美解決方案。
Redis Enterprise Cloud 是 Redis 提供的一項(xiàng)完全托管的云服務(wù),可幫助我們無限大規(guī)模部署 Redis 集群,而無需擔(dān)心基礎(chǔ)設(shè)施。
在本教程中,我們將使用免費(fèi)的 Redis Enterprise Cloud 實(shí)例。您可以在此處免費(fèi)開始使用 Redis Cloud,并按照本教程設(shè)置 Redis 數(shù)據(jù)庫和 Redis Insight,這是一個(gè)與 Redis 交互的 GUI。
一旦你的Redis數(shù)據(jù)庫配置完成,請?jiān)陧?xiàng)目的根目錄下(注意,這個(gè)位置是在服務(wù)器文件夾的外部)新建一個(gè)名為“worker
”的文件夾。
我們將我們的 worker 環(huán)境與 Web 服務(wù)器隔離開來,這樣當(dāng)客戶端向我們的 WebSocket 發(fā)送消息時(shí),Web 服務(wù)器就不必處理對第三方服務(wù)的請求。此外,還可以為其他用戶釋放資源。
這個(gè)工作程序服務(wù)通過Redis來處理與推理API的后端通信。
來自所有已連接客戶端的請求將附加到消息隊(duì)列(生產(chǎn)者),而 Worker 則使用消息,將請求發(fā)送到推理 API,并將響應(yīng)附加到響應(yīng)隊(duì)列。
API 收到響應(yīng)后,會將其發(fā)送回客戶端。
在生產(chǎn)者和消費(fèi)者之間的傳輸過程中,客戶端可以發(fā)送多條消息,這些消息將排隊(duì)并按順序響應(yīng)。
理想情況下,我們可以讓這個(gè) worker 在完全不同的服務(wù)器上運(yùn)行,在它自己的環(huán)境中,但現(xiàn)在,我們將在本地機(jī)器上創(chuàng)建自己的 Python 環(huán)境。
您可能想知道 – 為什么我們需要worker?想象一下這樣一個(gè)場景:Web 服務(wù)器還創(chuàng)建對第三方服務(wù)的請求。這意味著,在套接字連接期間等待第三方服務(wù)的響應(yīng)時(shí),服務(wù)器將被阻止,資源被占用,直到從 API 獲得響應(yīng)。
您可以在發(fā)送硬編碼響應(yīng)和發(fā)送新消息之前創(chuàng)建一個(gè)隨機(jī)的sleep time.sleep(10)
來進(jìn)行試驗(yàn)。然后,您可以嘗試在新的Postman會話中使用不同的令牌來建立連接。
您會注意到,在隨機(jī)睡眠超時(shí)之前,聊天會話不會連接。
雖然我們可以在更注重生產(chǎn)的服務(wù)器設(shè)置中使用異步技術(shù)和工作線程池,但隨著并發(fā)用戶數(shù)量的增長,這也不夠。
最終,我們希望通過使用 Redis 來代理我們的聊天 API 和第三方 API 之間的通信,從而避免占用 Web 服務(wù)器資源。
接下來打開一個(gè)新終端,cd 進(jìn)入 worker 文件夾,并創(chuàng)建并激活一個(gè)新的 Python 虛擬環(huán)境,類似于我們在第 1 部分中所做的。
接下來,安裝以下依賴項(xiàng):
pip install aiohttp aioredis python-dotenv
我們將使用 aioredis 客戶端與 Redis 數(shù)據(jù)庫連接。我們還將使用 requests 庫向 Huggingface 推理 API 發(fā)送請求。
創(chuàng)建兩個(gè)文件.env
和main.py
。然后創(chuàng)建一個(gè)名為src
的文件夾。另外,創(chuàng)建一個(gè)名為redis
的文件夾,并添加一個(gè)名為config.py
的新文件。
在.env
文件中,添加以下代碼,并確保使用 Redis 集群中提供的憑證更新字段。
export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
在 config.py 中添加下面的 Redis 類:
import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
我們創(chuàng)建一個(gè)Redis對象,并從環(huán)境變量初始化所需的參數(shù)。然后我們創(chuàng)建一個(gè)異步方法create_connection
來創(chuàng)建一個(gè)Redis連接,并返回從aioredi
s方法from_url
獲得的連接池。
接下來,我們通過運(yùn)行以下代碼在 main.py 中測試 Redis 連接。這將創(chuàng)建一個(gè)新的 Redis 連接池,設(shè)置一個(gè)簡單的鍵 “key”,并為其分配一個(gè)字符串 “value”。
from src.redis.config import Redis
import asyncio
async def main():
redis = Redis()
redis = await redis.create_connection()
print(redis)
await redis.set("key", "value")
if __name__ == "__main__":
asyncio.run(main())
現(xiàn)在打開 Redis Insight(如果您按照教程下載并安裝它)您應(yīng)該會看到如下內(nèi)容:
Redis Insight 測試
現(xiàn)在我們已經(jīng)設(shè)置了 worker 環(huán)境,我們可以在 Web 服務(wù)器上創(chuàng)建一個(gè) producer,并在 worker 上創(chuàng)建一個(gè) consumer。
首先,讓我們在服務(wù)器上再次創(chuàng)建Redis類。在服務(wù)器上。SRC創(chuàng)建一個(gè)名為redis
的文件夾,并添加config.py
和producer.py
兩個(gè)文件。
在 config.py
中,添加以下代碼,就像我們對 worker 環(huán)境所做的那樣:
import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
在 .env 文件中,還要添加 Redis 憑證:
export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
最后,在 server.src.redis.producer.py
添加 the following code:
from .config import Redis
class Producer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def add_to_stream(self, data: dict, stream_channel):
try:
msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
print(f"Message id {msg_id} added to {stream_channel} stream")
return msg_id
except Exception as e:
print(f"Error sending msg to stream => {e}")
我們創(chuàng)建了一個(gè)使用 Redis 客戶端初始化的 Producer 類。我們使用此客戶端通過add_to_stream
該方法將數(shù)據(jù)添加到流中,該方法采用數(shù)據(jù)和 Redis 通道名稱。
用于向流通道添加數(shù)據(jù)的 Redis 命令是xadd
,它在 aioredis 中同時(shí)具有高級和低級函數(shù)。
接下來,要運(yùn)行我們新創(chuàng)建的 Producer,請更新 chat.py
和WebSocket /chat
終端節(jié)點(diǎn),如下所示。請注意更新后的通道名稱 :message_channel
from ..redis.producer import Producer
from ..redis.config import Redis
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
try:
while True:
data = await websocket.receive_text()
print(data)
stream_data = {}
stream_data[token] = data
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
接下來,在 Postman 中,創(chuàng)建一個(gè)連接并發(fā)送任意數(shù)量的消息,這些消息顯示 Hello
.您應(yīng)該將流消息打印到終端,如下所示:
終端信道消息測試
在 Redis Insight 中,您將看到一個(gè)新創(chuàng)建的隊(duì)列mesage_channel
和一個(gè)帶時(shí)間戳的隊(duì)列,其中填充了從客戶端發(fā)送的消息。此帶時(shí)間戳的隊(duì)列對于保持消息的順序非常重要。
Redis洞察頻道
接下來,我們將為聊天消息創(chuàng)建一個(gè)模型。回想一下,我們通過 WebSockets 發(fā)送文本數(shù)據(jù),但我們的聊天數(shù)據(jù)需要包含更多的信息,而不僅僅是文本。我們需要在發(fā)送聊天消息時(shí)為其加上時(shí)間戳,并為每條消息分配一個(gè)唯一的ID,同時(shí)收集關(guān)于聊天會話的相關(guān)數(shù)據(jù),最后以JSON格式存儲這些數(shù)據(jù)。
我們可以將這些 JSON 數(shù)據(jù)存儲在 Redis 中,這樣就不會在連接丟失后丟失聊天記錄,因?yàn)槲覀兊?WebSocket 不存儲狀態(tài)。
在server.src
中創(chuàng)建名為schema
的新文件夾。然后創(chuàng)建一個(gè)名為chat.py
in server.src.schema
中添加 the following code 的文件:
from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
id = uuid.uuid4()
msg: str
timestamp = str(datetime.now())
class Chat(BaseModel):
token: str
messages: List[Message]
name: str
session_start = str(datetime.now())
我們使用Pydantic的BaseModel
類對聊天數(shù)據(jù)建模。Chat類將保存單個(gè)Chat
會話的數(shù)據(jù)。它將存儲令牌、用戶名以及使用datetime.now()
為聊天會話開始時(shí)間自動(dòng)生成的時(shí)間戳。
在這個(gè)聊天會話中發(fā)送和接收的消息存儲在一個(gè)Message
類中,該類使用uid4
動(dòng)態(tài)地創(chuàng)建一個(gè)聊天id。初始化Message
類時(shí)需要提供的唯一數(shù)據(jù)是消息文本。
為了使用 Redis JSON 的能力來存儲我們的聊天記錄,我們需要安裝 Redis labs 提供的 rejson。
在終端,cd到服務(wù)器并使用pip install rejson
安裝rejson
。然后更新server.src.redis.config.py
中的Redis類,使其包含create_rejson_connection
方法:
import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
self.REDIS_HOST = os.environ['REDIS_HOST']
self.REDIS_PORT = os.environ['REDIS_PORT']
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
def create_rejson_connection(self):
self.redisJson = Client(host=self.REDIS_HOST,
port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
return self.redisJson
我們正在添加create_rejson_connection
方法,通過rejson Client連接到Redis。這為我們提供了在Redis中創(chuàng)建和操作JSON數(shù)據(jù)的方法,這在aioredis中是不可用的。
接下來,在server.src.routes.chat.py
中,我們可以更新/token
端點(diǎn)來創(chuàng)建一個(gè)新的Chat
實(shí)例,并將會話數(shù)據(jù)存儲在Redis JSON中,如下所示:
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
# Create new chat session
json_client = redis.create_rejson_connection()
chat_session = Chat(
token=token,
messages=[],
name=name
)
# Store chat session in redis JSON with the token as key
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
# Set a timeout for redis data
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)
return chat_session.dict()
注意:因?yàn)檫@是一個(gè)演示應(yīng)用程序,所以我不想在 Redis 中存儲聊天數(shù)據(jù)太久。因此,我使用 aioredis 客戶端在令牌上添加了 60 分鐘的超時(shí)(rejson 不實(shí)施超時(shí))。這意味著 60 分鐘后,聊天會話數(shù)據(jù)將丟失。
這一步是必要的,因?yàn)槲覀儧]有對用戶進(jìn)行身份驗(yàn)證,并且我們希望在特定的時(shí)間段后能夠清除聊天數(shù)據(jù)。不過,這個(gè)步驟是可選的,您可以根據(jù)自己的需求選擇是否包含它。
接下來,在 Postman 中,當(dāng)您發(fā)送 POST 請求以創(chuàng)建新令牌時(shí),您將獲得如下所示的結(jié)構(gòu)化響應(yīng)。您還可以檢查 Redis Insight,查看與令牌一起存儲為 JSON 鍵的聊天數(shù)據(jù),以及以值形式存儲的數(shù)據(jù)。
令牌生成器更新
現(xiàn)在我們已經(jīng)生成并存儲了一個(gè)令牌,現(xiàn)在是更新/chat
WebSocket中的get_token
依賴項(xiàng)的好時(shí)機(jī)了。我們這樣做是為了在開始聊天會話之前檢查有效的令牌。
在server.src.socket.utils.py
中更新get_token
函數(shù)來檢查是否在Redis實(shí)例中存在token。如果是,則返回令牌,這意味著套接字連接是有效的。如果它不存在,我們關(guān)閉連接。
/token
創(chuàng)建的令牌將在60分鐘后停止存在。因此,如果在嘗試開始聊天時(shí)生成錯(cuò)誤響應(yīng),我們可以在前端使用一些簡單的邏輯來重定向用戶以生成新的令牌。
from ..redis.config import Redis
async def get_token(
websocket: WebSocket,
token: Optional[str] = Query(None),
):
if token is None or token == "":
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
redis_client = await redis.create_connection()
isexists = await redis_client.exists(token)
if isexists == 1:
return token
else:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Session not authenticated or expired token")
要測試依賴關(guān)系,請使用我們一直在使用的隨機(jī)令牌連接到聊天會話,您應(yīng)該會收到 403 錯(cuò)誤。(請注意,您必須在 Redis Insight 中手動(dòng)刪除令牌。)
現(xiàn)在復(fù)制發(fā)送post請求到/token
端點(diǎn)時(shí)生成的令牌(或創(chuàng)建一個(gè)新請求),并將其作為值粘貼到/chat
WebSocket所需的令牌查詢參數(shù)中。然后連接。你應(yīng)該能成功連接。
帶令牌的聊天會話
綜上所述,您的 chat.py 應(yīng)如下所示。
import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
import time
from ..redis.producer import Producer
from ..redis.config import Redis
from ..schema.chat import Chat
from rejson import Path
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
# Create nee chat session
json_client = redis.create_rejson_connection()
chat_session = Chat(
token=token,
messages=[],
name=name
)
print(chat_session.dict())
# Store chat session in redis JSON with the token as key
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
# Set a timeout for redis data
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)
return chat_session.dict()
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chat bot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
json_client = redis.create_rejson_connection()
try:
while True:
data = await websocket.receive_text()
stream_data = {}
stream_data[token] = data
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
干得好,走到這一步!在下一節(jié)中,我們將重點(diǎn)介紹與 AI 模型通信以及處理客戶端、服務(wù)器、worker 和外部 API 之間的數(shù)據(jù)傳輸。
在本節(jié)中,我們將重點(diǎn)介紹如何構(gòu)建一個(gè)包裝器來與 transformer 模型通信,以對話格式將提示從用戶發(fā)送到 API,以及接收和轉(zhuǎn)換聊天應(yīng)用程序的響應(yīng)。
我們不會在 Hugginface 上構(gòu)建或部署任何語言模型。相反,我們的重點(diǎn)將放在利用Huggingface的加速推理API來連接預(yù)先訓(xùn)練好的模型上。
我們將使用的模型是 EleutherAI 提供的 GPT-J-6B 模型。這是一個(gè)生成語言模型,使用 60 億個(gè)參數(shù)進(jìn)行了訓(xùn)練。
Huggingface 為我們提供了一個(gè)按需受限的 API 來連接這個(gè)模型,它幾乎是免費(fèi)的。
要開始使用 Huggingface,請創(chuàng)建一個(gè)免費(fèi)帳戶。在您的設(shè)置中,生成新的訪問令牌。對于最多 30k 個(gè)令牌,Huggingface 免費(fèi)提供對推理 API 的訪問。
您可以在這里監(jiān)控API的使用情況。請務(wù)必確保這個(gè)令牌的安全,切勿將其公開。
注意:我們將使用 HTTP 連接與 API 通信,因?yàn)槲覀兪褂玫氖敲赓M(fèi)帳戶。但是 PRO Huggingface 帳戶支持使用 WebSockets 進(jìn)行流式處理,請參閱并行性和批處理作業(yè)。
這有助于顯著縮短模型和聊天應(yīng)用程序之間的響應(yīng)時(shí)間,我希望在后續(xù)文章中介紹這種方法。
首先,我們將 Huggingface 連接憑證添加到工作程序目錄中的 .env 文件中。
export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B
接下來,在worker.src
創(chuàng)建一個(gè)名為model
的文件夾,然后添加一個(gè)文件gptj.py
。然后添加下面的GPT類:
import os
from dotenv import load_dotenv
import requests
import json
load_dotenv()
class GPT:
def __init__(self):
self.url = os.environ.get('MODEL_URL')
self.headers = {
"Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
self.payload = {
"inputs": "",
"parameters": {
"return_full_text": False,
"use_cache": True,
"max_new_tokens": 25
}
}
def query(self, input: str) -> list:
self.payload["inputs"] = input
data = json.dumps(self.payload)
response = requests.request(
"POST", self.url, headers=self.headers, data=data)
print(json.loads(response.content.decode("utf-8")))
return json.loads(response.content.decode("utf-8"))
if __name__ == "__main__":
GPT().query("Will artificial intelligence help humanity conquer the universe?")
GPT類是用Huggingface模型url
、身份驗(yàn)證header
和預(yù)定義的payload
初始化的。但是有效負(fù)載輸入是一個(gè)動(dòng)態(tài)字段,由查詢方法提供,并在我們向Huggingface端點(diǎn)發(fā)送請求之前更新
最后,我們通過直接在GPT類的實(shí)例上運(yùn)行查詢方法來測試這一點(diǎn)。在終端中,運(yùn)行python src/model/gptj.py
,你應(yīng)該得到這樣的響應(yīng)(只是要記住,你的響應(yīng)肯定與此不同):
[{'generated_text': ' (AI) could solve all the problems on this planet? I am of the opinion that in the short term artificial intelligence is much better than human beings, but in the long and distant future human beings will surpass artificial intelligence.\n\nIn the distant'}]
接下來,我們向 input 添加一些調(diào)整,通過更改 input 的格式,使與模型的交互更具對話性。
GPT
像這樣更新類:
class GPT:
def __init__(self):
self.url = os.environ.get('MODEL_URL')
self.headers = {
"Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
self.payload = {
"inputs": "",
"parameters": {
"return_full_text": False,
"use_cache": False,
"max_new_tokens": 25
}
}
def query(self, input: str) -> list:
self.payload["inputs"] = f"Human: {input} Bot:"
data = json.dumps(self.payload)
response = requests.request(
"POST", self.url, headers=self.headers, data=data)
data = json.loads(response.content.decode("utf-8"))
text = data[0]['generated_text']
res = str(text.split("Human:")[0]).strip("\n").strip()
return res
if __name__ == "__main__":
GPT().query("Will artificial intelligence help humanity conquer the universe?")
我們使用字符串文本f"Human: {input} Bot:"
更新了輸入。人工輸入被放置在字符串中,機(jī)器人提供響應(yīng)。這種輸入格式將 GPT-J6B 變成了對話模型。您可能會注意到的其他變化包括
對于我們發(fā)送到模型的每個(gè)新輸入,模型無法記住對話歷史記錄。如果我們想在對話中保留上下文,這一點(diǎn)很重要。
但請記住,隨著我們發(fā)送到模型的 Token 數(shù)量增加,處理成本會變得更高,響應(yīng)時(shí)間也會更長。
因此,我們需要探索一種有效方法來檢索短期歷史記錄,并將其發(fā)送給模型進(jìn)行處理我們還需要弄清楚一個(gè)最佳點(diǎn) – 我們想要檢索多少歷史數(shù)據(jù)并將其發(fā)送到模型?
要處理聊天記錄,我們需要回退到我們的 JSON 數(shù)據(jù)庫。我們將使用token
來獲取上次聊天數(shù)據(jù),然后在收到響應(yīng)時(shí),將響應(yīng)附加到 JSON 數(shù)據(jù)庫。
更新worker.src.redis.config.py
以包含create_rejson_connection
該方法。此外,使用身份驗(yàn)證數(shù)據(jù)更新 .env 文件,并確保已安裝 rejson。
您的worker.src.redis.config.py
應(yīng)如下所示:
import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
self.REDIS_HOST = os.environ['REDIS_HOST']
self.REDIS_PORT = os.environ['REDIS_PORT']
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
def create_rejson_connection(self):
self.redisJson = Client(host=self.REDIS_HOST,
port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
return self.redisJson
雖然您的 .env 文件應(yīng)如下所示:
export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B
接下來,在worker.src.redis
創(chuàng)建一個(gè)新的文件命名為cache.py
并添加以下代碼:
from .config import Redis
from rejson import Path
class Cache:
def __init__(self, json_client):
self.json_client = json_client
async def get_chat_history(self, token: str):
data = self.json_client.jsonget(
str(token), Path.rootPath())
return data
緩存是用一個(gè)json客戶端初始化的,get_chat_history
方法接受一個(gè)令牌,從Redis獲取該令牌的聊天歷史。確保從json中導(dǎo)入了Path對象。
接下來,使用以下worker.main.py
更新 :
from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
if __name__ == "__main__":
asyncio.run(main())
我在Postman中硬編碼了一個(gè)從以前的測試中創(chuàng)建的樣例令牌。如果沒有創(chuàng)建令牌,只需向/token
發(fā)送一個(gè)新請求并復(fù)制令牌,然后在終端中運(yùn)行python main.py
。您應(yīng)該看到終端中的數(shù)據(jù)如下所示:
{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}
接下來,我們需要在我們的緩存類中添加一個(gè)add_message_to_cache
方法,該方法將特定令牌的消息添加到Redis。
async def add_message_to_cache(self, token: str, message_data: dict):
self.json_client.jsonarrappend(
str(token), Path('.messages'), message_data)
rejson 提供的jsonarrappend
方法將新消息附加到 message 數(shù)組中。
請注意,要訪問 message 數(shù)組,我們需要提供 .messages
Path 的參數(shù)。如果您的消息數(shù)據(jù)具有不同/嵌套結(jié)構(gòu),只需提供要將新數(shù)據(jù)追加到的數(shù)組的路徑。
要測試此方法,請使用以下代碼更新 main.py 文件中的 main 函數(shù):
async def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", message_data={
"id": "1",
"msg": "Hello",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
我們正在向緩存發(fā)送硬編碼消息,并從緩存中獲取聊天記錄。當(dāng)您在終端的 worker 目錄中運(yùn)行python main.py
時(shí),您應(yīng)該在終端中打印出類似這樣的東西,并將消息添加到 message 數(shù)組中。
{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [{'id': '1', 'msg': 'Hello', 'timestamp': '2022-07-16 13:20:01.092109'}], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}
最后,我們需要更新 main 函數(shù)以將消息數(shù)據(jù)發(fā)送到 GPT 模型,并使用客戶端和模型之間發(fā)送的最后 4 條消息更新輸入。
首先,讓我們使用add_message_to_cache
新參數(shù) “source” 更新我們的函數(shù),該參數(shù)將告訴我們消息是人類還是機(jī)器人。然后,我們可以使用此 arg 將 “Human:” 或 “Bot:” 標(biāo)簽添加到數(shù)據(jù)中,然后再將其存儲到緩存中。
更新 add_message_to_cache
Cache 類中的方法,如下所示:
async def add_message_to_cache(self, token: str, source: str, message_data: dict):
if source == "human":
message_data['msg'] = "Human: " + (message_data['msg'])
elif source == "bot":
message_data['msg'] = "Bot: " + (message_data['msg'])
self.json_client.jsonarrappend(
str(token), Path('.messages'), message_data)
然后更新 worker 目錄中 main.py 中的 main 函數(shù),并運(yùn)行python main.py
以查看 Redis 數(shù)據(jù)庫中的新結(jié)果。
async def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
"id": "1",
"msg": "Hello",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
接下來,我們需要更新 main 函數(shù)以將新消息添加到緩存中,從緩存中讀取前 4 條消息,然后使用 query 方法對模型進(jìn)行 API 調(diào)用。它將具有一個(gè)有效負(fù)載,該負(fù)載由最后 4 條消息的復(fù)合字符串組成。
您可以隨時(shí)調(diào)整要提取的歷史記錄中消息的數(shù)量,但在我看來,為了演示目的,選擇4條消息是一個(gè)相當(dāng)合適的數(shù)字。
在worker.src
中,創(chuàng)建一個(gè)新的文件夾架構(gòu)。然后創(chuàng)建一個(gè)名為chat.py
并將我們的消息架構(gòu)粘貼到 chat.py 中的新文件,如下所示:
from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
id = str(uuid.uuid4())
msg: str
timestamp = str(datetime.now())
接下來,更新 main.py 文件,如下所示:
async def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
"id": "3",
"msg": "I would like to go to the moon to, would you take me?",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
print(msg)
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="bot", message_data=msg.dict())
在上面的代碼中,我們將新的消息數(shù)據(jù)添加到緩存中。此消息最終將來自消息隊(duì)列。接下來,我們從緩存中獲取聊天記錄,其中現(xiàn)在將包含我們添加的最新數(shù)據(jù)。
請注意,我們使用相同的硬編碼令牌添加到緩存并從緩存中獲取,暫時(shí)只是為了測試這一點(diǎn)。
接下來,我們修剪緩存數(shù)據(jù)并僅提取最后 4 項(xiàng)。然后我們通過提取列表中的 msg 并將其連接到空字符串來合并輸入數(shù)據(jù)。
最后,我們?yōu)闄C(jī)器人響應(yīng)創(chuàng)建一個(gè)新的 Message 實(shí)例,并將響應(yīng)添加到緩存中,將源指定為 “bot”
接下來,運(yùn)行幾次,每次運(yùn)行python main.py
時(shí)根據(jù)需要更改人工消息和 id。您應(yīng)該與模型進(jìn)行完整的對話輸入和輸出。
打開 Redis Insight,您應(yīng)該會看到類似于以下內(nèi)容的內(nèi)容:
接下來,我們要?jiǎng)?chuàng)建一個(gè)使用者并更新我們的worker.main.py
去連接消息隊(duì)列。我們希望它能夠?qū)崟r(shí)提取令牌數(shù)據(jù),因?yàn)槲覀兡壳罢趯α钆坪拖⑤斎脒M(jìn)行硬編碼。
在worker.src.redis
中創(chuàng)建名為stream.py
的新文件。使用以下代碼添加一個(gè)StreamConsumer
類:
class StreamConsumer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def consume_stream(self, count: int, block: int, stream_channel):
response = await self.redis_client.xread(
streams={stream_channel: '0-0'}, count=count, block=block)
return response
async def delete_message(self, stream_channel, message_id):
await self.redis_client.xdel(stream_channel, message_id)
StreamConsumer
類是用Redis客戶端初始化的。consume_stream
方法使用aioredis提供的xread
方法從消息通道的隊(duì)列中提取一條新消息。
接下來,用while循環(huán)更新worker.main.py
文件,以保持與消息通道的連接處于活動(dòng)狀態(tài),如下所示:
from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
redis_client = await redis.create_connection()
consumer = StreamConsumer(redis_client)
cache = Cache(json_client)
print("Stream consumer started")
print("Stream waiting for new messages")
while True:
response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
if response:
for stream, messages in response:
# Get message from stream, and extract token, message data and message id
for message in messages:
message_id = message[0]
token = [k.decode('utf-8')
for k, v in message[1].items()][0]
message = [v.decode('utf-8')
for k, v in message[1].items()][0]
print(token)
# Create a new message instance and add to cache, specifying the source as human
msg = Message(msg=message)
await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
# Get chat history from cache
data = await cache.get_chat_history(token=token)
# Clean message input and send to query
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
print(msg)
await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
# Delete messaage from queue after it has been processed
await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
asyncio.run(main())
這是一個(gè)相當(dāng)大的更新,所以讓我們一步一步來:
我們使用while True
循環(huán),以便 worker 可以在線偵聽隊(duì)列中的消息。
接下來,我們通過調(diào)用consume_stream
方法等待來自message_channel的新消息。如果隊(duì)列中有消息,則提取message_id、令牌和消息。然后我們創(chuàng)建Message類的新實(shí)例,將消息添加到緩存中,然后獲取最后4條消息。我們將其設(shè)置為GPT模型查詢方法的輸入。
獲得響應(yīng)后,使用add_message_to_cache
方法將響應(yīng)添加到緩存中,然后從隊(duì)列中刪除消息。
到目前為止,我們正在從客戶端向 message_channel 發(fā)送聊天消息(由查詢 AI 模型的工作程序接收)以獲取響應(yīng)。
接下來,我們需要將此響應(yīng)發(fā)送給客戶端。只要套接字連接仍處于打開狀態(tài),客戶端就應(yīng)該能夠接收響應(yīng)。
如果連接已關(guān)閉,則客戶端始終可以使用refresh_token
終端節(jié)點(diǎn)從聊天歷史記錄中獲取響應(yīng)。
在worker.src.redis
創(chuàng)建一個(gè)名為producer.py
的新文件Producer
中,并添加一個(gè)類似于我們在聊天 Web 服務(wù)器上的類:
class Producer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def add_to_stream(self, data: dict, stream_channel) -> bool:
msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
print(f"Message id {msg_id} added to {stream_channel} stream")
return msg_id
接下來,在main.py
文件中,更新 main 函數(shù)以初始化創(chuàng)建者,創(chuàng)建流數(shù)據(jù),并使用add_to_stream
方法將響應(yīng)發(fā)送到 response_channel
from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
from src.redis.producer import Producer
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
redis_client = await redis.create_connection()
consumer = StreamConsumer(redis_client)
cache = Cache(json_client)
producer = Producer(redis_client)
print("Stream consumer started")
print("Stream waiting for new messages")
while True:
response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
if response:
for stream, messages in response:
# Get message from stream, and extract token, message data and message id
for message in messages:
message_id = message[0]
token = [k.decode('utf-8')
for k, v in message[1].items()][0]
message = [v.decode('utf-8')
for k, v in message[1].items()][0]
# Create a new message instance and add to cache, specifying the source as human
msg = Message(msg=message)
await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
# Get chat history from cache
data = await cache.get_chat_history(token=token)
# Clean message input and send to query
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
stream_data = {}
stream_data[str(token)] = str(msg.dict())
await producer.add_to_stream(stream_data, "response_channel")
await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
# Delete messaage from queue after it has been processed
await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
asyncio.run(main())
接下來,我們需要讓客戶端知道我們何時(shí)收到來自 socket 終端節(jié)點(diǎn)/chat
中 worker 的響應(yīng)。我們通過監(jiān)聽響應(yīng)流來實(shí)現(xiàn)這一點(diǎn)。在這里,我們無需包含while循環(huán),因?yàn)橹灰B接保持開啟狀態(tài),套接字就會持續(xù)進(jìn)行監(jiān)聽。
請注意,我們還需要通過添加邏輯來檢查連接的令牌是否等于響應(yīng)中的令牌,從而檢查響應(yīng)是針對哪個(gè)客戶端的。然后,我們在讀取響應(yīng)隊(duì)列中的消息后將其刪除。
在server.src.redis
中創(chuàng)建一個(gè)名為 stream.py 的新文件并添加我們的StreamConsumer
類,如下所示:
from .config import Redis
class StreamConsumer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def consume_stream(self, count: int, block: int, stream_channel):
response = await self.redis_client.xread(
streams={stream_channel: '0-0'}, count=count, block=block)
return response
async def delete_message(self, stream_channel, message_id):
await self.redis_client.xdel(stream_channel, message_id)
接下來,更新/chat
套接字端點(diǎn),如下所示:
from ..redis.stream import StreamConsumer
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
json_client = redis.create_rejson_connection()
consumer = StreamConsumer(redis_client)
try:
while True:
data = await websocket.receive_text()
stream_data = {}
stream_data[str(token)] = str(data)
await producer.add_to_stream(stream_data, "message_channel")
response = await consumer.consume_stream(stream_channel="response_channel", block=0)
print(response)
for stream, messages in response:
for message in messages:
response_token = [k.decode('utf-8')
for k, v in message[1].items()][0]
if token == response_token:
response_message = [v.decode('utf-8')
for k, v in message[1].items()][0]
print(message[0].decode('utf-8'))
print(token)
print(response_token)
await manager.send_personal_message(response_message, websocket)
await consumer.delete_message(stream_channel="response_channel", message_id=message[0].decode('utf-8'))
except WebSocketDisconnect:
manager.disconnect(websocket)
最后,我們需要更新終端節(jié)點(diǎn)/refresh_token
,以使用我們的Cache
類從 Redis 數(shù)據(jù)庫獲取聊天記錄。
在 server.src.redis
中,添加一個(gè)cache.py
文件并添加以下代碼:
from rejson import Path
class Cache:
def __init__(self, json_client):
self.json_client = json_client
async def get_chat_history(self, token: str):
data = self.json_client.jsonget(
str(token), Path.rootPath())
return data
接下來,在server.src.routes.chat.py
中導(dǎo)入緩存類并更新/token
端點(diǎn)如下:
from ..redis.cache import Cache
@chat.get("/refresh_token")
async def refresh_token(request: Request, token: str):
json_client = redis.create_rejson_connection()
cache = Cache(json_client)
data = await cache.get_chat_history(token)
if data == None:
raise HTTPException(
status_code=400, detail="Session expired or does not exist")
else:
return data
現(xiàn)在,當(dāng)我們使用任何令牌向/refresh_token
終端節(jié)點(diǎn)發(fā)送 GET 請求時(shí),終端節(jié)點(diǎn)將從 Redis 數(shù)據(jù)庫獲取數(shù)據(jù)。
如果 Token 未超時(shí),則數(shù)據(jù)將發(fā)送給用戶。或者,如果未找到令牌,它將發(fā)送 400 響應(yīng)。
最后,我們將通過在 Postman 中創(chuàng)建多個(gè)聊天會話、在 Postman 中連接多個(gè)客戶端以及在客戶端上與機(jī)器人聊天來測試聊天系統(tǒng)。
最后,我們將嘗試獲取客戶的聊天記錄,并希望得到適當(dāng)?shù)幕貞?yīng)。
讓我們快速回顧一下我們使用聊天系統(tǒng)取得的成就。聊天客戶端為與客戶端的每個(gè)聊天會話創(chuàng)建一個(gè)令牌。此令牌用于標(biāo)識每個(gè)客戶端,連接到或 Web 服務(wù)器的客戶端發(fā)送的每條消息都在 Redis 通道 (message_chanel) 中排隊(duì),由令牌標(biāo)識。
我們的 worker environment 從這個(gè)通道讀取數(shù)據(jù)。它不知道客戶端是誰(除了它是一個(gè)唯一的令牌),并使用隊(duì)列中的消息向 Huggingface 推理 API 發(fā)送請求。
當(dāng)它收到響應(yīng)時(shí),響應(yīng)將添加到響應(yīng)渠道中,并更新聊天歷史記錄。偵聽 response_channel 的客戶端在收到帶有其令牌的響應(yīng)后立即將響應(yīng)發(fā)送到客戶端。
如果套接字仍處于打開狀態(tài),則發(fā)送此響應(yīng)。如果套接字已關(guān)閉,我們確定響應(yīng)被保留,因?yàn)轫憫?yīng)已添加到聊天歷史記錄中。即使發(fā)生了頁面刷新或連接中斷,客戶端仍然能夠獲取到歷史記錄。
恭喜你走到了這一步!您已經(jīng)能夠構(gòu)建一個(gè)有效的聊天系統(tǒng)。
在后續(xù)文章中,我將重點(diǎn)介紹如何為客戶端構(gòu)建聊天用戶界面、創(chuàng)建單元和功能測試、使用 WebSockets 和異步請求微調(diào)我們的工作線程環(huán)境以加快響應(yīng)時(shí)間,并最終在 AWS 上部署聊天應(yīng)用程序。
原文鏈接:https://www.freecodecamp.org/news/how-to-build-an-ai-chatbot-with-redis-python-and-gpt/
OpenAI ChatGPT API 與 React JS 的完美結(jié)合:全面指南
面向營銷人員的 API:前 7 名免費(fèi) REST API
常用文檔轉(zhuǎn)換API匯總
2024年國內(nèi)熱門天氣環(huán)境API
使用第三方API擴(kuò)展低代碼/無代碼平臺的功能
AI 驅(qū)動(dòng)的 API 如何改變招聘:2024 年國內(nèi)外頂級招聘相關(guān)API
Ipstack 案例研究:Airbnb 如何使用地理位置 IP 地址來展示房源
網(wǎng)易企業(yè)郵箱API 終極指南:功能、定價(jià)和實(shí)施
OpenAI API定價(jià)及成本計(jì)算