TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/buffer_source.hpp>
19 : #include <boost/capy/concept/io_awaitable.hpp>
20 : #include <boost/capy/concept/read_source.hpp>
21 : #include <boost/capy/error.hpp>
22 : #include <boost/capy/ex/io_env.hpp>
23 : #include <boost/capy/io_result.hpp>
24 : #include <boost/capy/io_task.hpp>
25 :
26 : #include <concepts>
27 : #include <coroutine>
28 : #include <cstddef>
29 : #include <exception>
30 : #include <new>
31 : #include <span>
32 : #include <stop_token>
33 : #include <system_error>
34 : #include <utility>
35 :
36 : namespace boost {
37 : namespace capy {
38 :
39 : /** Type-erased wrapper for any BufferSource.
40 :
41 : This class provides type erasure for any type satisfying the
42 : @ref BufferSource concept, enabling runtime polymorphism for
43 : buffer pull operations. It uses cached awaitable storage to achieve
44 : zero steady-state allocation after construction.
45 :
46 : The wrapper also satisfies @ref ReadSource. When the wrapped type
47 : satisfies only @ref BufferSource, the read operations are
48 : synthesized using @ref pull and @ref consume with an extra
49 : buffer copy. When the wrapped type satisfies both @ref BufferSource
50 : and @ref ReadSource, the native read operations are forwarded
51 : directly across the virtual boundary, avoiding the copy.
52 :
53 : The wrapper supports two construction modes:
54 : - **Owning**: Pass by value to transfer ownership. The wrapper
55 : allocates storage and owns the source.
56 : - **Reference**: Pass a pointer to wrap without ownership. The
57 : pointed-to source must outlive this wrapper.
58 :
59 : Within each mode, the vtable is populated at compile time based
60 : on whether the wrapped type also satisfies @ref ReadSource:
61 : - **BufferSource only**: @ref read_some and @ref read are
62 : synthesized from @ref pull and @ref consume, incurring one
63 : buffer copy per operation.
64 : - **BufferSource + ReadSource**: All read operations are
65 : forwarded natively through the type-erased boundary with
66 : no extra copy.
67 :
68 : @par Awaitable Preallocation
69 : The constructor preallocates storage for the type-erased awaitable.
70 : This reserves all virtual address space at server startup
71 : so memory usage can be measured up front, rather than
72 : allocating piecemeal as traffic arrives.
73 :
74 : @par Thread Safety
75 : Not thread-safe. Concurrent operations on the same wrapper
76 : are undefined behavior.
77 :
78 : @par Example
79 : @code
80 : // Owning - takes ownership of the source
81 : any_buffer_source abs(some_buffer_source{args...});
82 :
83 : // Reference - wraps without ownership
84 : some_buffer_source src;
85 : any_buffer_source abs(&src);
86 :
87 : const_buffer arr[16];
88 : auto [ec, bufs] = co_await abs.pull(arr);
89 :
90 : // ReadSource interface also available
91 : char buf[64];
92 : auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
93 : @endcode
94 :
95 : @see any_buffer_sink, BufferSource, ReadSource
96 : */
97 : class any_buffer_source
98 : {
99 : struct vtable;
100 : struct awaitable_ops;
101 : struct read_awaitable_ops;
102 :
103 : template<BufferSource S>
104 : struct vtable_for_impl;
105 :
106 : // hot-path members first for cache locality
107 : void* source_ = nullptr;
108 : vtable const* vt_ = nullptr;
109 : void* cached_awaitable_ = nullptr;
110 : awaitable_ops const* active_ops_ = nullptr;
111 : read_awaitable_ops const* active_read_ops_ = nullptr;
112 : void* storage_ = nullptr;
113 :
114 : public:
115 : /** Destructor.
116 :
117 : Destroys the owned source (if any) and releases the cached
118 : awaitable storage.
119 : */
120 : ~any_buffer_source();
121 :
122 : /** Construct a default instance.
123 :
124 : Constructs an empty wrapper. Operations on a default-constructed
125 : wrapper result in undefined behavior.
126 : */
127 : any_buffer_source() = default;
128 :
129 : /** Non-copyable.
130 :
131 : The awaitable cache is per-instance and cannot be shared.
132 : */
133 : any_buffer_source(any_buffer_source const&) = delete;
134 : any_buffer_source& operator=(any_buffer_source const&) = delete;
135 :
136 : /** Construct by moving.
137 :
138 : Transfers ownership of the wrapped source (if owned) and
139 : cached awaitable storage from `other`. After the move, `other` is
140 : in a default-constructed state.
141 :
142 : @param other The wrapper to move from.
143 : */
144 HIT 2 : any_buffer_source(any_buffer_source&& other) noexcept
145 2 : : source_(std::exchange(other.source_, nullptr))
146 2 : , vt_(std::exchange(other.vt_, nullptr))
147 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
148 2 : , active_ops_(std::exchange(other.active_ops_, nullptr))
149 2 : , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
150 2 : , storage_(std::exchange(other.storage_, nullptr))
151 : {
152 2 : }
153 :
154 : /** Assign by moving.
155 :
156 : Destroys any owned source and releases existing resources,
157 : then transfers ownership from `other`.
158 :
159 : @param other The wrapper to move from.
160 : @return Reference to this wrapper.
161 : */
162 : any_buffer_source&
163 : operator=(any_buffer_source&& other) noexcept;
164 :
165 : /** Construct by taking ownership of a BufferSource.
166 :
167 : Allocates storage and moves the source into this wrapper.
168 : The wrapper owns the source and will destroy it. If `S` also
169 : satisfies @ref ReadSource, native read operations are
170 : forwarded through the virtual boundary.
171 :
172 : @param s The source to take ownership of.
173 : */
174 : template<BufferSource S>
175 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
176 : any_buffer_source(S s);
177 :
178 : /** Construct by wrapping a BufferSource without ownership.
179 :
180 : Wraps the given source by pointer. The source must remain
181 : valid for the lifetime of this wrapper. If `S` also
182 : satisfies @ref ReadSource, native read operations are
183 : forwarded through the virtual boundary.
184 :
185 : @param s Pointer to the source to wrap.
186 : */
187 : template<BufferSource S>
188 : any_buffer_source(S* s);
189 :
190 : /** Check if the wrapper contains a valid source.
191 :
192 : @return `true` if wrapping a source, `false` if default-constructed
193 : or moved-from.
194 : */
195 : bool
196 16 : has_value() const noexcept
197 : {
198 16 : return source_ != nullptr;
199 : }
200 :
201 : /** Check if the wrapper contains a valid source.
202 :
203 : @return `true` if wrapping a source, `false` if default-constructed
204 : or moved-from.
205 : */
206 : explicit
207 2 : operator bool() const noexcept
208 : {
209 2 : return has_value();
210 : }
211 :
212 : /** Consume bytes from the source.
213 :
214 : Advances the internal read position of the underlying source
215 : by the specified number of bytes. The next call to @ref pull
216 : returns data starting after the consumed bytes.
217 :
218 : @param n The number of bytes to consume. Must not exceed the
219 : total size of buffers returned by the previous @ref pull.
220 :
221 : @par Preconditions
222 : The wrapper must contain a valid source (`has_value() == true`).
223 : */
224 : void
225 : consume(std::size_t n) noexcept;
226 :
227 : /** Pull buffer data from the source.
228 :
229 : Fills the provided span with buffer descriptors from the
230 : underlying source. The operation completes when data is
231 : available, the source is exhausted, or an error occurs.
232 :
233 : @param dest Span of const_buffer to fill.
234 :
235 : @return An awaitable that await-returns `(error_code,std::span<const_buffer>)`.
236 : On success with data, a non-empty span of filled buffers.
237 : On EOF, `ec == cond::eof` and span is empty.
238 :
239 : @par Preconditions
240 : The wrapper must contain a valid source (`has_value() == true`).
241 : The caller must not call this function again after a prior
242 : call returned an error.
243 : */
244 : auto
245 : pull(std::span<const_buffer> dest);
246 :
247 : /** Read some data into a mutable buffer sequence.
248 :
249 : Attempt to read up to `buffer_size( buffers )` bytes into
250 : the caller's buffers. May fill less than the full sequence.
251 :
252 : When the wrapped type provides native @ref ReadSource support,
253 : the operation forwards directly. Otherwise it is synthesized
254 : from @ref pull, @ref buffer_copy, and @ref consume.
255 :
256 : @param buffers The buffer sequence to fill.
257 :
258 : @return An awaitable that await-returns `(error_code,std::size_t)`.
259 :
260 : @par Preconditions
261 : The wrapper must contain a valid source (`has_value() == true`).
262 : The caller must not call this function again after a prior
263 : call returned an error (including EOF).
264 :
265 : @see pull, consume
266 : */
267 : template<MutableBufferSequence MB>
268 : io_task<std::size_t>
269 : read_some(MB buffers);
270 :
271 : /** Read data into a mutable buffer sequence.
272 :
273 : Fills the provided buffer sequence completely. When the
274 : wrapped type provides native @ref ReadSource support, each
275 : window is forwarded directly. Otherwise the data is
276 : synthesized from @ref pull, @ref buffer_copy, and @ref consume.
277 :
278 : @param buffers The buffer sequence to fill.
279 :
280 : @return An awaitable that await-returns `(error_code,std::size_t)`.
281 : On success, `n == buffer_size(buffers)`.
282 : On EOF, `ec == error::eof` and `n` is bytes transferred.
283 :
284 : @par Preconditions
285 : The wrapper must contain a valid source (`has_value() == true`).
286 : The caller must not call this function again after a prior
287 : call returned an error (including EOF).
288 :
289 : @see pull, consume
290 : */
291 : template<MutableBufferSequence MB>
292 : io_task<std::size_t>
293 : read(MB buffers);
294 :
295 : protected:
296 : /** Rebind to a new source after move.
297 :
298 : Updates the internal pointer to reference a new source object.
299 : Used by owning wrappers after move assignment when the owned
300 : object has moved to a new location.
301 :
302 : @param new_source The new source to bind to. Must be the same
303 : type as the original source.
304 :
305 : @note Terminates if called with a source of different type
306 : than the original.
307 : */
308 : template<BufferSource S>
309 : void
310 : rebind(S& new_source) noexcept
311 : {
312 : if(vt_ != &vtable_for_impl<S>::value)
313 : std::terminate();
314 : source_ = &new_source;
315 : }
316 :
317 : private:
318 : /** Forward a partial read through the vtable.
319 :
320 : Constructs the underlying `read_some` awaitable in
321 : cached storage and returns a type-erased awaitable.
322 : */
323 : auto
324 : read_some_(std::span<mutable_buffer const> buffers);
325 :
326 : /** Forward a complete read through the vtable.
327 :
328 : Constructs the underlying `read` awaitable in
329 : cached storage and returns a type-erased awaitable.
330 : */
331 : auto
332 : read_(std::span<mutable_buffer const> buffers);
333 : };
334 :
335 : /** Type-erased ops for awaitables that await-return `io_result<std::span<const_buffer>>`. */
336 : struct any_buffer_source::awaitable_ops
337 : {
338 : bool (*await_ready)(void*);
339 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
340 : io_result<std::span<const_buffer>> (*await_resume)(void*);
341 : void (*destroy)(void*) noexcept;
342 : };
343 :
344 : /** Type-erased ops for awaitables that await-return `io_result<std::size_t>`. */
345 : struct any_buffer_source::read_awaitable_ops
346 : {
347 : bool (*await_ready)(void*);
348 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
349 : io_result<std::size_t> (*await_resume)(void*);
350 : void (*destroy)(void*) noexcept;
351 : };
352 :
353 : struct any_buffer_source::vtable
354 : {
355 : // BufferSource ops (always populated)
356 : void (*destroy)(void*) noexcept;
357 : void (*do_consume)(void* source, std::size_t n) noexcept;
358 : std::size_t awaitable_size;
359 : std::size_t awaitable_align;
360 : awaitable_ops const* (*construct_awaitable)(
361 : void* source,
362 : void* storage,
363 : std::span<const_buffer> dest);
364 :
365 : // ReadSource forwarding (null when wrapped type is BufferSource-only)
366 : read_awaitable_ops const* (*construct_read_some_awaitable)(
367 : void* source,
368 : void* storage,
369 : std::span<mutable_buffer const> buffers);
370 : read_awaitable_ops const* (*construct_read_awaitable)(
371 : void* source,
372 : void* storage,
373 : std::span<mutable_buffer const> buffers);
374 : };
375 :
376 : template<BufferSource S>
377 : struct any_buffer_source::vtable_for_impl
378 : {
379 : using PullAwaitable = decltype(std::declval<S&>().pull(
380 : std::declval<std::span<const_buffer>>()));
381 :
382 : static void
383 7 : do_destroy_impl(void* source) noexcept
384 : {
385 7 : static_cast<S*>(source)->~S();
386 7 : }
387 :
388 : static void
389 45 : do_consume_impl(void* source, std::size_t n) noexcept
390 : {
391 45 : static_cast<S*>(source)->consume(n);
392 45 : }
393 :
394 : static awaitable_ops const*
395 110 : construct_awaitable_impl(
396 : void* source,
397 : void* storage,
398 : std::span<const_buffer> dest)
399 : {
400 110 : auto& s = *static_cast<S*>(source);
401 110 : ::new(storage) PullAwaitable(s.pull(dest));
402 :
403 : static constexpr awaitable_ops ops = {
404 110 : +[](void* p) {
405 110 : return static_cast<PullAwaitable*>(p)->await_ready();
406 : },
407 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
408 0 : return detail::call_await_suspend(
409 0 : static_cast<PullAwaitable*>(p), h, env);
410 : },
411 HIT 110 : +[](void* p) {
412 110 : return static_cast<PullAwaitable*>(p)->await_resume();
413 : },
414 110 : +[](void* p) noexcept {
415 110 : static_cast<PullAwaitable*>(p)->~PullAwaitable();
416 : }
417 : };
418 110 : return &ops;
419 : }
420 :
421 : static read_awaitable_ops const*
422 48 : construct_read_some_awaitable_impl(
423 : void* source,
424 : void* storage,
425 : std::span<mutable_buffer const> buffers)
426 : requires ReadSource<S>
427 : {
428 : using Aw = decltype(std::declval<S&>().read_some(
429 : std::span<mutable_buffer const>{}));
430 48 : auto& s = *static_cast<S*>(source);
431 48 : ::new(storage) Aw(s.read_some(buffers));
432 :
433 : static constexpr read_awaitable_ops ops = {
434 48 : +[](void* p) {
435 48 : return static_cast<Aw*>(p)->await_ready();
436 : },
437 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
438 0 : return detail::call_await_suspend(
439 0 : static_cast<Aw*>(p), h, env);
440 : },
441 HIT 48 : +[](void* p) {
442 48 : return static_cast<Aw*>(p)->await_resume();
443 : },
444 48 : +[](void* p) noexcept {
445 48 : static_cast<Aw*>(p)->~Aw();
446 : }
447 : };
448 48 : return &ops;
449 : }
450 :
451 : static read_awaitable_ops const*
452 18 : construct_read_awaitable_impl(
453 : void* source,
454 : void* storage,
455 : std::span<mutable_buffer const> buffers)
456 : requires ReadSource<S>
457 : {
458 : using Aw = decltype(std::declval<S&>().read(
459 : std::span<mutable_buffer const>{}));
460 18 : auto& s = *static_cast<S*>(source);
461 18 : ::new(storage) Aw(s.read(buffers));
462 :
463 : static constexpr read_awaitable_ops ops = {
464 18 : +[](void* p) {
465 18 : return static_cast<Aw*>(p)->await_ready();
466 : },
467 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
468 0 : return detail::call_await_suspend(
469 0 : static_cast<Aw*>(p), h, env);
470 : },
471 HIT 18 : +[](void* p) {
472 18 : return static_cast<Aw*>(p)->await_resume();
473 : },
474 18 : +[](void* p) noexcept {
475 18 : static_cast<Aw*>(p)->~Aw();
476 : }
477 : };
478 18 : return &ops;
479 : }
480 :
481 : static consteval std::size_t
482 : compute_max_size() noexcept
483 : {
484 : std::size_t s = sizeof(PullAwaitable);
485 : if constexpr (ReadSource<S>)
486 : {
487 : using RS = decltype(std::declval<S&>().read_some(
488 : std::span<mutable_buffer const>{}));
489 : using R = decltype(std::declval<S&>().read(
490 : std::span<mutable_buffer const>{}));
491 :
492 : if(sizeof(RS) > s) s = sizeof(RS);
493 : if(sizeof(R) > s) s = sizeof(R);
494 : }
495 : return s;
496 : }
497 :
498 : static consteval std::size_t
499 : compute_max_align() noexcept
500 : {
501 : std::size_t a = alignof(PullAwaitable);
502 : if constexpr (ReadSource<S>)
503 : {
504 : using RS = decltype(std::declval<S&>().read_some(
505 : std::span<mutable_buffer const>{}));
506 : using R = decltype(std::declval<S&>().read(
507 : std::span<mutable_buffer const>{}));
508 :
509 : if(alignof(RS) > a) a = alignof(RS);
510 : if(alignof(R) > a) a = alignof(R);
511 : }
512 : return a;
513 : }
514 :
515 : static consteval vtable
516 : make_vtable() noexcept
517 : {
518 : vtable v{};
519 : v.destroy = &do_destroy_impl;
520 : v.do_consume = &do_consume_impl;
521 : v.awaitable_size = compute_max_size();
522 : v.awaitable_align = compute_max_align();
523 : v.construct_awaitable = &construct_awaitable_impl;
524 : v.construct_read_some_awaitable = nullptr;
525 : v.construct_read_awaitable = nullptr;
526 :
527 : if constexpr (ReadSource<S>)
528 : {
529 : v.construct_read_some_awaitable =
530 : &construct_read_some_awaitable_impl;
531 : v.construct_read_awaitable =
532 : &construct_read_awaitable_impl;
533 : }
534 : return v;
535 : }
536 :
537 : static constexpr vtable value = make_vtable();
538 : };
539 :
540 : inline
541 124 : any_buffer_source::~any_buffer_source()
542 : {
543 124 : if(storage_)
544 : {
545 7 : vt_->destroy(source_);
546 7 : ::operator delete(storage_);
547 : }
548 124 : if(cached_awaitable_)
549 119 : ::operator delete(cached_awaitable_);
550 124 : }
551 :
552 : inline any_buffer_source&
553 2 : any_buffer_source::operator=(any_buffer_source&& other) noexcept
554 : {
555 2 : if(this != &other)
556 : {
557 2 : if(storage_)
558 : {
559 MIS 0 : vt_->destroy(source_);
560 0 : ::operator delete(storage_);
561 : }
562 HIT 2 : if(cached_awaitable_)
563 MIS 0 : ::operator delete(cached_awaitable_);
564 HIT 2 : source_ = std::exchange(other.source_, nullptr);
565 2 : vt_ = std::exchange(other.vt_, nullptr);
566 2 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
567 2 : storage_ = std::exchange(other.storage_, nullptr);
568 2 : active_ops_ = std::exchange(other.active_ops_, nullptr);
569 2 : active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
570 : }
571 2 : return *this;
572 : }
573 :
574 : template<BufferSource S>
575 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
576 7 : any_buffer_source::any_buffer_source(S s)
577 7 : : vt_(&vtable_for_impl<S>::value)
578 : {
579 : struct guard {
580 : any_buffer_source* self;
581 : bool committed = false;
582 7 : ~guard() {
583 7 : if(!committed && self->storage_) {
584 MIS 0 : self->vt_->destroy(self->source_);
585 0 : ::operator delete(self->storage_);
586 0 : self->storage_ = nullptr;
587 0 : self->source_ = nullptr;
588 : }
589 HIT 7 : }
590 7 : } g{this};
591 :
592 7 : storage_ = ::operator new(sizeof(S));
593 7 : source_ = ::new(storage_) S(std::move(s));
594 :
595 7 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
596 :
597 7 : g.committed = true;
598 7 : }
599 :
600 : template<BufferSource S>
601 112 : any_buffer_source::any_buffer_source(S* s)
602 112 : : source_(s)
603 112 : , vt_(&vtable_for_impl<S>::value)
604 : {
605 112 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
606 112 : }
607 :
608 : inline void
609 45 : any_buffer_source::consume(std::size_t n) noexcept
610 : {
611 45 : vt_->do_consume(source_, n);
612 45 : }
613 :
614 : inline auto
615 110 : any_buffer_source::pull(std::span<const_buffer> dest)
616 : {
617 : struct awaitable
618 : {
619 : any_buffer_source* self_;
620 : std::span<const_buffer> dest_;
621 :
622 : bool
623 110 : await_ready()
624 : {
625 220 : self_->active_ops_ = self_->vt_->construct_awaitable(
626 110 : self_->source_,
627 110 : self_->cached_awaitable_,
628 : dest_);
629 110 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
630 : }
631 :
632 : std::coroutine_handle<>
633 MIS 0 : await_suspend(std::coroutine_handle<> h, io_env const* env)
634 : {
635 0 : return self_->active_ops_->await_suspend(
636 0 : self_->cached_awaitable_, h, env);
637 : }
638 :
639 : io_result<std::span<const_buffer>>
640 HIT 110 : await_resume()
641 : {
642 : struct guard {
643 : any_buffer_source* self;
644 110 : ~guard() {
645 110 : self->active_ops_->destroy(self->cached_awaitable_);
646 110 : self->active_ops_ = nullptr;
647 110 : }
648 110 : } g{self_};
649 110 : return self_->active_ops_->await_resume(
650 195 : self_->cached_awaitable_);
651 110 : }
652 : };
653 110 : return awaitable{this, dest};
654 : }
655 :
656 : inline auto
657 48 : any_buffer_source::read_some_(
658 : std::span<mutable_buffer const> buffers)
659 : {
660 : struct awaitable
661 : {
662 : any_buffer_source* self_;
663 : std::span<mutable_buffer const> buffers_;
664 :
665 : bool
666 48 : await_ready() const noexcept
667 : {
668 48 : return false;
669 : }
670 :
671 : std::coroutine_handle<>
672 48 : await_suspend(std::coroutine_handle<> h, io_env const* env)
673 : {
674 96 : self_->active_read_ops_ =
675 96 : self_->vt_->construct_read_some_awaitable(
676 48 : self_->source_,
677 48 : self_->cached_awaitable_,
678 : buffers_);
679 :
680 48 : if(self_->active_read_ops_->await_ready(
681 48 : self_->cached_awaitable_))
682 48 : return h;
683 :
684 MIS 0 : return self_->active_read_ops_->await_suspend(
685 0 : self_->cached_awaitable_, h, env);
686 : }
687 :
688 : io_result<std::size_t>
689 HIT 48 : await_resume()
690 : {
691 : struct guard {
692 : any_buffer_source* self;
693 48 : ~guard() {
694 48 : self->active_read_ops_->destroy(
695 48 : self->cached_awaitable_);
696 48 : self->active_read_ops_ = nullptr;
697 48 : }
698 48 : } g{self_};
699 48 : return self_->active_read_ops_->await_resume(
700 88 : self_->cached_awaitable_);
701 48 : }
702 : };
703 48 : return awaitable{this, buffers};
704 : }
705 :
706 : inline auto
707 18 : any_buffer_source::read_(
708 : std::span<mutable_buffer const> buffers)
709 : {
710 : struct awaitable
711 : {
712 : any_buffer_source* self_;
713 : std::span<mutable_buffer const> buffers_;
714 :
715 : bool
716 18 : await_ready() const noexcept
717 : {
718 18 : return false;
719 : }
720 :
721 : std::coroutine_handle<>
722 18 : await_suspend(std::coroutine_handle<> h, io_env const* env)
723 : {
724 36 : self_->active_read_ops_ =
725 36 : self_->vt_->construct_read_awaitable(
726 18 : self_->source_,
727 18 : self_->cached_awaitable_,
728 : buffers_);
729 :
730 18 : if(self_->active_read_ops_->await_ready(
731 18 : self_->cached_awaitable_))
732 18 : return h;
733 :
734 MIS 0 : return self_->active_read_ops_->await_suspend(
735 0 : self_->cached_awaitable_, h, env);
736 : }
737 :
738 : io_result<std::size_t>
739 HIT 18 : await_resume()
740 : {
741 : struct guard {
742 : any_buffer_source* self;
743 18 : ~guard() {
744 18 : self->active_read_ops_->destroy(
745 18 : self->cached_awaitable_);
746 18 : self->active_read_ops_ = nullptr;
747 18 : }
748 18 : } g{self_};
749 18 : return self_->active_read_ops_->await_resume(
750 30 : self_->cached_awaitable_);
751 18 : }
752 : };
753 18 : return awaitable{this, buffers};
754 : }
755 :
756 : template<MutableBufferSequence MB>
757 : io_task<std::size_t>
758 58 : any_buffer_source::read_some(MB buffers)
759 : {
760 : buffer_param<MB> bp(buffers);
761 : auto dest = bp.data();
762 : if(dest.empty())
763 : co_return {{}, 0};
764 :
765 : // Native ReadSource path
766 : if(vt_->construct_read_some_awaitable)
767 : co_return co_await read_some_(dest);
768 :
769 : // Synthesized path: pull + buffer_copy + consume
770 : const_buffer arr[detail::max_iovec_];
771 : auto [ec, bufs] = co_await pull(arr);
772 : if(ec)
773 : co_return {ec, 0};
774 :
775 : auto n = buffer_copy(dest, bufs);
776 : consume(n);
777 : co_return {{}, n};
778 116 : }
779 :
780 : template<MutableBufferSequence MB>
781 : io_task<std::size_t>
782 24 : any_buffer_source::read(MB buffers)
783 : {
784 : buffer_param<MB> bp(buffers);
785 : std::size_t total = 0;
786 :
787 : // Native ReadSource path
788 : if(vt_->construct_read_awaitable)
789 : {
790 : for(;;)
791 : {
792 : auto dest = bp.data();
793 : if(dest.empty())
794 : break;
795 :
796 : auto [ec, n] = co_await read_(dest);
797 : total += n;
798 : if(ec)
799 : co_return {ec, total};
800 : bp.consume(n);
801 : }
802 : co_return {{}, total};
803 : }
804 :
805 : // Synthesized path: pull + buffer_copy + consume
806 : for(;;)
807 : {
808 : auto dest = bp.data();
809 : if(dest.empty())
810 : break;
811 :
812 : const_buffer arr[detail::max_iovec_];
813 : auto [ec, bufs] = co_await pull(arr);
814 :
815 : if(ec)
816 : co_return {ec, total};
817 :
818 : auto n = buffer_copy(dest, bufs);
819 : consume(n);
820 : total += n;
821 : bp.consume(n);
822 : }
823 :
824 : co_return {{}, total};
825 48 : }
826 :
827 : static_assert(BufferSource<any_buffer_source>);
828 : static_assert(ReadSource<any_buffer_source>);
829 :
830 : } // namespace capy
831 : } // namespace boost
832 :
833 : #endif
|