GCC Code Coverage Report


Directory: src/
File: src/msg-input-stream.c
Date: 2024-07-13 00:54:47
Exec Total Coverage
Lines: 139 180 77.2%
Functions: 23 25 92.0%
Branches: 38 70 54.3%

Line Branch Exec Source
1 /* Copyright 2022-2024 Jan-Michael Brummer
2 *
3 * This program is free software: you can redistribute it and/or modify
4 * it under the terms of the GNU Lesser General Public License as published by
5 * the Free Software Foundation, either version 3 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 */
16
17 #include <config.h>
18
19 #include <string.h>
20
21 #include <glib.h>
22 #include <glib/gi18n.h>
23 #include <gio/gio.h>
24
25 #include <libsoup/soup.h>
26
27 #include "msg-input-stream.h"
28 #include "msg-service.h"
29
30 static void msg_input_stream_seekable_iface_init (GSeekableIface *seekable_iface);
31
32 struct MsgInputStreamPrivate {
33 char *uri;
34 MsgService *service;
35 SoupMessage *msg;
36 GInputStream *stream;
37
38 char *range;
39 goffset request_offset;
40 goffset offset;
41 };
42
43
5/7
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 14 times.
44 G_DEFINE_TYPE_WITH_CODE (MsgInputStream, msg_input_stream, G_TYPE_INPUT_STREAM,
44 G_ADD_PRIVATE (MsgInputStream)
45 G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
46 msg_input_stream_seekable_iface_init))
47
48 static void
49 2 msg_input_stream_init (MsgInputStream *stream)
50 {
51 2 stream->priv = msg_input_stream_get_instance_private (stream);
52 2 }
53
54 static void
55 2 msg_input_stream_finalize (GObject *object)
56 {
57 2 MsgInputStream *stream = MSG_INPUT_STREAM (object);
58 2 MsgInputStreamPrivate *priv = stream->priv;
59
60
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 g_clear_pointer (&priv->uri, g_free);
61
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 g_clear_object (&priv->service);
62
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 g_clear_object (&priv->msg);
63
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 g_clear_object (&priv->stream);
64 2 g_free (priv->range);
65
66 2 G_OBJECT_CLASS (msg_input_stream_parent_class)->finalize (object);
67 2 }
68
69 /**
70 * msg_input_stream_new:
71 * @service: a #MsgService
72 * @uri: a #GUri
73 *
74 * Prepares to send a GET request for @uri on @session, and returns a
75 * #GInputStream that can be used to read the response.
76 *
77 * The request will not be sent until the first read call; if you need
78 * to look at the status code or response headers before reading the
79 * body, you can use msg_input_stream_send() or
80 * msg_input_stream_send_async() to force the message to be
81 * sent and the response headers read.
82 *
83 * Returns: (transfer full): a new #GInputStream.
84 **/
85 GInputStream *
86 2 msg_input_stream_new (MsgService *service,
87 char *uri)
88 {
89 MsgInputStream *stream;
90 MsgInputStreamPrivate *priv;
91
92 2 stream = g_object_new (MSG_TYPE_INPUT_STREAM, NULL);
93 2 priv = stream->priv;
94
95 2 priv->service = g_object_ref (service);
96 2 priv->uri = g_strdup (uri);
97
98 2 return G_INPUT_STREAM (stream);
99 }
100
101 static void
102 on_restarted (SoupMessage *msg,
103 __attribute__ ((unused)) gpointer user_data)
104 {
105 SoupMessageHeaders *headers = soup_message_get_request_headers (msg);
106 soup_message_headers_remove (headers, "Authorization");
107 }
108
109 static SoupMessage *
110 3 msg_input_stream_ensure_msg (GInputStream *stream)
111 {
112 3 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
113
114
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 times.
3 if (!priv->msg){
115 2 priv->msg = msg_service_build_message (MSG_SERVICE (priv->service), "GET", priv->uri, NULL, FALSE);
116 2 g_signal_connect (G_OBJECT (priv->msg), "restarted", G_CALLBACK (on_restarted), NULL);
117 2 msg_authorizer_process_request (msg_service_get_authorizer (priv->service), priv->msg);
118
119 2 priv->offset = 0;
120 }
121
122
123
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 times.
3 if (priv->range)
124 2 soup_message_headers_replace (soup_message_get_request_headers (priv->msg),
125 2 "Range", priv->range);
126
127 3 return priv->msg;
128 }
129
130 static void
131 read_callback (GObject *object,
132 GAsyncResult *result,
133 gpointer user_data)
134 {
135 GTask *task = user_data;
136 GInputStream *vfsstream = g_task_get_source_object (task);
137 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (vfsstream)->priv;
138 GError *error = NULL;
139 gssize nread;
140
141 nread = g_input_stream_read_finish (G_INPUT_STREAM (object), result, &error);
142 if (nread >= 0){
143 priv->offset += nread;
144 g_task_return_int (task, nread);
145 } else
146 g_task_return_error (task, error);
147 g_object_unref (task);
148 }
149
150 typedef struct {
151 gpointer buffer;
152 gsize count;
153 } ReadAfterSendData;
154
155 static void
156 1 read_send_callback (GObject *object,
157 GAsyncResult *result,
158 gpointer user_data)
159 {
160 1 GTask *task = user_data;
161 1 GInputStream *vfsstream = g_task_get_source_object (task);
162 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (vfsstream)->priv;
163 1 ReadAfterSendData *rasd = g_task_get_task_data (task);
164 1 GError *error = NULL;
165
166 1 priv->stream = soup_session_send_finish (SOUP_SESSION (object), result, &error);
167
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!priv->stream){
168 g_task_return_error (task, error);
169 g_object_unref (task);
170 1 return;
171 }
172
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 if (!SOUP_STATUS_IS_SUCCESSFUL (soup_message_get_status (priv->msg))){
173
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (soup_message_get_status (priv->msg) == SOUP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE){
174 g_input_stream_close (priv->stream, NULL, NULL);
175 g_task_return_int (task, 0);
176 g_clear_object (&priv->stream);
177 g_object_unref (task);
178 return;
179 }
180 2 g_task_return_new_error (task,
181 G_IO_ERROR,
182 1 soup_message_get_status (priv->msg),
183 1 _("HTTP Error: %s"), soup_message_get_reason_phrase (priv->msg));
184 1 g_object_unref (task);
185 1 return;
186 }
187 if (priv->range){
188 gboolean status;
189 goffset start, end;
190
191 status = soup_message_headers_get_content_range (soup_message_get_response_headers (priv->msg),
192 &start, &end, NULL);
193 if (!status || start != priv->request_offset){
194 g_task_return_new_error (task,
195 G_IO_ERROR,
196 G_IO_ERROR_FAILED,
197 _("Error seeking in stream"));
198 g_object_unref (task);
199 return;
200 }
201 }
202
203 g_input_stream_read_async (priv->stream, rasd->buffer, rasd->count,
204 g_task_get_priority (task),
205 g_task_get_cancellable (task),
206 read_callback, task);
207 }
208
209 static gssize
210 1 msg_input_stream_read_fn (GInputStream *stream,
211 void *buffer,
212 gsize count,
213 GCancellable *cancellable,
214 GError **error)
215 {
216 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
217
218
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!priv->stream) {
219 1 msg_input_stream_ensure_msg (stream);
220 1 priv->stream = soup_session_send (msg_service_get_session (priv->service), priv->msg, cancellable, error);
221 }
222
223 1 return g_input_stream_read (priv->stream, buffer, count, cancellable, error);
224 }
225
226 static void
227 1 msg_input_stream_read_async (GInputStream *stream,
228 void *buffer,
229 gsize count,
230 int io_priority,
231 GCancellable *cancellable,
232 GAsyncReadyCallback callback,
233 gpointer user_data)
234 {
235 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
236 GTask *task;
237
238 1 task = g_task_new (stream, cancellable, callback, user_data);
239 1 g_task_set_priority (task, io_priority);
240
241
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!priv->stream) {
242 ReadAfterSendData *rasd;
243
244 1 rasd = g_new (ReadAfterSendData, 1);
245 1 rasd->buffer = buffer;
246 1 rasd->count = count;
247 1 g_task_set_task_data (task, rasd, g_free);
248
249 1 msg_input_stream_ensure_msg (stream);
250 1 soup_session_send_async (msg_service_get_session (priv->service), priv->msg, G_PRIORITY_DEFAULT,
251 cancellable, read_send_callback, task);
252 1 return;
253 }
254
255 g_input_stream_read_async (priv->stream, buffer, count, io_priority,
256 cancellable, read_callback, task);
257 }
258
259 static gssize
260 1 msg_input_stream_read_finish (GInputStream *stream,
261 GAsyncResult *result,
262 GError **error)
263 {
264
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
265
266 1 return g_task_propagate_int (G_TASK (result), error);
267 }
268
269 static void
270 1 close_callback (GObject *object,
271 GAsyncResult *result,
272 gpointer user_data)
273 {
274 1 GTask *task = user_data;
275 1 GError *error = NULL;
276
277
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 if (g_input_stream_close_finish (G_INPUT_STREAM (object), result, &error))
278 1 g_task_return_boolean (task, TRUE);
279 else
280 g_task_return_error (task, error);
281 1 g_object_unref (task);
282 1 }
283
284 static void
285 1 msg_input_stream_close_async (GInputStream *stream,
286 int io_priority,
287 GCancellable *cancellable,
288 GAsyncReadyCallback callback,
289 gpointer user_data)
290 {
291 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
292 GTask *task;
293
294 1 task = g_task_new (stream, cancellable, callback, user_data);
295 1 g_task_set_priority (task, io_priority);
296
297
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (priv->stream == NULL){
298 g_task_return_boolean (task, TRUE);
299 return;
300 }
301
302 1 g_input_stream_close_async (priv->stream, io_priority,
303 cancellable, close_callback, task);
304 }
305
306 static gboolean
307 1 msg_input_stream_close_finish (GInputStream *stream,
308 GAsyncResult *result,
309 GError **error)
310 {
311
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
312
313 1 return g_task_propagate_boolean (G_TASK (result), error);
314 }
315
316 static goffset
317 1 msg_input_stream_tell (GSeekable *seekable)
318 {
319 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (seekable)->priv;
320
321 1 return priv->offset;
322 }
323
324 static gboolean
325 1 msg_input_stream_can_seek (__attribute__ ((unused)) GSeekable *seekable)
326 {
327 1 return TRUE;
328 }
329
330 static gboolean
331 2 msg_input_stream_seek (GSeekable *seekable,
332 goffset offset,
333 GSeekType type,
334 __attribute__ ((unused)) GCancellable *cancellable,
335 GError **error)
336 {
337 2 GInputStream *stream = G_INPUT_STREAM (seekable);
338 2 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (seekable)->priv;
339
340
3/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
2 if (type == G_SEEK_END && priv->msg){
341 1 goffset content_length = soup_message_headers_get_content_length (soup_message_get_response_headers (priv->msg));
342
343
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (content_length){
344 1 type = G_SEEK_SET;
345 1 offset = priv->request_offset + content_length + offset;
346 }
347 }
348
349
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (type == G_SEEK_END){
350 /* We could send "bytes=-offset", but since we don't know the
351 * Content-Length, we wouldn't be able to answer a tell()
352 * properly after that. We could maybe find the Content-Length
353 * by doing a HEAD... but that would require blocking.
354 */
355 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
356 "G_SEEK_END not supported");
357 return FALSE;
358 }
359
360
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (!g_input_stream_set_pending (stream, error))
361 return FALSE;
362
363
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (priv->stream){
364
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (!g_input_stream_close (priv->stream, NULL, error))
365 return FALSE;
366
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 g_clear_object (&priv->stream);
367 }
368
369
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 g_clear_pointer (&priv->range, g_free);
370
371
2/3
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
2 switch (type){
372 1 case G_SEEK_CUR:
373 1 offset += priv->offset;
374 /* fall through */
375
376 2 case G_SEEK_SET:
377 2 priv->range = g_strdup_printf ("bytes=%"G_GUINT64_FORMAT "-", (guint64)offset);
378 2 priv->request_offset = offset;
379 2 priv->offset = offset;
380 2 break;
381
382 default:
383 g_return_val_if_reached (FALSE);
384 }
385
386 2 g_input_stream_clear_pending (stream);
387 2 return TRUE;
388 }
389
390 static gboolean
391 1 msg_input_stream_can_truncate (__attribute__ ((unused)) GSeekable *seekable)
392 {
393 1 return FALSE;
394 }
395
396 static gboolean
397 1 msg_input_stream_truncate (__attribute__ ((unused)) GSeekable *seekable,
398 __attribute__ ((unused)) goffset offset,
399 __attribute__ ((unused)) GCancellable *cancellable,
400 GError **error)
401 {
402 1 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
403 "Truncate not allowed on input stream");
404 1 return FALSE;
405 }
406
407 /**
408 * msg_input_stream_get_message:
409 * @stream: a #GInputStream
410 *
411 * Gets corresponding SoupMessage
412 *
413 * Returns: (transfer full): a #SoupMessage
414 */
415 SoupMessage *
416 1 msg_input_stream_get_message (GInputStream *stream)
417 {
418 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
419
420 1 msg_input_stream_ensure_msg (stream);
421 1 return g_object_ref (priv->msg);
422 }
423
424 static void
425 2 msg_input_stream_class_init (MsgInputStreamClass *klass)
426 {
427 2 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
428 2 GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
429
430 2 gobject_class->finalize = msg_input_stream_finalize;
431
432 2 stream_class->read_fn = msg_input_stream_read_fn;
433 2 stream_class->read_async = msg_input_stream_read_async;
434 2 stream_class->read_finish = msg_input_stream_read_finish;
435 2 stream_class->close_async = msg_input_stream_close_async;
436 2 stream_class->close_finish = msg_input_stream_close_finish;
437 2 }
438
439 static void
440 2 msg_input_stream_seekable_iface_init (GSeekableIface *seekable_iface)
441 {
442 2 seekable_iface->tell = msg_input_stream_tell;
443 2 seekable_iface->can_seek = msg_input_stream_can_seek;
444 2 seekable_iface->seek = msg_input_stream_seek;
445 2 seekable_iface->can_truncate = msg_input_stream_can_truncate;
446 2 seekable_iface->truncate_fn = msg_input_stream_truncate;
447 2 }
448