Use LLM Gateway with Streaming Speech-to-Text (STT)

In this guide, you’ll learn how to use LLM Gateway with AssemblyAI’s Streaming API.

This script accumulates transcribed text in the on_message function using a global conversation_data (Python) / conversationData (JavaScript) variable. Once the transcription session is closed, the accumulated transcript is sent to LLM Gateway for analysis.

Quickstart

1import pyaudio
2import websocket
3import json
4import threading
5import time
6import requests
7from urllib.parse import urlencode
8
9YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key
10
11CONNECTION_PARAMS = {
12 "sample_rate": 16000,
13 "speech_model": "u3-rt-pro",
14}
15API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
16API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
17
18FRAMES_PER_BUFFER = 800
19SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
20CHANNELS = 1
21FORMAT = pyaudio.paInt16
22
23audio = None
24stream = None
25ws_app = None
26audio_thread = None
27stop_event = threading.Event()
28conversation_data = ""
29
30def analyze_with_llm_gateway(text):
31 """Called when the WebSocket connection is closing and the transcript text is sent to LLM Gateway to be analyzed."""
32 headers = {
33 "authorization": YOUR_API_KEY,
34 "content-type": "application/json"
35 }
36
37 prompt = "You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback."
38
39 llm_gateway_data = {
40 "model": "claude-sonnet-4-20250514",
41 "messages": [
42 {"role": "user", "content": f"{prompt}\n\nTranscript: {text}"}
43 ],
44 "max_tokens": 4000
45 }
46
47 result = requests.post(
48 "https://llm-gateway.assemblyai.com/v1/chat/completions",
49 headers=headers,
50 json=llm_gateway_data
51 )
52 return result.json()["choices"][0]["message"]["content"]
53
54def on_open(ws):
55 print("WebSocket connection opened.")
56 def stream_audio():
57 global stream
58 while not stop_event.is_set():
59 try:
60 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
61 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
62 except Exception as e:
63 print(f"Error streaming audio: {e}")
64 break
65
66 global audio_thread
67 audio_thread = threading.Thread(target=stream_audio)
68 audio_thread.daemon = True
69 audio_thread.start()
70
71def on_message(ws, message):
72 try:
73 data = json.loads(message)
74 msg_type = data.get("type")
75
76 if msg_type == "Begin":
77 print(f"Session began: ID={data.get('id')}")
78 elif msg_type == "Turn":
79 transcript = data.get("transcript", "")
80 if data.get("end_of_turn"):
81 global conversation_data
82 print(f"\r{' ' * 80}\r{transcript}")
83 conversation_data += f"{transcript}\n"
84 else:
85 print(f"\r{transcript}", end="")
86 elif msg_type == "Termination":
87 print(f"\nSession terminated: {data.get('audio_duration_seconds', 0)}s of audio")
88 except Exception as e:
89 print(f"Error handling message: {e}")
90
91def on_error(ws, error):
92 print(f"\nWebSocket Error: {error}")
93 stop_event.set()
94
95def on_close(ws, close_status_code, close_msg):
96 print(f"\nWebSocket Disconnected: Status={close_status_code}")
97 global stream, audio
98 stop_event.set()
99 if stream:
100 if stream.is_active():
101 stream.stop_stream()
102 stream.close()
103 if audio:
104 audio.terminate()
105
106def run():
107 global audio, stream, ws_app
108
109 audio = pyaudio.PyAudio()
110 stream = audio.open(
111 input=True,
112 frames_per_buffer=FRAMES_PER_BUFFER,
113 channels=CHANNELS,
114 format=FORMAT,
115 rate=SAMPLE_RATE,
116 )
117 print("Speak into your microphone. Press Ctrl+C to stop.")
118
119 ws_app = websocket.WebSocketApp(
120 API_ENDPOINT,
121 header={"Authorization": YOUR_API_KEY},
122 on_open=on_open,
123 on_message=on_message,
124 on_error=on_error,
125 on_close=on_close,
126 )
127
128 ws_thread = threading.Thread(target=ws_app.run_forever)
129 ws_thread.daemon = True
130 ws_thread.start()
131
132 try:
133 while ws_thread.is_alive():
134 time.sleep(0.1)
135 except KeyboardInterrupt:
136 print("\nStopping...")
137 stop_event.set()
138 if ws_app and ws_app.sock and ws_app.sock.connected:
139 ws_app.send(json.dumps({"type": "Terminate"}))
140 time.sleep(2)
141 if ws_app:
142 ws_app.close()
143 ws_thread.join(timeout=2.0)
144
145 if conversation_data.strip():
146 print("Analyzing conversation with LLM Gateway...")
147 print(analyze_with_llm_gateway(conversation_data))
148 else:
149 print("No conversation data to analyze.")
150
151if __name__ == "__main__":
152 run()

Step-by-Step Instructions

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.

Install Dependencies

$pip install websocket-client pyaudio requests

Import Packages & Set API Key

1import pyaudio
2import websocket
3import json
4import threading
5import time
6import requests
7from urllib.parse import urlencode
8
9YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key

