ModelsUniversal Streaming

Universal Streaming

speech_model is required

You must include the speech_model parameter in every streaming transcription request. There is no default model. If you omit speech_model, the request will fail. See Model selection to learn about available models.

Streaming is now available in EU-West via streaming.eu.assemblyai.com. To use the EU streaming endpoint, replace streaming.assemblyai.com with streaming.eu.assemblyai.com in your connection configuration.

Quickstart

In this quick guide you will learn how to use AssemblyAI’s Streaming Speech-to-Text feature to transcribe audio from your microphone.

To run this quickstart you will need:

  • Python or JavaScript installed
  • A valid AssemblyAI API key

Connection parameters

ParameterTypeRequiredDefaultDescription
speech_modelstringYesThe speech model to use. See Model selection for available models.
sample_rateintegerNo16000The sample rate of the audio stream in Hz.
format_turnsbooleanNofalseSet to true to enable formatted final transcripts with punctuation, casing, and inverse text normalization (e.g. dates, times, phone numbers).

To run the quickstart:

1

Create a new Python file (for example, main.py) and paste the code provided below inside.

2

Insert your API key to line 11.

3

Install the necessary libraries

$pip install websocket-client pyaudio
4

Run with python main.py

1import pyaudio
2import websocket
3import json
4import threading
5import time
6import wave
7from urllib.parse import urlencode
8from datetime import datetime
9
10# --- Configuration ---
11YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API key
12
13CONNECTION_PARAMS = {
14 "sample_rate": 16000,
15 "speech_model": "universal-streaming-english",
16 "format_turns": True, # Request formatted final transcripts
17}
18API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
19API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
20
21# Audio Configuration
22FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
23SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
24CHANNELS = 1
25FORMAT = pyaudio.paInt16
26
27# Global variables for audio stream and websocket
28audio = None
29stream = None
30ws_app = None
31audio_thread = None
32stop_event = threading.Event() # To signal the audio thread to stop
33
34# WAV recording variables
35recorded_frames = [] # Store audio frames for WAV file
36recording_lock = threading.Lock() # Thread-safe access to recorded_frames
37
38# --- WebSocket Event Handlers ---
39
40
41def on_open(ws):
42 """Called when the WebSocket connection is established."""
43 print("WebSocket connection opened.")
44 print(f"Connected to: {API_ENDPOINT}")
45
46 # Start sending audio data in a separate thread
47 def stream_audio():
48 global stream
49 print("Starting audio streaming...")
50 while not stop_event.is_set():
51 try:
52 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
53
54 # Store audio data for WAV recording
55 with recording_lock:
56 recorded_frames.append(audio_data)
57
58 # Send audio data as binary message
59 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
60 except Exception as e:
61 print(f"Error streaming audio: {e}")
62 # If stream read fails, likely means it's closed, stop the loop
63 break
64 print("Audio streaming stopped.")
65
66 global audio_thread
67 audio_thread = threading.Thread(target=stream_audio)
68 audio_thread.daemon = (
69 True # Allow main thread to exit even if this thread is running
70 )
71 audio_thread.start()
72
73def on_message(ws, message):
74 try:
75 data = json.loads(message)
76 msg_type = data.get('type')
77
78 if msg_type == "Begin":
79 session_id = data.get('id')
80 expires_at = data.get('expires_at')
81 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
82 elif msg_type == "Turn":
83 transcript = data.get('transcript', '')
84 if data.get('end_of_turn'):
85 print('\r' + ' ' * 80 + '\r', end='')
86 print(transcript)
87 else:
88 print(f"\r{transcript}", end='')
89 elif msg_type == "Termination":
90 audio_duration = data.get('audio_duration_seconds', 0)
91 session_duration = data.get('session_duration_seconds', 0)
92 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
93 except json.JSONDecodeError as e:
94 print(f"Error decoding message: {e}")
95 except Exception as e:
96 print(f"Error handling message: {e}")
97
98def on_error(ws, error):
99 """Called when a WebSocket error occurs."""
100 print(f"\nWebSocket Error: {error}")
101 # Attempt to signal stop on error
102 stop_event.set()
103
104
105def on_close(ws, close_status_code, close_msg):
106 """Called when the WebSocket connection is closed."""
107 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
108
109 # Save recorded audio to WAV file
110 save_wav_file()
111
112 # Ensure audio resources are released
113 global stream, audio
114 stop_event.set() # Signal audio thread just in case it's still running
115
116 if stream:
117 if stream.is_active():
118 stream.stop_stream()
119 stream.close()
120 stream = None
121 if audio:
122 audio.terminate()
123 audio = None
124 # Try to join the audio thread to ensure clean exit
125 if audio_thread and audio_thread.is_alive():
126 audio_thread.join(timeout=1.0)
127
128
129def save_wav_file():
130 """Save recorded audio frames to a WAV file."""
131 if not recorded_frames:
132 print("No audio data recorded.")
133 return
134
135 # Generate filename with timestamp
136 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
137 filename = f"recorded_audio_{timestamp}.wav"
138
139 try:
140 with wave.open(filename, 'wb') as wf:
141 wf.setnchannels(CHANNELS)
142 wf.setsampwidth(2) # 16-bit = 2 bytes
143 wf.setframerate(SAMPLE_RATE)
144
145 # Write all recorded frames
146 with recording_lock:
147 wf.writeframes(b''.join(recorded_frames))
148
149 print(f"Audio saved to: {filename}")
150 print(f"Duration: {len(recorded_frames) * FRAMES_PER_BUFFER / SAMPLE_RATE:.2f} seconds")
151
152 except Exception as e:
153 print(f"Error saving WAV file: {e}")
154
155
156# --- Main Execution ---
157def run():
158 global audio, stream, ws_app
159
160 # Initialize PyAudio
161 audio = pyaudio.PyAudio()
162
163 # Open microphone stream
164 try:
165 stream = audio.open(
166 input=True,
167 frames_per_buffer=FRAMES_PER_BUFFER,
168 channels=CHANNELS,
169 format=FORMAT,
170 rate=SAMPLE_RATE,
171 )
172 print("Microphone stream opened successfully.")
173 print("Speak into your microphone. Press Ctrl+C to stop.")
174 print("Audio will be saved to a WAV file when the session ends.")
175 except Exception as e:
176 print(f"Error opening microphone stream: {e}")
177 if audio:
178 audio.terminate()
179 return # Exit if microphone cannot be opened
180
181 # Create WebSocketApp
182 ws_app = websocket.WebSocketApp(
183 API_ENDPOINT,
184 header={"Authorization": YOUR_API_KEY},
185 on_open=on_open,
186 on_message=on_message,
187 on_error=on_error,
188 on_close=on_close,
189 )
190
191 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
192 ws_thread = threading.Thread(target=ws_app.run_forever)
193 ws_thread.daemon = True
194 ws_thread.start()
195
196 try:
197 # Keep main thread alive until interrupted
198 while ws_thread.is_alive():
199 time.sleep(0.1)
200 except KeyboardInterrupt:
201 print("\nCtrl+C received. Stopping...")
202 stop_event.set() # Signal audio thread to stop
203
204 # Send termination message to the server
205 if ws_app and ws_app.sock and ws_app.sock.connected:
206 try:
207 terminate_message = {"type": "Terminate"}
208 print(f"Sending termination message: {json.dumps(terminate_message)}")
209 ws_app.send(json.dumps(terminate_message))
210 # Give a moment for messages to process before forceful close
211 time.sleep(5)
212 except Exception as e:
213 print(f"Error sending termination message: {e}")
214
215 # Close the WebSocket connection (will trigger on_close)
216 if ws_app:
217 ws_app.close()
218
219 # Wait for WebSocket thread to finish
220 ws_thread.join(timeout=2.0)
221
222 except Exception as e:
223 print(f"\nAn unexpected error occurred: {e}")
224 stop_event.set()
225 if ws_app:
226 ws_app.close()
227 ws_thread.join(timeout=2.0)
228
229 finally:
230 # Final cleanup (already handled in on_close, but good as a fallback)
231 if stream and stream.is_active():
232 stream.stop_stream()
233 if stream:
234 stream.close()
235 if audio:
236 audio.terminate()
237 print("Cleanup complete. Exiting.")
238
239
240if __name__ == "__main__":
241 run()

