Translate Streaming STT Transcripts with LLM Gateway

In this guide, you’ll learn how to implement real-time translation of final transcripts using AssemblyAI’s Streaming API and LLM Gateway.

Quickstart

1import pyaudio
2import websocket
3import json
4import threading
5import time
6import requests
7from urllib.parse import urlencode
8
9YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key
10
11CONNECTION_PARAMS = {
12 "sample_rate": 16000,
13 "speech_model": "u3-rt-pro",
14}
15API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
16API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
17
18FRAMES_PER_BUFFER = 800
19SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
20CHANNELS = 1
21FORMAT = pyaudio.paInt16
22
23audio = None
24stream = None
25ws_app = None
26audio_thread = None
27stop_event = threading.Event()
28
29def translate_text(text):
30 """Called when translating final transcripts."""
31 headers = {
32 "authorization": YOUR_API_KEY
33 }
34
35 llm_gateway_data = {
36 "model": "gemini-2.5-flash-lite",
37 "prompt": f"Translate the following text into Spanish. Do not write a preamble. Just return the translated text.\n\nText: {text}",
38 "max_tokens": 1000
39 }
40
41 result = requests.post(
42 "https://llm-gateway.assemblyai.com/v1/chat/completions",
43 headers=headers,
44 json=llm_gateway_data
45 )
46 return result.json()["choices"][0]["message"]["content"]
47
48def on_open(ws):
49 print("WebSocket connection opened.")
50 def stream_audio():
51 global stream
52 while not stop_event.is_set():
53 try:
54 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
55 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
56 except Exception as e:
57 print(f"Error streaming audio: {e}")
58 break
59
60 global audio_thread
61 audio_thread = threading.Thread(target=stream_audio)
62 audio_thread.daemon = True
63 audio_thread.start()
64
65def on_message(ws, message):
66 try:
67 data = json.loads(message)
68 msg_type = data.get("type")
69
70 if msg_type == "Begin":
71 print(f"Session began: ID={data.get('id')}")
72 elif msg_type == "Turn":
73 transcript = data.get("transcript", "")
74 if data.get("end_of_turn"):
75 print(f"\r{' ' * 80}\r", end="")
76 print(translate_text(transcript))
77 else:
78 print(f"\r{transcript}", end="")
79 elif msg_type == "Termination":
80 print(f"\nSession terminated: {data.get('audio_duration_seconds', 0)}s of audio")
81 except Exception as e:
82 print(f"Error handling message: {e}")
83
84def on_error(ws, error):
85 print(f"\nWebSocket Error: {error}")
86 stop_event.set()
87
88def on_close(ws, close_status_code, close_msg):
89 print(f"\nWebSocket Disconnected: Status={close_status_code}")
90 global stream, audio
91 stop_event.set()
92 if stream:
93 if stream.is_active():
94 stream.stop_stream()
95 stream.close()
96 if audio:
97 audio.terminate()
98
99def run():
100 global audio, stream, ws_app
101
102 audio = pyaudio.PyAudio()
103 stream = audio.open(
104 input=True,
105 frames_per_buffer=FRAMES_PER_BUFFER,
106 channels=CHANNELS,
107 format=FORMAT,
108 rate=SAMPLE_RATE,
109 )
110 print("Speak into your microphone. Press Ctrl+C to stop.")
111
112 ws_app = websocket.WebSocketApp(
113 API_ENDPOINT,
114 header={"Authorization": YOUR_API_KEY},
115 on_open=on_open,
116 on_message=on_message,
117 on_error=on_error,
118 on_close=on_close,
119 )
120
121 ws_thread = threading.Thread(target=ws_app.run_forever)
122 ws_thread.daemon = True
123 ws_thread.start()
124
125 try:
126 while ws_thread.is_alive():
127 time.sleep(0.1)
128 except KeyboardInterrupt:
129 print("\nStopping...")
130 stop_event.set()
131 if ws_app and ws_app.sock and ws_app.sock.connected:
132 ws_app.send(json.dumps({"type": "Terminate"}))
133 time.sleep(2)
134 if ws_app:
135 ws_app.close()
136 ws_thread.join(timeout=2.0)
137
138if __name__ == "__main__":
139 run()

Step-by-Step Instructions

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.

Install Dependencies

$pip install websocket-client pyaudio requests

Import Packages & Set API Key

1import pyaudio
2import websocket
3import json
4import threading
5import time
6import requests
7from urllib.parse import urlencode
8
9YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key

