| 1 | import pyaudio |
| 2 | import websocket |
| 3 | import json |
| 4 | import threading |
| 5 | import time |
| 6 | import requests |
| 7 | from urllib.parse import urlencode |
| 8 | |
| 9 | YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key |
| 10 | |
| 11 | CONNECTION_PARAMS = { |
| 12 | "sample_rate": 16000, |
| 13 | "speech_model": "u3-rt-pro", |
| 14 | } |
| 15 | API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" |
| 16 | API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" |
| 17 | |
| 18 | FRAMES_PER_BUFFER = 800 |
| 19 | SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] |
| 20 | CHANNELS = 1 |
| 21 | FORMAT = pyaudio.paInt16 |
| 22 | |
| 23 | audio = None |
| 24 | stream = None |
| 25 | ws_app = None |
| 26 | audio_thread = None |
| 27 | stop_event = threading.Event() |
| 28 | conversation_data = "" |
| 29 | |
| 30 | def analyze_with_llm_gateway(text): |
| 31 | """Called when the WebSocket connection is closing and the transcript text is sent to LLM Gateway to be analyzed.""" |
| 32 | headers = { |
| 33 | "authorization": YOUR_API_KEY, |
| 34 | "content-type": "application/json" |
| 35 | } |
| 36 | |
| 37 | prompt = "You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback." |
| 38 | |
| 39 | llm_gateway_data = { |
| 40 | "model": "claude-sonnet-4-20250514", |
| 41 | "messages": [ |
| 42 | {"role": "user", "content": f"{prompt}\n\nTranscript: {text}"} |
| 43 | ], |
| 44 | "max_tokens": 4000 |
| 45 | } |
| 46 | |
| 47 | result = requests.post( |
| 48 | "https://llm-gateway.assemblyai.com/v1/chat/completions", |
| 49 | headers=headers, |
| 50 | json=llm_gateway_data |
| 51 | ) |
| 52 | return result.json()["choices"][0]["message"]["content"] |
| 53 | |
| 54 | def on_open(ws): |
| 55 | print("WebSocket connection opened.") |
| 56 | def stream_audio(): |
| 57 | global stream |
| 58 | while not stop_event.is_set(): |
| 59 | try: |
| 60 | audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) |
| 61 | ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) |
| 62 | except Exception as e: |
| 63 | print(f"Error streaming audio: {e}") |
| 64 | break |
| 65 | |
| 66 | global audio_thread |
| 67 | audio_thread = threading.Thread(target=stream_audio) |
| 68 | audio_thread.daemon = True |
| 69 | audio_thread.start() |
| 70 | |
| 71 | def on_message(ws, message): |
| 72 | try: |
| 73 | data = json.loads(message) |
| 74 | msg_type = data.get("type") |
| 75 | |
| 76 | if msg_type == "Begin": |
| 77 | print(f"Session began: ID={data.get('id')}") |
| 78 | elif msg_type == "Turn": |
| 79 | transcript = data.get("transcript", "") |
| 80 | if data.get("end_of_turn"): |
| 81 | global conversation_data |
| 82 | print(f"\r{' ' * 80}\r{transcript}") |
| 83 | conversation_data += f"{transcript}\n" |
| 84 | else: |
| 85 | print(f"\r{transcript}", end="") |
| 86 | elif msg_type == "Termination": |
| 87 | print(f"\nSession terminated: {data.get('audio_duration_seconds', 0)}s of audio") |
| 88 | except Exception as e: |
| 89 | print(f"Error handling message: {e}") |
| 90 | |
| 91 | def on_error(ws, error): |
| 92 | print(f"\nWebSocket Error: {error}") |
| 93 | stop_event.set() |
| 94 | |
| 95 | def on_close(ws, close_status_code, close_msg): |
| 96 | print(f"\nWebSocket Disconnected: Status={close_status_code}") |
| 97 | global stream, audio |
| 98 | stop_event.set() |
| 99 | if stream: |
| 100 | if stream.is_active(): |
| 101 | stream.stop_stream() |
| 102 | stream.close() |
| 103 | if audio: |
| 104 | audio.terminate() |
| 105 | |
| 106 | def run(): |
| 107 | global audio, stream, ws_app |
| 108 | |
| 109 | audio = pyaudio.PyAudio() |
| 110 | stream = audio.open( |
| 111 | input=True, |
| 112 | frames_per_buffer=FRAMES_PER_BUFFER, |
| 113 | channels=CHANNELS, |
| 114 | format=FORMAT, |
| 115 | rate=SAMPLE_RATE, |
| 116 | ) |
| 117 | print("Speak into your microphone. Press Ctrl+C to stop.") |
| 118 | |
| 119 | ws_app = websocket.WebSocketApp( |
| 120 | API_ENDPOINT, |
| 121 | header={"Authorization": YOUR_API_KEY}, |
| 122 | on_open=on_open, |
| 123 | on_message=on_message, |
| 124 | on_error=on_error, |
| 125 | on_close=on_close, |
| 126 | ) |
| 127 | |
| 128 | ws_thread = threading.Thread(target=ws_app.run_forever) |
| 129 | ws_thread.daemon = True |
| 130 | ws_thread.start() |
| 131 | |
| 132 | try: |
| 133 | while ws_thread.is_alive(): |
| 134 | time.sleep(0.1) |
| 135 | except KeyboardInterrupt: |
| 136 | print("\nStopping...") |
| 137 | stop_event.set() |
| 138 | if ws_app and ws_app.sock and ws_app.sock.connected: |
| 139 | ws_app.send(json.dumps({"type": "Terminate"})) |
| 140 | time.sleep(2) |
| 141 | if ws_app: |
| 142 | ws_app.close() |
| 143 | ws_thread.join(timeout=2.0) |
| 144 | |
| 145 | if conversation_data.strip(): |
| 146 | print("Analyzing conversation with LLM Gateway...") |
| 147 | print(analyze_with_llm_gateway(conversation_data)) |
| 148 | else: |
| 149 | print("No conversation data to analyze.") |
| 150 | |
| 151 | if __name__ == "__main__": |
| 152 | run() |