asio-grpc v3.4.1
Asynchronous gRPC with Asio/unified executors
|
The following headers have been removed:
bind_allocator.hpp
, cancel_safe.hpp
, default_completion_token.hpp
, get_completion_queue.hpp
, grpc_initiate.hpp
, grpc_stream.hpp
, notify_when_done.hpp
, repeatedly_request_context.hpp
, repeatedly_request.hpp
, rpc.hpp
, use_awaitable.hpp
, wait.hpp
asio-grpc
targets now link with gRPC::grpc++
instead of gRPC::grpc++_unsecure
. To restore the old behavior use:
asio-grpcConfig.cmake
no longer finds and links backend libraries (like Boost.Asio and unifex) to their respective asio-grpc::asio-grpc
targets. Example on how to restore the old behavior for the Boost.Asio backend:
The free function agrpc::wait
has been replaced with a new I/O-object like class called agrpc::Alarm
:
V2 | V3 |
---|---|
asio::awaitable<void> agrpc_wait()
{
grpc::Alarm alarm;
// implicitly uses GrpcContext stored in asio::awaitable:
bool wait_ok = co_await agrpc::wait(
alarm, std::chrono::system_clock::now() + std::chrono::seconds(1));
(void)wait_ok;
}
| asio::awaitable<void> agrpc_alarm(agrpc::GrpcContext& grpc_context)
{
// used GrpcContext is explicit:
agrpc::Alarm alarm{grpc_context};
std::chrono::system_clock::now() + std::chrono::seconds(1), asio::use_awaitable);
(void)wait_ok;
}
|
Migration of client rpc types based on example.proto:
V2 | V3 |
---|---|
asio::awaitable<void> unary_rpc(agrpc::GrpcContext& grpc_context,
example::v1::Example::Stub& stub)
{
grpc::ClientContext client_context;
client_context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
example::v1::Request request;
std::unique_ptr<grpc::ClientAsyncResponseReader<example::v1::Response>> reader =
agrpc::request(&example::v1::Example::Stub::AsyncUnary, stub, client_context,
request, grpc_context);
example::v1::Response response;
grpc::Status status;
co_await agrpc::finish(reader, response, status);
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
std::cout << "Response: " << response.integer();
}
| asio::awaitable<void> client_rpc_unary(agrpc::GrpcContext& grpc_context,
example::v1::Example::Stub& stub)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
grpc::ClientContext client_context;
client_context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
RPC::Request request;
RPC::Response response;
const grpc::Status status =
co_await RPC::request(grpc_context, stub, client_context, request, response);
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
std::cout << "Response: " << response.integer();
}
|
V2 | V3 |
---|---|
asio::awaitable<void> client_streaming_rpc(example::v1::Example::Stub& stub)
{
grpc::ClientContext client_context;
client_context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
example::v1::Response response;
std::unique_ptr<grpc::ClientAsyncWriter<example::v1::Request>> writer;
if (!co_await agrpc::request(&example::v1::Example::Stub::PrepareAsyncClientStreaming,
stub, client_context, writer, response))
{
grpc::Status status;
co_await agrpc::finish(writer, status);
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
example::v1::Request request;
request.set_integer(1);
while (co_await agrpc::write(writer, request) && request.integer() < 42)
{
request.set_integer(request.integer() + 1);
}
co_await agrpc::writes_done(writer);
grpc::Status status;
co_await agrpc::finish(writer, status);
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
std::cout << "Response: " << response.integer();
}
| asio::awaitable<void> client_rpc_client_streaming(agrpc::GrpcContext& grpc_context,
example::v1::Example::Stub& stub)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
RPC rpc{grpc_context};
rpc.context().set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
RPC::Response response;
if (!co_await rpc.start(stub, response))
{
const grpc::Status status = co_await rpc.finish();
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
RPC::Request request;
request.set_integer(1);
while (co_await rpc.write(request) && request.integer() < 42)
{
request.set_integer(request.integer() + 1);
}
const grpc::Status status = co_await rpc.finish();
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
std::cout << "Response: " << response.integer();
}
|
V2 | V3 |
---|---|
asio::awaitable<void> server_streaming_rpc(example::v1::Example::Stub& stub)
{
grpc::ClientContext client_context;
client_context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
example::v1::Request request;
request.set_integer(42);
std::unique_ptr<grpc::ClientAsyncReader<example::v1::Response>> reader;
if (!co_await agrpc::request(&example::v1::Example::Stub::PrepareAsyncServerStreaming,
stub, client_context, request, reader))
{
grpc::Status status;
co_await agrpc::finish(reader, status);
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
example::v1::Response response;
{
std::cout << "Response: " << response.integer() << '\n';
}
grpc::Status status;
co_await agrpc::finish(reader, status);
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
}
| asio::awaitable<void> client_rpc_server_streaming(agrpc::GrpcContext& grpc_context,
example::v1::Example::Stub& stub)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
RPC rpc{grpc_context};
rpc.context().set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
RPC::Request request;
request.set_integer(42);
if (!co_await rpc.start(stub, request))
{
const grpc::Status status = co_await rpc.finish();
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
RPC::Response response;
while (co_await rpc.read(response))
{
std::cout << "Response: " << response.integer() << '\n';
}
const grpc::Status status = co_await rpc.finish();
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
}
|
V2 | V3 |
---|---|
asio::awaitable<void> bidirectional_streaming_rpc(example::v1::Example::Stub& stub)
{
grpc::ClientContext client_context;
client_context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
std::unique_ptr<
grpc::ClientAsyncReaderWriter<example::v1::Request, example::v1::Response>>
reader_writer;
if (!co_await agrpc::request(
&example::v1::Example::Stub::PrepareAsyncBidirectionalStreaming, stub,
client_context, reader_writer))
{
grpc::Status status;
co_await agrpc::finish(reader_writer, status);
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
example::v1::Request request;
bool write_ok{true};
example::v1::Response response;
{
request.set_integer(response.integer() + 1);
write_ok = co_await agrpc::write(reader_writer, request);
}
co_await agrpc::writes_done(reader_writer);
grpc::Status status;
co_await agrpc::finish(reader_writer, status);
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
}
| asio::awaitable<void> client_rpc_bidirectional_streaming(agrpc::GrpcContext& grpc_context,
example::v1::Example::Stub& stub)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<agrpc::ClientRPC<
&example::v1::Example::Stub::PrepareAsyncBidirectionalStreaming>>;
RPC rpc{grpc_context};
rpc.context().set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
if (!co_await rpc.start(stub))
{
const grpc::Status status = co_await rpc.finish();
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
RPC::Request request;
request.set_integer(42);
bool write_ok{true};
RPC::Response response;
while (co_await rpc.read(response) && write_ok)
{
request.set_integer(response.integer() + 1);
write_ok = co_await rpc.write(request);
}
const grpc::Status status = co_await rpc.finish();
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
}
|
Migration of server rpc types based on example.proto:
V2 | V3 |
---|---|
void unary_rpc(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
agrpc::repeatedly_request(
&example::v1::Example::AsyncService::RequestUnary, service,
asio::bind_executor(
grpc_context,
[](grpc::ServerContext& /*server_context*/, example::v1::Request& /*request*/,
grpc::ServerAsyncResponseWriter<example::v1::Response>& writer)
-> asio::awaitable<void>
{
example::v1::Response response;
co_await agrpc::finish(writer, response, grpc::Status::OK);
// Alternatively finish with an error.
co_await agrpc::finish_with_error(writer, grpc::Status::CANCELLED);
}));
}
| void server_rpc_unary(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
grpc_context, service,
[](RPC& rpc, RPC::Request& request) -> asio::awaitable<void>
{
RPC::Response response;
response.set_integer(request.integer());
co_await rpc.finish(response, grpc::Status::OK);
// Alternatively finish with an error:
co_await rpc.finish_with_error(grpc::Status::CANCELLED);
},
asio::detached);
}
|
V2 | V3 |
---|---|
void client_streaming_rpc(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
agrpc::repeatedly_request(
&example::v1::Example::AsyncService::RequestClientStreaming, service,
asio::bind_executor(
grpc_context,
[](grpc::ServerContext& /*server_context*/,
grpc::ServerAsyncReader<example::v1::Response, example::v1::Request>&
reader) -> asio::awaitable<void>
{
example::v1::Request request;
{
std::cout << "Request: " << request.integer() << std::endl;
}
example::v1::Response response;
response.set_integer(42);
co_await agrpc::finish(reader, response, grpc::Status::OK);
// Alternatively finish with an error.
co_await agrpc::finish_with_error(reader, grpc::Status::CANCELLED);
}));
}
| void server_rpc_client_streaming(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
grpc_context, service,
[](RPC& rpc) -> asio::awaitable<void>
{
RPC::Request request;
while (co_await rpc.read(request))
{
std::cout << "Request: " << request.integer() << std::endl;
}
RPC::Response response;
response.set_integer(42);
co_await rpc.finish(response, grpc::Status::OK);
// Alternatively finish with an error:
co_await rpc.finish_with_error(grpc::Status::CANCELLED);
},
asio::detached);
}
|
V2 | V3 |
---|---|
void server_streaming_rpc(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
agrpc::repeatedly_request(
&example::v1::Example::AsyncService::RequestServerStreaming, service,
asio::bind_executor(
grpc_context,
[](grpc::ServerContext& /*server_context*/, example::v1::Request& request,
grpc::ServerAsyncWriter<example::v1::Response>& writer)
-> asio::awaitable<void>
{
example::v1::Response response;
for (int i{}; i != request.integer(); ++i)
{
response.set_integer(i);
if (!co_await agrpc::write(writer, response))
{
co_return;
}
}
co_await agrpc::finish(writer, grpc::Status::OK);
}));
}
| void server_rpc_server_streaming(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
grpc_context, service,
[](RPC& rpc, RPC::Request& request) -> asio::awaitable<void>
{
RPC::Response response;
for (int i{}; i != request.integer(); ++i)
{
response.set_integer(i);
if (!co_await rpc.write(response))
{
co_return;
}
}
co_await rpc.finish(grpc::Status::OK);
},
asio::detached);
}
|
V2 | V3 |
---|---|
void bidirectional_streaming_rpc(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
agrpc::repeatedly_request(
&example::v1::Example::AsyncService::RequestBidirectionalStreaming, service,
asio::bind_executor(
grpc_context,
[](grpc::ServerContext& /*server_context*/,
grpc::ServerAsyncReaderWriter<example::v1::Response, example::v1::Request>&
reader_writer) -> asio::awaitable<void>
{
example::v1::Request request;
example::v1::Response response;
{
response.set_integer(request.integer());
if (!co_await agrpc::write(reader_writer, response))
{
co_return;
}
}
response.set_integer(42);
co_await agrpc::write_last(reader_writer, response, grpc::WriteOptions{});
co_await agrpc::finish(reader_writer, grpc::Status::OK);
}));
}
| void server_rpc_bidirectional_streaming(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<agrpc::ServerRPC<
&example::v1::Example::AsyncService::RequestBidirectionalStreaming>>;
grpc_context, service,
[](RPC& rpc) -> asio::awaitable<void>
{
RPC::Request request;
RPC::Response response;
while (co_await rpc.read(request))
{
response.set_integer(request.integer());
if (!co_await rpc.write(response))
{
co_return;
}
}
response.set_integer(42);
co_await rpc.write(response, grpc::WriteOptions{}.set_last_message());
co_await rpc.finish(grpc::Status::OK);
},
asio::detached);
}
|