GCC Code Coverage Report


Directory: src/
File: msg-input-stream.c
Date: 2025-04-26 00:53:43
Exec Total Coverage
Lines: 141 183 77.0%
Functions: 23 25 92.0%
Branches: 39 71 54.9%

Line Branch Exec Source
1 /* Copyright 2022-2024 Jan-Michael Brummer
2 *
3 * This program is free software: you can redistribute it and/or modify
4 * it under the terms of the GNU Lesser General Public License as published by
5 * the Free Software Foundation, either version 3 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 */
16
17 #include <config.h>
18
19 #include <string.h>
20
21 #include <glib.h>
22 #include <glib/gi18n.h>
23 #include <gio/gio.h>
24
25 #include <libsoup/soup.h>
26
27 #include "msg-input-stream.h"
28 #include "msg-service.h"
29
30 static void msg_input_stream_seekable_iface_init (GSeekableIface *seekable_iface);
31
32 struct MsgInputStreamPrivate {
33 char *uri;
34 MsgService *service;
35 SoupMessage *msg;
36 GInputStream *stream;
37
38 char *range;
39 goffset request_offset;
40 goffset offset;
41 };
42
43
5/6
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 14 times.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
✓ Branch 5 taken 14 times.
22 G_DEFINE_TYPE_WITH_CODE (MsgInputStream, msg_input_stream, G_TYPE_INPUT_STREAM,
44 G_ADD_PRIVATE (MsgInputStream)
45 G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
46 msg_input_stream_seekable_iface_init))
47
48 static void
49 2 msg_input_stream_init (MsgInputStream *stream)
50 {
51 2 stream->priv = msg_input_stream_get_instance_private (stream);
52 2 }
53
54 static void
55 2 msg_input_stream_finalize (GObject *object)
56 {
57 2 MsgInputStream *stream = MSG_INPUT_STREAM (object);
58 2 MsgInputStreamPrivate *priv = stream->priv;
59
60
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 g_clear_pointer (&priv->uri, g_free);
61
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 g_clear_object (&priv->service);
62
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 g_clear_object (&priv->msg);
63
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 g_clear_object (&priv->stream);
64 2 g_free (priv->range);
65
66 2 G_OBJECT_CLASS (msg_input_stream_parent_class)->finalize (object);
67 2 }
68
69 /**
70 * msg_input_stream_new:
71 * @service: a #MsgService
72 * @uri: a #GUri
73 *
74 * Prepares to send a GET request for @uri on @session, and returns a
75 * #GInputStream that can be used to read the response.
76 *
77 * The request will not be sent until the first read call; if you need
78 * to look at the status code or response headers before reading the
79 * body, you can use msg_input_stream_send() or
80 * msg_input_stream_send_async() to force the message to be
81 * sent and the response headers read.
82 *
83 * Returns: (transfer full): a new #GInputStream.
84 **/
85 GInputStream *
86 2 msg_input_stream_new (MsgService *service,
87 char *uri)
88 {
89 MsgInputStream *stream;
90 MsgInputStreamPrivate *priv;
91
92 2 stream = g_object_new (MSG_TYPE_INPUT_STREAM, NULL);
93 2 priv = stream->priv;
94
95 2 priv->service = g_object_ref (service);
96 2 priv->uri = g_strdup (uri);
97
98 2 return G_INPUT_STREAM (stream);
99 }
100
101 static void
102 on_restarted (SoupMessage *msg,
103 __attribute__ ((unused)) gpointer user_data)
104 {
105 SoupMessageHeaders *headers = soup_message_get_request_headers (msg);
106 soup_message_headers_remove (headers, "Authorization");
107 }
108
109 static SoupMessage *
110 3 msg_input_stream_ensure_msg (GInputStream *stream)
111 {
112 3 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
113
114
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 times.
3 if (!priv->msg){
115 2 priv->msg = msg_service_build_message (MSG_SERVICE (priv->service), "GET", priv->uri, NULL, FALSE);
116 2 g_signal_connect (G_OBJECT (priv->msg), "restarted", G_CALLBACK (on_restarted), NULL);
117 2 msg_authorizer_process_request (msg_service_get_authorizer (priv->service), priv->msg);
118
119 2 priv->offset = 0;
120 }
121
122
123
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 times.
3 if (priv->range)
124 2 soup_message_headers_replace (soup_message_get_request_headers (priv->msg),
125 2 "Range", priv->range);
126
127 3 return priv->msg;
128 }
129
130 static void
131 read_callback (GObject *object,
132 GAsyncResult *result,
133 gpointer user_data)
134 {
135 GTask *task = user_data;
136 GInputStream *vfsstream = g_task_get_source_object (task);
137 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (vfsstream)->priv;
138 GError *error = NULL;
139 gssize nread;
140
141 nread = g_input_stream_read_finish (G_INPUT_STREAM (object), result, &error);
142 if (nread >= 0){
143 priv->offset += nread;
144 g_task_return_int (task, nread);
145 } else
146 g_task_return_error (task, error);
147 g_object_unref (task);
148 }
149
150 typedef struct {
151 gpointer buffer;
152 gsize count;
153 } ReadAfterSendData;
154
155 static void
156 1 read_send_callback (GObject *object,
157 GAsyncResult *result,
158 gpointer user_data)
159 {
160 1 GTask *task = user_data;
161 1 GInputStream *vfsstream = g_task_get_source_object (task);
162 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (vfsstream)->priv;
163 1 ReadAfterSendData *rasd = g_task_get_task_data (task);
164 1 GError *error = NULL;
165
166 1 priv->stream = soup_session_send_finish (SOUP_SESSION (object), result, &error);
167
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!priv->stream){
168 g_task_return_error (task, error);
169 g_object_unref (task);
170 1 return;
171 }
172
2/4
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 if (!SOUP_STATUS_IS_SUCCESSFUL (soup_message_get_status (priv->msg))){
173
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (soup_message_get_status (priv->msg) == SOUP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE){
174 g_input_stream_close (priv->stream, NULL, NULL);
175 g_task_return_int (task, 0);
176 g_clear_object (&priv->stream);
177 g_object_unref (task);
178 return;
179 }
180 2 g_task_return_new_error (task,
181 G_IO_ERROR,
182 1 soup_message_get_status (priv->msg),
183 1 _("HTTP Error: %s"), soup_message_get_reason_phrase (priv->msg));
184 1 g_object_unref (task);
185 1 return;
186 }
187 if (priv->range){
188 gboolean status;
189 goffset start, end;
190
191 status = soup_message_headers_get_content_range (soup_message_get_response_headers (priv->msg),
192 &start, &end, NULL);
193 if (!status || start != priv->request_offset){
194 g_task_return_new_error (task,
195 G_IO_ERROR,
196 G_IO_ERROR_FAILED,
197 _("Error seeking in stream"));
198 g_object_unref (task);
199 return;
200 }
201 }
202
203 g_input_stream_read_async (priv->stream, rasd->buffer, rasd->count,
204 g_task_get_priority (task),
205 g_task_get_cancellable (task),
206 read_callback, task);
207 }
208
209 static gssize
210 1 msg_input_stream_read_fn (GInputStream *stream,
211 void *buffer,
212 gsize count,
213 GCancellable *cancellable,
214 GError **error)
215 {
216 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
217
218
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!priv->stream) {
219 1 msg_input_stream_ensure_msg (stream);
220
221 1 retry:
222 1 priv->stream = soup_session_send (msg_service_get_session (priv->service), priv->msg, cancellable, error);
223
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (msg_service_handle_rate_limiting (priv->msg))
224 goto retry;
225 }
226
227 1 return g_input_stream_read (priv->stream, buffer, count, cancellable, error);
228 }
229
230 static void
231 1 msg_input_stream_read_async (GInputStream *stream,
232 void *buffer,
233 gsize count,
234 int io_priority,
235 GCancellable *cancellable,
236 GAsyncReadyCallback callback,
237 gpointer user_data)
238 {
239 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
240 GTask *task;
241
242 1 task = g_task_new (stream, cancellable, callback, user_data);
243 1 g_task_set_priority (task, io_priority);
244
245
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!priv->stream) {
246 ReadAfterSendData *rasd;
247
248 1 rasd = g_new (ReadAfterSendData, 1);
249 1 rasd->buffer = buffer;
250 1 rasd->count = count;
251 1 g_task_set_task_data (task, rasd, g_free);
252
253 1 msg_input_stream_ensure_msg (stream);
254 1 soup_session_send_async (msg_service_get_session (priv->service), priv->msg, G_PRIORITY_DEFAULT,
255 cancellable, read_send_callback, task);
256 1 return;
257 }
258
259 g_input_stream_read_async (priv->stream, buffer, count, io_priority,
260 cancellable, read_callback, task);
261 }
262
263 static gssize
264 1 msg_input_stream_read_finish (GInputStream *stream,
265 GAsyncResult *result,
266 GError **error)
267 {
268
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
269
270 1 return g_task_propagate_int (G_TASK (result), error);
271 }
272
273 static void
274 1 close_callback (GObject *object,
275 GAsyncResult *result,
276 gpointer user_data)
277 {
278 1 GTask *task = user_data;
279 1 GError *error = NULL;
280
281
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (g_input_stream_close_finish (G_INPUT_STREAM (object), result, &error))
282 1 g_task_return_boolean (task, TRUE);
283 else
284 g_task_return_error (task, error);
285 1 g_object_unref (task);
286 1 }
287
288 static void
289 1 msg_input_stream_close_async (GInputStream *stream,
290 int io_priority,
291 GCancellable *cancellable,
292 GAsyncReadyCallback callback,
293 gpointer user_data)
294 {
295 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
296 GTask *task;
297
298 1 task = g_task_new (stream, cancellable, callback, user_data);
299 1 g_task_set_priority (task, io_priority);
300
301
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (priv->stream == NULL){
302 g_task_return_boolean (task, TRUE);
303 return;
304 }
305
306 1 g_input_stream_close_async (priv->stream, io_priority,
307 cancellable, close_callback, task);
308 }
309
310 static gboolean
311 1 msg_input_stream_close_finish (GInputStream *stream,
312 GAsyncResult *result,
313 GError **error)
314 {
315
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
316
317 1 return g_task_propagate_boolean (G_TASK (result), error);
318 }
319
320 static goffset
321 1 msg_input_stream_tell (GSeekable *seekable)
322 {
323 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (seekable)->priv;
324
325 1 return priv->offset;
326 }
327
328 static gboolean
329 1 msg_input_stream_can_seek (__attribute__ ((unused)) GSeekable *seekable)
330 {
331 1 return TRUE;
332 }
333
334 static gboolean
335 2 msg_input_stream_seek (GSeekable *seekable,
336 goffset offset,
337 GSeekType type,
338 __attribute__ ((unused)) GCancellable *cancellable,
339 GError **error)
340 {
341 2 GInputStream *stream = G_INPUT_STREAM (seekable);
342 2 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (seekable)->priv;
343
344
3/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
2 if (type == G_SEEK_END && priv->msg){
345 1 goffset content_length = soup_message_headers_get_content_length (soup_message_get_response_headers (priv->msg));
346
347
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (content_length){
348 1 type = G_SEEK_SET;
349 1 offset = priv->request_offset + content_length + offset;
350 }
351 }
352
353
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (type == G_SEEK_END){
354 /* We could send "bytes=-offset", but since we don't know the
355 * Content-Length, we wouldn't be able to answer a tell()
356 * properly after that. We could maybe find the Content-Length
357 * by doing a HEAD... but that would require blocking.
358 */
359 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
360 "G_SEEK_END not supported");
361 return FALSE;
362 }
363
364
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (!g_input_stream_set_pending (stream, error))
365 return FALSE;
366
367
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (priv->stream){
368
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!g_input_stream_close (priv->stream, NULL, error))
369 return FALSE;
370
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 g_clear_object (&priv->stream);
371 }
372
373
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 g_clear_pointer (&priv->range, g_free);
374
375
2/3
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
2 switch (type){
376 1 case G_SEEK_CUR:
377 1 offset += priv->offset;
378 /* fall through */
379
380 2 case G_SEEK_SET:
381 2 priv->range = g_strdup_printf ("bytes=%"G_GUINT64_FORMAT "-", (guint64)offset);
382 2 priv->request_offset = offset;
383 2 priv->offset = offset;
384 2 break;
385
386 default:
387 g_return_val_if_reached (FALSE);
388 }
389
390 2 g_input_stream_clear_pending (stream);
391 2 return TRUE;
392 }
393
394 static gboolean
395 1 msg_input_stream_can_truncate (__attribute__ ((unused)) GSeekable *seekable)
396 {
397 1 return FALSE;
398 }
399
400 static gboolean
401 1 msg_input_stream_truncate (__attribute__ ((unused)) GSeekable *seekable,
402 __attribute__ ((unused)) goffset offset,
403 __attribute__ ((unused)) GCancellable *cancellable,
404 GError **error)
405 {
406 1 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
407 "Truncate not allowed on input stream");
408 1 return FALSE;
409 }
410
411 /**
412 * msg_input_stream_get_message:
413 * @stream: a #GInputStream
414 *
415 * Gets corresponding SoupMessage
416 *
417 * Returns: (transfer full): a #SoupMessage
418 */
419 SoupMessage *
420 1 msg_input_stream_get_message (GInputStream *stream)
421 {
422 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
423
424 1 msg_input_stream_ensure_msg (stream);
425 1 return g_object_ref (priv->msg);
426 }
427
428 static void
429 2 msg_input_stream_class_init (MsgInputStreamClass *klass)
430 {
431 2 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
432 2 GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
433
434 2 gobject_class->finalize = msg_input_stream_finalize;
435
436 2 stream_class->read_fn = msg_input_stream_read_fn;
437 2 stream_class->read_async = msg_input_stream_read_async;
438 2 stream_class->read_finish = msg_input_stream_read_finish;
439 2 stream_class->close_async = msg_input_stream_close_async;
440 2 stream_class->close_finish = msg_input_stream_close_finish;
441 2 }
442
443 static void
444 2 msg_input_stream_seekable_iface_init (GSeekableIface *seekable_iface)
445 {
446 2 seekable_iface->tell = msg_input_stream_tell;
447 2 seekable_iface->can_seek = msg_input_stream_can_seek;
448 2 seekable_iface->seek = msg_input_stream_seek;
449 2 seekable_iface->can_truncate = msg_input_stream_can_truncate;
450 2 seekable_iface->truncate_fn = msg_input_stream_truncate;
451 2 }
452