asio-grpc v3.2.0
Asynchronous gRPC with Asio/unified executors
V3 migration guide

The following headers have been removed:

bind_allocator.hpp, cancel_safe.hpp, default_completion_token.hpp, get_completion_queue.hpp, grpc_initiate.hpp, grpc_stream.hpp, notify_when_done.hpp, repeatedly_request_context.hpp, repeatedly_request.hpp, rpc.hpp, use_awaitable.hpp, wait.hpp

CMake

asio-grpc targets now link with gRPC::grpc++ instead of gRPC::grpc++_unsecure. To restore the old behavior use:

set(ASIO_GRPC_DISABLE_AUTOLINK on)
find_package(asio-grpc)
find_package(gRPC)
target_link_libraries(your_app PUBLIC asio-grpc::asio-grpc gRPC::grpc++_unsecure)

asio-grpcConfig.cmake no longer finds and links backend libraries (like Boost.Asio and unifex) to their respective asio-grpc::asio-grpc targets. Example on how to restore the old behavior for the Boost.Asio backend:

find_package(asio-grpc)
# New in v3:
find_package(Boost)
target_link_libraries(your_app PUBLIC
asio-grpc::asio-grpc
# New in v3:
Boost::headers
)

Alarms

The free function agrpc::wait has been replaced with a new I/O-object like class called agrpc::Alarm:

V2 V3
asio::awaitable<void> agrpc_wait()
{
grpc::Alarm alarm;
// implicitly uses GrpcContext stored in asio::awaitable:
bool wait_ok = co_await agrpc::wait(
alarm, std::chrono::system_clock::now() + std::chrono::seconds(1));
(void)wait_ok;
}
asio::awaitable<void> agrpc_alarm(agrpc::GrpcContext& grpc_context)
{
// used GrpcContext is explicit:
agrpc::Alarm alarm{grpc_context};
bool wait_ok = co_await alarm.wait(
std::chrono::system_clock::now() + std::chrono::seconds(1), asio::use_awaitable);
(void)wait_ok;
}
I/O object for grpc::Alarm
Definition: alarm.hpp:47
Execution context based on grpc::CompletionQueue
Definition: grpc_context.hpp:53

Client RPCs

Migration of client rpc types based on example.proto:

Unary

V2 V3
asio::awaitable<void> unary_rpc(agrpc::GrpcContext& grpc_context,
example::v1::Example::Stub& stub)
{
grpc::ClientContext client_context;
client_context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
example::v1::Request request;
std::unique_ptr<grpc::ClientAsyncResponseReader<example::v1::Response>> reader =
agrpc::request(&example::v1::Example::Stub::AsyncUnary, stub, client_context,
request, grpc_context);
example::v1::Response response;
grpc::Status status;
co_await agrpc::finish(reader, response, status);
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
std::cout << "Response: " << response.integer();
}
asio::awaitable<void> client_rpc_unary(agrpc::GrpcContext& grpc_context,
example::v1::Example::Stub& stub)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
grpc::ClientContext client_context;
client_context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
RPC::Request request;
RPC::Response response;
const grpc::Status status =
co_await RPC::request(grpc_context, stub, client_context, request, response);
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
std::cout << "Response: " << response.integer();
}
Primary ClientRPC template.
Definition: forward.hpp:57

Client-streaming

