diff --git a/assets/package.json b/assets/package.json index dbdbeca2cb..8c64a33255 100644 --- a/assets/package.json +++ b/assets/package.json @@ -80,7 +80,7 @@ "kubernetes-resource-parser": "0.1.0", "lodash": "4.17.21", "moment": "2.29.4", - "phoenix": "1.7.3", + "phoenix": "1.7.14", "pluralize": "8.0.0", "pluralsh-absinthe-socket-apollo-link": "0.2.0", "query-string": "8.1.0", diff --git a/assets/src/components/ai/chatbot/AISuggestFix.tsx b/assets/src/components/ai/chatbot/AISuggestFix.tsx index fdbc8e23ad..84d0ef7d60 100644 --- a/assets/src/components/ai/chatbot/AISuggestFix.tsx +++ b/assets/src/components/ai/chatbot/AISuggestFix.tsx @@ -19,7 +19,6 @@ import { AiInsightFragment, AiRole, ChatMessage, - useAiChatStreamSubscription, useAiFixPrMutation, useAiSuggestedFixLazyQuery, } from '../../../generated/graphql.ts' @@ -28,6 +27,8 @@ import LoadingIndicator from '../../utils/LoadingIndicator.tsx' import AIPanel from '../AIPanel.tsx' import { AISuggestFixButton } from './AISuggestFixButton.tsx' import { ChatWithAIButton, insightMessage } from './ChatbotButton.tsx' +import { useStreamTopic } from '../useStreamTopic.tsx' +import { useChannel } from 'components/hooks/useChannel.tsx' interface AISuggestFixProps { insight: Nullable @@ -52,20 +53,22 @@ export function Loading({ setStreaming: Dispatch> }): ReactNode { const [streamedMessage, setStreamedMessage] = useState([]) - useAiChatStreamSubscription({ - variables: { insightId, scopeId }, - onData: ({ data: { data } }) => { + const topic = useStreamTopic({ insightId, scopeId }) + const callback = useCallback( + ({ content, seq }) => { setStreaming(true) - if ((data?.aiStream?.seq ?? 1) % 120 === 0) scrollToBottom() + if ((seq ?? 1) % 120 === 0) scrollToBottom() setStreamedMessage((streamedMessage) => [ ...streamedMessage, { - seq: data?.aiStream?.seq ?? 0, - content: data?.aiStream?.content ?? '', + seq: seq ?? 0, + content: content ?? '', }, ]) }, - }) + [setStreaming, setStreamedMessage, scrollToBottom] + ) + useChannel(topic, 'stream', callback) if (!streamedMessage.length) { return diff --git a/assets/src/components/ai/chatbot/ChatbotPanelThread.tsx b/assets/src/components/ai/chatbot/ChatbotPanelThread.tsx index 9472f46b80..4da2c36dbe 100644 --- a/assets/src/components/ai/chatbot/ChatbotPanelThread.tsx +++ b/assets/src/components/ai/chatbot/ChatbotPanelThread.tsx @@ -13,7 +13,6 @@ import { ChatThreadDetailsDocument, ChatThreadDetailsQuery, ChatThreadFragment, - useAiChatStreamSubscription, useChatMutation, useChatThreadDetailsQuery, } from 'generated/graphql' @@ -25,6 +24,8 @@ import { SendMessageForm, } from './ChatbotSendMessageForm.tsx' import { ChatMessage } from './ChatMessage.tsx' +import { useStreamTopic } from '../useStreamTopic.tsx' +import { useChannel } from 'components/hooks/useChannel.tsx' export function ChatbotPanelThread({ currentThread, @@ -44,20 +45,22 @@ export function ChatbotPanelThread({ }, [messageListRef]) const [streamedMessage, setStreamedMessage] = useState([]) - useAiChatStreamSubscription({ - variables: { threadId: currentThread.id }, - onData: ({ data: { data } }) => { + const topic = useStreamTopic({ threadId: currentThread.id }) + const callback = useCallback( + ({ content, seq }) => { setStreaming(true) - if ((data?.aiStream?.seq ?? 1) % 120 === 0) scrollToBottom() + if ((seq ?? 1) % 120 === 0) scrollToBottom() setStreamedMessage((streamedMessage) => [ ...streamedMessage, { - seq: data?.aiStream?.seq ?? 0, - content: data?.aiStream?.content ?? '', + seq: seq ?? 0, + content: content ?? '', }, ]) }, - }) + [setStreaming, setStreamedMessage, scrollToBottom] + ) + useChannel(topic, 'stream', callback) const { data } = useChatThreadDetailsQuery({ variables: { id: currentThread.id }, diff --git a/assets/src/components/ai/useStreamTopic.tsx b/assets/src/components/ai/useStreamTopic.tsx new file mode 100644 index 0000000000..d13ef43c22 --- /dev/null +++ b/assets/src/components/ai/useStreamTopic.tsx @@ -0,0 +1,25 @@ +import { LoginContext } from 'components/contexts' +import { useContext, useMemo } from 'react' + +export function useStreamTopic({ + insightId, + scopeId, + threadId, +}: { + insightId?: string + scopeId?: string + threadId?: string +}) { + const { me } = useContext(LoginContext) + return useMemo(() => { + if (insightId) { + return `ai:insight:${insightId}:${me?.id}` + } + + if (threadId) { + return `ai:thread:${threadId}:${me?.id}` + } + + return `ai:freeform:${scopeId}:${me?.id}` + }, [insightId, threadId, scopeId, me]) +} diff --git a/assets/src/components/hooks/useChannel.tsx b/assets/src/components/hooks/useChannel.tsx new file mode 100644 index 0000000000..6ae1dfa83e --- /dev/null +++ b/assets/src/components/hooks/useChannel.tsx @@ -0,0 +1,20 @@ +import { socket } from 'helpers/client' +import { useEffect } from 'react' + +export function useChannel(topic, event, callback) { + useEffect(() => { + const channel = socket.channel(topic) + + channel + .join() + .receive('ok', ({ messages }) => + console.log('successfully joined channel', messages || '') + ) + .receive('error', ({ reason }) => + console.error('failed to join channel', reason) + ) + + channel.on(event, callback) + return () => channel.leave() + }) +} diff --git a/assets/src/generated/graphql-kubernetes.ts b/assets/src/generated/graphql-kubernetes.ts index d1042cfb91..d29b0e2876 100644 --- a/assets/src/generated/graphql-kubernetes.ts +++ b/assets/src/generated/graphql-kubernetes.ts @@ -1,4 +1,4 @@ -/* eslint-disable */ + /* prettier-ignore */ import { gql } from '@apollo/client'; import * as Apollo from '@apollo/client'; diff --git a/assets/src/generated/graphql-plural.ts b/assets/src/generated/graphql-plural.ts index 694f1a631a..2a8b931afc 100644 --- a/assets/src/generated/graphql-plural.ts +++ b/assets/src/generated/graphql-plural.ts @@ -1,4 +1,4 @@ -/* eslint-disable */ + /* prettier-ignore */ import { gql } from '@apollo/client'; import * as Apollo from '@apollo/client'; diff --git a/assets/src/generated/graphql.ts b/assets/src/generated/graphql.ts index 79a23b1a14..a80471a2e7 100644 --- a/assets/src/generated/graphql.ts +++ b/assets/src/generated/graphql.ts @@ -1,4 +1,4 @@ -/* eslint-disable */ + /* prettier-ignore */ import { gql } from '@apollo/client'; import * as Apollo from '@apollo/client'; diff --git a/assets/src/helpers/client.ts b/assets/src/helpers/client.ts index 4cdf6dcf26..79dd7a1f7d 100644 --- a/assets/src/helpers/client.ts +++ b/assets/src/helpers/client.ts @@ -24,15 +24,15 @@ export const authlessClient = new ApolloClient({ cache: new InMemoryCache(), }) -// function maybeRejoin(chan) { -// const state = chan.state -// if (state === 'closed' || state === 'errored') { -// console.log('broken absinthe channel, rejoining') -// chan.rejoin() -// } +// function resetAbsinthe(absintheSocket) { +// absintheSocket.channelJoinCreated = false +// absintheSocket.channel.joinedOnce = false +// absintheSocket.channel.leave() // } -function maybeReconnect(socket) { +function maybeReconnect(absintheSocket) { + console.log('polled socket', absintheSocket) + const socket = absintheSocket.phoenixSocket // console.log('socket reconnect attempt', socket) if (socket.connectionState() === 'closed') { console.warn('found dead websocket, attempting a reconnect') @@ -44,6 +44,7 @@ function maybeReconnect(socket) { socket.reconnectTimer.reset() socket.reconnectTimer.scheduleTimeout() } + // resetAbsinthe(absintheSocket) return } @@ -121,9 +122,14 @@ export function buildClient(gqlUrl, wsUrl, fetchToken) { }), }) - // socket.onOpen(() => maybeRejoin(absintheSocket.channel)) - socket.onClose(() => maybeReconnect(socket)) - setInterval(() => maybeReconnect(socket), 5000) + // setInterval(() => { + // console.log('log absinthe socket', absintheSocket) + // maybeRejoin(absintheSocket.channel) + // }, 5000) + // socket.onClose(() => { + // maybeReconnect(absintheSocket) + // }) + setInterval(() => maybeReconnect(absintheSocket), 5000) return { client, socket } } diff --git a/assets/yarn.lock b/assets/yarn.lock index 614cee0e1f..667aadd628 100644 --- a/assets/yarn.lock +++ b/assets/yarn.lock @@ -10196,7 +10196,7 @@ __metadata: moment: 2.29.4 moment-timezone: 0.5.43 npm-run-all: 4.1.5 - phoenix: 1.7.3 + phoenix: 1.7.14 pluralize: 8.0.0 pluralsh-absinthe-socket-apollo-link: 0.2.0 prettier: 3.3.3 @@ -17199,10 +17199,10 @@ __metadata: languageName: node linkType: hard -"phoenix@npm:1.7.3": - version: 1.7.3 - resolution: "phoenix@npm:1.7.3" - checksum: 039270c919e269412cbc40e46cbe667073506d387406856c3b03aa7ec338a710615f9ff3a2156e08877b851f5b29eaefb71da55d66ea1c7738a294761db04b93 +"phoenix@npm:1.7.14": + version: 1.7.14 + resolution: "phoenix@npm:1.7.14" + checksum: c687f340380df1ba9af5f526c76a06a445cb819d366873bee228a2151f66c9732d1e579dda5371be3c80674e5fd8003bc19549dbadec1fd25220bc8081e30440 languageName: node linkType: hard diff --git a/charts/stateless/.helmignore b/charts/stateless/.helmignore new file mode 100644 index 0000000000..0e8a0eb36f --- /dev/null +++ b/charts/stateless/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/charts/stateless/Chart.yaml b/charts/stateless/Chart.yaml new file mode 100644 index 0000000000..d3eeb03d1c --- /dev/null +++ b/charts/stateless/Chart.yaml @@ -0,0 +1,6 @@ +apiVersion: v2 +name: stateless +description: A Helm chart to deploy a basic stateless application +type: application +version: 0.1.0 +appVersion: "1.16.0" diff --git a/charts/stateless/templates/NOTES.txt b/charts/stateless/templates/NOTES.txt new file mode 100644 index 0000000000..3ff5dbf807 --- /dev/null +++ b/charts/stateless/templates/NOTES.txt @@ -0,0 +1,22 @@ +1. Get the application URL by running these commands: +{{- if .Values.ingress.enabled }} +{{- range $host := .Values.ingress.hosts }} + {{- range .paths }} + http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }} + {{- end }} +{{- end }} +{{- else if contains "NodePort" .Values.service.type }} + export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "stateless.fullname" . }}) + export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}") + echo http://$NODE_IP:$NODE_PORT +{{- else if contains "LoadBalancer" .Values.service.type }} + NOTE: It may take a few minutes for the LoadBalancer IP to be available. + You can watch its status by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "stateless.fullname" . }}' + export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "stateless.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}") + echo http://$SERVICE_IP:{{ .Values.service.port }} +{{- else if contains "ClusterIP" .Values.service.type }} + export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "stateless.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}") + export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}") + echo "Visit http://127.0.0.1:8080 to use your application" + kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT +{{- end }} diff --git a/charts/stateless/templates/_helpers.tpl b/charts/stateless/templates/_helpers.tpl new file mode 100644 index 0000000000..c2bc5b9827 --- /dev/null +++ b/charts/stateless/templates/_helpers.tpl @@ -0,0 +1,62 @@ +{{/* +Expand the name of the chart. +*/}} +{{- define "stateless.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +If release name contains chart name it will be used as a full name. +*/}} +{{- define "stateless.fullname" -}} +{{- if .Values.fullnameOverride }} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- $name := default .Chart.Name .Values.nameOverride }} +{{- if contains $name .Release.Name }} +{{- .Release.Name | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} +{{- end }} +{{- end }} +{{- end }} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "stateless.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Common labels +*/}} +{{- define "stateless.labels" -}} +helm.sh/chart: {{ include "stateless.chart" . }} +{{ include "stateless.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end }} + +{{/* +Selector labels +*/}} +{{- define "stateless.selectorLabels" -}} +app.kubernetes.io/name: {{ include "stateless.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{/* +Create the name of the service account to use +*/}} +{{- define "stateless.serviceAccountName" -}} +{{- if .Values.serviceAccount.create }} +{{- default (include "stateless.fullname" .) .Values.serviceAccount.name }} +{{- else }} +{{- default "default" .Values.serviceAccount.name }} +{{- end }} +{{- end }} diff --git a/charts/stateless/templates/deployment.yaml b/charts/stateless/templates/deployment.yaml new file mode 100644 index 0000000000..d404f00a52 --- /dev/null +++ b/charts/stateless/templates/deployment.yaml @@ -0,0 +1,76 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "stateless.fullname" . }} + labels: + {{- include "stateless.labels" . | nindent 4 }} +spec: + {{- if not .Values.autoscaling.enabled }} + replicas: {{ .Values.replicaCount }} + {{- end }} + selector: + matchLabels: + {{- include "stateless.selectorLabels" . | nindent 6 }} + template: + metadata: + {{- with .Values.podAnnotations }} + annotations: + {{- toYaml . | nindent 8 }} + {{- end }} + labels: + {{- include "stateless.labels" . | nindent 8 }} + {{- with .Values.podLabels }} + {{- toYaml . | nindent 8 }} + {{- end }} + spec: + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + serviceAccountName: {{ include "stateless.serviceAccountName" . }} + securityContext: + {{- toYaml .Values.podSecurityContext | nindent 8 }} + containers: + - name: {{ .Chart.Name }} + securityContext: + {{- toYaml .Values.securityContext | nindent 12 }} + image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + {{ if .Values.env }} + env: + {{ toYaml .Values.env | nindent 12 }} + {{ end }} + {{ if .Values.envFrom }} + envFrom: + {{ toYaml .Values.envFrom | nindent 12 }} + {{ end }} + ports: + - name: http + containerPort: {{ .Values.service.port }} + protocol: TCP + livenessProbe: + {{- toYaml .Values.livenessProbe | nindent 12 }} + readinessProbe: + {{- toYaml .Values.readinessProbe | nindent 12 }} + resources: + {{- toYaml .Values.resources | nindent 12 }} + {{- with .Values.volumeMounts }} + volumeMounts: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- with .Values.volumes }} + volumes: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/charts/stateless/templates/hpa.yaml b/charts/stateless/templates/hpa.yaml new file mode 100644 index 0000000000..8367078b9f --- /dev/null +++ b/charts/stateless/templates/hpa.yaml @@ -0,0 +1,32 @@ +{{- if .Values.autoscaling.enabled }} +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: {{ include "stateless.fullname" . }} + labels: + {{- include "stateless.labels" . | nindent 4 }} +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: {{ include "stateless.fullname" . }} + minReplicas: {{ .Values.autoscaling.minReplicas }} + maxReplicas: {{ .Values.autoscaling.maxReplicas }} + metrics: + {{- if .Values.autoscaling.targetCPUUtilizationPercentage }} + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }} + {{- end }} + {{- if .Values.autoscaling.targetMemoryUtilizationPercentage }} + - type: Resource + resource: + name: memory + target: + type: Utilization + averageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }} + {{- end }} +{{- end }} diff --git a/charts/stateless/templates/ingress.yaml b/charts/stateless/templates/ingress.yaml new file mode 100644 index 0000000000..e885ccf1cb --- /dev/null +++ b/charts/stateless/templates/ingress.yaml @@ -0,0 +1,61 @@ +{{- if .Values.ingress.enabled -}} +{{- $fullName := include "stateless.fullname" . -}} +{{- $svcPort := .Values.service.port -}} +{{- if and .Values.ingress.className (not (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion)) }} + {{- if not (hasKey .Values.ingress.annotations "kubernetes.io/ingress.class") }} + {{- $_ := set .Values.ingress.annotations "kubernetes.io/ingress.class" .Values.ingress.className}} + {{- end }} +{{- end }} +{{- if semverCompare ">=1.19-0" .Capabilities.KubeVersion.GitVersion -}} +apiVersion: networking.k8s.io/v1 +{{- else if semverCompare ">=1.14-0" .Capabilities.KubeVersion.GitVersion -}} +apiVersion: networking.k8s.io/v1beta1 +{{- else -}} +apiVersion: extensions/v1beta1 +{{- end }} +kind: Ingress +metadata: + name: {{ $fullName }} + labels: + {{- include "stateless.labels" . | nindent 4 }} + {{- with .Values.ingress.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +spec: + {{- if and .Values.ingress.className (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion) }} + ingressClassName: {{ .Values.ingress.className }} + {{- end }} + {{- if .Values.ingress.tls }} + tls: + {{- range .Values.ingress.tls }} + - hosts: + {{- range .hosts }} + - {{ . | quote }} + {{- end }} + secretName: {{ .secretName }} + {{- end }} + {{- end }} + rules: + {{- range .Values.ingress.hosts }} + - host: {{ .host | quote }} + http: + paths: + {{- range .paths }} + - path: {{ .path }} + {{- if and .pathType (semverCompare ">=1.18-0" $.Capabilities.KubeVersion.GitVersion) }} + pathType: {{ .pathType }} + {{- end }} + backend: + {{- if semverCompare ">=1.19-0" $.Capabilities.KubeVersion.GitVersion }} + service: + name: {{ $fullName }} + port: + number: {{ $svcPort }} + {{- else }} + serviceName: {{ $fullName }} + servicePort: {{ $svcPort }} + {{- end }} + {{- end }} + {{- end }} +{{- end }} diff --git a/charts/stateless/templates/service.yaml b/charts/stateless/templates/service.yaml new file mode 100644 index 0000000000..aed1f758e0 --- /dev/null +++ b/charts/stateless/templates/service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ include "stateless.fullname" . }} + labels: + {{- include "stateless.labels" . | nindent 4 }} +spec: + type: {{ .Values.service.type }} + ports: + - port: {{ .Values.service.port }} + targetPort: http + protocol: TCP + name: http + selector: + {{- include "stateless.selectorLabels" . | nindent 4 }} diff --git a/charts/stateless/templates/serviceaccount.yaml b/charts/stateless/templates/serviceaccount.yaml new file mode 100644 index 0000000000..72849888da --- /dev/null +++ b/charts/stateless/templates/serviceaccount.yaml @@ -0,0 +1,13 @@ +{{- if .Values.serviceAccount.create -}} +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ include "stateless.serviceAccountName" . }} + labels: + {{- include "stateless.labels" . | nindent 4 }} + {{- with .Values.serviceAccount.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +automountServiceAccountToken: {{ .Values.serviceAccount.automount }} +{{- end }} diff --git a/charts/stateless/values.yaml b/charts/stateless/values.yaml new file mode 100644 index 0000000000..f8ed1fd103 --- /dev/null +++ b/charts/stateless/values.yaml @@ -0,0 +1,109 @@ +# Default values for stateless. +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. + +replicaCount: 1 + +image: + repository: nginx + pullPolicy: IfNotPresent + # Overrides the image tag whose default is the chart appVersion. + tag: "" + +env: [] + +imagePullSecrets: [] +nameOverride: "" +fullnameOverride: "" + +serviceAccount: + # Specifies whether a service account should be created + create: true + # Automatically mount a ServiceAccount's API credentials? + automount: true + # Annotations to add to the service account + annotations: {} + # The name of the service account to use. + # If not set and create is true, a name is generated using the fullname template + name: "" + +podAnnotations: {} +podLabels: {} + +podSecurityContext: {} + # fsGroup: 2000 + +securityContext: {} + # capabilities: + # drop: + # - ALL + # readOnlyRootFilesystem: true + # runAsNonRoot: true + # runAsUser: 1000 + +service: + type: ClusterIP + port: 80 + +ingress: + enabled: false + className: "" + annotations: {} + # kubernetes.io/ingress.class: nginx + # kubernetes.io/tls-acme: "true" + hosts: + - host: chart-example.local + paths: + - path: / + pathType: ImplementationSpecific + tls: [] + # - secretName: chart-example-tls + # hosts: + # - chart-example.local + +resources: {} + # We usually recommend not to specify default resources and to leave this as a conscious + # choice for the user. This also increases chances charts run on environments with little + # resources, such as Minikube. If you do want to specify resources, uncomment the following + # lines, adjust them as necessary, and remove the curly braces after 'resources:'. + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + +livenessProbe: + httpGet: + path: / + port: http +readinessProbe: + httpGet: + path: / + port: http + +autoscaling: + enabled: false + minReplicas: 1 + maxReplicas: 100 + targetCPUUtilizationPercentage: 80 + # targetMemoryUtilizationPercentage: 80 + +# Additional volumes on the output Deployment definition. +volumes: [] +# - name: foo +# secret: +# secretName: mysecret +# optional: false + +# Additional volumeMounts on the output Deployment definition. +volumeMounts: [] +# - name: foo +# mountPath: "/etc/foo" +# readOnly: true + +nodeSelector: {} + +tolerations: [] + +affinity: {} diff --git a/lib/console/ai/stream.ex b/lib/console/ai/stream.ex index ad94e4573b..d5b05b871b 100644 --- a/lib/console/ai/stream.ex +++ b/lib/console/ai/stream.ex @@ -1,5 +1,6 @@ defmodule Console.AI.Stream do alias Console.Schema.User + alias ConsoleWeb.AIChannel defstruct [:topic] @@ -10,9 +11,11 @@ defmodule Console.AI.Stream do def stream(), do: Process.get(@stream) def publish(%__MODULE__{topic: topic}, c, ind) when is_binary(topic) do + msg = %{content: c, seq: ind} + AIChannel.stream(topic, msg) Absinthe.Subscription.publish( ConsoleWeb.Endpoint, - %{content: c, seq: ind}, + msg, [ai_stream: topic] ) end diff --git a/lib/console/ai/stream/exec.ex b/lib/console/ai/stream/exec.ex index 6dd56eac8c..670d366e56 100644 --- a/lib/console/ai/stream/exec.ex +++ b/lib/console/ai/stream/exec.ex @@ -20,7 +20,6 @@ defmodule Console.AI.Stream.Exec do {%AIStream.SSE.Event{data: data}, ind}, acc -> case reducer.(data) do c when is_binary(c) -> - IO.inspect(c, label: "publishing text stream") AIStream.publish(stream, c, ind) {:cont, [c | acc]} _ -> {:cont, acc} diff --git a/lib/console_web/channels/ai_channel.ex b/lib/console_web/channels/ai_channel.ex new file mode 100644 index 0000000000..e4761b795d --- /dev/null +++ b/lib/console_web/channels/ai_channel.ex @@ -0,0 +1,17 @@ +defmodule ConsoleWeb.AIChannel do + use ConsoleWeb, :channel + alias Console.Schema.User + + @stream_event "stream" + + def stream(topic, payload), do: ConsoleWeb.Endpoint.broadcast(topic, @stream_event, payload) + + def join("ai:" <> rest, _, socket) do + send(self(), {:connect_ai, String.split(rest, ":")}) + {:ok, socket} + end + + def handle_info({:connect_ai, [_, _, user_id]}, %{assigns: %{user: %User{id: user_id}}} = socket), + do: {:noreply, socket} + def handle_info(_, socket), do: {:stop, {:error, :unauthorized}, socket} +end diff --git a/lib/console_web/channels/user_socket.ex b/lib/console_web/channels/user_socket.ex index 029e629826..1c0323d1df 100644 --- a/lib/console_web/channels/user_socket.ex +++ b/lib/console_web/channels/user_socket.ex @@ -4,6 +4,7 @@ defmodule ConsoleWeb.UserSocket do schema: Console.GraphQl channel "pod:*", ConsoleWeb.ShellChannel + channel "ai:*", ConsoleWeb.AIChannel def connect(params, socket) do case build_context(params) do diff --git a/test/console_web/channels/ai_channel_test.exs b/test/console_web/channels/ai_channel_test.exs new file mode 100644 index 0000000000..36850a7494 --- /dev/null +++ b/test/console_web/channels/ai_channel_test.exs @@ -0,0 +1,19 @@ +defmodule ConsoleWeb.AIChannelTest do + use ConsoleWeb.ChannelCase, async: false + alias Console.AI.Stream + + describe "AIChannel" do + test "it can broadcast ai stream events" do + insight = insert(:ai_insight) + user = insert(:user) + topic = Stream.topic(:insight, insight.id, user) + + {:ok, socket} = mk_socket(user) + {:ok, _, _socket} = subscribe_and_join(socket, topic, %{}) + + stream = %Stream{topic: topic} + Stream.publish(stream, "ai content", 0) + assert_push "stream", %{content: "ai content", seq: 0} + end + end +end