Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(EAI-152) check for change to chunkAlgoHash when updating embeddings #580

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
11 changes: 10 additions & 1 deletion packages/mongodb-rag-core/src/contentStore/EmbeddedContent.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Page } from ".";
import { Page, PersistedPage } from ".";
import { VectorStore } from "../VectorStore";

/**
Expand Down Expand Up @@ -84,6 +84,15 @@ export type EmbeddedContentStore = VectorStore<EmbeddedContent> & {
*/
loadEmbeddedContent(args: { page: Page }): Promise<EmbeddedContent[]>;

/**
Get the Pages of the embedded content that meets the filter requirements.
*/
getPagesFromEmbeddedContent(args: {
dataSources?: string[];
chunkAlgoHash: string;
inverseChunkAlgoHash?: boolean;
}): Promise<PersistedPage[]>;

/**
Delete all embedded content for the given page and/or data sources.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { pageIdentity } from ".";
import { pageIdentity, PersistedPage } from ".";
import { DatabaseConnection } from "../DatabaseConnection";
import { EmbeddedContent, EmbeddedContentStore } from "./EmbeddedContent";
import { FindNearestNeighborsOptions, WithScore } from "../VectorStore";
Expand Down Expand Up @@ -95,6 +95,68 @@ export function makeMongoDbEmbeddedContentStore({
return await embeddedContentCollection.find(pageIdentity(page)).toArray();
},

async getPagesFromEmbeddedContent({
dataSources,
chunkAlgoHash,
inverseChunkAlgoHash = false,
}): Promise<PersistedPage[]> {
const pipeline = [
{
$match: {
...(dataSources ? { sourceName: { $in: dataSources } } : undefined),
chunkAlgoHash: inverseChunkAlgoHash
? { $ne: chunkAlgoHash }
: chunkAlgoHash,
},
},
{
$lookup: {
from: "pages",
let: {
url: "$url",
sourceName: "$sourceName",
},
pipeline: [
{
$match: {
$expr: {
$and: [
{
$eq: ["$url", "$$url"],
},
{
$eq: ["$sourceName", "$$sourceName"],
},
],
},
},
},
],
as: "pages",
},
},
{
$project: {
_id: 0,
pages: 1,
},
},
{
$unwind: {
path: "$pages",
},
},
{
$replaceRoot: {
newRoot: "$pages",
},
},
];
return await embeddedContentCollection
.aggregate<PersistedPage>(pipeline)
.toArray();
},

async deleteEmbeddedContent({
page,
dataSources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ import {
updateEmbeddedContent,
updateEmbeddedContentForPage,
} from "./updateEmbeddedContent";
import { persistPages } from ".";
import { makeMongoDbEmbeddedContentStore, makeMongoDbPageStore, MongoDbEmbeddedContentStore, MongoDbPageStore, PageStore, persistPages, updatePages } from ".";
import { makeMockPageStore } from "../test/MockPageStore";
import * as chunkPageModule from "../chunk/chunkPage";
import { EmbeddedContentStore, EmbeddedContent } from "./EmbeddedContent";
import { Embedder } from "../embed";
import { Page, PersistedPage } from ".";
import { strict as assert } from "assert";
import { MongoMemoryReplSet } from "mongodb-memory-server";
import { DataSource } from "../dataSources";
import { MongoClient } from "mongodb";

export const makeMockEmbeddedContentStore = (): EmbeddedContentStore => {
const content: Map<string /* page url */, EmbeddedContent[]> = new Map();
Expand All @@ -26,6 +30,9 @@ export const makeMockEmbeddedContentStore = (): EmbeddedContentStore => {
async updateEmbeddedContent({ embeddedContent, page }) {
content.set(page.url, [...embeddedContent]);
},
async getPagesFromEmbeddedContent(args) {
return [];
},
metadata: {
embeddingName: "test",
},
Expand Down Expand Up @@ -207,6 +214,7 @@ describe("updateEmbeddedContent", () => {
store: embeddedContentStore,
page,
concurrencyOptions: { createChunks: 2 },
chunkAlgoHash: "testchunkalgohash",
});

const embeddedContent = await embeddedContentStore.loadEmbeddedContent({
Expand Down Expand Up @@ -276,3 +284,157 @@ describe("updateEmbeddedContent", () => {
});
});
});


