gRPC: getting CANCELED status after exceeding message size - Stack Overflow

I am playing with the gRPC HelloWorld (gRPC 1.66) example and message sizes. For unary streams, I am ge

I am playing with the gRPC HelloWorld (gRPC 1.66) example and message sizes. For unary streams, I am getting correct response RESOURCE_EXHAUSTED for too big server responses. However, for

rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}

I am getting status CANCELED, when subsequent server messages are too big. Is this correct behaviour? I'd expect the same response as for unary RPC.

EDIT: Added sample server and client

#include <iostream>
#include <memory>
#include <string>
#include <random>
#include <thread>
#include <grpcpp/grpcpp.h>
#include "helloworld.grpc.pb.h"

using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerWriter;
using helloworld::Greeter;
using helloworld::HelloRequest;
using helloworld::HelloReply;

std::string generateLongString(size_t length) {
    const std::string chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
    std::random_device random_device;
    std::mt19937 generator(random_device());
    std::uniform_int_distribution<> distribution(0, chars.size() - 1);

    std::string longString;
    for (size_t i = 0; i < length; ++i) {
        longString += chars[distribution(generator)];
    }
    return longString;
}

class GreeterServiceImpl final : public Greeter::Service {
 public:
  Status SayHello(ServerContext* context, const HelloRequest* request, HelloReply* reply) override {
    reply->set_message("Hello " + request->name());
    return Status::OK;
  }

  Status SayHelloStreamReply(ServerContext* context, const HelloRequest* request, ServerWriter<HelloReply>* writer) override {
    for (int i = 0; i < 5; ++i) {
      HelloReply reply;
      reply.set_message("Hello " + generateLongString(i * 200)  + request->name() + " - message " + std::to_string(i + 1));
      writer->Write(reply);
      std::this_thread::sleep_for(std::chrono::seconds(1)); // Simulate delay
    }
    return Status::OK;
  }
};

void RunServer() {
  std::string server_address("0.0.0.0:50051");
  GreeterServiceImpl service;

  ServerBuilder builder;
  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  builder.RegisterService(&service);

  std::unique_ptr<Server> server(builder.BuildAndStart());
  std::cout << "Server listening on " << server_address << std::endl;
  server->Wait();
}

int main(int argc, char** argv) {
  RunServer();
  return 0;
}

Client code:

#include <iostream>
#include <memory>
#include <string>
#include <chrono>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/log/check.h"

#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>

#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif

ABSL_FLAG(std::string, target, "localhost:50051", "Server address");

using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using helloworld::Greeter;
using helloworld::HelloReply;
using helloworld::HelloRequest;

void* tag_get() {
    return reinterpret_cast<void*>(8);
}

void* tag_read() {
    return reinterpret_cast<void*>(16);
}

void* tag_finish() {
    return reinterpret_cast<void*>(32);
}

class GreeterClient {
 public:
  explicit GreeterClient(std::shared_ptr<Channel> channel)
      : stub_(Greeter::NewStub(channel)) {}

  void SayHelloStream(const std::string& user) {
      HelloRequest request;
      request.set_name(user);

      ClientContext cx;
      CompletionQueue cq;

      auto stream = stub_->AsyncSayHelloStreamReply(&cx, request, &cq, tag_get());

      void *ptag{nullptr};
      bool ok = false;

      HelloReply reply;
      grpc:Status rpc_status;
      std::chrono::milliseconds _5s{5000};

      while (true) {
          const auto deadline{std::chrono::system_clock::now() + _5s};

          switch (cq.AsyncNext(&ptag, &ok, deadline)) {
              case grpc::CompletionQueue::NextStatus::SHUTDOWN:
                  std::cout << "Shutdown" << std::endl;
                  break;
              case grpc::CompletionQueue::NextStatus::TIMEOUT:
                  std::cout << "Request timeout" << std::endl;
                  break;
              case grpc::CompletionQueue::NextStatus::GOT_EVENT:
                  if (ok) {
                      if (ptag == tag_finish()) {
                          std::cout << "::   error: " << rpc_status.error_message() << std::endl;
                          std::cout << ":: details: " << rpc_status.error_details() << std::endl;
                          std::cout << "::    code: " << rpc_status.error_code() << std::endl;
                          return;
                      } else {
                          if (ptag == tag_read()) {
                              std::cout << ":: received message size: " << reply.message().size() << std::endl;
                          }

                          stream->Read(&reply, tag_read());
                      }
                  } else {
                      std::cout << "Closing stream..." << std::endl;
                      stream->Finish(&rpc_status, tag_finish());
                  }
                  break;
          }
      }
  }

