o
    }jh4                     @   s  d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZm	Z	m
Z
mZmZ d dlmZmZmZ d dlZd dlmZ d dlmZ d dlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZm Z m!Z!m"Z"m#Z# ddl$m%Z% ddl&m'Z'm(Z( dedefddZ)e*e+Z,G dd dZ-dS )    Nwraps)AnyCallableDictListOptionalUnion)	urlencodeurlparse
urlunparse)ValidationError)connect)ClientConnection   )NotConnectedError)MessageServerMessageAdapter)http_endpoint_url)DEFAULT_HEARTBEAT_INTERVALDEFAULT_TIMEOUTPHOENIX_CHANNELVSNChannelEvents)	is_ws_url   )AsyncRealtimeChannelRealtimeChannelOptionsfuncreturnc                    s   t   fdd}|S )Nc                     s"   t d j d  | i |S )Nz	Warning: z is deprecated.)loggerwarning__name__)argskwargsr    L/var/www/html/bot/env/lib/python3.10/site-packages/realtime/_async/client.pywrapper   s   zdeprecated.<locals>.wrapperr   )r   r(   r&   r%   r'   
deprecated   s   r)   c                   @   s  e Zd Zdddeddefdedee dedeeee	f  d	e
d
e
dede
ddfddZedefddZd<ddZd<ddZd<ddZedd Zd<ddZdejjddfddZdd  Zd<d!d"Zd<d#d$Z	d=d%edee defd&d'Zdee fd(d)Z d*eddfd+d,Z!d*eddfd-d.Z"d<d/d0Z#dee ddfd1d2Z$defd3d4Z%d5e&e'eee	f f ddfd6d7Z(d%efd8d9Z)defd:d;Z*dS )>AsyncRealtimeClientNT   g      ?urltokenauto_reconnectparamshb_intervalmax_retriesinitial_backofftimeoutr   c	           	   	   C   s   t |stdtjddtjdd|tjdtjd d| _|r*|  jd| 7  _t|| _|p2i | _|| _	|| _
g | _|| _d	| _d
| _|| _i | _|| _|| _|| _d	| _d	| _d	S )a  
        Initialize a RealtimeClient instance for WebSocket communication.

        :param url: WebSocket URL of the Realtime server. Starts with `ws://` or `wss://`.
                   Also accepts default Supabase URL: `http://` or `https://`.
        :param token: Authentication token for the WebSocket connection.
        :param auto_reconnect: If True, automatically attempt to reconnect on disconnection. Defaults to True.
        :param params: Optional parameters for the connection. Defaults to None.
        :param hb_interval: Interval (in seconds) for sending heartbeat messages to keep the connection alive. Defaults to 25.
        :param max_retries: Maximum number of reconnection attempts. Defaults to 5.
        :param initial_backoff: Initial backoff time (in seconds) for reconnection attempts. Defaults to 1.0.
        :param timeout: Connection timeout in seconds. Defaults to DEFAULT_TIMEOUT.
        z4url must be a valid WebSocket URL or HTTP URL stringzhttps://zwss://zhttp://zws://)flagsz
/websocketz?apikey=Nr   )r   
ValueErrorresub
IGNORECASEr,   r   http_endpointr/   apikeyaccess_tokensend_bufferr0   _ws_connectionrefr.   channelsr1   r2   r3   _listen_task_heartbeat_task)	selfr,   r-   r.   r/   r0   r1   r2   r3   r&   r&   r'   __init__)   s(   ,


zAsyncRealtimeClient.__init__c                 C   s
   | j d uS N)r=   rB   r&   r&   r'   is_connectedV   s   
z AsyncRealtimeClient.is_connectedc                    s   | j stdzQ| j 2 zI3 dH W }td| zt|}W n ty> } ztd|d|  W Y d}~qd}~ww td|  | j	|j
 }rU|| q6 W dS  tjjyv } z| |I dH  W Y d}~dS d}~ww )zN
        An infinite loop that keeps listening.
        :return: None
        _listenNz	receive: zUnrecognized message format 
