Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move AbortableAsyncIterator to index.ts #153

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/abort/specific-request.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import ollama from 'ollama'
import { AbortableAsyncIterator } from '../../src/utils'
import ollama, { AbortableAsyncIterator } from '../../src'

let stream: AbortableAsyncIterator<object>

// Set a timeout to abort the request after 1 second
setTimeout(() => {
console.log('\nAborting request...\n')
stream.abort()
// Note: This will error if Ollama doesn't start responding within 1 second!
}, 1000) // 1000 milliseconds = 1 second

ollama.generate({
Expand Down
3 changes: 2 additions & 1 deletion src/browser.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as utils from './utils.js'
import { AbortableAsyncIterator, parseJSON, post } from './utils.js'
import { parseJSON, post } from './utils.js'
import 'whatwg-fetch'

import type {
Expand All @@ -25,6 +25,7 @@
ShowResponse,
StatusResponse,
} from './interfaces.js'
import { AbortableAsyncIterator } from './index'

export class Ollama {
protected readonly config: Config
Expand Down Expand Up @@ -66,7 +67,7 @@
*/
protected async processStreamableRequest<T extends object>(
endpoint: string,
request: { stream?: boolean } & Record<string, any>,

Check warning on line 70 in src/browser.ts

View workflow job for this annotation

GitHub Actions / test (16)

Unexpected any. Specify a different type

Check warning on line 70 in src/browser.ts

View workflow job for this annotation

GitHub Actions / test (18)

Unexpected any. Specify a different type

Check warning on line 70 in src/browser.ts

View workflow job for this annotation

GitHub Actions / test (20)

Unexpected any. Specify a different type
): Promise<T | AbortableAsyncIterator<T>> {
request.stream = request.stream ?? false
const host = `${this.config.host}/api/${endpoint}`
Expand Down
38 changes: 36 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import * as utils from './utils.js'
import { AbortableAsyncIterator } from './utils.js'

import fs, { createReadStream, promises } from 'fs'
import { dirname, join, resolve } from 'path'
import { createHash } from 'crypto'
import { homedir } from 'os'
import { Ollama as OllamaBrowser } from './browser.js'
import { type ErrorResponse, Ollama as OllamaBrowser } from './browser.js'

import type { CreateRequest, ProgressResponse } from './interfaces.js'

Expand Down Expand Up @@ -171,5 +170,40 @@

export default new Ollama()

/**
* An AsyncIterator which can be aborted
*/
export class AbortableAsyncIterator<T extends object> {
private readonly abortController: AbortController
private readonly itr: AsyncGenerator<T | ErrorResponse>
private readonly doneCallback: () => void

constructor(abortController: AbortController, itr: AsyncGenerator<T | ErrorResponse>, doneCallback: () => void) {
this.abortController = abortController
this.itr = itr
this.doneCallback = doneCallback
}

abort() {
this.abortController.abort()
}

async *[Symbol.asyncIterator]() {
for await (const message of this.itr) {
if ('error' in message) {
throw new Error(message.error)
}
yield message
// message will be done in the case of chat and generate
// message will be success in the case of a progress response (pull, push, create)
if ((message as any).done || (message as any).status === 'success') {

Check warning on line 199 in src/index.ts

View workflow job for this annotation

GitHub Actions / test (16)

Unexpected any. Specify a different type

Check warning on line 199 in src/index.ts

View workflow job for this annotation

GitHub Actions / test (16)

Unexpected any. Specify a different type

Check warning on line 199 in src/index.ts

View workflow job for this annotation

GitHub Actions / test (18)

Unexpected any. Specify a different type

Check warning on line 199 in src/index.ts

View workflow job for this annotation

GitHub Actions / test (18)

Unexpected any. Specify a different type

Check warning on line 199 in src/index.ts

View workflow job for this annotation

GitHub Actions / test (20)

Unexpected any. Specify a different type

Check warning on line 199 in src/index.ts

View workflow job for this annotation

GitHub Actions / test (20)

Unexpected any. Specify a different type
this.doneCallback()
return
}
}
throw new Error('Did not receive done or success response in stream.')
}
}

// export all types from the main entry point so that packages importing types dont need to specify paths
export * from './interfaces.js'
35 changes: 0 additions & 35 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,6 @@
}
}

/**
* An AsyncIterator which can be aborted
*/
export class AbortableAsyncIterator<T extends object> {
private readonly abortController: AbortController
private readonly itr: AsyncGenerator<T | ErrorResponse>
private readonly doneCallback: () => void

constructor(abortController: AbortController, itr: AsyncGenerator<T | ErrorResponse>, doneCallback: () => void) {
this.abortController = abortController
this.itr = itr
this.doneCallback = doneCallback
}

abort() {
this.abortController.abort()
}

async *[Symbol.asyncIterator]() {
for await (const message of this.itr) {
if ('error' in message) {
throw new Error(message.error)
}
yield message
// message will be done in the case of chat and generate
// message will be success in the case of a progress response (pull, push, create)
if ((message as any).done || (message as any).status === 'success') {
this.doneCallback()
return
}
}
throw new Error('Did not receive done or success response in stream.')
}
}

/**
* Checks if the response is ok, if not throws an error.
* If the response is not ok, it will try to parse the response as JSON and use the error field as the error message.
Expand Down Expand Up @@ -171,7 +136,7 @@
data?: Record<string, unknown> | BodyInit,
options?: { signal?: AbortSignal, headers?: Headers },
): Promise<Response> => {
const isRecord = (input: any): input is Record<string, unknown> => {

Check warning on line 139 in src/utils.ts

View workflow job for this annotation

GitHub Actions / test (16)

Unexpected any. Specify a different type

Check warning on line 139 in src/utils.ts

View workflow job for this annotation

GitHub Actions / test (18)

Unexpected any. Specify a different type

Check warning on line 139 in src/utils.ts

View workflow job for this annotation

GitHub Actions / test (20)

Unexpected any. Specify a different type
return input !== null && typeof input === 'object' && !Array.isArray(input)
}

Expand Down
Loading