 private:
  std::unique_ptr<Greeter::Stub> stub_;
};

std::shared_ptr<grpc::Channel> create_channel(const std::string& addr) {
    grpc::ChannelArguments args{};
    args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 2 * 60 * 1000);
    args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10 * 1000);
    args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
    args.SetMaxReceiveMessageSize(256 /* * 1024 * 1024*/);
    return grpc::CreateCustomChannel(addr, grpc::InsecureChannelCredentials(), args);
}

int main(int argc, char** argv) {
  absl::ParseCommandLine(argc, argv);
  std::string target_str = absl::GetFlag(FLAGS_target);
  GreeterClient greeter(create_channel(target_str));
  std::string user("world");
  greeter.SayHelloStream(user);
  return 0;
}

Sample output:

:: received message size: 23
:: received message size: 223
Closing stream...
::   error: CANCELLED
:: details:
::    code: 1

I am playing with the gRPC HelloWorld (gRPC 1.66) example and message sizes. For unary streams, I am getting correct response RESOURCE_EXHAUSTED for too big server responses. However, for

rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}

I am getting status CANCELED, when subsequent server messages are too big. Is this correct behaviour? I'd expect the same response as for unary RPC.

EDIT: Added sample server and client

#include <iostream>
#include <memory>
#include <string>
#include <random>
#include <thread>
#include <grpcpp/grpcpp.h>
#include "helloworld.grpc.pb.h"

using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerWriter;
using helloworld::Greeter;
using helloworld::HelloRequest;
using helloworld::HelloReply;

std::string generateLongString(size_t length) {
    const std::string chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
    std::random_device random_device;
    std::mt19937 generator(random_device());
    std::uniform_int_distribution<> distribution(0, chars.size() - 1);

    std::string longString;
    for (size_t i = 0; i < length; ++i) {
        longString += chars[distribution(generator)];
    }
    return longString;
}

class GreeterServiceImpl final : public Greeter::Service {
 public:
  Status SayHello(ServerContext* context, const HelloRequest* request, HelloReply* reply) override {
    reply->set_message("Hello " + request->name());
    return Status::OK;
  }

  Status SayHelloStreamReply(ServerContext* context, const HelloRequest* request, ServerWriter<HelloReply>* writer) override {
    for (int i = 0; i < 5; ++i) {
      HelloReply reply;
      reply.set_message("Hello " + generateLongString(i * 200)  + request->name() + " - message " + std::to_string(i + 1));
      writer->Write(reply);
      std::this_thread::sleep_for(std::chrono::seconds(1)); // Simulate delay
    }
    return Status::OK;
  }
};

void RunServer() {
  std::string server_address("0.0.0.0:50051");
  GreeterServiceImpl service;

  ServerBuilder builder;
  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  builder.RegisterService(&service);

  std::unique_ptr<Server> server(builder.BuildAndStart());
  std::cout << "Server listening on " << server_address << std::endl;
  server->Wait();
}

int main(int argc, char** argv) {
  RunServer();
  return 0;
}

Client code:

#include <iostream>
#include <memory>
#include <string>
#include <chrono>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/log/check.h"

#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>

#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif

