Not able to make websocket connection in Rust - Stack Overflow

#[tokio::main]async fn main() {println!("Starting Tokio runtime...");if tokio::runtime::Hand

#[tokio::main]
async fn main() {
    println!("Starting Tokio runtime...");
    if tokio::runtime::Handle::try_current().is_ok() {
        println!("Tokio runtime is active.");
    } else {
        println!("No active Tokio runtime.");
    }
    let local = tokio::task::LocalSet::new(); 
    println!("LocalSet created.");
    dotenv().ok();
    let services = Arc::new(services::new_services());
    let connected_clients: ConnectedClients = Arc::new(Mutex::new(HashMap::new()));

    let make_svc = make_service_fn(|_conn| {
        let services = Arc::clone(&services);
        let clients = Arc::clone(&connected_clients);
        async move {
            Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
                handle_request(req, services.clone(), clients.clone())
            }))
        }
    });

    let port: u16 = env::var("PORT")
        .unwrap_or_else(|_| "8080".to_string())
        .parse()
        .expect("PORT must be a valid number");

    let addr = format!("127.0.0.1:{}", port).parse().expect("Invalid address");

    local.run_until(async {
        let server = Server::bind(&addr).serve(make_svc);

        println!("Server listening on {}", addr);
        if let Err(e) = server.await {
            eprintln!("Server error: {}", e);
        }
    }).await;
}


