"""
Actions module providing high-level functions to interact with Nostr relays.
This module contains functions for fetching events, streaming data, testing
relay capabilities, and computing comprehensive relay metadata. These functions
provide a high-level interface for common Nostr protocol operations.
The main categories of actions include:
Event Operations:
- fetch_events: Retrieve stored events matching filter criteria
- stream_events: Continuously stream events as they arrive
Relay Information:
- fetch_nip11: Retrieve NIP-11 relay information document
- check_connectivity: Test basic WebSocket connection capability
- check_readability: Test ability to subscribe and receive events
- check_writability: Test ability to publish events
- fetch_nip66: Retrieve comprehensive connection metrics
- fetch_relay_metadata: Generate comprehensive relay metadata
All functions work with existing Client instances and handle errors gracefully.
Connection testing functions automatically detect proof-of-work requirements
from relay NIP-11 metadata when available.
Example:
Basic usage of action functions:
>>> # Create relay and client
>>> relay = Relay("wss://relay.damus.io")
>>> client = Client(relay)
>>> # Test relay capabilities
>>> metadata = await fetch_relay_metadata(client, private_key, public_key)
>>> print(f"Relay is {'readable' if metadata.readable else 'not readable'}")
>>> # Fetch events
>>> async with client:
... filter = Filter(kinds=[1], limit=10)
... events = await fetch_events(client, filter)
... print(f"Retrieved {len(events)} events")
>>> # Stream events continuously
>>> async with client:
... filter = Filter(kinds=[1])
... async for event in stream_events(client, filter):
... print(f"New event: {event.content}")
"""
import json
import logging
import time
from asyncio import TimeoutError
from collections.abc import AsyncGenerator
from typing import Optional
from aiohttp import ClientError
from ..core.client import Client
from ..core.event import Event
from ..core.filter import Filter
from ..core.relay_metadata import RelayMetadata
from ..exceptions.errors import ClientConnectionError
from ..exceptions.errors import ClientPublicationError
from ..utils import generate_event
logger = logging.getLogger(__name__)
[docs]
async def fetch_events(
client: Client,
filter: Filter,
) -> list[Event]:
"""
Fetch events matching the filter using an existing client connection.
This function subscribes to events matching the filter criteria, collects
all matching events, and returns them as a list. The subscription is
automatically closed when all stored events have been received.
Args:
client (Client): An instance of Client already connected to a relay
filter (Filter): A Filter instance defining the criteria for fetching events
Returns:
List[Event]: A list of Event instances matching the filter
Raises:
ClientConnectionError: If client is not connected
Examples:
Fetch recent text notes:
>>> async with Client(relay) as client:
... filter = Filter(kinds=[1], limit=10)
... events = await fetch_events(client, filter)
... print(f"Found {len(events)} events")
Fetch events from specific author:
>>> filter = Filter(authors=["abc123..."], kinds=[1, 3])
>>> events = await fetch_events(client, filter)
>>> for event in events:
... print(f"Event {event.id}: {event.content[:50]}...")
"""
if not client.is_connected:
raise ClientConnectionError("Client is not connected")
events = []
subscription_id = await client.subscribe(filter)
# Listen for events until end of stored events (EOSE)
async for event_message in client.listen_events(subscription_id):
try:
event = Event.from_dict(event_message[2])
events.append(event)
except (TypeError, KeyError, ValueError) as e:
logger.debug(f"Failed to parse event: {e}")
except Exception as e:
logger.exception(f"Unexpected error parsing event: {e}")
await client.unsubscribe(subscription_id)
return events
[docs]
async def stream_events(
client: Client,
filter: Filter,
) -> AsyncGenerator[Event, None]:
"""
Stream events matching the filter using an existing client connection.
This function subscribes to events and yields them as they arrive from
the relay. Unlike fetch_events, this continues indefinitely and yields
both stored and new events.
Args:
client (Client): An instance of Client already connected to a relay
filter (Filter): A Filter instance defining the criteria for streaming events
Yields:
Event: Event instances matching the filter as they arrive
Raises:
ClientConnectionError: If client is not connected
Examples:
Stream text notes in real-time:
>>> async with Client(relay) as client:
... filter = Filter(kinds=[1], limit=10)
... async for event in stream_events(client, filter):
... print(f"New note: {event.content}")
... if event.content.startswith("STOP"):
... break
Stream events from specific authors:
>>> filter = Filter(authors=["abc123..."], kinds=[1])
>>> async for event in stream_events(client, filter):
... process_event(event)
"""
if not client.is_connected:
raise ClientConnectionError("Client is not connected")
subscription_id = await client.subscribe(filter)
# Stream events continuously
async for event_message in client.listen_events(subscription_id):
try:
event = Event.from_dict(event_message[2])
yield event
except (TypeError, KeyError, ValueError) as e:
logger.debug(f"Failed to parse event: {e}")
except Exception as e:
logger.exception(f"Unexpected error parsing event: {e}")
await client.unsubscribe(subscription_id)
[docs]
async def fetch_nip11(client: Client) -> Optional[RelayMetadata.Nip11]:
"""
Fetch NIP-11 metadata from the relay.
This function attempts to retrieve the NIP-11 relay information document
by making HTTP requests to the relay's information endpoint. It tries
both HTTPS and HTTP protocols.
Args:
client (Client): An instance of Client (connection not required)
Returns:
Optional[RelayMetadata.Nip11]: NIP-11 metadata or None if not available
"""
relay_id = client.relay.url.removeprefix("wss://")
headers = {"Accept": "application/nostr+json"}
# Try both HTTPS and HTTP protocols
for schema in ["https://", "http://"]:
try:
session = client.session()
async with session:
async with session.get(
schema + relay_id, headers=headers, timeout=client.timeout
) as response:
if response.status == 200:
data = await response.json()
if not isinstance(data, dict):
return None
data_processed = {
"name": data.get("name"),
"description": data.get("description"),
"banner": data.get("banner"),
"icon": data.get("icon"),
"pubkey": data.get("pubkey"),
"contact": data.get("contact"),
"supported_nips": data.get("supported_nips"),
"software": data.get("software"),
"version": data.get("version"),
"privacy_policy": data.get("privacy_policy"),
"terms_of_service": data.get("terms_of_service"),
"limitation": data.get("limitation"),
"extra_fields": {
key: value
for key, value in data.items()
if key
not in [
"name",
"description",
"banner",
"icon",
"pubkey",
"contact",
"supported_nips",
"software",
"version",
"privacy_policy",
"terms_of_service",
"limitation",
]
},
}
# Validate string fields
string_fields = [
"name",
"description",
"banner",
"icon",
"pubkey",
"contact",
"software",
"version",
"privacy_policy",
"terms_of_service",
]
for key in string_fields:
if not (
isinstance(data_processed[key], str) or data_processed[key] is None
):
data_processed[key] = None
# Validate supported_nips list
if not isinstance(data_processed["supported_nips"], list):
data_processed["supported_nips"] = None
else:
data_processed["supported_nips"] = [
nip
for nip in data_processed["supported_nips"]
if isinstance(nip, (int, str))
]
# Validate dictionary fields
dict_fields = ["limitation", "extra_fields"]
for key in dict_fields:
field_value = data_processed[key]
if not isinstance(field_value, dict):
data_processed[key] = None
else:
tmp = {}
for dict_key, value in field_value.items():
if isinstance(dict_key, str):
try:
json.dumps(value)
tmp[dict_key] = value
except (TypeError, ValueError):
continue
data_processed[key] = tmp
for value in data_processed.values():
if value is not None:
return RelayMetadata.Nip11.from_dict(data_processed)
else:
logger.debug(
f"NIP-11 not found at {schema + relay_id} (status {response.status})"
)
return None
except (ClientError, TimeoutError) as e:
logger.debug(f"Failed to fetch NIP-11 from {schema + relay_id}: {e}")
except Exception as e:
logger.exception(f"Unexpected error fetching NIP-11: {e}")
return None
[docs]
async def check_connectivity(client: Client) -> tuple[Optional[int], bool]:
"""
Check if the relay is connectable and measure connection time.
This function attempts to establish a WebSocket connection to the relay
and measures the round-trip time for the connection establishment.
Args:
client (Client): An instance of Client (must not be already connected)
Returns:
Tuple[Optional[int], bool]: (rtt_open in ms or None, openable as bool)
- rtt_open: Connection time in milliseconds, or None if failed
- openable: True if connection succeeded, False otherwise
Raises:
ClientConnectionError: If client is already connected
Examples:
Test relay connectivity:
>>> client = Client(relay)
>>> rtt, is_openable = await check_connectivity(client)
>>> if is_openable:
... print(f"Relay is reachable in {rtt}ms")
... else:
... print("Relay is not reachable")
Use in relay testing:
>>> for relay_url in relay_list:
... client = Client(Relay(relay_url))
... rtt, openable = await check_connectivity(client)
... if openable:
... print(f"{relay_url}: {rtt}ms")
"""
if client.is_connected:
raise ClientConnectionError("Client is already connected")
rtt_open = None
openable = False
try:
time_start = time.perf_counter()
async with client:
time_end = time.perf_counter()
rtt_open = int((time_end - time_start) * 1000)
openable = True
except ClientConnectionError as e:
logger.debug(f"Relay connection error: {e}")
except Exception as e:
logger.exception(f"Unexpected error during connectivity check: {e}")
# Ensure consistency - if not openable, rtt_open must be None
if not openable:
rtt_open = None
return rtt_open, openable
[docs]
async def check_readability(client: Client) -> tuple[Optional[int], bool]:
"""
Check if the relay allows reading events and measure read response time.
This function subscribes to a simple filter and measures how long it takes
to receive a response (either events or end-of-stored-events).
Args:
client (Client): An instance of Client (must be connected)
Returns:
Tuple[Optional[int], bool]: (rtt_read in ms or None, readable as bool)
Raises:
ClientConnectionError: If client is not connected
"""
if not client.is_connected:
raise ClientConnectionError("Client is not connected")
rtt_read = None
readable = False
try:
filter = Filter(limit=1)
time_start = time.perf_counter()
subscription_id = await client.subscribe(filter)
# Listen for first response to measure read capability
async for message in client.listen():
if rtt_read is None:
time_end = time.perf_counter()
rtt_read = int((time_end - time_start) * 1000)
if message[0] == "EVENT" and message[1] == subscription_id:
readable = True
break
elif message[0] == "EOSE" and message[1] == subscription_id:
readable = True
break # End of stored events
elif message[0] == "CLOSED" and message[1] == subscription_id:
break # Subscription closed
elif message[0] == "NOTICE":
continue # Ignore notices
await client.unsubscribe(subscription_id)
except ClientConnectionError as e:
logger.debug(f"Relay connection error: {e}")
except Exception as e:
logger.exception(f"Unexpected error during readability check: {e}")
# Ensure consistency - if not readable, rtt_read must be None
if not readable:
rtt_read = None
return rtt_read, readable
[docs]
async def check_writability(
client: Client,
sec: str,
pub: str,
target_difficulty: Optional[int] = None,
event_creation_timeout: Optional[int] = None,
) -> tuple[Optional[int], bool]:
"""
Check if the relay allows writing events and measure write response time.
This function creates and publishes a test event (kind 30166) to the relay
and measures the response time. The event uses the relay URL as identifier.
Args:
client (Client): An instance of Client (must be connected)
sec (str): Private key for signing the test event
pub (str): Public key corresponding to the private key
target_difficulty (Optional[int]): Proof-of-work difficulty for the event
event_creation_timeout (Optional[int]): Timeout for event creation
Returns:
Tuple[Optional[int], bool]: (rtt_write in ms or None, writable as bool)
Raises:
ClientConnectionError: If client is not connected
"""
if not client.is_connected:
raise ClientConnectionError("Client is not connected")
rtt_write = None
writable = False
try:
# Generate test event with relay URL as identifier
timeout = (
event_creation_timeout if event_creation_timeout is not None else (client.timeout or 10)
)
event_dict = generate_event(
sec,
pub,
30166, # Parameterized replaceable event kind
[["d", client.relay.url]], # 'd' tag for identifier
"{}", # Empty JSON content
target_difficulty=target_difficulty,
timeout=timeout,
)
event = Event.from_dict(event_dict)
# Measure publish response time
time_start = time.perf_counter()
# Now raises ClientPublicationError on failure
await client.publish(event)
time_end = time.perf_counter()
rtt_write = int((time_end - time_start) * 1000)
writable = True # If we get here, publish succeeded
except ClientPublicationError as e:
logger.debug(f"Publish error: {e}")
writable = False
except ClientConnectionError as e:
logger.debug(f"Relay connection error: {e}")
except Exception as e:
logger.exception(f"Unexpected error during writability check: {e}")
# Ensure consistency - if not writable, rtt_write must be None
if not writable:
rtt_write = None
return rtt_write, writable
[docs]
async def fetch_nip66(
client: Client,
sec: str,
pub: str,
target_difficulty: Optional[int] = None,
event_creation_timeout: Optional[int] = None,
) -> Optional[RelayMetadata.Nip66]:
"""
Fetch comprehensive connection metrics from the relay.
This function performs a complete connectivity test including connection
establishment, read capability testing, and write capability testing.
Args:
client (Client): An instance of Client (must not be already connected)
sec (str): Private key for signing test events
pub (str): Public key corresponding to the private key
target_difficulty (Optional[int]): Proof-of-work difficulty for test events
event_creation_timeout (Optional[int]): Timeout for event creation
Returns:
Optional[RelayMetadata.Nip66]: NIP-66 metadata or None if not available
Raises:
ClientConnectionError: If client is already connected
"""
if client.is_connected:
raise ClientConnectionError("Client is already connected")
rtt_open = None
rtt_read = None
rtt_write = None
openable = False
writable = False
readable = False
try:
# Test basic connectivity first
rtt_open, openable = await check_connectivity(client)
if not openable:
return None
# Test read and write capabilities while connected
async with client:
rtt_read, readable = await check_readability(client)
rtt_write, writable = await check_writability(
client, sec, pub, target_difficulty, event_creation_timeout
)
data = {
"rtt_open": rtt_open,
"rtt_read": rtt_read,
"rtt_write": rtt_write,
"openable": openable,
"writable": writable,
"readable": readable,
}
return RelayMetadata.Nip66.from_dict(data)
except ClientConnectionError as e:
logger.debug(f"Relay connection error: {e}")
except Exception as e:
logger.exception(f"Unexpected error fetching NIP-66: {e}")
return None