Core concepts

For a message-by-message breakdown of a turn, see our Streaming API: Message Sequence Breakdown guide.

Universal-Streaming is built based upon two core concepts: Turn objects and immutable transcriptions.

Turn object

A Turn object is intended to correspond to a speaking turn in the context of voice agent applications, and therefore it roughly corresponds to an utterance in a broader context. We assign a unique ID to each Turn object, which is included in our response. Specifically, the Universal-Streaming response is formatted as follows:

1{
2 "turn_order": 1,
3 "end_of_turn": false,
4 "transcript": "modern medicine is",
5 "end_of_turn_confidence": 0.7,
6 "words": [
7 { "text": "modern", "word_is_final": true, ... },
8 { "text": "medicine", "word_is_final": true, ... },
9 { "text": "is", "word_is_final": true, ... },
10 { "text": "amazing", "word_is_final": false, ... }
11 ]
12}
  • turn_order: Integer that increments with each new turn
  • turn_is_formatted: Boolean indicating if the text in the transcript field has been formatted with punctuation, casing, and inverse text normalization (e.g. dates, times, phone numbers). This field is false by default. Set format_turns=true to enable formatting. Use end_of_turn to detect end of turn, not turn_is_formatted.
  • end_of_turn: Boolean indicating if this is the end of the current turn
  • transcript: String containing only finalized words
  • end_of_turn_confidence: Floating number (0-1) representing the confidence that the current turn has finished, i.e., the current speaker has completed their turn
  • words: List of Word objects with individual metadata

Each Word object in the words array includes:

  • text: The string representation of the word
  • word_is_final: Boolean indicating if the word is finalized, where a finalized word means the word won’t be altered in future transcription responses
  • start: Timestamp for word start
  • end: Timestamp for word end
  • confidence: Confidence score for the word

Do not use turn_is_formatted to detect end of turn. Use end_of_turn to determine when a speaker’s turn has completed.

Immutable transcription

AssemblyAI’s streaming system receives audio in a streaming fashion, it returns transcription responses in real-time using the format specified above. Unlike many other streaming speech-to-text models that implement the concept of partial/variable transcriptions to show transcripts in an ongoing manner, Universal-Streaming transcriptions are immutable. In other words, the text that has already been produced will not be overwritten in future transcription responses. Therefore, with Universal-Streaming, the transcriptions will be delivered in the following way:

1→ Hello my na
2→ Hello my name
3→ Hello my name
4→ Hello my name is
5→ Hello my name is Zac
6→ Hello my name is Zack

When an end of the current turn is detected, you receive a message with end_of_turn set to true. If you enable text formatting by setting format_turns=true, you will also receive a transcription response with turn_is_formatted set to true.

1→ Hello my name is Zack
2→ Hello, my name is Zack. (end_of_turn: true)

In this example, you may have noticed that the last word of each transcript may occasionally be a subword (“Zac” in the example shown above). Each Word object has the word_is_final field to indicate whether the model is confident that the last word is a completed word. Note that, except for the last word, word_is_final is always true.