async fn handle_request(
    mut req: Request<Body>, 
    _services: Arc<Services>, 
    clients: ConnectedClients
) -> Result<Response<Body>, hyper::Error> {
    if req.uri().path() == "/ws" {
        println!("/ws path entered");

        // Check if it's a WebSocket request
        if !is_websocket_request(&req) {
            return Ok(Response::builder()
                .status(StatusCode::BAD_REQUEST)
                .body(Body::from("Not a WebSocket request"))
                .unwrap());
        }
        println!("ITS A WS CONNECTION!");
        // Extract token from query parameters
        let token = extract_token_from_query(req.uri());
        let (mut user_id, 
            mut _id, 
            mut stores, 
            mut first_name, 
            mut last_name, 
            mut is_admin, 
            mut route,
            mut device_id
        ) = (
            String::new(),
            String::new(),
            Vec::new(),
            String::new(),
            String::new(),
            false,
            String::new(),
            None,
        );

        match token {
            Some(ref t) => {
                // Token validation logic
                println!("Entered into match token");
                let secret_key = std::env::var("JWT_SECRET").unwrap_or_else(|_| "default_secret".to_string());
                let decoding_key = DecodingKey::from_secret(secret_key.as_ref());
                let validation = Validation::default();

     // TOKEN VALIDATION AND PARSING


}
                
        // Get the WebSocket key from headers
        let ws_key = req.headers().get(SEC_WEBSOCKET_KEY).and_then(|v| v.to_str().ok()).unwrap();

        // Generate accept key
        let accept_key = derive_accept_key(ws_key.as_bytes());

        println!("WebSocket Key: {:?}", ws_key);
        println!("Accept Key: {}", accept_key);

        let mut response = Response::builder()
            .status(StatusCode::SWITCHING_PROTOCOLS)
            .header(CONNECTION, "Upgrade")
            .header(UPGRADE, "websocket")
            .header(SEC_WEBSOCKET_ACCEPT, accept_key);

        // Add protocol if present
        if let Some(protocol) = req.headers().get(SEC_WEBSOCKET_PROTOCOL) {
            response = response.header(SEC_WEBSOCKET_PROTOCOL, protocol);
        }

        let response = response.body(Body::empty()).unwrap();
        println!("response : {:#?}", response);

        
    // Perform WebSocket upgrade and wrapping outside `tokio::spawn`
   
    let device_id_clone = device_id.clone(); 
    
    let local = LocalSet::new();    

    println!("local {:#?}", local);
    if tokio::runtime::Handle::try_current().is_err() {
        println!("No active Tokio runtime");
    }
    println!("Incoming request: {:?}", req);
   
    
    
    local.spawn_local(async  move{
        println!("Entered into local spawn");
        match on(req).await {
            Ok(upgraded) => {
                println!("Connection upgraded");
    
         
                let ws_stream: WebSocketStream<Upgraded> = WebSocketStream::from_raw_socket(
                    upgraded,
                    Role::Server,
                    Some(WebSocketConfig::default())
                ).await;

               

                let shared_ws_stream = Arc::new(RwLock::new(ws_stream));
                let shared_ws_stream_clone = Arc::clone(&shared_ws_stream);
               
                
                let user_connection: Connection = Connection::new(
                    None, 
                    None, 
                    shared_ws_stream,
                    device_id.clone(), 
                    None, 
                    None,
                );
    
                println!("WebSocket stream created");
            
            if let Err(e) = handle_websocket_stream(shared_ws_stream_clone, clients).await {
                eprintln!("WebSocket handler error: {}", e);
            }
            }
            Err(e) => eprintln!("Failed to upgrade connection: {}", e),
        }
    });
   
    Ok(response)
} else {
    Ok(Response::new(Body::from("404 not found!")))
}
}

Trying to make a websocket connection in rust, but the connection breaks abruptly after connecting for 1-2 milliseconds Below are the terminal logs:

Starting Tokio runtime...
Tokio runtime is active.
LocalSet created.
Server listening on 127.0.0.1:8080
/ws path entered
WebSocket request validation:
  Upgrade header present: true
  Connection upgrade: true
  Has WebSocket key: true
  Has correct version: true
ITS A WS CONNECTION!
Entered into match token
Token validated successfully
CALLED
WebSocket Key: "rTNBeHrwfawHZ9uhlw3M1Q=="
Accept Key: pJhyS7Aq49YTSEt5997ylXf8Z7g=
response : Response {
    status: 101,
    version: HTTP/1.1,
    headers: {
        "connection": "Upgrade",
        "upgrade": "websocket",
        "sec-websocket-accept": "pJhyS7Aq49YTSEt5997ylXf8Z7g=",
    },
    body: Body(
        Empty,
    ),
}
local LocalSet
Incoming request: Request { method: GET, uri: /ws?token=JWT_TOKEN, version: HTTP/1.1, headers: {"sec-websocket-version": "13", "sec-websocket-key": "rTNBeHrwfawHZ9uhlw3M1Q==", "connection": "Upgrade", "upgrade": "websocket", "sec-websocket-extensions": "permessage-deflate; client_max_window_bits", "host": "localhost:8080"}, body: Body(Empty) }

**The Code is not entering into local.spawn_local block so its not able to make a connection, as per me. If anyone has any idea how to solve this, please share. **

Here's the full error after adding the local.await before Ok(response)

future cannot be sent between threads safely
within `hyper::proto::h2::server::H2Stream<impl futures::Future<Output = Result<hyper::Response<Body>, hyper::Error>>, Body>`, the trait `std::marker::Send` is not implemented for `Rc<tokio::task::local::Context>`, which is required by `hyper::common::exec::Exec: hyper::common::exec::ConnStreamExec<_, _>`
the trait `hyper::common::exec::ConnStreamExec<F, B>` is implemented for `hyper::common::exec::Exec`

Second error at server.await in main function:

the trait bound `hyper::common::exec::Exec: hyper::common::exec::ConnStreamExec<impl futures::Future<Output = Result<hyper::Response<Body>, hyper::Error>>, Body>` is not satisfied
the trait `hyper::common::exec::ConnStreamExec<F, B>` is implemented for `hyper::common::exec::Exec`
required for `NewSvcTask<AddrStream, {async block@src/main.rs:114:9: 118:10}, ServiceFn<..., ...>, ..., ...>` to implement `futures::Future`
required for `hyper::common::exec::Exec` to implement `hyper::common::exec::NewSvcExec<AddrStream, {async block@src/main.rs:114:9: 118:10}, hyper::service::util::ServiceFn<{closure@src/main.rs:115:46: 115:71}, Body>, hyper::common::exec::Exec, hyper::server::server::NoopWatcher>`
1 redundant requirement hidden
required for `hyper::Server<AddrIncoming, hyper::service::make::MakeServiceFn<{closure@src/main.rs:111:36: 111:43}>>` to implement `futures::Future`
required for `hyper::Server<AddrIncoming, hyper::service::make::MakeServiceFn<{closure@src/main.rs:111:36: 111:43}>>` to implement `std::future::IntoFuture`
consider using `--verbose` to print the full type name to the console
#[tokio::main]
async fn main() {
    println!("Starting Tokio runtime...");
    if tokio::runtime::Handle::try_current().is_ok() {
        println!("Tokio runtime is active.");
    } else {
        println!("No active Tokio runtime.");
    }
    let local = tokio::task::LocalSet::new(); 
    println!("LocalSet created.");
    dotenv().ok();
    let services = Arc::new(services::new_services());
    let connected_clients: ConnectedClients = Arc::new(Mutex::new(HashMap::new()));

    let make_svc = make_service_fn(|_conn| {
        let services = Arc::clone(&services);
        let clients = Arc::clone(&connected_clients);
        async move {
            Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
                handle_request(req, services.clone(), clients.clone())
            }))
        }
    });

    let port: u16 = env::var("PORT")
        .unwrap_or_else(|_| "8080".to_string())
        .parse()
        .expect("PORT must be a valid number");

    let addr = format!("127.0.0.1:{}", port).parse().expect("Invalid address");

    local.run_until(async {
        let server = Server::bind(&addr).serve(make_svc);

        println!("Server listening on {}", addr);
        if let Err(e) = server.await {
            eprintln!("Server error: {}", e);
        }
    }).await;
}


