Branch data Line data Source code
1 : : /* GIO - GLib Input, Output and Streaming Library
2 : : *
3 : : * Copyright © 2008 codethink
4 : : * Copyright © 2009 Red Hat, Inc.
5 : : *
6 : : * SPDX-License-Identifier: LGPL-2.1-or-later
7 : : *
8 : : * This library is free software; you can redistribute it and/or
9 : : * modify it under the terms of the GNU Lesser General Public
10 : : * License as published by the Free Software Foundation; either
11 : : * version 2.1 of the License, or (at your option) any later version.
12 : : *
13 : : * This library is distributed in the hope that it will be useful,
14 : : * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 : : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 : : * Lesser General Public License for more details.
17 : : *
18 : : * You should have received a copy of the GNU Lesser General
19 : : * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
20 : : *
21 : : * Authors: Ryan Lortie <desrt@desrt.ca>
22 : : * Alexander Larsson <alexl@redhat.com>
23 : : */
24 : :
25 : : #include "config.h"
26 : : #include <glib.h>
27 : : #include "glibintl.h"
28 : :
29 : : #include "giostream.h"
30 : : #include "gasyncresult.h"
31 : : #include "gioprivate.h"
32 : : #include "gtask.h"
33 : :
34 : : /**
35 : : * GIOStream:
36 : : *
37 : : * `GIOStream` represents an object that has both read and write streams.
38 : : * Generally the two streams act as separate input and output streams,
39 : : * but they share some common resources and state. For instance, for
40 : : * seekable streams, both streams may use the same position.
41 : : *
42 : : * Examples of `GIOStream` objects are [class@Gio.SocketConnection], which represents
43 : : * a two-way network connection; and [class@Gio.FileIOStream], which represents a
44 : : * file handle opened in read-write mode.
45 : : *
46 : : * To do the actual reading and writing you need to get the substreams
47 : : * with [method@Gio.IOStream.get_input_stream] and
48 : : * [method@Gio.IOStream.get_output_stream].
49 : : *
50 : : * The `GIOStream` object owns the input and the output streams, not the other
51 : : * way around, so keeping the substreams alive will not keep the `GIOStream`
52 : : * object alive. If the `GIOStream` object is freed it will be closed, thus
53 : : * closing the substreams, so even if the substreams stay alive they will
54 : : * always return `G_IO_ERROR_CLOSED` for all operations.
55 : : *
56 : : * To close a stream use [method@Gio.IOStream.close] which will close the common
57 : : * stream object and also the individual substreams. You can also close
58 : : * the substreams themselves. In most cases this only marks the
59 : : * substream as closed, so further I/O on it fails but common state in the
60 : : * `GIOStream` may still be open. However, some streams may support
61 : : * ‘half-closed’ states where one direction of the stream is actually shut down.
62 : : *
63 : : * Operations on `GIOStream`s cannot be started while another operation on the
64 : : * `GIOStream` or its substreams is in progress. Specifically, an application can
65 : : * read from the [class@Gio.InputStream] and write to the
66 : : * [class@Gio.OutputStream] simultaneously (either in separate threads, or as
67 : : * asynchronous operations in the same thread), but an application cannot start
68 : : * any `GIOStream` operation while there is a `GIOStream`, `GInputStream` or
69 : : * `GOutputStream` operation in progress, and an application can’t start any
70 : : * `GInputStream` or `GOutputStream` operation while there is a `GIOStream`
71 : : * operation in progress.
72 : : *
73 : : * This is a product of individual stream operations being associated with a
74 : : * given [type@GLib.MainContext] (the thread-default context at the time the
75 : : * operation was started), rather than entire streams being associated with a
76 : : * single `GMainContext`.
77 : : *
78 : : * GIO may run operations on `GIOStream`s from other (worker) threads, and this
79 : : * may be exposed to application code in the behaviour of wrapper streams, such
80 : : * as [class@Gio.BufferedInputStream] or [class@Gio.TlsConnection]. With such
81 : : * wrapper APIs, application code may only run operations on the base (wrapped)
82 : : * stream when the wrapper stream is idle. Note that the semantics of such
83 : : * operations may not be well-defined due to the state the wrapper stream leaves
84 : : * the base stream in (though they are guaranteed not to crash).
85 : : *
86 : : * Since: 2.22
87 : : */
88 : :
89 : : enum
90 : : {
91 : : PROP_0,
92 : : PROP_INPUT_STREAM,
93 : : PROP_OUTPUT_STREAM,
94 : : PROP_CLOSED
95 : : };
96 : :
97 : : struct _GIOStreamPrivate {
98 : : guint closed : 1;
99 : : guint pending : 1;
100 : : };
101 : :
102 : : static gboolean g_io_stream_real_close (GIOStream *stream,
103 : : GCancellable *cancellable,
104 : : GError **error);
105 : : static void g_io_stream_real_close_async (GIOStream *stream,
106 : : int io_priority,
107 : : GCancellable *cancellable,
108 : : GAsyncReadyCallback callback,
109 : : gpointer user_data);
110 : : static gboolean g_io_stream_real_close_finish (GIOStream *stream,
111 : : GAsyncResult *result,
112 : : GError **error);
113 : :
114 : 22026 : G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GIOStream, g_io_stream, G_TYPE_OBJECT)
115 : :
116 : : static void
117 : 2212 : g_io_stream_dispose (GObject *object)
118 : : {
119 : : GIOStream *stream;
120 : :
121 : 2212 : stream = G_IO_STREAM (object);
122 : :
123 : 2212 : if (!stream->priv->closed)
124 : 164 : g_io_stream_close (stream, NULL, NULL);
125 : :
126 : 2212 : G_OBJECT_CLASS (g_io_stream_parent_class)->dispose (object);
127 : 2212 : }
128 : :
129 : : static void
130 : 2465 : g_io_stream_init (GIOStream *stream)
131 : : {
132 : 2465 : stream->priv = g_io_stream_get_instance_private (stream);
133 : 2465 : }
134 : :
135 : : static void
136 : 0 : g_io_stream_get_property (GObject *object,
137 : : guint prop_id,
138 : : GValue *value,
139 : : GParamSpec *pspec)
140 : : {
141 : 0 : GIOStream *stream = G_IO_STREAM (object);
142 : :
143 : 0 : switch (prop_id)
144 : : {
145 : 0 : case PROP_CLOSED:
146 : 0 : g_value_set_boolean (value, stream->priv->closed);
147 : 0 : break;
148 : :
149 : 0 : case PROP_INPUT_STREAM:
150 : 0 : g_value_set_object (value, g_io_stream_get_input_stream (stream));
151 : 0 : break;
152 : :
153 : 0 : case PROP_OUTPUT_STREAM:
154 : 0 : g_value_set_object (value, g_io_stream_get_output_stream (stream));
155 : 0 : break;
156 : :
157 : 0 : default:
158 : 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
159 : : }
160 : 0 : }
161 : :
162 : : static void
163 : 136 : g_io_stream_class_init (GIOStreamClass *klass)
164 : : {
165 : 136 : GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
166 : :
167 : 136 : gobject_class->dispose = g_io_stream_dispose;
168 : 136 : gobject_class->get_property = g_io_stream_get_property;
169 : :
170 : 136 : klass->close_fn = g_io_stream_real_close;
171 : 136 : klass->close_async = g_io_stream_real_close_async;
172 : 136 : klass->close_finish = g_io_stream_real_close_finish;
173 : :
174 : : /**
175 : : * GIOStream:closed:
176 : : *
177 : : * Whether the stream is closed.
178 : : *
179 : : * Since: 2.22
180 : : */
181 : 136 : g_object_class_install_property (gobject_class, PROP_CLOSED,
182 : : g_param_spec_boolean ("closed", NULL, NULL,
183 : : FALSE,
184 : : G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
185 : :
186 : : /**
187 : : * GIOStream:input-stream:
188 : : *
189 : : * The [class@Gio.InputStream] to read from.
190 : : *
191 : : * Since: 2.22
192 : : */
193 : 136 : g_object_class_install_property (gobject_class, PROP_INPUT_STREAM,
194 : : g_param_spec_object ("input-stream", NULL, NULL,
195 : : G_TYPE_INPUT_STREAM,
196 : : G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
197 : :
198 : : /**
199 : : * GIOStream:output-stream:
200 : : *
201 : : * The [class@Gio.OutputStream] to write to.
202 : : *
203 : : * Since: 2.22
204 : : */
205 : 136 : g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
206 : : g_param_spec_object ("output-stream", NULL, NULL,
207 : : G_TYPE_OUTPUT_STREAM,
208 : : G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
209 : 136 : }
210 : :
211 : : /**
212 : : * g_io_stream_is_closed:
213 : : * @stream: a #GIOStream
214 : : *
215 : : * Checks if a stream is closed.
216 : : *
217 : : * Returns: %TRUE if the stream is closed.
218 : : *
219 : : * Since: 2.22
220 : : */
221 : : gboolean
222 : 10 : g_io_stream_is_closed (GIOStream *stream)
223 : : {
224 : 10 : g_return_val_if_fail (G_IS_IO_STREAM (stream), TRUE);
225 : :
226 : 10 : return stream->priv->closed;
227 : : }
228 : :
229 : : /**
230 : : * g_io_stream_get_input_stream:
231 : : * @stream: a #GIOStream
232 : : *
233 : : * Gets the input stream for this object. This is used
234 : : * for reading.
235 : : *
236 : : * Returns: (transfer none): a #GInputStream, owned by the #GIOStream.
237 : : * Do not free.
238 : : *
239 : : * Since: 2.22
240 : : */
241 : : GInputStream *
242 : 7075 : g_io_stream_get_input_stream (GIOStream *stream)
243 : : {
244 : : GIOStreamClass *klass;
245 : :
246 : 7075 : klass = G_IO_STREAM_GET_CLASS (stream);
247 : :
248 : 7075 : g_assert (klass->get_input_stream != NULL);
249 : :
250 : 7075 : return klass->get_input_stream (stream);
251 : : }
252 : :
253 : : /**
254 : : * g_io_stream_get_output_stream:
255 : : * @stream: a #GIOStream
256 : : *
257 : : * Gets the output stream for this object. This is used for
258 : : * writing.
259 : : *
260 : : * Returns: (transfer none): a #GOutputStream, owned by the #GIOStream.
261 : : * Do not free.
262 : : *
263 : : * Since: 2.22
264 : : */
265 : : GOutputStream *
266 : 18000 : g_io_stream_get_output_stream (GIOStream *stream)
267 : : {
268 : : GIOStreamClass *klass;
269 : :
270 : 18000 : klass = G_IO_STREAM_GET_CLASS (stream);
271 : :
272 : 18000 : g_assert (klass->get_output_stream != NULL);
273 : 18000 : return klass->get_output_stream (stream);
274 : : }
275 : :
276 : : /**
277 : : * g_io_stream_has_pending:
278 : : * @stream: a #GIOStream
279 : : *
280 : : * Checks if a stream has pending actions.
281 : : *
282 : : * Returns: %TRUE if @stream has pending actions.
283 : : *
284 : : * Since: 2.22
285 : : **/
286 : : gboolean
287 : 0 : g_io_stream_has_pending (GIOStream *stream)
288 : : {
289 : 0 : g_return_val_if_fail (G_IS_IO_STREAM (stream), FALSE);
290 : :
291 : 0 : return stream->priv->pending;
292 : : }
293 : :
294 : : /**
295 : : * g_io_stream_set_pending:
296 : : * @stream: a #GIOStream
297 : : * @error: a #GError location to store the error occurring, or %NULL to
298 : : * ignore
299 : : *
300 : : * Sets @stream to have actions pending. If the pending flag is
301 : : * already set or @stream is closed, it will return %FALSE and set
302 : : * @error.
303 : : *
304 : : * Returns: %TRUE if pending was previously unset and is now set.
305 : : *
306 : : * Since: 2.22
307 : : */
308 : : gboolean
309 : 2230 : g_io_stream_set_pending (GIOStream *stream,
310 : : GError **error)
311 : : {
312 : 2230 : g_return_val_if_fail (G_IS_IO_STREAM (stream), FALSE);
313 : :
314 : 2230 : if (stream->priv->closed)
315 : : {
316 : 0 : g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
317 : : _("Stream is already closed"));
318 : 0 : return FALSE;
319 : : }
320 : :
321 : 2230 : if (stream->priv->pending)
322 : : {
323 : 0 : g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
324 : : /* Translators: This is an error you get if there is
325 : : * already an operation running against this stream when
326 : : * you try to start one */
327 : : _("Stream has outstanding operation"));
328 : 0 : return FALSE;
329 : : }
330 : :
331 : 2230 : stream->priv->pending = TRUE;
332 : 2230 : return TRUE;
333 : : }
334 : :
335 : : /**
336 : : * g_io_stream_clear_pending:
337 : : * @stream: a #GIOStream
338 : : *
339 : : * Clears the pending flag on @stream.
340 : : *
341 : : * Since: 2.22
342 : : */
343 : : void
344 : 2230 : g_io_stream_clear_pending (GIOStream *stream)
345 : : {
346 : 2230 : g_return_if_fail (G_IS_IO_STREAM (stream));
347 : :
348 : 2230 : stream->priv->pending = FALSE;
349 : : }
350 : :
351 : : static gboolean
352 : 9 : g_io_stream_real_close (GIOStream *stream,
353 : : GCancellable *cancellable,
354 : : GError **error)
355 : : {
356 : : gboolean res;
357 : :
358 : 9 : res = g_output_stream_close (g_io_stream_get_output_stream (stream),
359 : : cancellable, error);
360 : :
361 : : /* If this errored out, unset error so that we don't report
362 : : further errors, but still do the following ops */
363 : 9 : if (error != NULL && *error != NULL)
364 : 0 : error = NULL;
365 : :
366 : 9 : res &= g_input_stream_close (g_io_stream_get_input_stream (stream),
367 : : cancellable, error);
368 : :
369 : 9 : return res;
370 : : }
371 : :
372 : : /**
373 : : * g_io_stream_close:
374 : : * @stream: a #GIOStream
375 : : * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
376 : : * @error: location to store the error occurring, or %NULL to ignore
377 : : *
378 : : * Closes the stream, releasing resources related to it. This will also
379 : : * close the individual input and output streams, if they are not already
380 : : * closed.
381 : : *
382 : : * Once the stream is closed, all other operations will return
383 : : * %G_IO_ERROR_CLOSED. Closing a stream multiple times will not
384 : : * return an error.
385 : : *
386 : : * Closing a stream will automatically flush any outstanding buffers
387 : : * in the stream.
388 : : *
389 : : * Streams will be automatically closed when the last reference
390 : : * is dropped, but you might want to call this function to make sure
391 : : * resources are released as early as possible.
392 : : *
393 : : * Some streams might keep the backing store of the stream (e.g. a file
394 : : * descriptor) open after the stream is closed. See the documentation for
395 : : * the individual stream for details.
396 : : *
397 : : * On failure the first error that happened will be reported, but the
398 : : * close operation will finish as much as possible. A stream that failed
399 : : * to close will still return %G_IO_ERROR_CLOSED for all operations.
400 : : * Still, it is important to check and report the error to the user,
401 : : * otherwise there might be a loss of data as all data might not be written.
402 : : *
403 : : * If @cancellable is not NULL, then the operation can be cancelled by
404 : : * triggering the cancellable object from another thread. If the operation
405 : : * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
406 : : * Cancelling a close will still leave the stream closed, but some streams
407 : : * can use a faster close that doesn't block to e.g. check errors.
408 : : *
409 : : * The default implementation of this method just calls close on the
410 : : * individual input/output streams.
411 : : *
412 : : * Returns: %TRUE on success, %FALSE on failure
413 : : *
414 : : * Since: 2.22
415 : : */
416 : : gboolean
417 : 249 : g_io_stream_close (GIOStream *stream,
418 : : GCancellable *cancellable,
419 : : GError **error)
420 : : {
421 : : GIOStreamClass *class;
422 : : gboolean res;
423 : :
424 : 249 : g_return_val_if_fail (G_IS_IO_STREAM (stream), FALSE);
425 : :
426 : 249 : class = G_IO_STREAM_GET_CLASS (stream);
427 : :
428 : 249 : if (stream->priv->closed)
429 : 0 : return TRUE;
430 : :
431 : 249 : if (!g_io_stream_set_pending (stream, error))
432 : 0 : return FALSE;
433 : :
434 : 249 : if (cancellable)
435 : 0 : g_cancellable_push_current (cancellable);
436 : :
437 : 249 : res = TRUE;
438 : 249 : if (class->close_fn)
439 : 249 : res = class->close_fn (stream, cancellable, error);
440 : :
441 : 249 : if (cancellable)
442 : 0 : g_cancellable_pop_current (cancellable);
443 : :
444 : 249 : stream->priv->closed = TRUE;
445 : 249 : g_io_stream_clear_pending (stream);
446 : :
447 : 249 : return res;
448 : : }
449 : :
450 : : static void
451 : 1963 : async_ready_close_callback_wrapper (GObject *source_object,
452 : : GAsyncResult *res,
453 : : gpointer user_data)
454 : : {
455 : 1963 : GIOStream *stream = G_IO_STREAM (source_object);
456 : 1963 : GIOStreamClass *klass = G_IO_STREAM_GET_CLASS (stream);
457 : 1963 : GTask *task = user_data;
458 : 1963 : GError *error = NULL;
459 : : gboolean success;
460 : :
461 : 1963 : stream->priv->closed = TRUE;
462 : 1963 : g_io_stream_clear_pending (stream);
463 : :
464 : 1963 : if (g_async_result_legacy_propagate_error (res, &error))
465 : 0 : success = FALSE;
466 : : else
467 : 1963 : success = klass->close_finish (stream, res, &error);
468 : :
469 : 1963 : if (error)
470 : 0 : g_task_return_error (task, error);
471 : : else
472 : 1963 : g_task_return_boolean (task, success);
473 : :
474 : 1963 : g_object_unref (task);
475 : 1963 : }
476 : :
477 : : /**
478 : : * g_io_stream_close_async:
479 : : * @stream: a #GIOStream
480 : : * @io_priority: the io priority of the request
481 : : * @cancellable: (nullable): optional cancellable object
482 : : * @callback: (scope async): a #GAsyncReadyCallback
483 : : * to call when the request is satisfied
484 : : * @user_data: the data to pass to callback function
485 : : *
486 : : * Requests an asynchronous close of the stream, releasing resources
487 : : * related to it. When the operation is finished @callback will be
488 : : * called. You can then call g_io_stream_close_finish() to get
489 : : * the result of the operation.
490 : : *
491 : : * For behaviour details see g_io_stream_close().
492 : : *
493 : : * The asynchronous methods have a default fallback that uses threads
494 : : * to implement asynchronicity, so they are optional for inheriting
495 : : * classes. However, if you override one you must override all.
496 : : *
497 : : * Since: 2.22
498 : : */
499 : : void
500 : 2069 : g_io_stream_close_async (GIOStream *stream,
501 : : int io_priority,
502 : : GCancellable *cancellable,
503 : : GAsyncReadyCallback callback,
504 : : gpointer user_data)
505 : : {
506 : : GIOStreamClass *class;
507 : 2069 : GError *error = NULL;
508 : : GTask *task;
509 : :
510 : 2175 : g_return_if_fail (G_IS_IO_STREAM (stream));
511 : :
512 : 2069 : task = g_task_new (stream, cancellable, callback, user_data);
513 : 2069 : g_task_set_source_tag (task, g_io_stream_close_async);
514 : :
515 : 2069 : if (stream->priv->closed)
516 : : {
517 : 106 : g_task_return_boolean (task, TRUE);
518 : 106 : g_object_unref (task);
519 : 106 : return;
520 : : }
521 : :
522 : 1963 : if (!g_io_stream_set_pending (stream, &error))
523 : : {
524 : 0 : g_task_return_error (task, error);
525 : 0 : g_object_unref (task);
526 : 0 : return;
527 : : }
528 : :
529 : 1963 : class = G_IO_STREAM_GET_CLASS (stream);
530 : :
531 : 1963 : class->close_async (stream, io_priority, cancellable,
532 : : async_ready_close_callback_wrapper, task);
533 : : }
534 : :
535 : : /**
536 : : * g_io_stream_close_finish:
537 : : * @stream: a #GIOStream
538 : : * @result: a #GAsyncResult
539 : : * @error: a #GError location to store the error occurring, or %NULL to
540 : : * ignore
541 : : *
542 : : * Closes a stream.
543 : : *
544 : : * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
545 : : *
546 : : * Since: 2.22
547 : : */
548 : : gboolean
549 : 2067 : g_io_stream_close_finish (GIOStream *stream,
550 : : GAsyncResult *result,
551 : : GError **error)
552 : : {
553 : 2067 : g_return_val_if_fail (G_IS_IO_STREAM (stream), FALSE);
554 : 2067 : g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
555 : :
556 : 2067 : return g_task_propagate_boolean (G_TASK (result), error);
557 : : }
558 : :
559 : :
560 : : static void
561 : 6 : close_async_thread (GTask *task,
562 : : gpointer source_object,
563 : : gpointer task_data,
564 : : GCancellable *cancellable)
565 : : {
566 : 6 : GIOStream *stream = source_object;
567 : : GIOStreamClass *class;
568 : 6 : GError *error = NULL;
569 : : gboolean result;
570 : :
571 : 6 : class = G_IO_STREAM_GET_CLASS (stream);
572 : 6 : if (class->close_fn)
573 : : {
574 : 6 : result = class->close_fn (stream,
575 : : g_task_get_cancellable (task),
576 : : &error);
577 : 6 : if (!result)
578 : : {
579 : 0 : g_task_return_error (task, error);
580 : 0 : return;
581 : : }
582 : : }
583 : :
584 : 6 : g_task_return_boolean (task, TRUE);
585 : : }
586 : :
587 : : typedef struct
588 : : {
589 : : GError *error;
590 : : gint pending;
591 : : } CloseAsyncData;
592 : :
593 : : static void
594 : 806 : stream_close_complete (GObject *source,
595 : : GAsyncResult *result,
596 : : gpointer user_data)
597 : : {
598 : 806 : GTask *task = user_data;
599 : : CloseAsyncData *data;
600 : :
601 : 806 : data = g_task_get_task_data (task);
602 : 806 : data->pending--;
603 : :
604 : 806 : if (G_IS_OUTPUT_STREAM (source))
605 : : {
606 : 403 : GError *error = NULL;
607 : :
608 : : /* Match behaviour with the sync route and give precedent to the
609 : : * error returned from closing the output stream.
610 : : */
611 : 403 : g_output_stream_close_finish (G_OUTPUT_STREAM (source), result, &error);
612 : 403 : if (error)
613 : : {
614 : 0 : if (data->error)
615 : 0 : g_error_free (data->error);
616 : 0 : data->error = error;
617 : : }
618 : : }
619 : : else
620 : 403 : g_input_stream_close_finish (G_INPUT_STREAM (source), result, data->error ? NULL : &data->error);
621 : :
622 : 806 : if (data->pending == 0)
623 : : {
624 : 403 : if (data->error)
625 : 0 : g_task_return_error (task, data->error);
626 : : else
627 : 403 : g_task_return_boolean (task, TRUE);
628 : :
629 : 403 : g_slice_free (CloseAsyncData, data);
630 : 403 : g_object_unref (task);
631 : : }
632 : 806 : }
633 : :
634 : : static void
635 : 409 : g_io_stream_real_close_async (GIOStream *stream,
636 : : int io_priority,
637 : : GCancellable *cancellable,
638 : : GAsyncReadyCallback callback,
639 : : gpointer user_data)
640 : : {
641 : : GInputStream *input;
642 : : GOutputStream *output;
643 : : GTask *task;
644 : :
645 : 409 : task = g_task_new (stream, cancellable, callback, user_data);
646 : 409 : g_task_set_source_tag (task, g_io_stream_real_close_async);
647 : 409 : g_task_set_check_cancellable (task, FALSE);
648 : 409 : g_task_set_priority (task, io_priority);
649 : :
650 : 409 : input = g_io_stream_get_input_stream (stream);
651 : 409 : output = g_io_stream_get_output_stream (stream);
652 : :
653 : 409 : if (g_input_stream_async_close_is_via_threads (input) && g_output_stream_async_close_is_via_threads (output))
654 : : {
655 : : /* No sense in dispatching to the thread twice -- just do it all
656 : : * in one go.
657 : : */
658 : 6 : g_task_run_in_thread (task, close_async_thread);
659 : 6 : g_object_unref (task);
660 : : }
661 : : else
662 : : {
663 : : CloseAsyncData *data;
664 : :
665 : : /* We should avoid dispatching to another thread in case either
666 : : * object that would not do it for itself because it may not be
667 : : * threadsafe.
668 : : */
669 : 403 : data = g_slice_new (CloseAsyncData);
670 : 403 : data->error = NULL;
671 : 403 : data->pending = 2;
672 : :
673 : 403 : g_task_set_task_data (task, data, NULL);
674 : 403 : g_input_stream_close_async (input, io_priority, cancellable, stream_close_complete, task);
675 : 403 : g_output_stream_close_async (output, io_priority, cancellable, stream_close_complete, task);
676 : : }
677 : 409 : }
678 : :
679 : : static gboolean
680 : 409 : g_io_stream_real_close_finish (GIOStream *stream,
681 : : GAsyncResult *result,
682 : : GError **error)
683 : : {
684 : 409 : g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
685 : :
686 : 409 : return g_task_propagate_boolean (G_TASK (result), error);
687 : : }
688 : :
689 : : typedef struct
690 : : {
691 : : GIOStream *stream1;
692 : : GIOStream *stream2;
693 : : GIOStreamSpliceFlags flags;
694 : : gint io_priority;
695 : : GCancellable *cancellable;
696 : : gulong cancelled_id;
697 : : GCancellable *op1_cancellable;
698 : : GCancellable *op2_cancellable;
699 : : guint completed;
700 : : GError *error;
701 : : } SpliceContext;
702 : :
703 : : static void
704 : 1 : splice_context_free (SpliceContext *ctx)
705 : : {
706 : 1 : g_object_unref (ctx->stream1);
707 : 1 : g_object_unref (ctx->stream2);
708 : 1 : if (ctx->cancellable != NULL)
709 : 0 : g_object_unref (ctx->cancellable);
710 : 1 : g_object_unref (ctx->op1_cancellable);
711 : 1 : g_object_unref (ctx->op2_cancellable);
712 : 1 : g_clear_error (&ctx->error);
713 : 1 : g_slice_free (SpliceContext, ctx);
714 : 1 : }
715 : :
716 : : static void
717 : 1 : splice_complete (GTask *task,
718 : : SpliceContext *ctx)
719 : : {
720 : 1 : if (ctx->cancelled_id != 0)
721 : 0 : g_cancellable_disconnect (ctx->cancellable, ctx->cancelled_id);
722 : 1 : ctx->cancelled_id = 0;
723 : :
724 : 1 : if (ctx->error != NULL)
725 : : {
726 : 0 : g_task_return_error (task, ctx->error);
727 : 0 : ctx->error = NULL;
728 : : }
729 : : else
730 : 1 : g_task_return_boolean (task, TRUE);
731 : 1 : }
732 : :
733 : : static void
734 : 2 : splice_close_cb (GObject *iostream,
735 : : GAsyncResult *res,
736 : : gpointer user_data)
737 : : {
738 : 2 : GTask *task = user_data;
739 : 2 : SpliceContext *ctx = g_task_get_task_data (task);
740 : 2 : GError *error = NULL;
741 : :
742 : 2 : g_io_stream_close_finish (G_IO_STREAM (iostream), res, &error);
743 : :
744 : 2 : ctx->completed++;
745 : :
746 : : /* Keep the first error that occurred */
747 : 2 : if (error != NULL && ctx->error == NULL)
748 : 0 : ctx->error = error;
749 : : else
750 : 2 : g_clear_error (&error);
751 : :
752 : : /* If all operations are done, complete now */
753 : 2 : if (ctx->completed == 4)
754 : 1 : splice_complete (task, ctx);
755 : :
756 : 2 : g_object_unref (task);
757 : 2 : }
758 : :
759 : : static void
760 : 2 : splice_cb (GObject *ostream,
761 : : GAsyncResult *res,
762 : : gpointer user_data)
763 : : {
764 : 2 : GTask *task = user_data;
765 : 2 : SpliceContext *ctx = g_task_get_task_data (task);
766 : 2 : GError *error = NULL;
767 : :
768 : 2 : g_output_stream_splice_finish (G_OUTPUT_STREAM (ostream), res, &error);
769 : :
770 : 2 : ctx->completed++;
771 : :
772 : : /* ignore cancellation error if it was not requested by the user */
773 : 2 : if (error != NULL &&
774 : 0 : g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
775 : 0 : (ctx->cancellable == NULL ||
776 : 0 : !g_cancellable_is_cancelled (ctx->cancellable)))
777 : 0 : g_clear_error (&error);
778 : :
779 : : /* Keep the first error that occurred */
780 : 2 : if (error != NULL && ctx->error == NULL)
781 : 0 : ctx->error = error;
782 : : else
783 : 2 : g_clear_error (&error);
784 : :
785 : 2 : if (ctx->completed == 1 &&
786 : 1 : (ctx->flags & G_IO_STREAM_SPLICE_WAIT_FOR_BOTH) == 0)
787 : : {
788 : : /* We don't want to wait for the 2nd operation to finish, cancel it */
789 : 0 : g_cancellable_cancel (ctx->op1_cancellable);
790 : 0 : g_cancellable_cancel (ctx->op2_cancellable);
791 : : }
792 : 2 : else if (ctx->completed == 2)
793 : : {
794 : 1 : if (ctx->cancellable == NULL ||
795 : 0 : !g_cancellable_is_cancelled (ctx->cancellable))
796 : : {
797 : 1 : g_cancellable_reset (ctx->op1_cancellable);
798 : 1 : g_cancellable_reset (ctx->op2_cancellable);
799 : : }
800 : :
801 : : /* Close the IO streams if needed */
802 : 1 : if ((ctx->flags & G_IO_STREAM_SPLICE_CLOSE_STREAM1) != 0)
803 : : {
804 : 1 : g_io_stream_close_async (ctx->stream1,
805 : : g_task_get_priority (task),
806 : : ctx->op1_cancellable,
807 : : splice_close_cb, g_object_ref (task));
808 : : }
809 : : else
810 : 0 : ctx->completed++;
811 : :
812 : 1 : if ((ctx->flags & G_IO_STREAM_SPLICE_CLOSE_STREAM2) != 0)
813 : : {
814 : 1 : g_io_stream_close_async (ctx->stream2,
815 : : g_task_get_priority (task),
816 : : ctx->op2_cancellable,
817 : : splice_close_cb, g_object_ref (task));
818 : : }
819 : : else
820 : 0 : ctx->completed++;
821 : :
822 : : /* If all operations are done, complete now */
823 : 1 : if (ctx->completed == 4)
824 : 0 : splice_complete (task, ctx);
825 : : }
826 : :
827 : 2 : g_object_unref (task);
828 : 2 : }
829 : :
830 : : static void
831 : 0 : splice_cancelled_cb (GCancellable *cancellable,
832 : : GTask *task)
833 : : {
834 : : SpliceContext *ctx;
835 : :
836 : 0 : ctx = g_task_get_task_data (task);
837 : 0 : g_cancellable_cancel (ctx->op1_cancellable);
838 : 0 : g_cancellable_cancel (ctx->op2_cancellable);
839 : 0 : }
840 : :
841 : : /**
842 : : * g_io_stream_splice_async:
843 : : * @stream1: a #GIOStream.
844 : : * @stream2: a #GIOStream.
845 : : * @flags: a set of #GIOStreamSpliceFlags.
846 : : * @io_priority: the io priority of the request.
847 : : * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
848 : : * @callback: (scope async): a #GAsyncReadyCallback
849 : : * to call when the request is satisfied
850 : : * @user_data: the data to pass to callback function
851 : : *
852 : : * Asynchronously splice the output stream of @stream1 to the input stream of
853 : : * @stream2, and splice the output stream of @stream2 to the input stream of
854 : : * @stream1.
855 : : *
856 : : * When the operation is finished @callback will be called.
857 : : * You can then call g_io_stream_splice_finish() to get the
858 : : * result of the operation.
859 : : *
860 : : * Since: 2.28
861 : : **/
862 : : void
863 : 1 : g_io_stream_splice_async (GIOStream *stream1,
864 : : GIOStream *stream2,
865 : : GIOStreamSpliceFlags flags,
866 : : gint io_priority,
867 : : GCancellable *cancellable,
868 : : GAsyncReadyCallback callback,
869 : : gpointer user_data)
870 : : {
871 : : GTask *task;
872 : : SpliceContext *ctx;
873 : : GInputStream *istream;
874 : : GOutputStream *ostream;
875 : :
876 : 1 : if (cancellable != NULL && g_cancellable_is_cancelled (cancellable))
877 : : {
878 : 0 : g_task_report_new_error (NULL, callback, user_data,
879 : : g_io_stream_splice_async,
880 : : G_IO_ERROR, G_IO_ERROR_CANCELLED,
881 : : "Operation has been cancelled");
882 : 0 : return;
883 : : }
884 : :
885 : 1 : ctx = g_slice_new0 (SpliceContext);
886 : 1 : ctx->stream1 = g_object_ref (stream1);
887 : 1 : ctx->stream2 = g_object_ref (stream2);
888 : 1 : ctx->flags = flags;
889 : 1 : ctx->op1_cancellable = g_cancellable_new ();
890 : 1 : ctx->op2_cancellable = g_cancellable_new ();
891 : 1 : ctx->completed = 0;
892 : :
893 : 1 : task = g_task_new (NULL, cancellable, callback, user_data);
894 : 1 : g_task_set_source_tag (task, g_io_stream_splice_async);
895 : 1 : g_task_set_task_data (task, ctx, (GDestroyNotify) splice_context_free);
896 : :
897 : 1 : if (cancellable != NULL)
898 : : {
899 : 0 : ctx->cancellable = g_object_ref (cancellable);
900 : 0 : ctx->cancelled_id = g_cancellable_connect (cancellable,
901 : : G_CALLBACK (splice_cancelled_cb), g_object_ref (task),
902 : : g_object_unref);
903 : : }
904 : :
905 : 1 : istream = g_io_stream_get_input_stream (stream1);
906 : 1 : ostream = g_io_stream_get_output_stream (stream2);
907 : 1 : g_output_stream_splice_async (ostream, istream, G_OUTPUT_STREAM_SPLICE_NONE,
908 : : io_priority, ctx->op1_cancellable, splice_cb,
909 : : g_object_ref (task));
910 : :
911 : 1 : istream = g_io_stream_get_input_stream (stream2);
912 : 1 : ostream = g_io_stream_get_output_stream (stream1);
913 : 1 : g_output_stream_splice_async (ostream, istream, G_OUTPUT_STREAM_SPLICE_NONE,
914 : : io_priority, ctx->op2_cancellable, splice_cb,
915 : : g_object_ref (task));
916 : :
917 : 1 : g_object_unref (task);
918 : : }
919 : :
920 : : /**
921 : : * g_io_stream_splice_finish:
922 : : * @result: a #GAsyncResult.
923 : : * @error: a #GError location to store the error occurring, or %NULL to
924 : : * ignore.
925 : : *
926 : : * Finishes an asynchronous io stream splice operation.
927 : : *
928 : : * Returns: %TRUE on success, %FALSE otherwise.
929 : : *
930 : : * Since: 2.28
931 : : **/
932 : : gboolean
933 : 1 : g_io_stream_splice_finish (GAsyncResult *result,
934 : : GError **error)
935 : : {
936 : 1 : g_return_val_if_fail (g_task_is_valid (result, NULL), FALSE);
937 : :
938 : 1 : return g_task_propagate_boolean (G_TASK (result), error);
939 : : }
|