React Query and Server Side Events

Published:

Using server side events (SSE) is a great solution to one way streaming updates from the server. If you need two way communication, there's better tech such as sockets. But, sockets require protocol changes and your deployment target might prevent their use. SSE is HTTP so it's pretty safe to rely on anywhere. It might just be your next required solution.
A note on support
Their are some slightly differences between clients and support for things like Authorization
headers HTTP verbs etc. I've chosen to use eventsource
to bridge the gaps and get a comfortable interface for this example. Even that package suggests an alternative, so consider your options!
Code
A general note, the actual types for this example were generated through the wonderful openapi-typescript
module, but are largely irrelevant.
In case you decide to use it, you might care for the npm script:
export NODE_TLS_REJECT_UNAUTHORIZED=0 && openapi-typescript --alphabetize --export-type --empty-objects-unknown --root-types --default-non-nullable=false && npm run-script lint:fix
And a config file like:
apis: api@v2: root: http://127.0.0.1:8000/openapi.json x-openapi-ts: output: ./src/api/openapi.ts
API hooks using React Query
In this example we're listening for updates to a conversation sent via SSE and sending new messages using a simple POST.
To setup the listener we'll need to create a rather special hook. It's goal is:
- Return a sensible default for initial rendering
- Create a listener, resuming from the last event id we cached
- Update the cache directly after some messages
In addition, we'll need a mutation that can send messages. I'll include a direct POST method (mostly for testing), and a mutation hook around it.
First some generic API src/api/utils.ts
:
/** Applies JSON headers and stringifies the body */ export const apiPost = async ( input: string, init?: ApiRequestInit, config: APIRequestConfig = defaultConfig ): Promise<Response> => apiCall( input, { ...init, method: "POST", headers: { "Content-Type": "application/json", ...init?.headers, }, body: JSON.stringify(init?.body), }, config ); const apiCall = async ( input: string, init: RequestInit, config: APIRequestConfig ): Promise<Response> => { const response = await fetch(input, { ...defaultFetchOptions, ...init, headers: { ...defaultFetchOptions.headers, Accept: "application/json", ...init?.headers, }, }); if (config.throwErrors) await throwApiErrors(response); return response; }; export const replaceApiUriPathParameters = ( path: string, params: | Record<string | number, string | number | undefined | null> | undefined | null ): string => !params ? path : Object.entries(params).reduce( (uri, [key, value]) => value === undefined || value === null ? uri : uri.replace("{" + key + "}", String(value)), path ); /** Returns an empty string, or string encoded url query string parameters preceded by '?' if params are present */ export const getUrlSearchParamsString = ( params: Record<string | number, string | number> | undefined | null ): string => !params ? "" : "?" + new URLSearchParams( Object.entries(params).reduce((params, [key, value]) => { params[key] = String(value); return params; }, {} as Record<string, string>) ).toString();
And the important bits from src/api/conversation.ts
.
import { EventSource } from "eventsource"; import { useMutation, useQuery, useQueryClient, UseQueryOptions, } from "@tanstack/react-query"; // Sending a message can cause some backend processing. // The agent will keep us abreast of the status. type SchemaAgentStatus = "processing" | "idle"; // Largely irrelevant for example. Your types will vary: type SchemaConversationItemPublic = { content: string }; // We'll use this as the general shape of the event stream. export type ConversationState = { lastEventId: number; items: SchemaConversationItemPublic[]; status: SchemaAgentStatus; }; const defaultConversationState: ConversationState = { lastEventId: 0, items: [], // If we don't know, assume we're processing until told otherwise. status: "processing", }; export type UseConversationStateParams = { controller: AbortController; }; export const getUseConversationStateQueryKey = ( id: SchemaConversationItemPublic["id"] | undefined ) => ["useConversationState", id]; /** * A special hook that uses React Query as a caching mechanism in front of fetch event source. * Unlike other hooks, this has an ongoing effect and might be changed by the server instead of client. * * This hooks is not safe to call multiple times. Attach through `useContext`. */ export const useConversationState = ( id: SchemaConversationItemPublic["id"] | undefined, { controller }: UseConversationStateParams ) => { const queryClient = useQueryClient(); return useQuery({ enabled: !!id, // A convenience for the calling code to avoid conditional hooks while loading. refetchOnWindowFocus: false, // Absolutely critical or you get duplicates queryKey: getUseConversationStateQueryKey(id), queryFn: (): ConversationState => { if (!id) throw new Error("id is undefined"); const queryKey = getUseConversationStateQueryKey(id); // Access the global query client cache looking for state const state = (queryClient.getQueryCache().find({ queryKey })?.state .data as ConversationState) || { // And fill in our defaults when it's inevitably not found on first pass. ...defaultConversationState, }; const params: ConversationHistoryEndpoint["get"]["parameters"] = { path: { id }, // Include our last cached event id as we resubscribe // This is 0 initially, but if users leave the page and return // the cache will correctly resume at a higher number. header: { "last-event-id": state.lastEventId }, }; const uri = replaceApiUriPathParameters( conversationHistoryPath, params.path ); const es = new EventSource(uri, { // We need to provide our own implementation of fetch here // in order to attach header parameters such as lastEventId // and Authorization fetch: (input, init) => fetch(input, { ...init, ...defaultFetchOptions, headers: Object.entries(params.header || {}).reduce( (headers, [key, value]) => { headers[key] = String(value); return headers; }, {} as Record<string, string> ), }), }); // Allowing the calling code to pass an abort controller will let // us close the connection with the server as we unload the caller. controller.signal.addEventListener("abort", () => { es.close(); }); // A global handler for the special 'untyped' messages. // Your subscription design *should* provide you messages with `type: 'XYZ'` instead. // This is still required to catch errors, and more. es.addEventListener("message", console.warn); // We'll need to update these pieces often enough to warrant a utility. const getQueryAndState = () => { const query = queryClient .getQueryCache() .find<ConversationState>({ queryKey }); if (!query) throw new Error("query is undefined"); const state = query.state.data || defaultConversationState; return { query, state }; }; // Status events provide us the processing or idle signal es.addEventListener("status", (event) => { const { query, state } = getQueryAndState(); query.setData({ ...state, status: event.data as SchemaAgentStatus, // It's highly unlikely the event contains an actual event id, those // are sent with new items in the conversation. Safety first though. lastEventId: event.lastEventId ? Number(event.lastEventId) : state.lastEventId, } satisfies ConversationState); }); // When we do get new conversation items, they need appended to our cache. const addConversationItem = ( item: SchemaConversationItemPublic, newState?: Partial<Pick<ConversationState, "status" | "lastEventId">> ) => { const { query, state } = getQueryAndState(); query.setData({ ...state, ...newState, items: [...state.items, item], } satisfies ConversationState); }; es.addEventListener("item", (event) => { // eslint-disable-next-line @typescript-eslint/no-unsafe-argument const item = JSON.parse(event.data) as SchemaConversationItemPublic; addConversationItem(item, { lastEventId: Number(event.lastEventId) }); }); es.addEventListener("error", (error) => { let status: ConversationState["status"] | undefined; switch (error.code) { // Server error, we're done here. Close it. case 500: es.close(); status = "idle"; break; } console.error(error); const now = new Date().toISOString(); addConversationItem({ content: error.message }, { status }); }); // The server might also cut our connection, as it shuts down. es.addEventListener("closed", () => { es.close(); }); state.status = defaultConversationState.status; return { ...defaultConversationState, ...state, }; }, }); }; // Add conversation message export const conversationMessageAddPath = "/api/conversations/{id}/message/"; type ConversationMessageAddEndpoint = paths[typeof conversationMessageAddPath]; type ConversationsMessagePostEndpointRequestBody = ConversationMessageAddEndpoint["post"]["requestBody"]["content"]["application/json"]; /** Actual api call. For functionality, you probably prefer useConversationUpdateMutation() */ export const postConversationMessage = async ( conversationId: string, body: ConversationsMessagePostEndpointRequestBody ) => { const uri = replaceApiUriPathParameters(conversationMessageAddPath, { id: conversationId, }); const response = await apiPost(uri, { body }); const data = (await response.json()) as ConversationMessageAddEndpoint["post"]["responses"]["202"]["content"]["application/json"]; return data; }; export const usePostConversationMessageMutation = ( id: SchemaConversationPublic["id"] | undefined | null ) => { const queryClient = useQueryClient(); return useMutation({ mutationFn: (body: ConversationsMessagePostEndpointRequestBody) => { if (!id) { throw new Error("id is undefined"); } const result = postConversationMessage(id, body); // Update the modified time of the conversation as well across other hooks. // It's updated on the backend, but refetching is overkill. const now = new Date().toISOString(); // Conversation lists queryClient .getQueryCache() .findAll({ queryKey: [...getUseConversationsQueryKey()] }) .forEach((query) => { if (query?.state.data) { const data = query.state.data as ConversationsListResponse; query.setData({ ...data, results: data.results.map((result) => result.id === id ? { ...result, modified_at: now } : result ), }); } }); // This specific conversation's details const conversationQuery = queryClient .getQueryCache() .find<ConversationEndpointResponse>({ queryKey: getUseConversationQueryKey(id), }); if (conversationQuery?.state.data) { conversationQuery.setData({ ...conversationQuery.state.data, modified_at: now, }); } return result; }, }); };
API integration using Context
With hooks in place for communication, we just need to create a wrapper around them for the rest of the app to access since it can't be called multiple times safely:
/* eslint-disable react-refresh/only-export-components */ import { useConversationQuery, useConversationState, UseConversationStateParams, } from "@/api"; import { createContext, useContext, useEffect, useRef, type FC, type ReactNode, } from "react"; export interface IConversationStateContext { stateQuery: ReturnType<typeof useConversationState>; } export const ConversationStateContext = createContext<IConversationStateContext>({} as IConversationStateContext); type ConversationStateContextProviderProps = { children: ReactNode; params?: Omit<UseConversationStateParams, "controller">; query: ReturnType<typeof useConversationQuery>; }; export const ConversationStateContextProvider: FC< ConversationStateContextProviderProps > = ({ children, params, query: conversationQuery }) => { const controller = useRef(new AbortController()); useEffect(() => { return () => { controller.current.abort(); controller.current = new AbortController(); }; }, [controller]); const stateQuery = useConversationState(conversationQuery.data?.id, { controller: controller.current, ...params, }); return ( <ConversationStateContext.Provider value={{ stateQuery }}> {children} </ConversationStateContext.Provider> ); }; export const useConversationStateContext = () => useContext(ConversationStateContext);