async fn handle_request(
    mut req: Request<Body>, 
    _services: Arc<Services>, 
    clients: ConnectedClients
) -> Result<Response<Body>, hyper::Error> {
    if req.uri().path() == "/ws" {
        println!("/ws path entered");

        // Check if it's a WebSocket request
        if !is_websocket_request(&req) {
            return Ok(Response::builder()
                .status(StatusCode::BAD_REQUEST)
                .body(Body::from("Not a WebSocket request"))
                .unwrap());
        }
        println!("ITS A WS CONNECTION!");
        // Extract token from query parameters
        let token = extract_token_from_query(req.uri());
        let (mut user_id, 
            mut _id, 
            mut stores, 
            mut first_name, 
            mut last_name, 
            mut is_admin, 
            mut route,
            mut device_id
        ) = (
            String::new(),
            String::new(),
            Vec::new(),
            String::new(),
            String::new(),
            false,
            String::new(),
            None,
        );

        match token {
            Some(ref t) => {
                // Token validation logic
                println!("Entered into match token");
                let secret_key = std::env::var("JWT_SECRET").unwrap_or_else(|_| "default_secret".to_string());
                let decoding_key = DecodingKey::from_secret(secret_key.as_ref());
                let validation = Validation::default();

     // TOKEN VALIDATION AND PARSING


}
                
        // Get the WebSocket key from headers
        let ws_key = req.headers().get(SEC_WEBSOCKET_KEY).and_then(|v| v.to_str().ok()).unwrap();

        // Generate accept key
        let accept_key = derive_accept_key(ws_key.as_bytes());

        println!("WebSocket Key: {:?}", ws_key);
        println!("Accept Key: {}", accept_key);

        let mut response = Response::builder()
            .status(StatusCode::SWITCHING_PROTOCOLS)
            .header(CONNECTION, "Upgrade")
            .header(UPGRADE, "websocket")
            .header(SEC_WEBSOCKET_ACCEPT, accept_key);

        // Add protocol if present
        if let Some(protocol) = req.headers().get(SEC_WEBSOCKET_PROTOCOL) {
            response = response.header(SEC_WEBSOCKET_PROTOCOL, protocol);
        }

        let response = response.body(Body::empty()).unwrap();
        println!("response : {:#?}", response);

        
    // Perform WebSocket upgrade and wrapping outside `tokio::spawn`
   
    let device_id_clone = device_id.clone(); 
    
    let local = LocalSet::new();    

    println!("local {:#?}", local);
    if tokio::runtime::Handle::try_current().is_err() {
        println!("No active Tokio runtime");
    }
    println!("Incoming request: {:?}", req);
   
    
    
    local.spawn_local(async  move{
        println!("Entered into local spawn");
        match on(req).await {
            Ok(upgraded) => {
                println!("Connection upgraded");
    
         
                let ws_stream: WebSocketStream<Upgraded> = WebSocketStream::from_raw_socket(
                    upgraded,
                    Role::Server,
                    Some(WebSocketConfig::default())
                ).await;

               

                let shared_ws_stream = Arc::new(RwLock::new(ws_stream));
                let shared_ws_stream_clone = Arc::clone(&shared_ws_stream);
               
                
                let user_connection: Connection = Connection::new(
                    None, 
                    None, 
                    shared_ws_stream,
                    device_id.clone(), 
                    None, 
                    None,
                );
    
                println!("WebSocket stream created");
            
            if let Err(e) = handle_websocket_stream(shared_ws_stream_clone, clients).await {
                eprintln!("WebSocket handler error: {}", e);
            }
            }
            Err(e) => eprintln!("Failed to upgrade connection: {}", e),
        }
    });
   
    Ok(response)
} else {
    Ok(Response::new(Body::from("404 not found!")))
}
}

Trying to make a websocket connection in rust, but the connection breaks abruptly after connecting for 1-2 milliseconds Below are the terminal logs:

Starting Tokio runtime...
Tokio runtime is active.
LocalSet created.
Server listening on 127.0.0.1:8080
/ws path entered
WebSocket request validation:
  Upgrade header present: true
  Connection upgrade: true
  Has WebSocket key: true
  Has correct version: true
ITS A WS CONNECTION!
Entered into match token
Token validated successfully
CALLED
WebSocket Key: "rTNBeHrwfawHZ9uhlw3M1Q=="
Accept Key: pJhyS7Aq49YTSEt5997ylXf8Z7g=
response : Response {
    status: 101,
    version: HTTP/1.1,
    headers: {
        "connection": "Upgrade",
        "upgrade": "websocket",
        "sec-websocket-accept": "pJhyS7Aq49YTSEt5997ylXf8Z7g=",
    },
    body: Body(
        Empty,
    ),
}
local LocalSet
Incoming request: Request { method: GET, uri: /ws?token=JWT_TOKEN, version: HTTP/1.1, headers: {"sec-websocket-version": "13", "sec-websocket-key": "rTNBeHrwfawHZ9uhlw3M1Q==", "connection": "Upgrade", "upgrade": "websocket", "sec-websocket-extensions": "permessage-deflate; client_max_window_bits", "host": "localhost:8080"}, body: Body(Empty) }

**The Code is not entering into local.spawn_local block so its not able to make a connection, as per me. If anyone has any idea how to solve this, please share. **

Here's the full error after adding the local.await before Ok(response)

future cannot be sent between threads safely
within `hyper::proto::h2::server::H2Stream<impl futures::Future<Output = Result<hyper::Response<Body>, hyper::Error>>, Body>`, the trait `std::marker::Send` is not implemented for `Rc<tokio::task::local::Context>`, which is required by `hyper::common::exec::Exec: hyper::common::exec::ConnStreamExec<_, _>`
the trait `hyper::common::exec::ConnStreamExec<F, B>` is implemented for `hyper::common::exec::Exec`

Second error at server.await in main function:

the trait bound `hyper::common::exec::Exec: hyper::common::exec::ConnStreamExec<impl futures::Future<Output = Result<hyper::Response<Body>, hyper::Error>>, Body>` is not satisfied
the trait `hyper::common::exec::ConnStreamExec<F, B>` is implemented for `hyper::common::exec::Exec`
required for `NewSvcTask<AddrStream, {async block@src/main.rs:114:9: 118:10}, ServiceFn<..., ...>, ..., ...>` to implement `futures::Future`
required for `hyper::common::exec::Exec` to implement `hyper::common::exec::NewSvcExec<AddrStream, {async block@src/main.rs:114:9: 118:10}, hyper::service::util::ServiceFn<{closure@src/main.rs:115:46: 115:71}, Body>, hyper::common::exec::Exec, hyper::server::server::NoopWatcher>`
1 redundant requirement hidden
required for `hyper::Server<AddrIncoming, hyper::service::make::MakeServiceFn<{closure@src/main.rs:111:36: 111:43}>>` to implement `futures::Future`
required for `hyper::Server<AddrIncoming, hyper::service::make::MakeServiceFn<{closure@src/main.rs:111:36: 111:43}>>` to implement `std::future::IntoFuture`
consider using `--verbose` to print the full type name to the console
Share Improve this question edited Nov 21, 2024 at 12:32 Sagar Maheshwari asked Nov 21, 2024 at 5:46 Sagar MaheshwariSagar Maheshwari 72 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

A LocalSet doesn't do anything until you actually await it somehow. local.spawn_local doesn't start driving the provided future if the LocalSet itself isn't being driven itself. This is why the code doesn't run -- you send the future to the LocalSet and then you drop it. This is not too different from writing let foo = async move { ... };, never awaiting foo, and then wondering why the code in the async block doesn't run.

You could add local.await; before Ok(response) for example. Note that this will block the outer function until the future given to the LocalSet finishes. It's not clear from your (long and convoluted) example code whether this is actually the correct solution to the problem, however it would cause the code to run, which fixes the primary issue that you're seeing.

It's not clear why you're even using LocalSet here since you could just await the future directly. I don't see any utility LocalSet is giving you whatsoever.

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1742311784a4420034.html

相关推荐

  • Not able to make websocket connection in Rust - Stack Overflow

    #[tokio::main]async fn main() {println!("Starting Tokio runtime...");if tokio::runtime::Hand

    13小时前
    20

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信