DocsAgentAPI ReferenceMockAgentTransport

MockAgentTransport

MockAgentTransport is a deterministic AgentTransport implementation for tests. It records stream calls, lets tests emit LangGraph stream events manually, and avoids a running LangGraph server.

Complete test example

The pattern below covers the full lifecycle: provide a transport instance, create the component, emit stream events, and assert signal state.

import { Component } from '@angular/core';
import { TestBed } from '@angular/core/testing';
import {
  provideAgent,
  MockAgentTransport,
  agent,
} from '@ngaf/langgraph';
 
@Component({ template: '' })
class ChatComponent {
  readonly chat = agent({
    apiUrl: '',
    assistantId: 'test-agent',
    threadId: 'thread-1',
  });
}
 
describe('ChatComponent', () => {
  let transport: MockAgentTransport;
 
  beforeEach(() => {
    transport = new MockAgentTransport();
    TestBed.configureTestingModule({
      imports: [ChatComponent],
      providers: [provideAgent({ apiUrl: '', transport })],
    });
  });
 
  it('reflects streamed messages', async () => {
    const fixture = TestBed.createComponent(ChatComponent);
    fixture.detectChanges();
 
    const stream = fixture.componentInstance.chat.submit({ message: 'Hello' });
    transport.emit([
      {
        type: 'values',
        values: {
          messages: [{ type: 'ai', id: 'a1', content: 'Hi there' }],
        },
      },
    ]);
    transport.close();
    await stream;
    fixture.detectChanges();
 
    expect(fixture.componentInstance.chat.messages()).toEqual([
      expect.objectContaining({ role: 'assistant', content: 'Hi there' }),
    ]);
    expect(fixture.componentInstance.chat.status()).toBe('idle');
  });
 
  it('surfaces errors through the error signal', async () => {
    const fixture = TestBed.createComponent(ChatComponent);
    fixture.detectChanges();
 
    const stream = fixture.componentInstance.chat.submit({ message: 'Hello' });
    transport.emitError(new Error('not found'));
    await expect(stream).rejects.toThrow('not found');
    fixture.detectChanges();
 
    expect(fixture.componentInstance.chat.status()).toBe('error');
    expect(fixture.componentInstance.chat.error()).toBeInstanceOf(Error);
  });
});

MockAgentTransport API

MethodDescription
constructor(script?)Optionally seeds scripted event batches for manual stepping.
nextBatch()Returns the next scripted event batch.
emit(events)Pushes one or more StreamEvent objects into the active stream.
emitError(err)Causes the active stream to reject with err.
close()Closes the active stream after queued events drain.
isStreaming()Returns whether a stream is currently active.

The transport also records calls in streams, createdQueuedRuns, cancelledRuns, joinedRuns, and historyCalls, which is useful for asserting payloads and options.

Deterministic tests

MockAgentTransport emits only when your test calls emit(), emitError(), or close(). This makes stream state and payload assertions deterministic.

What's Next

MockAgentTransportclass

Test transport for deterministic agent testing without a real LangGraph server. Script event batches upfront, then emit them manually or step through them in your test specs. Supports error injection and close control.

Parameters

ParameterTypeDescription
scriptStreamEvent[][]Array of event batches. Each batch is emitted as a group.

Properties

ParameterTypeDescription
cancelledRunsobject[]
createdQueuedRunsAgentQueueEntry<unknown>[]
historyThreadState<DefaultValues>[]
historyCallsstring[]
joinedRunsobject[]
streamsobject[]

Methods

cancelRun(threadId: string, runId: string, signal: AbortSignal)

Optional: cancel a server-side run.

ParameterTypeDescription
threadIdstring
runIdstring
signalAbortSignal
close()

Close the stream. Remaining queued events are drained before completion.

createQueuedRun(_assistantId: string, threadId: string, payload: unknown, signal: AbortSignal, options: LangGraphSubmitOptions)

Optional: create a server-side queued run without joining it immediately.

ParameterTypeDescription
_assistantIdstring
threadIdstring
payloadunknown
signalAbortSignal
options?LangGraphSubmitOptions
emit(events: StreamEvent[])

Manually emit events into the stream.

ParameterTypeDescription
eventsStreamEvent[]
emitError(err: Error)

Inject an error into the stream.

ParameterTypeDescription
errError
getHistory(threadId: string, signal: AbortSignal)

Optional: load persisted checkpoint history for a thread.

ParameterTypeDescription
threadIdstring
signalAbortSignal
isStreaming()

Returns true if a stream is currently active.

joinStream(threadId: string, runId: string, lastEventId: string | undefined, signal: AbortSignal)

Optional: join an already-started run without creating a new one.

ParameterTypeDescription
threadIdstring
runIdstring
lastEventIdstring | undefined
signalAbortSignal
nextBatch()

Advance to the next scripted batch. Pass the returned events to `emit()`.

stream(_assistantId: string, _threadId: string | null, _payload: unknown, signal: AbortSignal, options: LangGraphSubmitOptions)

Open a streaming connection to an agent and yield events.

ParameterTypeDescription
_assistantIdstring
_threadIdstring | null
_payloadunknown
signalAbortSignal
options?LangGraphSubmitOptions

Examples

const transport = new MockAgentTransport([
  [{ type: 'values', messages: [aiMsg('Hello')] }],
  [{ type: 'values', messages: [aiMsg('Done')] }],
]);