Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 0 additions & 61 deletions examples/07-transcription-live-websocket.py

This file was deleted.

132 changes: 132 additions & 0 deletions examples/13-transcription-live-websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
Example: Live Transcription with WebSocket (Listen V1)

This example demonstrates how to use WebSocket for real-time audio transcription.
In production, you would stream audio from a microphone or other live source.
This example uses an audio file to demonstrate the streaming pattern.
"""

import os
import threading
import time
from typing import Union

from dotenv import load_dotenv

load_dotenv()

from deepgram import DeepgramClient
from deepgram.core.events import EventType
from deepgram.listen.v1.types import (
ListenV1Finalize,
ListenV1Metadata,
ListenV1Results,
ListenV1SpeechStarted,
ListenV1UtteranceEnd,
)

ListenV1SocketClientResponse = Union[ListenV1Results, ListenV1Metadata, ListenV1UtteranceEnd, ListenV1SpeechStarted]

# Audio streaming configuration
CHUNK_SIZE = 8192 # Bytes to send at a time
SAMPLE_RATE = 44100 # Hz (typical for WAV files)
SAMPLE_WIDTH = 2 # 16-bit audio = 2 bytes per sample
CHANNELS = 1 # Mono audio

# Calculate delay between chunks to simulate real-time streaming
# This makes the audio stream at its natural playback rate
CHUNK_DELAY = CHUNK_SIZE / (SAMPLE_RATE * SAMPLE_WIDTH * CHANNELS)

client = DeepgramClient()

try:
with client.listen.v1.connect(model="nova-3") as connection:

def on_message(message: ListenV1SocketClientResponse) -> None:
# Extract transcription from Results events
if isinstance(message, ListenV1Results):
if message.channel and message.channel.alternatives:
transcript = message.channel.alternatives[0].transcript
if transcript:
print(f"Transcript: {transcript}")

def on_open(_) -> None:
print("Connection opened")

def on_close(_) -> None:
print("Connection closed")

def on_error(error) -> None:
print(f"Error: {error}")

# Register event handlers
connection.on(EventType.OPEN, on_open)
connection.on(EventType.MESSAGE, on_message)
connection.on(EventType.CLOSE, on_close)
connection.on(EventType.ERROR, on_error)

# Define a function to send audio in a background thread
def send_audio():
audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav")

with open(audio_path, "rb") as audio_file:
print(f"Streaming audio from {audio_path}")

while True:
chunk = audio_file.read(CHUNK_SIZE)
if not chunk:
break

connection.send_media(chunk)

# Simulate real-time streaming by adding delay between chunks
time.sleep(CHUNK_DELAY)

print("Finished sending audio")

connection.send_finalize(ListenV1Finalize(type="Finalize"))

# Start sending audio in a background thread
threading.Thread(target=send_audio, daemon=True).start()

# Start listening - this blocks until the connection closes or times out
# The connection will stay open until the server closes it or it times out
connection.start_listening()

# For async version:
# import asyncio
# from deepgram import AsyncDeepgramClient
#
# async with client.listen.v1.connect(model="nova-3") as connection:
# async def on_message(message):
# if isinstance(message, ListenV1Results):
# if message.channel and message.channel.alternatives:
# transcript = message.channel.alternatives[0].transcript
# if transcript:
# print(f"Transcript: {transcript}")
#
# connection.on(EventType.MESSAGE, on_message)
#
# # Define coroutine to send audio
# async def send_audio():
# audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav")
# with open(audio_path, "rb") as audio_file:
# while chunk := audio_file.read(CHUNK_SIZE):
# await connection.send_media(chunk)
# # Simulate real-time streaming
# await asyncio.sleep(CHUNK_DELAY)
# print("Finished sending audio")
# await connection.send_finalize(ListenV1Finalize(type="Finalize"))
#
# # Start both tasks
# listen_task = asyncio.create_task(connection.start_listening())
# send_task = asyncio.create_task(send_audio())
#
# # Wait for send to complete
# await send_task
#
# # Continue listening until connection closes or times out
# await listen_task

except Exception as e:
print(f"Error: {e}")
132 changes: 132 additions & 0 deletions examples/14-transcription-live-websocket-v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
Example: Live Transcription with WebSocket V2 (Listen V2)

This example demonstrates how to use Listen V2 for advanced conversational speech
recognition with contextual turn detection.

Note: Listen V2 requires 16kHz linear16 PCM audio format.
In production, you would stream audio from a microphone or other live source.
This example uses an audio file to demonstrate the streaming pattern.
"""

import os
import threading
import time
from typing import Union

from dotenv import load_dotenv

load_dotenv()

from deepgram import DeepgramClient
from deepgram.core.events import EventType
from deepgram.listen.v2.types import (
ListenV2Connected,
ListenV2FatalError,
ListenV2TurnInfo,
)

