I am playing with the gRPC HelloWorld (gRPC 1.66) example and message sizes. For unary streams, I am getting correct response RESOURCE_EXHAUSTED for too big server responses. However, for
rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}
I am getting status CANCELED, when subsequent server messages are too big. Is this correct behaviour? I'd expect the same response as for unary RPC.
EDIT: Added sample server and client
#include <iostream>
#include <memory>
#include <string>
#include <random>
#include <thread>
#include <grpcpp/grpcpp.h>
#include "helloworld.grpc.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerWriter;
using helloworld::Greeter;
using helloworld::HelloRequest;
using helloworld::HelloReply;
std::string generateLongString(size_t length) {
const std::string chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
std::random_device random_device;
std::mt19937 generator(random_device());
std::uniform_int_distribution<> distribution(0, chars.size() - 1);
std::string longString;
for (size_t i = 0; i < length; ++i) {
longString += chars[distribution(generator)];
}
return longString;
}
class GreeterServiceImpl final : public Greeter::Service {
public:
Status SayHello(ServerContext* context, const HelloRequest* request, HelloReply* reply) override {
reply->set_message("Hello " + request->name());
return Status::OK;
}
Status SayHelloStreamReply(ServerContext* context, const HelloRequest* request, ServerWriter<HelloReply>* writer) override {
for (int i = 0; i < 5; ++i) {
HelloReply reply;
reply.set_message("Hello " + generateLongString(i * 200) + request->name() + " - message " + std::to_string(i + 1));
writer->Write(reply);
std::this_thread::sleep_for(std::chrono::seconds(1)); // Simulate delay
}
return Status::OK;
}
};
void RunServer() {
std::string server_address("0.0.0.0:50051");
GreeterServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
int main(int argc, char** argv) {
RunServer();
return 0;
}
Client code:
#include <iostream>
#include <memory>
#include <string>
#include <chrono>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/log/check.h"
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif
ABSL_FLAG(std::string, target, "localhost:50051", "Server address");
using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using helloworld::Greeter;
using helloworld::HelloReply;
using helloworld::HelloRequest;
void* tag_get() {
return reinterpret_cast<void*>(8);
}
void* tag_read() {
return reinterpret_cast<void*>(16);
}
void* tag_finish() {
return reinterpret_cast<void*>(32);
}
class GreeterClient {
public:
explicit GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
void SayHelloStream(const std::string& user) {
HelloRequest request;
request.set_name(user);
ClientContext cx;
CompletionQueue cq;
auto stream = stub_->AsyncSayHelloStreamReply(&cx, request, &cq, tag_get());
void *ptag{nullptr};
bool ok = false;
HelloReply reply;
grpc:Status rpc_status;
std::chrono::milliseconds _5s{5000};
while (true) {
const auto deadline{std::chrono::system_clock::now() + _5s};
switch (cq.AsyncNext(&ptag, &ok, deadline)) {
case grpc::CompletionQueue::NextStatus::SHUTDOWN:
std::cout << "Shutdown" << std::endl;
break;
case grpc::CompletionQueue::NextStatus::TIMEOUT:
std::cout << "Request timeout" << std::endl;
break;
case grpc::CompletionQueue::NextStatus::GOT_EVENT:
if (ok) {
if (ptag == tag_finish()) {
std::cout << ":: error: " << rpc_status.error_message() << std::endl;
std::cout << ":: details: " << rpc_status.error_details() << std::endl;
std::cout << ":: code: " << rpc_status.error_code() << std::endl;
return;
} else {
if (ptag == tag_read()) {
std::cout << ":: received message size: " << reply.message().size() << std::endl;
}
stream->Read(&reply, tag_read());
}
} else {
std::cout << "Closing stream..." << std::endl;
stream->Finish(&rpc_status, tag_finish());
}
break;
}
}
}
private:
std::unique_ptr<Greeter::Stub> stub_;
};
std::shared_ptr<grpc::Channel> create_channel(const std::string& addr) {
grpc::ChannelArguments args{};
args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 2 * 60 * 1000);
args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10 * 1000);
args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
args.SetMaxReceiveMessageSize(256 /* * 1024 * 1024*/);
return grpc::CreateCustomChannel(addr, grpc::InsecureChannelCredentials(), args);
}
int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
std::string target_str = absl::GetFlag(FLAGS_target);
GreeterClient greeter(create_channel(target_str));
std::string user("world");
greeter.SayHelloStream(user);
return 0;
}
Sample output:
:: received message size: 23
:: received message size: 223
Closing stream...
:: error: CANCELLED
:: details:
:: code: 1
I am playing with the gRPC HelloWorld (gRPC 1.66) example and message sizes. For unary streams, I am getting correct response RESOURCE_EXHAUSTED for too big server responses. However, for
rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}
I am getting status CANCELED, when subsequent server messages are too big. Is this correct behaviour? I'd expect the same response as for unary RPC.
EDIT: Added sample server and client
#include <iostream>
#include <memory>
#include <string>
#include <random>
#include <thread>
#include <grpcpp/grpcpp.h>
#include "helloworld.grpc.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerWriter;
using helloworld::Greeter;
using helloworld::HelloRequest;
using helloworld::HelloReply;
std::string generateLongString(size_t length) {
const std::string chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
std::random_device random_device;
std::mt19937 generator(random_device());
std::uniform_int_distribution<> distribution(0, chars.size() - 1);
std::string longString;
for (size_t i = 0; i < length; ++i) {
longString += chars[distribution(generator)];
}
return longString;
}
class GreeterServiceImpl final : public Greeter::Service {
public:
Status SayHello(ServerContext* context, const HelloRequest* request, HelloReply* reply) override {
reply->set_message("Hello " + request->name());
return Status::OK;
}
Status SayHelloStreamReply(ServerContext* context, const HelloRequest* request, ServerWriter<HelloReply>* writer) override {
for (int i = 0; i < 5; ++i) {
HelloReply reply;
reply.set_message("Hello " + generateLongString(i * 200) + request->name() + " - message " + std::to_string(i + 1));
writer->Write(reply);
std::this_thread::sleep_for(std::chrono::seconds(1)); // Simulate delay
}
return Status::OK;
}
};
void RunServer() {
std::string server_address("0.0.0.0:50051");
GreeterServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
int main(int argc, char** argv) {
RunServer();
return 0;
}
Client code:
#include <iostream>
#include <memory>
#include <string>
#include <chrono>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/log/check.h"
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif
ABSL_FLAG(std::string, target, "localhost:50051", "Server address");
using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using helloworld::Greeter;
using helloworld::HelloReply;
using helloworld::HelloRequest;
void* tag_get() {
return reinterpret_cast<void*>(8);
}
void* tag_read() {
return reinterpret_cast<void*>(16);
}
void* tag_finish() {
return reinterpret_cast<void*>(32);
}
class GreeterClient {
public:
explicit GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
void SayHelloStream(const std::string& user) {
HelloRequest request;
request.set_name(user);
ClientContext cx;
CompletionQueue cq;
auto stream = stub_->AsyncSayHelloStreamReply(&cx, request, &cq, tag_get());
void *ptag{nullptr};
bool ok = false;
HelloReply reply;
grpc:Status rpc_status;
std::chrono::milliseconds _5s{5000};
while (true) {
const auto deadline{std::chrono::system_clock::now() + _5s};
switch (cq.AsyncNext(&ptag, &ok, deadline)) {
case grpc::CompletionQueue::NextStatus::SHUTDOWN:
std::cout << "Shutdown" << std::endl;
break;
case grpc::CompletionQueue::NextStatus::TIMEOUT:
std::cout << "Request timeout" << std::endl;
break;
case grpc::CompletionQueue::NextStatus::GOT_EVENT:
if (ok) {
if (ptag == tag_finish()) {
std::cout << ":: error: " << rpc_status.error_message() << std::endl;
std::cout << ":: details: " << rpc_status.error_details() << std::endl;
std::cout << ":: code: " << rpc_status.error_code() << std::endl;
return;
} else {
if (ptag == tag_read()) {
std::cout << ":: received message size: " << reply.message().size() << std::endl;
}
stream->Read(&reply, tag_read());
}
} else {
std::cout << "Closing stream..." << std::endl;
stream->Finish(&rpc_status, tag_finish());
}
break;
}
}
}
private:
std::unique_ptr<Greeter::Stub> stub_;
};
std::shared_ptr<grpc::Channel> create_channel(const std::string& addr) {
grpc::ChannelArguments args{};
args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 2 * 60 * 1000);
args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10 * 1000);
args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
args.SetMaxReceiveMessageSize(256 /* * 1024 * 1024*/);
return grpc::CreateCustomChannel(addr, grpc::InsecureChannelCredentials(), args);
}
int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
std::string target_str = absl::GetFlag(FLAGS_target);
GreeterClient greeter(create_channel(target_str));
std::string user("world");
greeter.SayHelloStream(user);
return 0;
}
Sample output:
:: received message size: 23
:: received message size: 223
Closing stream...
:: error: CANCELLED
:: details:
:: code: 1
Share
Improve this question
edited Mar 14 at 8:45
Patrik Polakovic
asked Mar 10 at 12:37
Patrik PolakovicPatrik Polakovic
5691 gold badge8 silver badges21 bronze badges
1 Answer
Reset to default 0You should get RESOURCE_EXHAUSTED status code if the RPC fails due to exceeding the max message size, for both unary and streaming, and the status message should indicate that the failure was because of exceeding the max message size. If you're seeing CANCELLED instead, it may be that the failure is not actually caused by exceeding the max message size but rather by something cancelling the RPC.
It's hard to say for sure what's happening here without seeing the exact code you're using to reproduce the problem.
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744846318a4596859.html
评论列表(0条)