From c6b0dc6a29ff6096b0febb2204b6d20d83f68ace Mon Sep 17 00:00:00 2001 From: Jyong <76649700+JohnJyong@users.noreply.github.com> Date: Fri, 9 Aug 2024 16:47:15 +0800 Subject: [PATCH] update dataset embedding model, update document status to be indexing (#7145) --- api/tasks/deal_dataset_vector_index_task.py | 111 ++++++++++++-------- 1 file changed, 70 insertions(+), 41 deletions(-) diff --git a/api/tasks/deal_dataset_vector_index_task.py b/api/tasks/deal_dataset_vector_index_task.py index c1b0e7f1a4cf9..ce93e111e54aa 100644 --- a/api/tasks/deal_dataset_vector_index_task.py +++ b/api/tasks/deal_dataset_vector_index_task.py @@ -42,31 +42,42 @@ def deal_dataset_vector_index_task(dataset_id: str, action: str): ).all() if dataset_documents: - documents = [] - for dataset_document in dataset_documents: - # delete from vector index - segments = db.session.query(DocumentSegment).filter( - DocumentSegment.document_id == dataset_document.id, - DocumentSegment.enabled == True - ) .order_by(DocumentSegment.position.asc()).all() - for segment in segments: - document = Document( - page_content=segment.content, - metadata={ - "doc_id": segment.index_node_id, - "doc_hash": segment.index_node_hash, - "document_id": segment.document_id, - "dataset_id": segment.dataset_id, - } - ) + dataset_documents_ids = [doc.id for doc in dataset_documents] + db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)) \ + .update({"indexing_status": "indexing"}, synchronize_session=False) + db.session.commit() - documents.append(document) + for dataset_document in dataset_documents: + try: + # add from vector index + segments = db.session.query(DocumentSegment).filter( + DocumentSegment.document_id == dataset_document.id, + DocumentSegment.enabled == True + ) .order_by(DocumentSegment.position.asc()).all() + if segments: + documents = [] + for segment in segments: + document = Document( + page_content=segment.content, + metadata={ + "doc_id": segment.index_node_id, + "doc_hash": segment.index_node_hash, + "document_id": segment.document_id, + "dataset_id": segment.dataset_id, + } + ) - # save vector index - index_processor.load(dataset, documents, with_keywords=False) + documents.append(document) + # save vector index + index_processor.load(dataset, documents, with_keywords=False) + db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \ + .update({"indexing_status": "completed"}, synchronize_session=False) + db.session.commit() + except Exception as e: + db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \ + .update({"indexing_status": "error", "error": str(e)}, synchronize_session=False) + db.session.commit() elif action == 'update': - # clean index - index_processor.clean(dataset, None, with_keywords=False) dataset_documents = db.session.query(DatasetDocument).filter( DatasetDocument.dataset_id == dataset_id, DatasetDocument.indexing_status == 'completed', @@ -75,28 +86,46 @@ def deal_dataset_vector_index_task(dataset_id: str, action: str): ).all() # add new index if dataset_documents: - documents = [] + # update document status + dataset_documents_ids = [doc.id for doc in dataset_documents] + db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)) \ + .update({"indexing_status": "indexing"}, synchronize_session=False) + db.session.commit() + + # clean index + index_processor.clean(dataset, None, with_keywords=False) + for dataset_document in dataset_documents: - # delete from vector index - segments = db.session.query(DocumentSegment).filter( - DocumentSegment.document_id == dataset_document.id, - DocumentSegment.enabled == True - ).order_by(DocumentSegment.position.asc()).all() - for segment in segments: - document = Document( - page_content=segment.content, - metadata={ - "doc_id": segment.index_node_id, - "doc_hash": segment.index_node_hash, - "document_id": segment.document_id, - "dataset_id": segment.dataset_id, - } - ) + # update from vector index + try: + segments = db.session.query(DocumentSegment).filter( + DocumentSegment.document_id == dataset_document.id, + DocumentSegment.enabled == True + ).order_by(DocumentSegment.position.asc()).all() + if segments: + documents = [] + for segment in segments: + document = Document( + page_content=segment.content, + metadata={ + "doc_id": segment.index_node_id, + "doc_hash": segment.index_node_hash, + "document_id": segment.document_id, + "dataset_id": segment.dataset_id, + } + ) - documents.append(document) + documents.append(document) + # save vector index + index_processor.load(dataset, documents, with_keywords=False) + db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \ + .update({"indexing_status": "completed"}, synchronize_session=False) + db.session.commit() + except Exception as e: + db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \ + .update({"indexing_status": "error", "error": str(e)}, synchronize_session=False) + db.session.commit() - # save vector index - index_processor.load(dataset, documents, with_keywords=False) end_at = time.perf_counter() logging.info(