Audio Configuration & Global Variables

Set all of your audio configurations and global variables.

1CONNECTION_PARAMS = {
2 "sample_rate": 16000,
3 "speech_model": "u3-rt-pro",
4}
5API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
6API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
7
8FRAMES_PER_BUFFER = 800
9SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
10CHANNELS = 1
11FORMAT = pyaudio.paInt16
12
13audio = None
14stream = None
15ws_app = None
16audio_thread = None
17stop_event = threading.Event()

Define Translate Text Function

Define a function called translate_text (Python) or translateText (JavaScript), which uses LLM Gateway to translate the English final transcripts into another language. This example is translating the text into Spanish. To set this to a different language, just replace “Spanish” in the prompt with your language of choice.

1def translate_text(text):
2 """Called when translating final transcripts."""
3 headers = {
4 "authorization": YOUR_API_KEY
5 }
6
7 llm_gateway_data = {
8 "model": "gemini-2.5-flash-lite",
9 "prompt": f"Translate the following text into Spanish. Do not write a preamble. Just return the translated text.\n\nText: {text}",
10 "max_tokens": 1000
11 }
12
13 result = requests.post(
14 "https://llm-gateway.assemblyai.com/v1/chat/completions",
15 headers=headers,
16 json=llm_gateway_data
17 )
18 return result.json()["choices"][0]["message"]["content"]

Websocket Event Handlers

Open Websocket

1def on_open(ws):
2 print("WebSocket connection opened.")
3 def stream_audio():
4 global stream
5 while not stop_event.is_set():
6 try:
7 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
8 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
9 except Exception as e:
10 print(f"Error streaming audio: {e}")
11 break
12
13 global audio_thread
14 audio_thread = threading.Thread(target=stream_audio)
15 audio_thread.daemon = True
16 audio_thread.start()

Handle Websocket Messages

In this function, use the previously defined translate_text / translateText to translate all final transcripts.

1def on_message(ws, message):
2 try:
3 data = json.loads(message)
4 msg_type = data.get("type")
5
6 if msg_type == "Begin":
7 print(f"Session began: ID={data.get('id')}")
8 elif msg_type == "Turn":
9 transcript = data.get("transcript", "")
10 if data.get("end_of_turn"):
11 print(f"\r{' ' * 80}\r", end="")
12 print(translate_text(transcript))
13 else:
14 print(f"\r{transcript}", end="")
15 elif msg_type == "Termination":
16 print(f"\nSession terminated: {data.get('audio_duration_seconds', 0)}s of audio")
17 except Exception as e:
18 print(f"Error handling message: {e}")

Close Websocket

1def on_close(ws, close_status_code, close_msg):
2 print(f"\nWebSocket Disconnected: Status={close_status_code}")
3 global stream, audio
4 stop_event.set()
5 if stream:
6 if stream.is_active():
7 stream.stop_stream()
8 stream.close()
9 if audio:
10 audio.terminate()

Websocket Error Handling

1def on_error(ws, error):
2 print(f"\nWebSocket Error: {error}")
3 stop_event.set()

Begin Streaming STT Transcription

1def run():
2 global audio, stream, ws_app
3
4 audio = pyaudio.PyAudio()
5 stream = audio.open(
6 input=True,
7 frames_per_buffer=FRAMES_PER_BUFFER,
8 channels=CHANNELS,
9 format=FORMAT,
10 rate=SAMPLE_RATE,
11 )
12 print("Speak into your microphone. Press Ctrl+C to stop.")
13
14 ws_app = websocket.WebSocketApp(
15 API_ENDPOINT,
16 header={"Authorization": YOUR_API_KEY},
17 on_open=on_open,
18 on_message=on_message,
19 on_error=on_error,
20 on_close=on_close,
21 )
22
23 ws_thread = threading.Thread(target=ws_app.run_forever)
24 ws_thread.daemon = True
25 ws_thread.start()
26
27 try:
28 while ws_thread.is_alive():
29 time.sleep(0.1)
30 except KeyboardInterrupt:
31 print("\nStopping...")
32 stop_event.set()
33 if ws_app and ws_app.sock and ws_app.sock.connected:
34 ws_app.send(json.dumps({"type": "Terminate"}))
35 time.sleep(2)
36 if ws_app:
37 ws_app.close()
38 ws_thread.join(timeout=2.0)
39
40if __name__ == "__main__":
41 run()