GCC Code Coverage Report


Directory: src/
File: src/msg-input-stream.c
Date: 2024-05-18 00:53:33
Exec Total Coverage
Lines: 138 175 78.9%
Functions: 23 24 95.8%
Branches: 38 70 54.3%

Line Branch Exec Source
1 /* Copyright 2022-2024 Jan-Michael Brummer
2 *
3 * This program is free software: you can redistribute it and/or modify
4 * it under the terms of the GNU Lesser General Public License as published by
5 * the Free Software Foundation, either version 3 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 */
16
17 #include <config.h>
18
19 #include <string.h>
20
21 #include <glib.h>
22 #include <glib/gi18n.h>
23 #include <gio/gio.h>
24
25 #include <libsoup/soup.h>
26
27 #include "msg-input-stream.h"
28 #include "msg-service.h"
29
30 static void msg_input_stream_seekable_iface_init (GSeekableIface *seekable_iface);
31
32 struct MsgInputStreamPrivate {
33 char *uri;
34 MsgService *service;
35 SoupMessage *msg;
36 GInputStream *stream;
37
38 char *range;
39 goffset request_offset;
40 goffset offset;
41 };
42
43
5/7
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 14 times.
44 G_DEFINE_TYPE_WITH_CODE (MsgInputStream, msg_input_stream, G_TYPE_INPUT_STREAM,
44 G_ADD_PRIVATE (MsgInputStream)
45 G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
46 msg_input_stream_seekable_iface_init))
47
48 static void
49 2 msg_input_stream_init (MsgInputStream *stream)
50 {
51 2 stream->priv = msg_input_stream_get_instance_private (stream);
52 2 }
53
54 static void
55 2 msg_input_stream_finalize (GObject *object)
56 {
57 2 MsgInputStream *stream = MSG_INPUT_STREAM (object);
58 2 MsgInputStreamPrivate *priv = stream->priv;
59
60
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 g_clear_pointer (&priv->uri, g_free);
61
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 g_clear_object (&priv->service);
62
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 g_clear_object (&priv->msg);
63
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 g_clear_object (&priv->stream);
64 2 g_free (priv->range);
65
66 2 G_OBJECT_CLASS (msg_input_stream_parent_class)->finalize (object);
67 2 }
68
69 /**
70 * msg_input_stream_new:
71 * @service: a #MsgService
72 * @uri: a #GUri
73 *
74 * Prepares to send a GET request for @uri on @session, and returns a
75 * #GInputStream that can be used to read the response.
76 *
77 * The request will not be sent until the first read call; if you need
78 * to look at the status code or response headers before reading the
79 * body, you can use msg_input_stream_send() or
80 * msg_input_stream_send_async() to force the message to be
81 * sent and the response headers read.
82 *
83 * Returns: (transfer full): a new #GInputStream.
84 **/
85 GInputStream *
86 2 msg_input_stream_new (MsgService *service,
87 char *uri)
88 {
89 MsgInputStream *stream;
90 MsgInputStreamPrivate *priv;
91
92 2 stream = g_object_new (MSG_TYPE_INPUT_STREAM, NULL);
93 2 priv = stream->priv;
94
95 2 priv->service = g_object_ref (service);
96 2 priv->uri = g_strdup (uri);
97
98 2 return G_INPUT_STREAM (stream);
99 }
100
101 static SoupMessage *
102 3 msg_input_stream_ensure_msg (GInputStream *stream)
103 {
104 3 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
105
106
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 times.
3 if (!priv->msg){
107 2 priv->msg = msg_service_build_message (MSG_SERVICE (priv->service), "GET", priv->uri, NULL, FALSE);
108 2 msg_authorizer_process_request (msg_service_get_authorizer (priv->service), priv->msg);
109
110 2 priv->offset = 0;
111 }
112
113
114
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 times.
3 if (priv->range)
115 2 soup_message_headers_replace (soup_message_get_request_headers (priv->msg),
116 2 "Range", priv->range);
117
118 3 return priv->msg;
119 }
120
121 static void
122 read_callback (GObject *object,
123 GAsyncResult *result,
124 gpointer user_data)
125 {
126 GTask *task = user_data;
127 GInputStream *vfsstream = g_task_get_source_object (task);
128 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (vfsstream)->priv;
129 GError *error = NULL;
130 gssize nread;
131
132 nread = g_input_stream_read_finish (G_INPUT_STREAM (object), result, &error);
133 if (nread >= 0){
134 priv->offset += nread;
135 g_task_return_int (task, nread);
136 } else
137 g_task_return_error (task, error);
138 g_object_unref (task);
139 }
140
141 typedef struct {
142 gpointer buffer;
143 gsize count;
144 } ReadAfterSendData;
145
146 static void
147 1 read_send_callback (GObject *object,
148 GAsyncResult *result,
149 gpointer user_data)
150 {
151 1 GTask *task = user_data;
152 1 GInputStream *vfsstream = g_task_get_source_object (task);
153 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (vfsstream)->priv;
154 1 ReadAfterSendData *rasd = g_task_get_task_data (task);
155 1 GError *error = NULL;
156
157 1 priv->stream = soup_session_send_finish (SOUP_SESSION (object), result, &error);
158
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!priv->stream){
159 g_task_return_error (task, error);
160 g_object_unref (task);
161 1 return;
162 }
163
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 if (!SOUP_STATUS_IS_SUCCESSFUL (soup_message_get_status (priv->msg))){
164
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (soup_message_get_status (priv->msg) == SOUP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE){
165 g_input_stream_close (priv->stream, NULL, NULL);
166 g_task_return_int (task, 0);
167 g_clear_object (&priv->stream);
168 g_object_unref (task);
169 return;
170 }
171 2 g_task_return_new_error (task,
172 G_IO_ERROR,
173 1 soup_message_get_status (priv->msg),
174 1 _("HTTP Error: %s"), soup_message_get_reason_phrase (priv->msg));
175 1 g_object_unref (task);
176 1 return;
177 }
178 if (priv->range){
179 gboolean status;
180 goffset start, end;
181
182 status = soup_message_headers_get_content_range (soup_message_get_response_headers (priv->msg),
183 &start, &end, NULL);
184 if (!status || start != priv->request_offset){
185 g_task_return_new_error (task,
186 G_IO_ERROR,
187 G_IO_ERROR_FAILED,
188 _("Error seeking in stream"));
189 g_object_unref (task);
190 return;
191 }
192 }
193
194 g_input_stream_read_async (priv->stream, rasd->buffer, rasd->count,
195 g_task_get_priority (task),
196 g_task_get_cancellable (task),
197 read_callback, task);
198 }
199
200 static gssize
201 1 msg_input_stream_read_fn (GInputStream *stream,
202 void *buffer,
203 gsize count,
204 GCancellable *cancellable,
205 GError **error)
206 {
207 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
208
209
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!priv->stream) {
210 1 msg_input_stream_ensure_msg (stream);
211 1 priv->stream = soup_session_send (msg_service_get_session (priv->service), priv->msg, cancellable, error);
212 }
213
214 1 return g_input_stream_read (priv->stream, buffer, count, cancellable, error);
215 }
216
217 static void
218 1 msg_input_stream_read_async (GInputStream *stream,
219 void *buffer,
220 gsize count,
221 int io_priority,
222 GCancellable *cancellable,
223 GAsyncReadyCallback callback,
224 gpointer user_data)
225 {
226 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
227 GTask *task;
228
229 1 task = g_task_new (stream, cancellable, callback, user_data);
230 1 g_task_set_priority (task, io_priority);
231
232
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!priv->stream) {
233 ReadAfterSendData *rasd;
234
235 1 rasd = g_new (ReadAfterSendData, 1);
236 1 rasd->buffer = buffer;
237 1 rasd->count = count;
238 1 g_task_set_task_data (task, rasd, g_free);
239
240 1 msg_input_stream_ensure_msg (stream);
241 1 soup_session_send_async (msg_service_get_session (priv->service), priv->msg, G_PRIORITY_DEFAULT,
242 cancellable, read_send_callback, task);
243 1 return;
244 }
245
246 g_input_stream_read_async (priv->stream, buffer, count, io_priority,
247 cancellable, read_callback, task);
248 }
249
250 static gssize
251 1 msg_input_stream_read_finish (GInputStream *stream,
252 GAsyncResult *result,
253 GError **error)
254 {
255
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
256
257 1 return g_task_propagate_int (G_TASK (result), error);
258 }
259
260 static void
261 1 close_callback (GObject *object,
262 GAsyncResult *result,
263 gpointer user_data)
264 {
265 1 GTask *task = user_data;
266 1 GError *error = NULL;
267
268
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 if (g_input_stream_close_finish (G_INPUT_STREAM (object), result, &error))
269 1 g_task_return_boolean (task, TRUE);
270 else
271 g_task_return_error (task, error);
272 1 g_object_unref (task);
273 1 }
274
275 static void
276 1 msg_input_stream_close_async (GInputStream *stream,
277 int io_priority,
278 GCancellable *cancellable,
279 GAsyncReadyCallback callback,
280 gpointer user_data)
281 {
282 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
283 GTask *task;
284
285 1 task = g_task_new (stream, cancellable, callback, user_data);
286 1 g_task_set_priority (task, io_priority);
287
288
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (priv->stream == NULL){
289 g_task_return_boolean (task, TRUE);
290 return;
291 }
292
293 1 g_input_stream_close_async (priv->stream, io_priority,
294 cancellable, close_callback, task);
295 }
296
297 static gboolean
298 1 msg_input_stream_close_finish (GInputStream *stream,
299 GAsyncResult *result,
300 GError **error)
301 {
302
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
303
304 1 return g_task_propagate_boolean (G_TASK (result), error);
305 }
306
307 static goffset
308 1 msg_input_stream_tell (GSeekable *seekable)
309 {
310 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (seekable)->priv;
311
312 1 return priv->offset;
313 }
314
315 static gboolean
316 1 msg_input_stream_can_seek (__attribute__ ((unused)) GSeekable *seekable)
317 {
318 1 return TRUE;
319 }
320
321 static gboolean
322 2 msg_input_stream_seek (GSeekable *seekable,
323 goffset offset,
324 GSeekType type,
325 __attribute__ ((unused)) GCancellable *cancellable,
326 GError **error)
327 {
328 2 GInputStream *stream = G_INPUT_STREAM (seekable);
329 2 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (seekable)->priv;
330
331
3/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
2 if (type == G_SEEK_END && priv->msg){
332 1 goffset content_length = soup_message_headers_get_content_length (soup_message_get_response_headers (priv->msg));
333
334
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (content_length){
335 1 type = G_SEEK_SET;
336 1 offset = priv->request_offset + content_length + offset;
337 }
338 }
339
340
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (type == G_SEEK_END){
341 /* We could send "bytes=-offset", but since we don't know the
342 * Content-Length, we wouldn't be able to answer a tell()
343 * properly after that. We could maybe find the Content-Length
344 * by doing a HEAD... but that would require blocking.
345 */
346 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
347 "G_SEEK_END not supported");
348 return FALSE;
349 }
350
351
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (!g_input_stream_set_pending (stream, error))
352 return FALSE;
353
354
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (priv->stream){
355
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 if (!g_input_stream_close (priv->stream, NULL, error))
356 return FALSE;
357
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 g_clear_object (&priv->stream);
358 }
359
360
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 g_clear_pointer (&priv->range, g_free);
361
362
2/3
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
2 switch (type){
363 1 case G_SEEK_CUR:
364 1 offset += priv->offset;
365 /* fall through */
366
367 2 case G_SEEK_SET:
368 2 priv->range = g_strdup_printf ("bytes=%"G_GUINT64_FORMAT "-", (guint64)offset);
369 2 priv->request_offset = offset;
370 2 priv->offset = offset;
371 2 break;
372
373 default:
374 g_return_val_if_reached (FALSE);
375 }
376
377 2 g_input_stream_clear_pending (stream);
378 2 return TRUE;
379 }
380
381 static gboolean
382 1 msg_input_stream_can_truncate (__attribute__ ((unused)) GSeekable *seekable)
383 {
384 1 return FALSE;
385 }
386
387 static gboolean
388 1 msg_input_stream_truncate (__attribute__ ((unused)) GSeekable *seekable,
389 __attribute__ ((unused)) goffset offset,
390 __attribute__ ((unused)) GCancellable *cancellable,
391 GError **error)
392 {
393 1 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
394 "Truncate not allowed on input stream");
395 1 return FALSE;
396 }
397
398 /**
399 * msg_input_stream_get_message:
400 * @stream: a #GInputStream
401 *
402 * Gets corresponding SoupMessage
403 *
404 * Returns: (transfer full): a #SoupMessage
405 */
406 SoupMessage *
407 1 msg_input_stream_get_message (GInputStream *stream)
408 {
409 1 MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv;
410
411 1 msg_input_stream_ensure_msg (stream);
412 1 return g_object_ref (priv->msg);
413 }
414
415 static void
416 2 msg_input_stream_class_init (MsgInputStreamClass *klass)
417 {
418 2 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
419 2 GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
420
421 2 gobject_class->finalize = msg_input_stream_finalize;
422
423 2 stream_class->read_fn = msg_input_stream_read_fn;
424 2 stream_class->read_async = msg_input_stream_read_async;
425 2 stream_class->read_finish = msg_input_stream_read_finish;
426 2 stream_class->close_async = msg_input_stream_close_async;
427 2 stream_class->close_finish = msg_input_stream_close_finish;
428 2 }
429
430 static void
431 2 msg_input_stream_seekable_iface_init (GSeekableIface *seekable_iface)
432 {
433 2 seekable_iface->tell = msg_input_stream_tell;
434 2 seekable_iface->can_seek = msg_input_stream_can_seek;
435 2 seekable_iface->seek = msg_input_stream_seek;
436 2 seekable_iface->can_truncate = msg_input_stream_can_truncate;
437 2 seekable_iface->truncate_fn = msg_input_stream_truncate;
438 2 }
439