zparsed message as )r=   r   r    infor   validate_jsonr   errorr?   gettopic_handle_message
websockets
exceptionsConnectionClosedError_on_connect_error)rB   msgmessageechannelr&   r&   r'   rG   Z   s,   
zAsyncRealtimeClient._listenc                    sV   d | _ |  I d H  | jr'| j D ]\}}td|  | I d H  qd S d S )Nz&Rejoining channel after reconnection: )r=   r   rF   r?   itemsr    rI   _rejoin)rB   rM   rV   r&   r&   r'   
_reconnectr   s   zAsyncRealtimeClient._reconnectc                    sL  | j rtd dS d}| j}td| j  || jk rzt| jI dH }|| _td |  I dH W S  t	y } zU|d7 }t
dt|  || jksT| jsbt
d| d	t|   |d
|d   }td| d| j d|dd| d	 t|I dH  t|d
 d}W Y d}~nd}~ww || jk st	d| j d)a{  
        Establishes a WebSocket connection with exponential backoff retry mechanism.

        This method attempts to connect to the WebSocket server. If the connection fails,
        it will retry with an exponential backoff strategy up to a maximum number of retries.

        Returns:
            None

        Raises:
            Exception: If unable to establish a connection after max_retries attempts.

        Note:
            - The initial backoff time and maximum retries are set during RealtimeClient initialization.
            - The backoff time doubles after each failed attempt, up to a maximum of 60 seconds.
        z(WebSocket connection already establishedNr   z&Attempting to connect to WebSocket at z-WebSocket connection established successfullyr   zConnection attempt failed: z$Connection failed permanently after z attempts. Error: r   zRetry /z: Next attempt in z.2fzs (backoff=zs)<   z/Failed to establish WebSocket connection after z	 attempts)rF   r    rI   r2   r,   r1   r   r=   _on_connect	ExceptionrK   strr.   asynciosleepmin)rB   retriesbackoffwsrU   	wait_timer&   r&   r'   r   {   sB   


 
zAsyncRealtimeClient.connectc                    s   d S rD   r&   rE   r&   r&   r'   listen   s   zAsyncRealtimeClient.listenc                    s`   | j r| j   d | _ | jr| j  d | _t|  | _ t|  | _|  I d H  d S rD   )r@   cancelrA   r_   create_taskrG   
_heartbeat_flush_send_bufferrE   r&   r&   r'   r\      s   

zAsyncRealtimeClient._on_connectrU   c                    sL   t d|j d|j  | jrt d |  I d H  d S t d d S )Nz'WebSocket connection closed with code: z
, reason: z%Initiating auto-reconnect sequence...z/Auto-reconnect disabled, terminating connection)r    rK   codereasonr.   rI   rY   )rB   rU   r&   r&   r'   rR      s   
z%AsyncRealtimeClient._on_connect_errorc                    s@   | j rt| jdkr| jD ]}| I d H  qg | _d S d S d S )Nr   )rF   lenr<   )rB   callbackr&   r&   r'   rj      s   

z&AsyncRealtimeClient._flush_send_bufferc                    sR   | j r| j  I dH  d| _ | jr| j  d| _| jr'| j  d| _dS dS )z
        Close the WebSocket connection.

        Returns:
            None

        Raises:
            NotConnectedError: If the connection is not established when this method is called.
        N)r=   closer@   rg   rA   rE   r&   r&   r'   ro      s   


zAsyncRealtimeClient.closec              
      s   | j std| jr_ztttji d d}| |I d H  t	t