describe("updateEmbeddedContent", () => {
let mongod: MongoMemoryReplSet | undefined;
let pageStore: MongoDbPageStore;
let embedStore: MongoDbEmbeddedContentStore;
let uri: string;
let databaseName: string;
let mongoClient: MongoClient;
let page1Embedding: EmbeddedContent[], page2Embedding: EmbeddedContent[]
let pages: PersistedPage[] = [];

const embedder = {
async embed() {
return { embedding: [1, 2, 3] };
},
};
const mockDataSources: DataSource[] = [
{
name: "source1",
fetchPages: async () => [
{
url: "test1.com",
format: "html",
sourceName: "source1",
body: "hello source 1",
},
],
},
{
name: "source2",
fetchPages: async () => [
{
url: "test2.com",
format: "html",
sourceName: "source2",
body: "hello source 1",
},
],
},
];
const mockDataSourceNames = mockDataSources.map(
(dataSource) => dataSource.name
);

beforeEach(async () => {
databaseName = "test-all-command";
mongod = await MongoMemoryReplSet.create();
uri = mongod.getUri();
mongoClient = await MongoClient.connect(mongod.getUri(), {});
embedStore = makeMongoDbEmbeddedContentStore({
connectionUri: uri,
databaseName,
searchIndex: { embeddingName: "test-embedding" },
});
pageStore = makeMongoDbPageStore({
connectionUri: uri,
databaseName,
});
// create pages and verify that they have been created
await updatePages({ sources: mockDataSources, pageStore });
pages = await pageStore.loadPages();
assert(pages.length == 2);
// create embeddings for the pages and verify that they have been created
await updateEmbeddedContent({
since: new Date(0),
embeddedContentStore: embedStore,
pageStore,
sourceNames: mockDataSourceNames,
embedder,
});
page1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
page2Embedding = await embedStore.loadEmbeddedContent({
page: pages[1],
});
assert(page1Embedding.length);
assert(page2Embedding.length);
});

afterEach(async () => {
await pageStore.drop();
await pageStore.close();
await embedStore.drop();
await embedStore.close();
await mongoClient.close();
await mongod?.stop();
});

it("updates embedded content for pages that have been updated after the 'since' date provided", async () => {
// Modify dates of pages and embedded content for testing
const sinceDate = new Date("2024-01-01")
const beforeSinceDate = new Date("2023-01-01")
const afterSinceDate = new Date("2025-01-01")
// set pages[0] to be last updated before sinceDate (should not be modified)
await mongoClient.db(databaseName).collection('pages').updateOne({...pages[0]}, { $set: { updated: beforeSinceDate } });
await mongoClient.db(databaseName).collection('embedded_content').updateOne({sourceName: mockDataSourceNames[0]}, { $set: { updated: beforeSinceDate } });
// set pages[1] to be last updated after sinceDate (should be re-chunked)
await mongoClient.db(databaseName).collection('pages').updateOne({...pages[1]}, { $set: { updated: afterSinceDate } });
await mongoClient.db(databaseName).collection('embedded_content').updateOne({sourceName: mockDataSourceNames[1]}, { $set: { updated: afterSinceDate } });
const originalPage1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
const originalPage2Embedding = await embedStore.loadEmbeddedContent({
page: pages[1],
});
await updateEmbeddedContent({
since: sinceDate,
embeddedContentStore: embedStore,
pageStore,
sourceNames: mockDataSourceNames,
embedder,
});
const updatedPage1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
const updatedPage2Embedding = await embedStore.loadEmbeddedContent({
page: pages[1],
});
assert(updatedPage1Embedding.length);
assert(updatedPage2Embedding.length);
expect(updatedPage1Embedding[0].updated.getTime()).toBe(originalPage1Embedding[0].updated.getTime());
expect(updatedPage2Embedding[0].updated.getTime()).not.toBe(originalPage2Embedding[0].updated.getTime());
});
it("updates embedded content when page has not changed, but chunk algo has, ignoring since date", async () => {
// change the chunking algo for the second page, but not the first
await updateEmbeddedContent({
since: new Date(),
embeddedContentStore: embedStore,
pageStore,
sourceNames: [mockDataSourceNames[0]],
embedder,
});
await updateEmbeddedContent({
since: new Date(),
embeddedContentStore: embedStore,
pageStore,
sourceNames: [mockDataSourceNames[1]],
embedder,
chunkOptions: { chunkOverlap: 2 },
});
const updatedPage1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
const updatedPage2Embedding = await embedStore.loadEmbeddedContent({
page: pages[1],
});
assert(updatedPage1Embedding.length);
assert(updatedPage2Embedding.length);
expect(updatedPage1Embedding[0].chunkAlgoHash).toBe(page1Embedding[0].chunkAlgoHash);
expect(updatedPage2Embedding[0].chunkAlgoHash).not.toBe(page2Embedding[0].chunkAlgoHash);
});
});
Loading