
如何快速實現REST API集成以優化業務流程
在本指南中,我們將構建一個基于 RAG 的 LLM 應用程序,我們將在其中整合外部數據源以增強 LLM 的功能。具體來說,我們將構建一個可以回答有關 Ray (一個用于生產和擴展 ML 工作負載的 Python 框架)問題的助手。
這里的目標是讓開發人員更容易采用 Ray,而且正如我們將在本指南中看到的那樣,幫助改進我們的 Ray 文檔本身并為其他 LLM 應用程序提供基礎。我們還將分享我們在此過程中遇到的挑戰以及我們如何克服這些挑戰。
注意:我們已經概括了整個指南,以便可以輕松擴展它以在您自己的數據之上構建基于 RAG 的 LLM 應用程序。
除了構建我們的 LLM 應用程序之外,我們還將專注于擴展和在生產中提供服務。與傳統機器學習甚至監督式深度學習不同,從一開始,規模就是 LLM 應用程序的瓶頸。大型數據集、模型、計算密集型工作負載、服務要求等。隨著我們周圍的世界不斷發展,我們將開發能夠處理任何規模的應用程序。
我們還將專注于評估和性能。我們的應用程序涉及許多可變的部分:嵌入模型、分塊邏輯、LLM 本身等,因此,重要的是我們要嘗試不同的配置以優化最佳質量響應。但是,評估和定量比較生成任務的不同配置并非易事。我們將分解應用程序各個部分的評估(給定查詢的檢索、給定源的生成),還評估整體性能(端到端生成)并分享優化配置的結果。
注意:在本指南中,我們將嘗試使用不同的 LLM(OpenAI、Llama 等)。您需要 OpenAI 憑據才能訪問 ChatGPT 模型和 Anyscale Endpoints(可用的公共和私有終端)來提供 + 微調 OSS LLM。
OpenAI 憑據:https://platform.openai.com/account/api-keysAnyscale Endpoints:https://www.anyscale.com/
在開始構建 RAG 應用程序之前,我們需要首先創建包含已處理數據源的向量數據庫。
我們將首先將 Ray 文檔從網站加載到本地目錄:Ray 文檔:https://docs.ray.io/en/master/?
export EFS_DIR=/desired/output/directory
wget -e robots=off --recursive --no-clobber --page-requisites \
--html-extension --convert-links --restrict-file-names=windows \
--domains docs.ray.io --no-parent --accept=html \
-P $EFS_DIR https://docs.ray.io/en/master/
然后,我們將把文檔內容加載到 Ray 數據集中,以便可以對它們執行大規模操作(例如嵌入、索引等)。對于大型數據源、模型和應用程序服務需求,規模是 LLM 應用程序的首要任務。我們希望以這樣的方式構建我們的應用程序,使它們能夠隨著我們的需求增長而擴展,而無需我們稍后更改代碼。Ray 數據集:https://docs.ray.io/en/latest/data/data.html?
# Ray dataset
DOCS_DIR = Path(EFS_DIR, "docs.ray.io/en/master/")
ds = ray.data.from_items([{"path": path} for path in DOCS_DIR.rglob("*.html")
if not path.is_dir()])
print(f"{ds.count()} documents")
現在我們有了包含所有 html 文件路徑的數據集,我們將開發一些可以適當地從這些文件中提取內容的函數。我們希望以一種通用的方式來執行此操作,以便我們可以在所有文檔頁面中執行此提?。ㄟ@樣您就可以將其用于您自己的數據源)。我們的流程是首先識別 html 頁面中的章節,然后提取它們之間的文本。我們將所有這些保存到一個字典列表中,該字典將章節內的文本映射到具有章節錨點 ID 的特定 url。
sample_html_fp = Path(EFS_DIR, "docs.ray.io/en/master/rllib/rllib-env.html")
extract_sections({"path": sample_html_fp})[0]
{'source': 'https://docs.ray.io/en/master/rllib/rllib-env.html#environments', 'text': '\nEnvironments#\nRLlib works with several different types of environments, including Farama-Foundation Gymnasium, user-defined, multi-agent, and also batched environments.\nTip\nNot all environments work with all algorithms. Check out the algorithm overview for more information.\n'}
我們可以使用 Ray Data 的 flat_map 僅用一行代碼將此提取過程(extract_section)并行應用于數據集中的所有文件路徑。 flat_map:https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.flat_map.html?
# Extract sections
sections_ds = ds.flat_map(extract_sections)
sections = sections_ds.take_all()
print (len(sections))
我們現在有了一個章節列表(包含每個章節的文本和來源),但我們現在還不應該直接將其用作 RAG 應用程序的上下文。每個章節的文本長度各不相同,而且很多都是相當大的塊。
如果我們使用這些大段文本,那么我們就會插入大量嘈雜/不需要的上下文,而且由于所有 LLM 都有最大上下文長度,我們無法容納太多其他相關上下文。因此,我們將把每個部分中的文本拆分成較小的塊。直觀地說,較小的塊將封裝單個/幾個概念,與較大的塊相比,噪聲較少。我們現在將選擇一些典型的文本拆分值(例如,chunk_size=300)來創建我們的塊,但稍后我們將嘗試使用更廣泛的值。
from langchain.document_loaders import ReadTheDocsLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Text splitter
chunk_size = 300
chunk_overlap = 50
text_splitter = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n", " ", ""],
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
)
# Chunk a sample section
sample_section = sections_ds.take(1)[0]
chunks = text_splitter.create_documents(
texts=[sample_section["text"]],
metadatas=[{"source": sample_section["source"]}])
print (chunks[0])
page_content='ray.tune.TuneConfig.search_alg#\nTuneConfig.search_alg: Optional[Union[ray.tune.search.searcher.Searcher, ray.tune.search.search_algorithm.SearchAlgorithm]] = None#' metadata={'source': 'https://docs.ray.io/en/master/tune/api/doc/ray.tune.TuneConfig.search_alg.html#ray-tune-tuneconfig-search-alg'}
雖然對數據集進行分塊相對較快,但讓我們將分塊邏輯包裝到一個函數中,以便我們可以大規模應用工作負載,從而使分塊速度與數據源的增長一樣快:
def chunk_section(section, chunk_size, chunk_overlap):
text_splitter = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n", " ", ""],
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len)
chunks = text_splitter.create_documents(
texts=[sample_section["text"]],
metadatas=[{"source": sample_section["source"]}])
return [{"text": chunk.page_content, "source": chunk.metadata["source"]} for chunk in chunks]
# Scale chunking
chunks_ds = sections_ds.flat_map(partial(
chunk_section,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap))
print(f"{chunks_ds.count()} chunks")
chunks_ds.show(1)
5727 chunks
{'text': 'ray.tune.TuneConfig.search_alg#\nTuneConfig.search_alg: Optional[Union[ray.tune.search.searcher.Searcher, ray.tune.search.search_algorithm.SearchAlgorithm]] = None#', 'source': 'https://docs.ray.io/en/master/tune/api/doc/ray.tune.TuneConfig.search_alg.html#ray-tune-tuneconfig-search-alg'}
嵌入數據
現在我們已經從各個部分創建了小塊,我們需要一種方法來識別與給定查詢最相關的塊。一種非常有效且快速的方法是使用預訓練模型嵌入我們的數據,并使用相同的模型嵌入查詢。然后,我們可以計算所有塊嵌入和我們的查詢嵌入之間的距離,以確定前 k 個塊。有許多不同的預訓練模型可供選擇來嵌入我們的數據,但最受歡迎的模型可以通過 HuggingFace 的海量文本嵌入基準 (MTEB) 排行榜發現。這些模型通過諸如下一個/掩碼標記預測等任務在非常大的文本語料庫上進行了預訓練,這使它們能夠學習在 N 維中表示子標記并捕獲語義關系。我們可以利用這一點來表示我們的數據并確定用于回答給定查詢的最相關上下文。我們使用 Langchain 的嵌入包裝器(HuggingFaceEmbeddings 和 OpenAIEmbeddings)輕松加載模型并嵌入我們的文檔塊。
注意:嵌入并不是確定更相關塊的唯一方法。我們也可以使用 LLM 來決定!但是,由于 LLM 比這些嵌入模型大得多,并且具有最大上下文長度,因此最好使用嵌入來檢索前 k 個塊。然后,我們可以在較少的 k 個塊上使用 LLM 來確定要用作上下文來回答查詢的 <k 個塊。我們還可以使用重新排名(例如 Cohere Rerank)來進一步確定要使用的最相關塊。我們還可以將嵌入與傳統的信息檢索方法(例如關鍵字匹配)相結合,這對于匹配嵌入子標記時可能丟失的唯一標記很有用。
HuggingFace 的海量文本嵌入基準:
https://huggingface.co/spaces/mteb/leaderboard
HuggingFaceEmbeddings:
OpenAIEmbeddings:
Cohere Rerank:
https://txt.cohere.com/rerank/
from langchain.embeddings import OpenAIEmbeddings
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
import numpy as np
from ray.data import ActorPoolStrategy
class EmbedChunks:
def __init__(self, model_name):
if model_name == "text-embedding-ada-002":
self.embedding_model = OpenAIEmbeddings(
model=model_name,
openai_api_base=os.environ["OPENAI_API_BASE"],
openai_api_key=os.environ["OPENAI_API_KEY"])
else:
self.embedding_model = HuggingFaceEmbeddings(
model_name=model_name,
model_kwargs={"device": "cuda"},
encode_kwargs={"device": "cuda", "batch_size": 100})
def __call__(self, batch):
embeddings = self.embedding_model.embed_documents(batch["text"])
return {"text": batch["text"], "source": batch["source"], "embeddings":
embeddings}
在這里,我們能夠使用 map_batches 按比例嵌入我們的塊。我們所要做的就是定義 batch_size 和計算(我們使用兩個工作器,每個工作器有 1 個 GPU)。
# Embed chunks
embedding_model_name = "thenlper/gte-base"
embedded_chunks = chunks_ds.map_batches(
EmbedChunks,
fn_constructor_kwargs={"model_name": embedding_model_name},
batch_size=100,
num_gpus=1,
compute=ActorPoolStrategy(size=2))
# Sample (text, source, embedding) triplet
[{'text': 'External library integrations for Ray Tune#',
'source': 'https://docs.ray.io/en/master/tune/api/integration.html#external-library-integrations-for-ray-tune',
'embeddings': [
0.012108353897929192,
0.009078810922801495,
0.030281754210591316,
-0.0029687234200537205,
…]
}
現在我們有了嵌入的塊,我們需要將它們索引(存儲)到某個地方,以便我們可以快速檢索它們進行推理。雖然有許多流行的向量數據庫選項,但我們將使用 Postgres 和 pgvector,因為它簡單且性能好。我們將創建一個表(文檔)并為每個嵌入的塊寫入(文本、源、嵌入)三元組。Postgres 和 pgvector:https://github.com/pgvector/pgvector
class StoreResults:
def __call__(self, batch):
with psycopg.connect(os.environ["DB_CONNECTION_STRING"]) as conn:
register_vector(conn)
with conn.cursor() as cur:
for text, source, embedding in zip
(batch["text"], batch["source"], batch["embeddings"]):
cur.execute("INSERT INTO document (text, source, embedding)
VALUES (%s, %s, %s)", (text, source, embedding,),)
return {}
再次,我們可以利用 Ray Data map_batches 來并行執行此索引:
map_batches :
https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html?
# Index data
embedded_chunks.map_batches(
StoreResults,
batch_size=128,
num_cpus=1,
compute=ActorPoolStrategy(size=28),
).count()
在我們的向量數據庫中索引了嵌入的塊后,我們就可以針對給定的查詢執行檢索了。首先,我們將使用與嵌入文本塊相同的嵌入模型來嵌入傳入的查詢。
# Embed query
embedding_model = HuggingFaceEmbeddings(model_name=embedding_model_name)
query = "What is the default batch size for map_batches?"
embedding = np.array(embedding_model.embed_query(query))
len(embedding)
768
然后,我們將通過提取與我們的嵌入式查詢最接近的嵌入塊來檢索前 k 個最相關的塊。我們使用余弦距離 (<=>),但有很多選項(https://github.com/pgvector/pgvector#vector-operators)可供選擇。一旦我們檢索到前 num_chunks,我們就可以收集每個塊的文本并將其用作上下文來生成響應。
# Get context
num_chunks = 5
with psycopg.connect(os.environ["DB_CONNECTION_STRING"]) as conn:
register_vector(conn)
with conn.cursor() as cur:
cur.execute("SELECT * FROM document ORDER BY embedding <-> %s LIMIT %s", (embedding, num_chunks))
rows = cur.fetchall()
context = [{"text": row[1]} for row in rows]
sources = [row[2] for row in rows]
https://docs.ray.io/en/master/data/api/doc/ray.data.Dataset.map_batches.html#ray-data-dataset-map-batches
entire blocks as batches (blocks may contain different numbers of rows).
The actual size of the batch provided to fn may be smaller than
batch_size if batch_size doesn’t evenly divide the block(s) sent
to a given map task. Default batch_size is 4096 with “default”.
https://docs.ray.io/en/master/data/transforming-data.html#configuring-batch-size
The default batch size depends on your resource type. If you’re using CPUs,
the default batch size is 4096. If you’re using GPUs, you must specify an explicit batch size.
(cont…)
我們可以將所有這些組合成一個方便的函數:
def semantic_search(query, embedding_model, k):
embedding = np.array(embedding_model.embed_query(query))
with psycopg.connect(os.environ["DB_CONNECTION_STRING"]) as conn:
register_vector(conn)
with conn.cursor() as cur:
cur.execute("SELECT * FROM document ORDER BY embedding <=> %s LIMIT %s", (embedding, k),)
rows = cur.fetchall()
semantic_context = [{"id": row[0], "text": row[1], "source": row[2]} for row in rows]
return semantic_context
現在,我們可以使用上下文從 LLM 生成響應。如果沒有檢索到的相關上下文,LLM 可能無法準確回答我們的問題。隨著數據的增長,我們可以輕松地嵌入和索引任何新數據,并能夠檢索它來回答問題。
from rag.generate import prepare_response
from rag.utils import get_client
def generate_response(
llm, temperature=0.0, stream=True,
system_content="", assistant_content="", user_content="",
max_retries=1, retry_interval=60):
"""Generate response from an LLM."""
retry_count = 0
client = get_client(llm=llm)
messages = [{"role": role, "content": content} for role, content in [
("system", system_content),
("assistant", assistant_content),
("user", user_content)] if content]
while retry_count <= max_retries:
try:
chat_completion = client.chat.completions.create(
model=llm,
temperature=temperature,
stream=stream,
messages=messages,
)
return prepare_response(chat_completion, stream=stream)
except Exception as e:
print(f"Exception: {e}")
time.sleep(retry_interval) # default is per-minute rate limits
retry_count += 1
return ""
注意:我們使用 0.0 的溫度來啟用可重復的實驗,但您應該根據您的用例進行調整。對于需要始終以事實為依據的用例,我們建議使用非常低的溫度值,而更具創造性的任務可以從更高的溫度下受益。
# Generate response
query = "What is the default batch size for map_batches?"
response = generate_response(
llm="meta-llama/Llama-2-70b-chat-hf",
temperature=0.0,
stream=True,
system_content="Answer the query using the context provided. Be succinct.",
user_content=f"query: {query}, context: {context}")
The default batch size for map_batches is 4096.
讓我們將上下文檢索和響應生成結合在一起,形成一個方便的查詢代理,我們可以使用它輕松生成響應。這將負責設置我們的代理(嵌入和 LLM 模型)以及上下文檢索,并將其傳遞給我們的 LLM 以生成響應。
class QueryAgent:
def __init__(self, embedding_model_name="thenlper/gte-base",
llm="meta-llama/Llama-2-70b-chat-hf", temperature=0.0,
max_context_length=4096, system_content="", assistant_content=""):
# Embedding model
self.embedding_model = get_embedding_model(
embedding_model_name=embedding_model_name,
model_kwargs={"device": "cuda"},
encode_kwargs={"device": "cuda", "batch_size": 100})
# Context length (restrict input length to 50% of total length)
max_context_length = int(0.5*max_context_length)
# LLM
self.llm = llm
self.temperature = temperature
self.context_length = max_context_length - get_num_tokens(system_content + assistant_content)
self.system_content = system_content
self.assistant_content = assistant_content
def __call__(self, query, num_chunks=5, stream=True):
# Get sources and context
context_results = semantic_search(
query=query,
embedding_model=self.embedding_model,
k=num_chunks)
# Generate response
context = [item["text"] for item in context_results]
sources = [item["source"] for item in context_results]
user_content = f"query: {query}, context: {context}"
answer = generate_response(
llm=self.llm,
temperature=self.temperature,
stream=stream,
system_content=self.system_content,
assistant_content=self.assistant_content,
user_content=trim(user_content, self.context_length))
# Result
result = {
"question": query,
"sources": sources,
"answer": answer,
"llm": self.llm,
}
return result
有了這個,我們只需幾行就可以使用我們的 RAG 應用程序:
llm = "meta-llama/Llama-2-7b-chat-hf"
agent = QueryAgent(
embedding_model_name="thenlper/gte-base",
llm=llm,
max_context_length=MAX_CONTEXT_LENGTHS[llm],
system_content="Answer the query using the context provided. Be succinct.")
result = agent(query="What is the default batch size for map_batches?")
print("\n\n", json.dumps(result, indent=2))
The default batch size for map_batches
is 4096
{
"question": "What is the default batch size for map_batches?",
"sources": [
"ray.data.Dataset.map_batches — Ray 2.7.1",
"Transforming Data — Ray 2.7.1",
"Ray Data Internals — Ray 2.7.1",
"Dynamic Request Batching — Ray 2.7.1",
"Image Classification Batch Inference with PyTorch — Ray 2.7.1"
],
"answer": "The default batch size for map_batches
is 4096",
"llm": "meta-llama/Llama-2-7b-chat-hf"
}
到目前為止,我們已經為 RAG 應用程序的各個部分選擇了典型/任意值。但是,如果我們要更改某些內容,例如分塊邏輯、嵌入模型、LLM 等,我們如何知道我們擁有比以前更好的配置?像這樣的生成任務很難進行定量評估,因此我們需要開發可靠的方法來進行評估。由于我們的應用程序中有許多可變組件,因此我們需要執行單元/組件和端到端評估。組件評估可能涉及單獨評估我們的檢索(是我們檢索到的一組塊中的最佳來源)和評估我們的 LLM 響應(給定最佳來源,LLM 是否能夠產生高質量的答案)。對于端到端評估,我們可以評估整個系統的質量(給定數據源,響應的質量如何)。我們將要求我們的評估員 LLM 使用上下文對回答的質量進行 1-5 之間的評分,但是,我們也可以讓它為其他維度生成分數,例如幻覺(僅使用提供的上下文中的信息生成的答案)、毒性等。注意:我們可以將分數限制為二進制(0/1),這可能更容易解釋(例如,回答要么正確要么不正確)。但是,我們在分數中引入了更高的方差,以便更深入、更細致地了解 LLM 如何對回答進行評分(例如,LLM 對回答的偏見)。
檢索系統和 LLM 的組件評估(左),總體評估(右)。
評估器我們將從確定評估器開始。給定查詢的響應和相關上下文,我們的評估器應該是一種值得信賴的方法來評分/評估響應的質量。但在確定評估器之前,我們需要一個問題數據集和答案的來源。我們可以使用此數據集要求不同的評估者提供答案,然后對他們的答案進行評分(例如,分數在 1-5 之間)。然后,我們可以檢查此數據集以確定我們的評估者是否公正,并且對分配的分數有合理的推理。注意:我們正在評估我們的 LLM 在給定相關上下文的情況下生成響應的能力。這是一個組件級評估(quality_score (LLM)),因為我們沒有使用檢索來獲取相關上下文。我們將從手動創建數據集開始(如果您無法手動創建數據集,請繼續閱讀)。我們有一個用戶查詢列表和回答查詢的理想來源 datasets/eval-dataset-v1.jsonl。我們將使用上面的 LLM 應用程序通過 GPT-4 為每個查詢/源對生成參考答案。
datasets/eval-dataset-v1.jsonl:
https://github.com/ray-project/llm-applications/blob/main/datasets/eval-dataset-v1.jsonl
with open(Path(ROOT_DIR, "datasets/eval-dataset-v1.jsonl"), "r") as f:
data = [json.loads(item) for item in list(f)]
[{'question': 'I’m struggling a bit with Ray Data type conversions when I do map_batches. Any advice?',
'source': 'https://docs.ray.io/en/master/data/transforming-data.html'},
…
{'question': 'Is Ray integrated with DeepSpeed?',
'source': 'https://docs.ray.io/en/master/ray-air/examples/gptj_deepspeed_fine_tuning.html#fine-tuning-the-model-with-ray-air-a-name-train-a'}]
每個數據點都有一個問題,并且標記的源具有與問題答案相關的精確上下文:
# Sample
uri = "https://docs.ray.io/en/master/data/transforming-data.html#configuring-batch-format"
fetch_text(uri=uri)
'\nConfiguring batch format#\nRay Data represents batches as dicts of NumPy ndarrays or pandas DataFrames. …'
我們可以從此上下文中提取文本并將其傳遞給我們的 LLM 以生成問題的答案。我們還將要求它對查詢的響應質量進行評分。為此,我們定義了一個繼承自 QueryAgent 的 QueryAgentWithContext,不同之處在于我們提供上下文,它不需要檢索它。
class QueryAgentWithContext(QueryAgent):
def __call__(self, query, context):
user_content = f"query: {query}, context: {context}"
response = generate_response(
llm=self.llm,
temperature=self.temperature,
stream=True,
system_content=self.system_content,
assistant_content=self.assistant_content,
user_content=user_content[: self.context_length])
return response
現在,我們可以創建一個包含問題、來源、答案、分數和推理的數據集。我們可以檢查它以確定我們的評估器是否高質量。
根據它提供的分數和推理,我們發現 GPT-4 是一款高質量的評估器。我們對其他 LLM(例如 Llama-2-70b)進行了同樣的評估,發現它們缺乏適當的推理,并且非??犊亟o出了自己的答案。
EVALUATOR = "gpt-4"
注意:更徹底的評估還會通過要求評估者比較以下不同 LLM 的回答來測試以下內容:
冷啟動我們可能并不總是有準備好的問題數據集和隨時可用的最佳來源來回答該問題。為了解決這個冷啟動問題,我們可以使用 LLM 查看我們的文本塊并生成特定塊將回答的問題。這為我們提供了高質量的問題和答案的確切來源。但是,這種數據集生成方法可能會有點嘈雜。生成的問題可能并不總是與用戶可能提出的問題高度一致。我們所說的最佳來源的特定塊也可能在其他塊中具有該確切信息。盡管如此,在我們收集 + 手動標記高質量數據集的同時,這仍然是開始我們的開發過程的好方法。
# Prompt
num_questions = 3
system_content = f"""
Create {num_questions} questions using only the context provided.
End each question with a '?' character and then in a newline write the answer to that question using only the context provided.
Separate each question/answer pair by a newline.
"""
# Generate questions
synthetic_data = []
for chunk in chunks[:1]: # small samples
response = generate_response(
llm="gpt-4",
temperature=0.0,
system_content=system_content,
user_content=f"context: {chunk.page_content}")
entries = response.split("\n\n")
for entry in entries:
question, answer = entry.split("\n")
synthetic_data.append({"question": question, "source": chunk.metadata["source"], "answer": answer})
synthetic_data[:3]
[{'question': 'What can you use to monitor and debug your Ray applications and clusters?',
'source': 'https://docs.ray.io/en/master/ray-observability/reference/index.html#reference',
'answer': 'You can use the API and CLI documented in the references to monitor and debug your Ray applications and clusters.'},
{'question': 'What are the guides included in the references?',
'source': 'https://docs.ray.io/en/master/ray-observability/reference/index.html#reference',
'answer': 'The guides included in the references are State API, State CLI, and System Metrics.'},
{'question': 'What are the two types of interfaces mentioned for monitoring and debugging Ray applications and clusters?',
'source': 'https://docs.ray.io/en/master/ray-observability/reference/index.html#reference',
'answer': 'The two types of interfaces mentioned for monitoring and debugging Ray applications and clusters are API and CLI.'}]
本文章轉載微信公眾號@Ray中文社區