ABSL_FLAG(std::string, target, "localhost:50051", "Server address");

using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using helloworld::Greeter;
using helloworld::HelloReply;
using helloworld::HelloRequest;

void* tag_get() {
    return reinterpret_cast<void*>(8);
}

void* tag_read() {
    return reinterpret_cast<void*>(16);
}

void* tag_finish() {
    return reinterpret_cast<void*>(32);
}

class GreeterClient {
 public:
  explicit GreeterClient(std::shared_ptr<Channel> channel)
      : stub_(Greeter::NewStub(channel)) {}

  void SayHelloStream(const std::string& user) {
      HelloRequest request;
      request.set_name(user);

      ClientContext cx;
      CompletionQueue cq;

      auto stream = stub_->AsyncSayHelloStreamReply(&cx, request, &cq, tag_get());

      void *ptag{nullptr};
      bool ok = false;

      HelloReply reply;
      grpc:Status rpc_status;
      std::chrono::milliseconds _5s{5000};

      while (true) {
          const auto deadline{std::chrono::system_clock::now() + _5s};

          switch (cq.AsyncNext(&ptag, &ok, deadline)) {
              case grpc::CompletionQueue::NextStatus::SHUTDOWN:
                  std::cout << "Shutdown" << std::endl;
                  break;
              case grpc::CompletionQueue::NextStatus::TIMEOUT:
                  std::cout << "Request timeout" << std::endl;
                  break;
              case grpc::CompletionQueue::NextStatus::GOT_EVENT:
                  if (ok) {
                      if (ptag == tag_finish()) {
                          std::cout << "::   error: " << rpc_status.error_message() << std::endl;
                          std::cout << ":: details: " << rpc_status.error_details() << std::endl;
                          std::cout << "::    code: " << rpc_status.error_code() << std::endl;
                          return;
                      } else {
                          if (ptag == tag_read()) {
                              std::cout << ":: received message size: " << reply.message().size() << std::endl;
                          }

                          stream->Read(&reply, tag_read());
                      }
                  } else {
                      std::cout << "Closing stream..." << std::endl;
                      stream->Finish(&rpc_status, tag_finish());
                  }
                  break;
          }
      }
  }

 private:
  std::unique_ptr<Greeter::Stub> stub_;
};

std::shared_ptr<grpc::Channel> create_channel(const std::string& addr) {
    grpc::ChannelArguments args{};
    args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 2 * 60 * 1000);
    args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10 * 1000);
    args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
    args.SetMaxReceiveMessageSize(256 /* * 1024 * 1024*/);
    return grpc::CreateCustomChannel(addr, grpc::InsecureChannelCredentials(), args);
}

int main(int argc, char** argv) {
  absl::ParseCommandLine(argc, argv);
  std::string target_str = absl::GetFlag(FLAGS_target);
  GreeterClient greeter(create_channel(target_str));
  std::string user("world");
  greeter.SayHelloStream(user);
  return 0;
}

Sample output:

:: received message size: 23
:: received message size: 223
Closing stream...
::   error: CANCELLED
:: details:
::    code: 1
Share Improve this question edited Mar 14 at 8:45 Patrik Polakovic asked Mar 10 at 12:37 Patrik PolakovicPatrik Polakovic 5691 gold badge8 silver badges21 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

You should get RESOURCE_EXHAUSTED status code if the RPC fails due to exceeding the max message size, for both unary and streaming, and the status message should indicate that the failure was because of exceeding the max message size. If you're seeing CANCELLED instead, it may be that the failure is not actually caused by exceeding the max message size but rather by something cancelling the RPC.

It's hard to say for sure what's happening here without seeing the exact code you're using to reproduce the problem.

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744846318a4596859.html

相关推荐

  • gRPC: getting CANCELED status after exceeding message size - Stack Overflow

    I am playing with the gRPC HelloWorld (gRPC 1.66) example and message sizes. For unary streams, I am ge

    2天前
    50

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信