AI Process Multiple Documents
Purpose
Need to Process Many Documents with AI? I did.
There could be many ways to do this, and for many reasons. My reason was to not write up 50 summaries for various prompts. I wanted a markdown table that summarized all of them.
You could modify the prompt of this script to extract any type of data from any type of content and format it any way you want.
This is how you extract a summary from each document using openAI API. The basic use case is to run AI command on more than 1 file input, like … thousands of court documents for example where you are extracting a vexatious litigants lies and keeping a tally of the proven times he’s perjured himself (for example).
Get API Key
You will need an API key to use the python openai module. It’s inexpensive. Pennies for multiple AI queries.
https://platform.openai.com/api-keys
requirements.txt
I recommend a requirements.txt file that you install a venv and load these modules into that venv. Do not load these into your core python install in case they clobber some existing modules. Virtual environments are the only way I recommned.
annotated-types==0.7.0
anyio==4.4.0
beautifulsoup4==4.12.3
certifi==2024.2.2
charset-normalizer==3.3.2
colorama==0.4.6
distro==1.9.0
h11==0.14.0
httpcore==1.0.5
httpx==0.27.0
idna==3.7
openai==1.30.4
prettytable==3.10.0
pydantic==2.7.2
pydantic_core==2.18.3
python-dotenv==1.0.1
requests==2.32.3
sniffio==1.3.1
soupsieve==2.5
tqdm==4.66.4
typing_extensions==4.12.0
urllib3==2.2.1
wcwidth==0.2.13
Using a .env File
Place the .env file in your venv. It uses the python-dotnv library. I did this on Windows and am not sure it’s required to do it this particular way on other OS.
# this file is named .env in your python project folder
# This key won't work, just showing you what it should look like
OPENAI_API_KEY="sk-proj-11239781379801307913709813709123789123"
yourscript.py
import os
import sqlite3
import time
from openai import OpenAI
from dotenv import load_dotenv
import httpx
# Load environment variables from .env file
load_dotenv()
# Set up the OpenAI client with your API key
client = OpenAI(
api_key=os.environ.get('OPENAI_API_KEY')
)
# where are your documents to summarize? ie: WSL in my example
BASE_DIR = r'\\wsl.localhost\Debian\home\james\fabric\patterns'
DB_NAME = 'summaries.db'
SLEEP_TIME = 1 # seconds to sleep between API calls to avoid hitting rate limits/play nice
TIMEOUT = 30 # seconds timeout for API calls
MODEL = "gpt-4o" # or "gpt-3.5-turbo" or whatever model
# play with this on a single data set first,
# once it's tuned, then modify here and run script
AI_QUESTION = """
Summarize the following markdown content into two sentences and provide.
The first sentence is a succinct summary. The second sentence is a list
of use cases. Both sentences will total no more than 150 characters
each for a total fo 300 characters max. If the information can be summarized
in less that is better.
Input:
{text}
"""
def init_db():
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS summaries (
pattern_name TEXT PRIMARY KEY,
summary TEXT
)
''')
conn.commit()
conn.close()
def get_pattern_folders():
pattern_names = [d for d in os.listdir(BASE_DIR) if os.path.isdir(os.path.join(BASE_DIR, d))]
return pattern_names
def fetch_markdown(pattern_name):
md_path = os.path.join(BASE_DIR, pattern_name, 'system.md')
#print(f"Fetching markdown from {md_path}...")
try:
with open(md_path, 'r', encoding='utf-8') as file:
return file.read()
except FileNotFoundError:
print(f"File not found for {pattern_name}.")
return None
except UnicodeDecodeError as e:
print(f"Error reading {pattern_name}: {e}")
return None
def summarize_text(pattern_name, text):
try:
response = client.chat.completions.create(
messages=[{
"role": "user",
"content": f"{AI_QUESTION}{text}"
}],
model=MODEL,
timeout=TIMEOUT # Add timeout to handle long waits
)
data = response.choices[0].message.content.strip()
return data
except httpx.TimeoutException:
print("Request timed out. Skipping this pattern.")
return "Summarization timed out."
except Exception as e:
print(f"Error during summarization: {e}")
return "Summarization failed."
def save_summary(pattern_name, summary):
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute('INSERT OR REPLACE INTO summaries (pattern_name, summary) VALUES (?, ?)', (pattern_name, summary))
conn.commit()
conn.close()
def get_existing_summaries():
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute('SELECT pattern_name FROM summaries')
rows = cursor.fetchall()
conn.close()
return {row[0] for row in rows}
def generate_markdown_table():
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute('SELECT pattern_name, summary FROM summaries')
rows = cursor.fetchall()
conn.close()
md_table = "| Pattern Name | Description |\n"
md_table += "|--------------|-------------|\n"
for pattern_name, summary in rows:
md_table += f"| {pattern_name} | {summary} |\n"
return md_table
def main():
init_db()
pattern_names = get_pattern_folders()
existing_summaries = get_existing_summaries()
for pattern_name in pattern_names:
if pattern_name in existing_summaries:
print(f"Skipping {pattern_name}, already summarized.")
continue
md_content = fetch_markdown(pattern_name)
if md_content:
summary = summarize_text(pattern_name, md_content)
save_summary(pattern_name, summary)
print(f"Summarized and saved {pattern_name}: {summary}")
time.sleep(SLEEP_TIME)
md_table = generate_markdown_table()
with open('summaries.md', 'w') as f:
f.write(md_table)
print("Markdown table generated successfully.")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nProcess interrupted by user. Exiting gracefully.")
dbdumper.py
In case you want to review your db
import sqlite3
from prettytable import PrettyTable
DB_NAME = 'summaries.db'
def fetch_summaries():
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute('SELECT pattern_name, summary FROM summaries')
rows = cursor.fetchall()
conn.close()
return rows
def display_summaries(rows):
table = PrettyTable()
table.field_names = ["Pattern Name", "Summary"]
for row in rows:
table.add_row(row)
print(table)
def main():
rows = fetch_summaries()
if rows:
display_summaries(rows)
else:
print("No records found in the database.")
if __name__ == "__main__":
main()