Line | Branch | Exec | Source |
---|---|---|---|
1 | /* Copyright 2022-2024 Jan-Michael Brummer | ||
2 | * | ||
3 | * This program is free software: you can redistribute it and/or modify | ||
4 | * it under the terms of the GNU Lesser General Public License as published by | ||
5 | * the Free Software Foundation, either version 3 of the License, or | ||
6 | * (at your option) any later version. | ||
7 | * | ||
8 | * This program is distributed in the hope that it will be useful, | ||
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
11 | * GNU Lesser General Public License for more details. | ||
12 | * | ||
13 | * You should have received a copy of the GNU Lesser General Public License | ||
14 | * along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
15 | */ | ||
16 | |||
17 | #include <config.h> | ||
18 | |||
19 | #include <string.h> | ||
20 | |||
21 | #include <glib.h> | ||
22 | #include <glib/gi18n.h> | ||
23 | #include <gio/gio.h> | ||
24 | |||
25 | #include <libsoup/soup.h> | ||
26 | |||
27 | #include "msg-input-stream.h" | ||
28 | #include "msg-service.h" | ||
29 | |||
30 | static void msg_input_stream_seekable_iface_init (GSeekableIface *seekable_iface); | ||
31 | |||
32 | struct MsgInputStreamPrivate { | ||
33 | char *uri; | ||
34 | MsgService *service; | ||
35 | SoupMessage *msg; | ||
36 | GInputStream *stream; | ||
37 | |||
38 | char *range; | ||
39 | goffset request_offset; | ||
40 | goffset offset; | ||
41 | }; | ||
42 | |||
43 |
5/6✓ Branch 0 taken 4 times.
✓ Branch 1 taken 14 times.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
✓ Branch 5 taken 14 times.
|
22 | G_DEFINE_TYPE_WITH_CODE (MsgInputStream, msg_input_stream, G_TYPE_INPUT_STREAM, |
44 | G_ADD_PRIVATE (MsgInputStream) | ||
45 | G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE, | ||
46 | msg_input_stream_seekable_iface_init)) | ||
47 | |||
48 | static void | ||
49 | 2 | msg_input_stream_init (MsgInputStream *stream) | |
50 | { | ||
51 | 2 | stream->priv = msg_input_stream_get_instance_private (stream); | |
52 | 2 | } | |
53 | |||
54 | static void | ||
55 | 2 | msg_input_stream_finalize (GObject *object) | |
56 | { | ||
57 | 2 | MsgInputStream *stream = MSG_INPUT_STREAM (object); | |
58 | 2 | MsgInputStreamPrivate *priv = stream->priv; | |
59 | |||
60 |
1/2✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
|
2 | g_clear_pointer (&priv->uri, g_free); |
61 |
1/2✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
|
2 | g_clear_object (&priv->service); |
62 |
1/2✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
|
2 | g_clear_object (&priv->msg); |
63 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | g_clear_object (&priv->stream); |
64 | 2 | g_free (priv->range); | |
65 | |||
66 | 2 | G_OBJECT_CLASS (msg_input_stream_parent_class)->finalize (object); | |
67 | 2 | } | |
68 | |||
69 | /** | ||
70 | * msg_input_stream_new: | ||
71 | * @service: a #MsgService | ||
72 | * @uri: a #GUri | ||
73 | * | ||
74 | * Prepares to send a GET request for @uri on @session, and returns a | ||
75 | * #GInputStream that can be used to read the response. | ||
76 | * | ||
77 | * The request will not be sent until the first read call; if you need | ||
78 | * to look at the status code or response headers before reading the | ||
79 | * body, you can use msg_input_stream_send() or | ||
80 | * msg_input_stream_send_async() to force the message to be | ||
81 | * sent and the response headers read. | ||
82 | * | ||
83 | * Returns: (transfer full): a new #GInputStream. | ||
84 | **/ | ||
85 | GInputStream * | ||
86 | 2 | msg_input_stream_new (MsgService *service, | |
87 | char *uri) | ||
88 | { | ||
89 | MsgInputStream *stream; | ||
90 | MsgInputStreamPrivate *priv; | ||
91 | |||
92 | 2 | stream = g_object_new (MSG_TYPE_INPUT_STREAM, NULL); | |
93 | 2 | priv = stream->priv; | |
94 | |||
95 | 2 | priv->service = g_object_ref (service); | |
96 | 2 | priv->uri = g_strdup (uri); | |
97 | |||
98 | 2 | return G_INPUT_STREAM (stream); | |
99 | } | ||
100 | |||
101 | static void | ||
102 | ✗ | on_restarted (SoupMessage *msg, | |
103 | __attribute__ ((unused)) gpointer user_data) | ||
104 | { | ||
105 | ✗ | SoupMessageHeaders *headers = soup_message_get_request_headers (msg); | |
106 | ✗ | soup_message_headers_remove (headers, "Authorization"); | |
107 | ✗ | } | |
108 | |||
109 | static SoupMessage * | ||
110 | 3 | msg_input_stream_ensure_msg (GInputStream *stream) | |
111 | { | ||
112 | 3 | MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv; | |
113 | |||
114 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 times.
|
3 | if (!priv->msg){ |
115 | 2 | priv->msg = msg_service_build_message (MSG_SERVICE (priv->service), "GET", priv->uri, NULL, FALSE); | |
116 | 2 | g_signal_connect (G_OBJECT (priv->msg), "restarted", G_CALLBACK (on_restarted), NULL); | |
117 | 2 | msg_authorizer_process_request (msg_service_get_authorizer (priv->service), priv->msg); | |
118 | |||
119 | 2 | priv->offset = 0; | |
120 | } | ||
121 | |||
122 | |||
123 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 times.
|
3 | if (priv->range) |
124 | 2 | soup_message_headers_replace (soup_message_get_request_headers (priv->msg), | |
125 | 2 | "Range", priv->range); | |
126 | |||
127 | 3 | return priv->msg; | |
128 | } | ||
129 | |||
130 | static void | ||
131 | ✗ | read_callback (GObject *object, | |
132 | GAsyncResult *result, | ||
133 | gpointer user_data) | ||
134 | { | ||
135 | ✗ | GTask *task = user_data; | |
136 | ✗ | GInputStream *vfsstream = g_task_get_source_object (task); | |
137 | ✗ | MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (vfsstream)->priv; | |
138 | ✗ | GError *error = NULL; | |
139 | gssize nread; | ||
140 | |||
141 | ✗ | nread = g_input_stream_read_finish (G_INPUT_STREAM (object), result, &error); | |
142 | ✗ | if (nread >= 0){ | |
143 | ✗ | priv->offset += nread; | |
144 | ✗ | g_task_return_int (task, nread); | |
145 | } else | ||
146 | ✗ | g_task_return_error (task, error); | |
147 | ✗ | g_object_unref (task); | |
148 | ✗ | } | |
149 | |||
150 | typedef struct { | ||
151 | gpointer buffer; | ||
152 | gsize count; | ||
153 | } ReadAfterSendData; | ||
154 | |||
155 | static void | ||
156 | 1 | read_send_callback (GObject *object, | |
157 | GAsyncResult *result, | ||
158 | gpointer user_data) | ||
159 | { | ||
160 | 1 | GTask *task = user_data; | |
161 | 1 | GInputStream *vfsstream = g_task_get_source_object (task); | |
162 | 1 | MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (vfsstream)->priv; | |
163 | 1 | ReadAfterSendData *rasd = g_task_get_task_data (task); | |
164 | 1 | GError *error = NULL; | |
165 | |||
166 | 1 | priv->stream = soup_session_send_finish (SOUP_SESSION (object), result, &error); | |
167 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (!priv->stream){ |
168 | ✗ | g_task_return_error (task, error); | |
169 | ✗ | g_object_unref (task); | |
170 | 1 | return; | |
171 | } | ||
172 |
2/4✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | if (!SOUP_STATUS_IS_SUCCESSFUL (soup_message_get_status (priv->msg))){ |
173 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (soup_message_get_status (priv->msg) == SOUP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE){ |
174 | ✗ | g_input_stream_close (priv->stream, NULL, NULL); | |
175 | ✗ | g_task_return_int (task, 0); | |
176 | ✗ | g_clear_object (&priv->stream); | |
177 | ✗ | g_object_unref (task); | |
178 | ✗ | return; | |
179 | } | ||
180 | 2 | g_task_return_new_error (task, | |
181 | G_IO_ERROR, | ||
182 | 1 | soup_message_get_status (priv->msg), | |
183 | 1 | _("HTTP Error: %s"), soup_message_get_reason_phrase (priv->msg)); | |
184 | 1 | g_object_unref (task); | |
185 | 1 | return; | |
186 | } | ||
187 | ✗ | if (priv->range){ | |
188 | gboolean status; | ||
189 | goffset start, end; | ||
190 | |||
191 | ✗ | status = soup_message_headers_get_content_range (soup_message_get_response_headers (priv->msg), | |
192 | &start, &end, NULL); | ||
193 | ✗ | if (!status || start != priv->request_offset){ | |
194 | ✗ | g_task_return_new_error (task, | |
195 | G_IO_ERROR, | ||
196 | G_IO_ERROR_FAILED, | ||
197 | ✗ | _("Error seeking in stream")); | |
198 | ✗ | g_object_unref (task); | |
199 | ✗ | return; | |
200 | } | ||
201 | } | ||
202 | |||
203 | ✗ | g_input_stream_read_async (priv->stream, rasd->buffer, rasd->count, | |
204 | g_task_get_priority (task), | ||
205 | g_task_get_cancellable (task), | ||
206 | read_callback, task); | ||
207 | } | ||
208 | |||
209 | static gssize | ||
210 | 1 | msg_input_stream_read_fn (GInputStream *stream, | |
211 | void *buffer, | ||
212 | gsize count, | ||
213 | GCancellable *cancellable, | ||
214 | GError **error) | ||
215 | { | ||
216 | 1 | MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv; | |
217 | |||
218 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | if (!priv->stream) { |
219 | 1 | msg_input_stream_ensure_msg (stream); | |
220 | |||
221 | 1 | retry: | |
222 | 1 | priv->stream = soup_session_send (msg_service_get_session (priv->service), priv->msg, cancellable, error); | |
223 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (msg_service_handle_rate_limiting (priv->msg)) |
224 | ✗ | goto retry; | |
225 | } | ||
226 | |||
227 | 1 | return g_input_stream_read (priv->stream, buffer, count, cancellable, error); | |
228 | } | ||
229 | |||
230 | static void | ||
231 | 1 | msg_input_stream_read_async (GInputStream *stream, | |
232 | void *buffer, | ||
233 | gsize count, | ||
234 | int io_priority, | ||
235 | GCancellable *cancellable, | ||
236 | GAsyncReadyCallback callback, | ||
237 | gpointer user_data) | ||
238 | { | ||
239 | 1 | MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv; | |
240 | GTask *task; | ||
241 | |||
242 | 1 | task = g_task_new (stream, cancellable, callback, user_data); | |
243 | 1 | g_task_set_priority (task, io_priority); | |
244 | |||
245 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | if (!priv->stream) { |
246 | ReadAfterSendData *rasd; | ||
247 | |||
248 | 1 | rasd = g_new (ReadAfterSendData, 1); | |
249 | 1 | rasd->buffer = buffer; | |
250 | 1 | rasd->count = count; | |
251 | 1 | g_task_set_task_data (task, rasd, g_free); | |
252 | |||
253 | 1 | msg_input_stream_ensure_msg (stream); | |
254 | 1 | soup_session_send_async (msg_service_get_session (priv->service), priv->msg, G_PRIORITY_DEFAULT, | |
255 | cancellable, read_send_callback, task); | ||
256 | 1 | return; | |
257 | } | ||
258 | |||
259 | ✗ | g_input_stream_read_async (priv->stream, buffer, count, io_priority, | |
260 | cancellable, read_callback, task); | ||
261 | } | ||
262 | |||
263 | static gssize | ||
264 | 1 | msg_input_stream_read_finish (GInputStream *stream, | |
265 | GAsyncResult *result, | ||
266 | GError **error) | ||
267 | { | ||
268 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | g_return_val_if_fail (g_task_is_valid (result, stream), -1); |
269 | |||
270 | 1 | return g_task_propagate_int (G_TASK (result), error); | |
271 | } | ||
272 | |||
273 | static void | ||
274 | 1 | close_callback (GObject *object, | |
275 | GAsyncResult *result, | ||
276 | gpointer user_data) | ||
277 | { | ||
278 | 1 | GTask *task = user_data; | |
279 | 1 | GError *error = NULL; | |
280 | |||
281 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | if (g_input_stream_close_finish (G_INPUT_STREAM (object), result, &error)) |
282 | 1 | g_task_return_boolean (task, TRUE); | |
283 | else | ||
284 | ✗ | g_task_return_error (task, error); | |
285 | 1 | g_object_unref (task); | |
286 | 1 | } | |
287 | |||
288 | static void | ||
289 | 1 | msg_input_stream_close_async (GInputStream *stream, | |
290 | int io_priority, | ||
291 | GCancellable *cancellable, | ||
292 | GAsyncReadyCallback callback, | ||
293 | gpointer user_data) | ||
294 | { | ||
295 | 1 | MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv; | |
296 | GTask *task; | ||
297 | |||
298 | 1 | task = g_task_new (stream, cancellable, callback, user_data); | |
299 | 1 | g_task_set_priority (task, io_priority); | |
300 | |||
301 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (priv->stream == NULL){ |
302 | ✗ | g_task_return_boolean (task, TRUE); | |
303 | ✗ | return; | |
304 | } | ||
305 | |||
306 | 1 | g_input_stream_close_async (priv->stream, io_priority, | |
307 | cancellable, close_callback, task); | ||
308 | } | ||
309 | |||
310 | static gboolean | ||
311 | 1 | msg_input_stream_close_finish (GInputStream *stream, | |
312 | GAsyncResult *result, | ||
313 | GError **error) | ||
314 | { | ||
315 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | g_return_val_if_fail (g_task_is_valid (result, stream), -1); |
316 | |||
317 | 1 | return g_task_propagate_boolean (G_TASK (result), error); | |
318 | } | ||
319 | |||
320 | static goffset | ||
321 | 1 | msg_input_stream_tell (GSeekable *seekable) | |
322 | { | ||
323 | 1 | MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (seekable)->priv; | |
324 | |||
325 | 1 | return priv->offset; | |
326 | } | ||
327 | |||
328 | static gboolean | ||
329 | 1 | msg_input_stream_can_seek (__attribute__ ((unused)) GSeekable *seekable) | |
330 | { | ||
331 | 1 | return TRUE; | |
332 | } | ||
333 | |||
334 | static gboolean | ||
335 | 2 | msg_input_stream_seek (GSeekable *seekable, | |
336 | goffset offset, | ||
337 | GSeekType type, | ||
338 | __attribute__ ((unused)) GCancellable *cancellable, | ||
339 | GError **error) | ||
340 | { | ||
341 | 2 | GInputStream *stream = G_INPUT_STREAM (seekable); | |
342 | 2 | MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (seekable)->priv; | |
343 | |||
344 |
3/4✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
2 | if (type == G_SEEK_END && priv->msg){ |
345 | 1 | goffset content_length = soup_message_headers_get_content_length (soup_message_get_response_headers (priv->msg)); | |
346 | |||
347 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | if (content_length){ |
348 | 1 | type = G_SEEK_SET; | |
349 | 1 | offset = priv->request_offset + content_length + offset; | |
350 | } | ||
351 | } | ||
352 | |||
353 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (type == G_SEEK_END){ |
354 | /* We could send "bytes=-offset", but since we don't know the | ||
355 | * Content-Length, we wouldn't be able to answer a tell() | ||
356 | * properly after that. We could maybe find the Content-Length | ||
357 | * by doing a HEAD... but that would require blocking. | ||
358 | */ | ||
359 | ✗ | g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, | |
360 | "G_SEEK_END not supported"); | ||
361 | ✗ | return FALSE; | |
362 | } | ||
363 | |||
364 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (!g_input_stream_set_pending (stream, error)) |
365 | ✗ | return FALSE; | |
366 | |||
367 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | if (priv->stream){ |
368 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (!g_input_stream_close (priv->stream, NULL, error)) |
369 | ✗ | return FALSE; | |
370 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | g_clear_object (&priv->stream); |
371 | } | ||
372 | |||
373 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | g_clear_pointer (&priv->range, g_free); |
374 | |||
375 |
2/3✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
2 | switch (type){ |
376 | 1 | case G_SEEK_CUR: | |
377 | 1 | offset += priv->offset; | |
378 | /* fall through */ | ||
379 | |||
380 | 2 | case G_SEEK_SET: | |
381 | 2 | priv->range = g_strdup_printf ("bytes=%"G_GUINT64_FORMAT "-", (guint64)offset); | |
382 | 2 | priv->request_offset = offset; | |
383 | 2 | priv->offset = offset; | |
384 | 2 | break; | |
385 | |||
386 | ✗ | default: | |
387 | ✗ | g_return_val_if_reached (FALSE); | |
388 | } | ||
389 | |||
390 | 2 | g_input_stream_clear_pending (stream); | |
391 | 2 | return TRUE; | |
392 | } | ||
393 | |||
394 | static gboolean | ||
395 | 1 | msg_input_stream_can_truncate (__attribute__ ((unused)) GSeekable *seekable) | |
396 | { | ||
397 | 1 | return FALSE; | |
398 | } | ||
399 | |||
400 | static gboolean | ||
401 | 1 | msg_input_stream_truncate (__attribute__ ((unused)) GSeekable *seekable, | |
402 | __attribute__ ((unused)) goffset offset, | ||
403 | __attribute__ ((unused)) GCancellable *cancellable, | ||
404 | GError **error) | ||
405 | { | ||
406 | 1 | g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, | |
407 | "Truncate not allowed on input stream"); | ||
408 | 1 | return FALSE; | |
409 | } | ||
410 | |||
411 | /** | ||
412 | * msg_input_stream_get_message: | ||
413 | * @stream: a #GInputStream | ||
414 | * | ||
415 | * Gets corresponding SoupMessage | ||
416 | * | ||
417 | * Returns: (transfer full): a #SoupMessage | ||
418 | */ | ||
419 | SoupMessage * | ||
420 | 1 | msg_input_stream_get_message (GInputStream *stream) | |
421 | { | ||
422 | 1 | MsgInputStreamPrivate *priv = MSG_INPUT_STREAM (stream)->priv; | |
423 | |||
424 | 1 | msg_input_stream_ensure_msg (stream); | |
425 | 1 | return g_object_ref (priv->msg); | |
426 | } | ||
427 | |||
428 | static void | ||
429 | 2 | msg_input_stream_class_init (MsgInputStreamClass *klass) | |
430 | { | ||
431 | 2 | GObjectClass *gobject_class = G_OBJECT_CLASS (klass); | |
432 | 2 | GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass); | |
433 | |||
434 | 2 | gobject_class->finalize = msg_input_stream_finalize; | |
435 | |||
436 | 2 | stream_class->read_fn = msg_input_stream_read_fn; | |
437 | 2 | stream_class->read_async = msg_input_stream_read_async; | |
438 | 2 | stream_class->read_finish = msg_input_stream_read_finish; | |
439 | 2 | stream_class->close_async = msg_input_stream_close_async; | |
440 | 2 | stream_class->close_finish = msg_input_stream_close_finish; | |
441 | 2 | } | |
442 | |||
443 | static void | ||
444 | 2 | msg_input_stream_seekable_iface_init (GSeekableIface *seekable_iface) | |
445 | { | ||
446 | 2 | seekable_iface->tell = msg_input_stream_tell; | |
447 | 2 | seekable_iface->can_seek = msg_input_stream_can_seek; | |
448 | 2 | seekable_iface->seek = msg_input_stream_seek; | |
449 | 2 | seekable_iface->can_truncate = msg_input_stream_can_truncate; | |
450 | 2 | seekable_iface->truncate_fn = msg_input_stream_truncate; | |
451 | 2 | } | |
452 |