V2 V3
asio::awaitable<void> client_streaming_rpc(example::v1::Example::Stub& stub)
{
grpc::ClientContext client_context;
client_context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
example::v1::Response response;
std::unique_ptr<grpc::ClientAsyncWriter<example::v1::Request>> writer;
if (!co_await agrpc::request(&example::v1::Example::Stub::PrepareAsyncClientStreaming,
stub, client_context, writer, response))
{
grpc::Status status;
co_await agrpc::finish(writer, status);
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
example::v1::Request request;
request.set_integer(1);
while (co_await agrpc::write(writer, request) && request.integer() < 42)
{
request.set_integer(request.integer() + 1);
}
co_await agrpc::writes_done(writer);
grpc::Status status;
co_await agrpc::finish(writer, status);
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
std::cout << "Response: " << response.integer();
}
asio::awaitable<void> client_rpc_client_streaming(agrpc::GrpcContext& grpc_context,
example::v1::Example::Stub& stub)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
RPC rpc{grpc_context};
rpc.context().set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
RPC::Response response;
if (!co_await rpc.start(stub, response))
{
const grpc::Status status = co_await rpc.finish();
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
RPC::Request request;
request.set_integer(1);
while (co_await rpc.write(request) && request.integer() < 42)
{
request.set_integer(request.integer() + 1);
}
const grpc::Status status = co_await rpc.finish();
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
std::cout << "Response: " << response.integer();
}

Server-streaming

V2 V3
asio::awaitable<void> server_streaming_rpc(example::v1::Example::Stub& stub)
{
grpc::ClientContext client_context;
client_context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
example::v1::Request request;
request.set_integer(42);
std::unique_ptr<grpc::ClientAsyncReader<example::v1::Response>> reader;
if (!co_await agrpc::request(&example::v1::Example::Stub::PrepareAsyncServerStreaming,
stub, client_context, request, reader))
{
grpc::Status status;
co_await agrpc::finish(reader, status);
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
example::v1::Response response;
while (co_await agrpc::read(reader, response))
{
std::cout << "Response: " << response.integer() << '\n';
}
grpc::Status status;
co_await agrpc::finish(reader, status);
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
}
constexpr detail::ReadFn read
Read from a streaming RPC.
Definition: read.hpp:74
asio::awaitable<void> client_rpc_server_streaming(agrpc::GrpcContext& grpc_context,
example::v1::Example::Stub& stub)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
RPC rpc{grpc_context};
rpc.context().set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
RPC::Request request;
request.set_integer(42);
if (!co_await rpc.start(stub, request))
{
const grpc::Status status = co_await rpc.finish();
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
RPC::Response response;
while (co_await rpc.read(response))
{
std::cout << "Response: " << response.integer() << '\n';
}
const grpc::Status status = co_await rpc.finish();
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
}

Bidirectional-streaming

V2 V3
asio::awaitable<void> bidirectional_streaming_rpc(example::v1::Example::Stub& stub)
{
grpc::ClientContext client_context;
client_context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
std::unique_ptr<
grpc::ClientAsyncReaderWriter<example::v1::Request, example::v1::Response>>
reader_writer;
if (!co_await agrpc::request(
&example::v1::Example::Stub::PrepareAsyncBidirectionalStreaming, stub,
client_context, reader_writer))
{
grpc::Status status;
co_await agrpc::finish(reader_writer, status);
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
example::v1::Request request;
bool write_ok{true};
example::v1::Response response;
while (co_await agrpc::read(reader_writer, response) && write_ok)
{
request.set_integer(response.integer() + 1);
write_ok = co_await agrpc::write(reader_writer, request);
}
co_await agrpc::writes_done(reader_writer);
grpc::Status status;
co_await agrpc::finish(reader_writer, status);
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
}
asio::awaitable<void> client_rpc_bidirectional_streaming(agrpc::GrpcContext& grpc_context,
example::v1::Example::Stub& stub)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<agrpc::ClientRPC<
&example::v1::Example::Stub::PrepareAsyncBidirectionalStreaming>>;
RPC rpc{grpc_context};
rpc.context().set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(5));
if (!co_await rpc.start(stub))
{
const grpc::Status status = co_await rpc.finish();
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
RPC::Request request;
request.set_integer(42);
bool write_ok{true};
RPC::Response response;
while (co_await rpc.read(response) && write_ok)
{
request.set_integer(response.integer() + 1);
write_ok = co_await rpc.write(request);
}
const grpc::Status status = co_await rpc.finish();
if (!status.ok())
{
std::cerr << "Rpc failed: " << status.error_message();
co_return;
}
}

Server RPCs

Migration of server rpc types based on example.proto:

Unary

V2 V3
void unary_rpc(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
agrpc::repeatedly_request(
&example::v1::Example::AsyncService::RequestUnary, service,
asio::bind_executor(
grpc_context,
[](grpc::ServerContext& /*server_context*/, example::v1::Request& /*request*/,
grpc::ServerAsyncResponseWriter<example::v1::Response>& writer)
-> asio::awaitable<void>
{
example::v1::Response response;
co_await agrpc::finish(writer, response, grpc::Status::OK);
// Alternatively finish with an error.
co_await agrpc::finish_with_error(writer, grpc::Status::CANCELLED);
}));
}
void server_rpc_unary(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
agrpc::register_awaitable_rpc_handler<RPC>(
grpc_context, service,
[](RPC& rpc, RPC::Request& request) -> asio::awaitable<void>
{
RPC::Response response;
response.set_integer(request.integer());
co_await rpc.finish(response, grpc::Status::OK);
// Alternatively finish with an error:
co_await rpc.finish_with_error(grpc::Status::CANCELLED);
},
asio::detached);
}
Primary ServerRPC template.
Definition: forward.hpp:76

Client-streaming

V2 V3
void client_streaming_rpc(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
agrpc::repeatedly_request(
&example::v1::Example::AsyncService::RequestClientStreaming, service,
asio::bind_executor(
grpc_context,
[](grpc::ServerContext& /*server_context*/,
grpc::ServerAsyncReader<example::v1::Response, example::v1::Request>&
reader) -> asio::awaitable<void>
{
example::v1::Request request;
while (co_await agrpc::read(reader, request))
{
std::cout << "Request: " << request.integer() << std::endl;
}
example::v1::Response response;
response.set_integer(42);
co_await agrpc::finish(reader, response, grpc::Status::OK);
// Alternatively finish with an error.
co_await agrpc::finish_with_error(reader, grpc::Status::CANCELLED);
}));
}
void server_rpc_client_streaming(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
agrpc::register_awaitable_rpc_handler<RPC>(
grpc_context, service,
[](RPC& rpc) -> asio::awaitable<void>
{
RPC::Request request;
while (co_await rpc.read(request))
{
std::cout << "Request: " << request.integer() << std::endl;
}
RPC::Response response;
response.set_integer(42);
co_await rpc.finish(response, grpc::Status::OK);
// Alternatively finish with an error:
co_await rpc.finish_with_error(grpc::Status::CANCELLED);
},
asio::detached);
}

Server-streaming

V2 V3
void server_streaming_rpc(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
agrpc::repeatedly_request(
&example::v1::Example::AsyncService::RequestServerStreaming, service,
asio::bind_executor(
grpc_context,
[](grpc::ServerContext& /*server_context*/, example::v1::Request& request,
grpc::ServerAsyncWriter<example::v1::Response>& writer)
-> asio::awaitable<void>
{
example::v1::Response response;
for (int i{}; i != request.integer(); ++i)
{
response.set_integer(i);
if (!co_await agrpc::write(writer, response))
{
co_return;
}
}
co_await agrpc::finish(writer, grpc::Status::OK);
}));
}
void server_rpc_server_streaming(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<
agrpc::register_awaitable_rpc_handler<RPC>(
grpc_context, service,
[](RPC& rpc, RPC::Request& request) -> asio::awaitable<void>
{
RPC::Response response;
for (int i{}; i != request.integer(); ++i)
{
response.set_integer(i);
if (!co_await rpc.write(response))
{
co_return;
}
}
co_await rpc.finish(grpc::Status::OK);
},
asio::detached);
}

Bidirectional-streaming

V2 V3
void bidirectional_streaming_rpc(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
agrpc::repeatedly_request(
&example::v1::Example::AsyncService::RequestBidirectionalStreaming, service,
asio::bind_executor(
grpc_context,
[](grpc::ServerContext& /*server_context*/,
grpc::ServerAsyncReaderWriter<example::v1::Response, example::v1::Request>&
reader_writer) -> asio::awaitable<void>
{
example::v1::Request request;
example::v1::Response response;
while (co_await agrpc::read(reader_writer, request))
{
response.set_integer(request.integer());
if (!co_await agrpc::write(reader_writer, response))
{
co_return;
}
}
response.set_integer(42);
co_await agrpc::write_last(reader_writer, response, grpc::WriteOptions{});
co_await agrpc::finish(reader_writer, grpc::Status::OK);
}));
}
void server_rpc_bidirectional_streaming(agrpc::GrpcContext& grpc_context,
example::v1::Example::AsyncService& service)
{
using RPC = asio::use_awaitable_t<>::as_default_on_t<agrpc::ServerRPC<
&example::v1::Example::AsyncService::RequestBidirectionalStreaming>>;
agrpc::register_awaitable_rpc_handler<RPC>(
grpc_context, service,
[](RPC& rpc) -> asio::awaitable<void>
{
RPC::Request request;
RPC::Response response;
while (co_await rpc.read(request))
{
response.set_integer(request.integer());
if (!co_await rpc.write(response))
{
co_return;
}
}
response.set_integer(42);
co_await rpc.write(response, grpc::WriteOptions{}.set_last_message());
co_await rpc.finish(grpc::Status::OK);
},
asio::detached);
}