from hyrex import HyrexRegistry
import psycopg2
import openai
import os
hy = HyrexRegistry()
@hy.task
def update_db_record(doc_id: str, embedding: list[float]):
"""Store document embedding in PostgreSQL"""
with psycopg2.connect(os.environ.get("DB_CONN_STRING")) as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE documents SET embedding = %s WHERE doc_id = %s",
(embedding, doc_id)
)
@hy.task
def process_document(doc_id: str):
"""Generate embedding for a single document"""
# Fetch document content from Google Drive
file_content = gdrive_sdk.get_document(doc_id)
# Generate embedding using OpenAI
response = openai.embeddings.create(
model="text-embedding-3-small",
input=file_content
)
embedding = response.data[0].embedding
# Store embedding in database
update_db_record.send(doc_id, embedding)
@hy.task
def sync_google_drive_documents():
"""Process all documents in Google Drive"""
all_doc_ids = gdrive_sdk.list_document_ids()
for doc_id in all_doc_ids:
process_document.send(doc_id)