ListenV2SocketClientResponse = Union[ListenV2Connected, ListenV2TurnInfo, ListenV2FatalError]

# Audio streaming configuration
# IMPORTANT: Listen V2 requires 16kHz linear16 PCM audio
CHUNK_SIZE = 8192 # Bytes to send at a time
SAMPLE_RATE = 16000 # Hz (required for Listen V2)
SAMPLE_WIDTH = 2 # 16-bit audio = 2 bytes per sample
CHANNELS = 1 # Mono audio

# Calculate delay between chunks to simulate real-time streaming
# This makes the audio stream at its natural playback rate
CHUNK_DELAY = CHUNK_SIZE / (SAMPLE_RATE * SAMPLE_WIDTH * CHANNELS)

client = DeepgramClient()

try:
# Listen V2 requires specific audio format: 16kHz linear16 PCM
with client.listen.v2.connect(model="flux-general-en", encoding="linear16", sample_rate="16000") as connection:

def on_message(message: ListenV2SocketClientResponse) -> None:
# Handle TurnInfo events containing transcription and turn metadata
if isinstance(message, ListenV2TurnInfo):
print(f"Turn {message.turn_index}: {message.transcript}")
print(f" Event: {message.event}")

def on_open(_) -> None:
print("Connection opened")

def on_close(_) -> None:
print("Connection closed")

def on_error(error) -> None:
print(f"Error: {error}")

# Register event handlers
connection.on(EventType.OPEN, on_open)
connection.on(EventType.MESSAGE, on_message)
connection.on(EventType.CLOSE, on_close)
connection.on(EventType.ERROR, on_error)

# Define a function to send audio in a background thread
def send_audio():
# IMPORTANT: Audio must be 16kHz linear16 PCM for Listen V2
audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav")

with open(audio_path, "rb") as audio_file:
print(f"Streaming audio from {audio_path}")

while True:
chunk = audio_file.read(CHUNK_SIZE)
if not chunk:
break

connection.send_media(chunk)

# Simulate real-time streaming by adding delay between chunks
time.sleep(CHUNK_DELAY)

print("Finished sending audio")

# Start sending audio in a background thread
threading.Thread(target=send_audio, daemon=True).start()

# Start listening - this blocks until the connection closes or times out
# The connection will stay open until the server closes it or it times out
connection.start_listening()

# For async version:
# import asyncio
# from deepgram import AsyncDeepgramClient
#
# async with client.listen.v2.connect(
# model="flux-general-en",
# encoding="linear16",
# sample_rate="16000"
# ) as connection:
# async def on_message(message):
# if isinstance(message, ListenV2TurnInfo):
# print(f"Turn {message.turn_index}: {message.transcript}")
#
# connection.on(EventType.MESSAGE, on_message)
#
# # Define coroutine to send audio
# async def send_audio():
# audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav")
# with open(audio_path, "rb") as audio_file:
# while chunk := audio_file.read(CHUNK_SIZE):
# await connection.send_media(chunk)
# # Simulate real-time streaming
# await asyncio.sleep(CHUNK_DELAY)
# print("Finished sending audio")
#
# # Start both tasks
# listen_task = asyncio.create_task(connection.start_listening())
# send_task = asyncio.create_task(send_audio())
#
# # Wait for send to complete
# await send_task
#
# # Continue listening until connection closes or times out
# await listen_task

except Exception as e:
print(f"Error: {e}")
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ def on_message(message: SpeakV1SocketClientResponse) -> None:
# Note: start_listening() blocks, so send all messages first
# For better control with bidirectional communication, use the async version
text_message = SpeakV1Text(text="Hello, this is a text to speech example.")
connection.send_speak_v_1_text(text_message)
connection.send_text(text_message)

# Flush to ensure all text is processed
flush_message = SpeakV1Flush()
connection.send_speak_v_1_flush(flush_message)
connection.send_flush(flush_message)

# Close the connection when done
close_message = SpeakV1Close()
connection.send_speak_v_1_close(close_message)
connection.send_close(close_message)

# Start listening - this blocks until the connection closes
# All messages should be sent before calling this in sync mode
Expand All @@ -58,9 +58,9 @@ def on_message(message: SpeakV1SocketClientResponse) -> None:
# from deepgram import AsyncDeepgramClient
# async with client.speak.v1.connect(...) as connection:
# listen_task = asyncio.create_task(connection.start_listening())
# await connection.send_speak_v_1_text(SpeakV1Text(text="..."))
# await connection.send_speak_v_1_flush(SpeakV1Flush())
# await connection.send_speak_v_1_close(SpeakV1Close())
# await connection.send_text(SpeakV1Text(text="..."))
# await connection.send_flush(SpeakV1Flush())
# await connection.send_close(SpeakV1Close())
# await listen_task

except Exception as e:
Expand Down
Loading