Audio Configuration & Global Variables

Set all of your audio configurations and global variables. Initialize the conversation_data / conversationData variable as an empty string to accumulate final transcripts.

1CONNECTION_PARAMS = {
2 "sample_rate": 16000,
3 "speech_model": "u3-rt-pro",
4}
5API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
6API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
7
8FRAMES_PER_BUFFER = 800
9SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
10CHANNELS = 1
11FORMAT = pyaudio.paInt16
12
13audio = None
14stream = None
15ws_app = None
16audio_thread = None
17stop_event = threading.Event()
18conversation_data = ""

Define Analyze With LLM Gateway Function

Define a function called analyze_with_llm_gateway (Python) or analyzeWithLlmGateway (JavaScript), which uses LLM Gateway to analyze the complete final transcript text. The prompt can be modified to suit your individual requirements.

1def analyze_with_llm_gateway(text):
2 """Called when the WebSocket connection is closing and the transcript text is sent to LLM Gateway to be analyzed."""
3 headers = {
4 "authorization": YOUR_API_KEY,
5 "content-type": "application/json"
6 }
7
8 prompt = "You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback."
9
10 llm_gateway_data = {
11 "model": "claude-sonnet-4-20250514",
12 "messages": [
13 {"role": "user", "content": f"{prompt}\n\nTranscript: {text}"}
14 ],
15 "max_tokens": 4000
16 }
17
18 result = requests.post(
19 "https://llm-gateway.assemblyai.com/v1/chat/completions",
20 headers=headers,
21 json=llm_gateway_data
22 )
23 return result.json()["choices"][0]["message"]["content"]

Websocket Event Handlers

Open Websocket

1def on_open(ws):
2 print("WebSocket connection opened.")
3 def stream_audio():
4 global stream
5 while not stop_event.is_set():
6 try:
7 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
8 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
9 except Exception as e:
10 print(f"Error streaming audio: {e}")
11 break
12
13 global audio_thread
14 audio_thread = threading.Thread(target=stream_audio)
15 audio_thread.daemon = True
16 audio_thread.start()

Handle Websocket Messages

In this function, use the previously defined conversation_data / conversationData to store all final transcripts together for later analysis.

1def on_message(ws, message):
2 try:
3 data = json.loads(message)
4 msg_type = data.get("type")
5
6 if msg_type == "Begin":
7 print(f"Session began: ID={data.get('id')}")
8 elif msg_type == "Turn":
9 transcript = data.get("transcript", "")
10 if data.get("end_of_turn"):
11 global conversation_data
12 print(f"\r{' ' * 80}\r{transcript}")
13 conversation_data += f"{transcript}\n"
14 else:
15 print(f"\r{transcript}", end="")
16 elif msg_type == "Termination":
17 print(f"\nSession terminated: {data.get('audio_duration_seconds', 0)}s of audio")
18 except Exception as e:
19 print(f"Error handling message: {e}")

Close Websocket

1def on_close(ws, close_status_code, close_msg):
2 print(f"\nWebSocket Disconnected: Status={close_status_code}")
3 global stream, audio
4 stop_event.set()
5 if stream:
6 if stream.is_active():
7 stream.stop_stream()
8 stream.close()
9 if audio:
10 audio.terminate()

Websocket Error Handling

1def on_error(ws, error):
2 print(f"\nWebSocket Error: {error}")
3 stop_event.set()

Begin Streaming STT Transcription

After the socket is closed, conversation_data / conversationData is sent to the analyze_with_llm_gateway / analyzeWithLlmGateway function and the LLM Gateway results are printed out.

1def run():
2 global audio, stream, ws_app
3
4 audio = pyaudio.PyAudio()
5 stream = audio.open(
6 input=True,
7 frames_per_buffer=FRAMES_PER_BUFFER,
8 channels=CHANNELS,
9 format=FORMAT,
10 rate=SAMPLE_RATE,
11 )
12 print("Speak into your microphone. Press Ctrl+C to stop.")
13
14 ws_app = websocket.WebSocketApp(
15 API_ENDPOINT,
16 header={"Authorization": YOUR_API_KEY},
17 on_open=on_open,
18 on_message=on_message,
19 on_error=on_error,
20 on_close=on_close,
21 )
22
23 ws_thread = threading.Thread(target=ws_app.run_forever)
24 ws_thread.daemon = True
25 ws_thread.start()
26
27 try:
28 while ws_thread.is_alive():
29 time.sleep(0.1)
30 except KeyboardInterrupt:
31 print("\nStopping...")
32 stop_event.set()
33 if ws_app and ws_app.sock and ws_app.sock.connected:
34 ws_app.send(json.dumps({"type": "Terminate"}))
35 time.sleep(2)
36 if ws_app:
37 ws_app.close()
38 ws_thread.join(timeout=2.0)
39
40 if conversation_data.strip():
41 print("Analyzing conversation with LLM Gateway...")
42 print(analyze_with_llm_gateway(conversation_data))
43 else:
44 print("No conversation data to analyze.")
45
46if __name__ == "__main__":
47 run()