-
Notifications
You must be signed in to change notification settings - Fork 0
/
indexing.py
349 lines (286 loc) · 13.5 KB
/
indexing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
from typing import List
from docx import Document
from ml.documents import Document as Doc
from PIL import Image
from io import BytesIO
import zipfile
from loguru import logger
from ml.embedders import EmbeddingGenerator
from ml.models import Paragraph, Chunk
from repositories.clickhouse import ClickhouseRepository
from ml.chunkers import RecursiveChunker
from schemas.clickhouse import CreateChunkOpts, CreateParagraphOpts
import os
from services.minio import MinioService
from utils.types import MinioContentType
def extract_images_to_memory(docx_path: str) -> List[BytesIO]:
"""
Извлекает изображения из файла .docx и сохраняет их в памяти в виде объектов BytesIO.
- docx_path (str): Путь к файлу .docx.
- List[BytesIO]: Список изображений в памяти (объекты BytesIO).
"""
image_files = []
with zipfile.ZipFile(docx_path, "r") as docx_zip:
for file in docx_zip.namelist():
if file.startswith("word/media/"):
image_data = BytesIO(docx_zip.read(file))
image_files.append(image_data)
return image_files
def parse_docx(
repo: ClickhouseRepository, static_storage: MinioService, docx_path: str
) -> List[Paragraph]:
"""
Парсит документ .docx и извлекает параграфы, а также сохраняет их в ClickhouseRepository.
Параметры:
- repo (ClickhouseRepository): Репозиторий для сохранения объектов Paragraph.
- docx_path (str): Путь к файлу .docx.
Возвращает:
- List[Paragraph]: Список объектов Paragraph.
"""
document = Document(docx_path)
paragraphs = []
current_section_num = 0
current_section_name = None
current_section_text = []
current_section_images = []
image_files = extract_images_to_memory(docx_path)
image_counter = 0
for para in document.paragraphs:
style_name = para.style.name
text = para.text.strip()
if style_name == "Heading 2":
if current_section_name:
section_text_combined = "\n".join(current_section_text).strip()
paragraph_obj = Paragraph(
name=current_section_name,
text=section_text_combined,
num=str(current_section_num),
image_binaries=current_section_images,
)
paragraphs.append(paragraph_obj)
current_section_num += 1
current_section_name = text
current_section_text = []
current_section_images = []
elif style_name != "Heading 2" and text:
if current_section_name:
current_section_text.append(text)
# Associate images with paragraphs
if "Рисунок".lower() in text.lower():
if image_counter < len(image_files):
current_section_images.append(image_files[image_counter])
image_counter += 1
if current_section_name:
section_text_combined = "\n".join(current_section_text).strip()
paragraph_obj = Paragraph(
name=current_section_name,
text=section_text_combined,
num=str(current_section_num),
image_binaries=current_section_images,
)
paragraphs.append(paragraph_obj)
for paragraph in paragraphs:
for index, image in enumerate(paragraph.image_binaries):
path = static_storage.create_object_from_byte(
object_path=f"images/{paragraph.id}/{index}",
file=image,
content_type=MinioContentType.PNG,
)
paragraph.image_paths.append(path)
repo.create_paragraph(
CreateParagraphOpts(
id=paragraph.id,
name=paragraph.name,
text=paragraph.text,
num=paragraph.num,
images={
f"Image_{i+1}": image
for i, image in enumerate(paragraph.image_paths)
},
)
)
return paragraphs
def append_to_clickhouse(repo: ClickhouseRepository, chunks: List[Chunk]):
"""
Сохраняет данные в ClickHouse.
Параметры:
- chunks (List[Chunk]): Список чанков для сохранения.
"""
# Мокированная функция - требуется реализация пользователем
logger.info("Сохранение данных в ClickHouse.")
for chunk in chunks:
repo.create_chunk(
CreateChunkOpts(
id=chunk.id,
emb=chunk.emb,
text=chunk.text,
paragraph_id=chunk.paragraph_uuid,
)
)
def chunk_paragraphs(paragraphs: List[Paragraph]) -> List[Chunk]:
"""
Разбивает параграфы на чанки с помощью RecursiveChunker и добавляет UUID параграфа в метаданные.
Параметры:
- paragraphs (List[Paragraph]): Список параграфов для обработки.
Возвращает:
- List[Chunk]: Список созданных чанков.
"""
if not paragraphs:
logger.error("Список параграфов пуст.")
raise ValueError("Список параграфов пуст.")
chunks = []
# Инициализируем RecursiveChunker
try:
recursive_splitter = RecursiveChunker(chunk_overlap=32, chunk_size=256)
logger.info("RecursiveChunker успешно инициализирован.")
except Exception as e:
logger.error(f"Ошибка при инициализации RecursiveChunker: {e}")
raise RuntimeError(f"Не удалось инициализировать RecursiveChunker: {e}")
for paragraph in paragraphs:
if not paragraph.text:
logger.warning(f"Параграф с UUID {paragraph.id} не содержит текста.")
continue
# Создаем документ для параграфа
doc = Doc(
page_content=paragraph.text,
metadata={
"paragraph_uuid": str(paragraph.id),
"name_paragraph": paragraph.name,
"num_paragraph": paragraph.num,
},
)
# Разбиваем документ на чанки
try:
paragraph_chunks = recursive_splitter.split_documents([doc])
logger.debug(
f"Параграф {paragraph.id} разбит на {len(paragraph_chunks)} чанков."
)
except Exception as e:
logger.error(f"Ошибка при разбиении параграфа {paragraph.id} на чанки: {e}")
continue
# Создаем объекты Chunk из полученных чанков
for chunk_doc in paragraph_chunks:
chunk = Chunk(text=chunk_doc.page_content, paragraph_uuid=paragraph.id)
chunks.append(chunk)
logger.debug(f"Создан текстовый чанк для параграфа {paragraph.id}")
# Обрабатываем изображения в параграфе
for idx, image_bin in enumerate(paragraph.image_binaries):
chunk = Chunk(image=True, binary=image_bin, paragraph_uuid=paragraph.id)
chunks.append(chunk)
logger.debug(f"Создан визуальный чанк для параграфа {paragraph.id}")
if not chunks:
logger.error("Не удалось создать чанки из предоставленных параграфов.")
raise RuntimeError("Чанки не были созданы из параграфов.")
logger.info(f"Всего создано {len(chunks)} чанков из параграфов.")
return chunks
def generate_embeddings_for_chunks(
chunks: List[Chunk], embedding_generator: EmbeddingGenerator
) -> None:
"""
Генерирует эмбеддинги для каждого чанка (текст или изображение) и сохраняет их в объекте Chunk.
Параметры:
- chunks (List[Chunk]): Список чанков.
- embedding_generator (EmbeddingGenerator): Экземпляр класса для генерации эмбеддингов.
"""
if not chunks:
logger.error("Список чанков пуст.")
raise ValueError("Chunk list is empty.")
text_chunks = []
image_chunks = []
for chunk in chunks:
if chunk.image:
image_chunks.append(chunk)
else:
text_chunks.append(chunk)
# Генерируем эмбеддинги для текстовых чанков
if text_chunks:
texts = [chunk.text for chunk in text_chunks]
try:
embeddings = embedding_generator.get_text_embedding(texts)
for idx, chunk in enumerate(text_chunks):
chunk.emb = embeddings[idx].tolist()
logger.info(
f"Эмбеддинги для {len(text_chunks)} текстовых чанков сгенерированы."
)
except Exception as e:
logger.error(f"Ошибка при генерации эмбеддингов текстовых чанков: {e}")
raise
# Генерируем эмбеддинги для чанков изображений
if image_chunks:
for idx, chunk in enumerate(image_chunks):
image = Image.open(chunk.binary)
embeddings = embedding_generator.get_image_embedding([image])
embeddings = embeddings.squeeze().tolist()
chunk.emb = embeddings
chunk.text = "image"
logger.info(
f"Эмбеддинги для {len(image_chunks)} визуальных чанков сгенерированы."
)
def docs2clickhouse(
repo: ClickhouseRepository, static_storage: MinioService, docx_path: str
):
"""
Основная функция для обработки документа .docx и сохранения данных в ClickHouse.
Параметры:
- docx_path (str): Путь к файлу .docx.
"""
if not os.path.exists(docx_path):
logger.error(f"Файл документа '{docx_path}' не найден.")
raise FileNotFoundError(f"Document file '{docx_path}' not found.")
# Шаг 1: Парсинг документа
try:
paragraphs = parse_docx(repo, static_storage, docx_path)
logger.info(
f"Парсинг документа завершен. Найдено {len(paragraphs)} параграфов."
)
except Exception as e:
logger.error(f"Ошибка при парсинге документа: {e}")
raise RuntimeError(f"Error parsing document: {e}")
if not paragraphs:
logger.error("Парсинг документа не вернул ни одного параграфа.")
raise RuntimeError("No paragraphs were parsed from the document.")
# Шаг 2: Разбиваем параграфы на чанки и добавляем UUID параграфа в метаданные
try:
chunks = chunk_paragraphs(paragraphs)
logger.info(
f"Разбиение параграфов на чанки завершено. Всего чанков: {len(chunks)}."
)
except Exception as e:
logger.error(f"Ошибка при разбиении параграфов на чанки: {e}")
raise RuntimeError(f"Error chunking paragraphs: {e}")
# Шаг 3: Генерируем эмбеддинги для чанков (как текстовых, так и изображений)
try:
embedding_generator = EmbeddingGenerator()
generate_embeddings_for_chunks(chunks, embedding_generator)
logger.info("Генерация эмбеддингов для всех чанков завершена.")
except Exception as e:
logger.error(f"Ошибка при генерации эмбеддингов: {e}")
raise RuntimeError(f"Error generating embeddings: {e}")
# Шаг 4: Сохранение данных в ClickHouse
try:
append_to_clickhouse(repo, chunks)
logger.info("Данные успешно сохранены в ClickHouse.")
except Exception as e:
logger.error(f"Ошибка при сохранении данных в ClickHouse: {e}")
raise RuntimeError(f"Error saving data to ClickHouse: {e}")
logger.info("Обработка документа завершена успешно.")
# Пример использования
if __name__ == "__main__":
from services.minio import MinioService
from configs.Environment import get_environment_variables
from minio import Minio
docx_path = "ml/test_data/data.docx"
repo = ClickhouseRepository()
env = get_environment_variables()
minio_client = Minio(
env.MINIO_HOST,
access_key=env.MINIO_ACCESS,
secret_key=env.MINIO_SECRET,
secure=False if env.ENV == "LOCAL" else True,
)
static_storage = MinioService(minio_client)
embedding_generator = EmbeddingGenerator()
try:
docs2clickhouse(repo, static_storage, docx_path)
except Exception as e:
logger.error(f"Произошла ошибка при обработке: {e}")