本笔记本演示了如何:
- 将 OpenAI Wikipedia 向量数据集索引到 Elasticsearch 中
- 运用 Streamlit 构建一个简略的 Gen AI 运用程序,该运用程序运用 Elasticsearch 检索上下文并运用 OpenAI 拟定答案
装置
装置 Elasticsearch 及 Kibana
如果你还没有装置好自己的 Elasticsearch 及 Kibana,那么请参阅一下的文章来进行装置:
在装置的时候,请挑选 Elastic Stack 8.x进行装置。在装置的时候,咱们能够看到如下的装置信息:
环境变量
在发动 Jupyter 之前,咱们设置如下的环境变量:
1. export ES_USER="elastic"
2. export ES_PASSWORD="xnLj56lTrH98Lf_6n76y"
3. export ES_ENDPOINT="localhost"
4. export OPENAI_API_KEY="YourOpenAiKey"
请在上面修正相应的变量的值。这个需要在发动 jupyter 之前运行。
复制 Elasticsearch 证书
咱们把 Elasticsearch 的证书复制到当时的目录下:
1. $ pwd
2. /Users/liuxg/python/elser
3. $ cp ~/elastic/elasticsearch-8.12.0/config/certs/http_ca.crt .
4. $ ls http_ca.crt
5. http_ca.crt
装置 Python 依靠包
python3 -m pip install -qU openai pandas==1.5.3 wget elasticsearch streamlit tqdm load_dotenv
准备数据
咱们能够运用如下的指令来下载数据:
wget https://cdn.openai.com/API/examples/data/vector_database_wikipedia_articles_embedded.zip
1. $ pwd
2. /Users/liuxg/python/elser
3. $ wget https://cdn.openai.com/API/examples/data/vector_database_wikipedia_articles_embedded.zip
4. --2024-02-09 12:06:36-- https://cdn.openai.com/API/examples/data/vector_database_wikipedia_articles_embedded.zip
5. Resolving cdn.openai.com (cdn.openai.com)... 13.107.213.69
6. Connecting to cdn.openai.com (cdn.openai.com)|13.107.213.69|:443... connected.
7. HTTP request sent, awaiting response... 200 OK
8. Length: 698933052 (667M) [application/zip]
9. Saving to: ‘vector_database_wikipedia_articles_embedded.zip’
11. vector_database_wikipedi 100%[==================================>] 666.55M 1.73MB/s in 3m 2s
13. 2024-02-09 12:09:40 (3.66 MB/s) - ‘vector_database_wikipedia_articles_embedded.zip’ saved [698933052/698933052]
创立运用并展示
咱们在当时的目录下打入如下的指令来创立 notebook:
1. $ pwd
2. /Users/liuxg/python/elser
3. $ jupyter notebook
导入包及衔接到 Elasticsearch
1. import os
2. from getpass import getpass
3. from elasticsearch import Elasticsearch, helpers
4. import wget, zipfile, pandas as pd, json, openai
5. import streamlit as st
6. from tqdm.notebook import tqdm
7. from dotenv import load_dotenv
9. load_dotenv()
11. openai_api_key=os.getenv('OPENAI_API_KEY')
12. elastic_user=os.getenv('ES_USER')
13. elastic_password=os.getenv('ES_PASSWORD')
14. elastic_endpoint=os.getenv("ES_ENDPOINT")
16. url = f"https://{elastic_user}:{elastic_password}@{elastic_endpoint}:9200"
17. client = Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True)
19. print(client.info())
装备 OpenAI 衔接
咱们的示例将运用 OpenAI 来拟定答案,因此请在此处供给有用的 OpenAI Api 密钥。
你能够按照本指南检索你的 API 密钥。然后测试与OpenAI的衔接,检查该笔记本运用的型号是否可用。
1. from openai import OpenAI
3. openai = OpenAI()
4. openai.models.retrieve("text-embedding-ada-002")
1. $ pip3 list | grep openai
2. langchain-openai 0.0.5
3. openai 1.12.0
下载数据集
1. with zipfile.ZipFile("vector_database_wikipedia_articles_embedded.zip",
2. "r") as zip_ref:
3. zip_ref.extractall("data")
运行上面的代码后,咱们能够在如下地址找到解压缩的文件 vector_database_wikipedia_articles_embedded.csv:
1. $ pwd
2. /Users/liuxg/python/elser
3. $ ls ./data
4. __MACOSX vector_database_wikipedia_articles_embedded.csv
5. paul_graham
将 CSV 文件读入 Pandas DataFrame
接下来,咱们运用 Pandas 库将解压的 CSV 文件读入 DataFrame。 此步骤能够更轻松地将数据批量索引到 Elasticsearch 中。
wikipedia_dataframe = pd.read_csv("data/vector_database_wikipedia_articles_embedded.csv")
运用映射创立索引
现在咱们需要运用必要的映射创立一个 Elasticsearch 索引。 这将使咱们能够将数据索引到 Elasticsearch 中。
咱们对 title_vector 和 content_vector 字段运用密集向量字段类型。 这是一种特别的字段类型,允许咱们在 Elasticsearch 中存储密集向量。
稍后,咱们需要以密集向量字段为目标进行 kNN 查找。
1. index_mapping= {
2. "properties": {
3. "title_vector": {
4. "type": "dense_vector",
5. "dims": 1536,
6. "index": "true",
7. "similarity": "cosine"
8. },
9. "content_vector": {
10. "type": "dense_vector",
11. "dims": 1536,
12. "index": "true",
13. "similarity": "cosine"
14. },
15. "text": {"type": "text"},
16. "title": {"type": "text"},
17. "url": { "type": "keyword"},
18. "vector_id": {"type": "long"}
20. }
21. }
22. client.indices.create(index="wikipedia_vector_index", mappings=index_mapping)
将数据索引到 Elasticsearch
以下函数生成所需的批量操作,这些操作能够传递到 Elasticsearch 的 bulk API,因此咱们能够在单个恳求中有用地索引多个文档。
对于 DataFrame 中的每一行,该函数都会生成一个字典,表明要索引的单个文档。
1. def dataframe_to_bulk_actions(df):
2. for index, row in df.iterrows():
3. yield {
4. "_index": 'wikipedia_vector_index',
5. "_id": row['id'],
6. "_source": {
7. 'url' : row["url"],
8. 'title' : row["title"],
9. 'text' : row["text"],
10. 'title_vector' : json.loads(row["title_vector"]),
11. 'content_vector' : json.loads(row["content_vector"]),
12. 'vector_id' : row["vector_id"]
13. }
14. }
因为数据帧很大,咱们将以 100 个为一组对数据进行索引。咱们运用 Python 客户端的 bulkAPI 帮助程序将数据索引到 Elasticsearch 中。
1. total_documents = len(wikipedia_dataframe)
3. progress_bar = tqdm(total=total_documents, unit="documents")
4. success_count = 0
6. for ok, info in helpers.streaming_bulk(client, actions=dataframe_to_bulk_actions(wikipedia_dataframe), raise_on_error=False, chunk_size=100):
7. if ok:
8. success_count += 1
9. else:
10. print(f"Unable to index {info['index']['_id']}: {info['index']['error']}")
11. progress_bar.update(1)
12. progress_bar.set_postfix(success=success_count)
等上面的代码运行结束后,咱们能够在 Kibana 中进行检查:
运用 Streamlit 构建运用程序
鄙人一节中, 你将运用 Streamlit 构建一个简略的界面。
该运用程序将显现一个简略的查找栏,用户能够在其中提出问题。 Elasticsearch 用于检索与问题匹配的相关文档(上下文),然后 OpenAI 运用上下文拟定答案。
装置依靠项以在运行后访问运用程序。
!npm install localtunnel
1. %%writefile app.py
3. import os
4. import streamlit as st
5. import openai
6. from elasticsearch import Elasticsearch
7. from dotenv import load_dotenv
9. from openai import OpenAI
11. openai = OpenAI()
13. load_dotenv()
15. openai_api_key=os.getenv('OPENAI_API_KEY')
16. elastic_user=os.getenv('ES_USER')
17. elastic_password=os.getenv('ES_PASSWORD')
18. elastic_endpoint=os.getenv("ES_ENDPOINT")
20. url = f"https://{elastic_user}:{elastic_password}@{elastic_endpoint}:9200"
21. client = Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True)
23. # Define model
24. EMBEDDING_MODEL = "text-embedding-ada-002"
27. def openai_summarize(query, response):
28. context = response['hits']['hits'][0]['_source']['text']
29. summary = openai.chat.completions.create(
30. model="gpt-3.5-turbo",
31. messages=[
32. {"role": "system", "content": "You are a helpful assistant."},
33. {"role": "user", "content": "Answer the following question:" + query + "by using the following text: " + context},
34. ]
35. )
37. print(summary)
38. return summary.choices[0].message.content
41. def search_es(query):
42. # Create embedding
43. question_embedding = openai.embeddings.create(input=query, model=EMBEDDING_MODEL)
45. # Define Elasticsearch query
46. response = client.search(
47. index = "wikipedia_vector_index",
48. knn={
49. "field": "content_vector",
50. "query_vector": question_embedding.data[0].embedding,
51. "k": 10,
52. "num_candidates": 100
53. }
54. )
55. return response
58. def main():
59. st.title("Gen AI Application")
61. # Input for user search query
62. user_query = st.text_input("Enter your question:")
64. if st.button("Search"):
65. if user_query:
67. st.write(f"Searching for: {user_query}")
68. result = search_es(user_query)
70. # print(result)
71. openai_summary = openai_summarize(user_query, result)
72. st.write(f"OpenAI Summary: {openai_summary}")
74. # Display search results
75. if result['hits']['total']['value'] > 0:
76. st.write("Search Results:")
77. for hit in result['hits']['hits']:
78. st.write(hit['_source']['title'])
79. st.write(hit['_source']['text'])
80. else:
81. st.write("No results found.")
83. if __name__ == "__main__":
84. main()
运行运用
运行运用程序并检查您的隧道 IP:
!streamlit run app.py
整个 notebook 的源码能够在地址下载:github.com/liu-xiao-gu…