| jdI d H  W n/ tjjyF } z| |I d H  W Y d }~nd }~w tjjyY } zW Y d }~nd }~ww | jsd S d S )Nri   )rM   eventpayloadr>      )r=   r   rF   r   r   r   	heartbeatsendr_   r`   maxr0   rO   rP   rQ   rR   ConnectionClosedOK)rB   datarU   r&   r&   r'   ri      s*   zAsyncRealtimeClient._heartbeatrM   c                 C   s$   d| }t | ||}|| j|< |S )z
        Initialize a channel and create a two-way association with the socket.

        :param topic: The topic to subscribe to
        :param params: Optional channel parameters
        :return: AsyncRealtimeChannel instance
        z	realtime:)r   r?   )rB   rM   r/   chanr&   r&   r'   rV     s   


zAsyncRealtimeClient.channelc                 C   s   t | j S rD   )listr?   valuesrE   r&   r&   r'   get_channels     z AsyncRealtimeClient.get_channelsrV   c                 C   s   | j |j= d S rD   )r?   rM   rB   rV   r&   r&   r'   _remove_channel  r|   z#AsyncRealtimeClient._remove_channelc                    sH   |j | jv r| j|j   I dH  t| jdkr"|  I dH  dS dS )z
        Unsubscribes and removes a channel from the socket
        :param channel: Channel to remove
        :return: None
        Nr   )rM   r?   unsubscriberm   ro   r}   r&   r&   r'   remove_channel  s   z"AsyncRealtimeClient.remove_channelc                    s6   | j  D ]\}}| I dH  q|  I dH  dS )z]
        Unsubscribes and removes all channels from the socket
        :return: None
        N)r?   rW   r   ro   )rB   _rV   r&   r&   r'   remove_all_channels#  s   z'AsyncRealtimeClient.remove_all_channelsc                    sD   || _ | j D ]\}}|jr|jr|tj d|iI dH  q	dS )a  
        Set the authentication token for the connection and update all joined channels.

        This method updates the access token for the current connection and sends the new token
        to all joined channels. This is useful for refreshing authentication or changing users.

        Args:
            token (Optional[str]): The new authentication token. Can be None to remove authentication.

        Returns:
            None
        r;   N)r;   r?   rW   _joined_once	is_joinedpushr   )rB   r-   r   rV   r&   r&   r'   set_auth-  s   zAsyncRealtimeClient.set_authc                 C   s   |  j d7  _ | j  S )Nr   )r>   rE   r&   r&   r'   	_make_ref@  s   zAsyncRealtimeClient._make_refrT   c                    sv   t |tr	|}ntd tdi |}|  td    fdd}jr3| I dH  dS j| dS )a  
        Send a message through the WebSocket connection.

        This method serializes the given message dictionary to JSON,
        and sends it through the WebSocket connection. If the connection
        is not currently established, the message will be buffered and sent
        once the connection is re-established.

        Args:
            message (Dict[str, Any]): The message to be sent, as a dictionary.

        Returns:
            None
        zWarning: calling AsyncRealtimeClient.send with a dictionary is deprecated. Please call it with a Message object instead. This will be a hard error in the future.zsend: c               
      sz   j stdzj  I d H  W d S  tjjy1 }  z| I d H  W Y d } ~ d S d } ~ w tjjy<   Y d S w )N_send)r=   r   rt   rO   rP   rQ   rR   rv   )rU   message_strrB   r&   r'   send_message]  s   z.AsyncRealtimeClient.send.<locals>.send_messageNr&   )	
isinstancer   r    r!   model_dump_jsonrI   rF   r<   append)rB   rT   rS   r   r&   r   r'   rt   D  s   
zAsyncRealtimeClient.sendc                    s6    fdd| j  D }|D ]	}| I d H  qd S )Nc                    s&   g | ]}|j  kr|js|jr|qS r&   )rM   r   
is_joining).0chrM   r&   r'   
<listcomp>n  s
    z9AsyncRealtimeClient._leave_open_topic.<locals>.<listcomp>)r?   rz   r   )rB   rM   dup_channelsr   r&   r   r'   _leave_open_topicm  s   
z%AsyncRealtimeClient._leave_open_topicc                 C   sB   t | j}ti | jdtidd}t|j|j|j|j||j	fS )NvsnT)doseq)
r   r,   r
   r/   r   r   schemenetlocpathfragment)rB   
parsed_urlqueryr&   r&   r'   endpoint_urlw  s   
z AsyncRealtimeClient.endpoint_url)r   NrD   )+r"   
__module____qualname__r   r   r^   r   boolr   r   intfloatrC   propertyrF   rG   rY   r   r)   rf   r\   rO   rP   rQ   rR   rj   ro   ri   r   r   rV   r   r{   r~   r   r   r   r   r	   r   rt   r   r   r&   r&   r&   r'   r*   (   sx    	

-


	6







")
r*   ).r_   jsonloggingr6   	functoolsr   typingr   r   r   r   r   r	   urllib.parser
   r   r   rO   pydanticr   r   websockets.asyncio.clientr   rP   r   rT   r   r   transformersr   typesr   r   r   r   r   utilsr   rV   r   r   r)   	getLoggerr"   r    r*   r&   r&   r&   r'   <module>   s(     
	