Branch data Line data Source code
1 : : /* GLib testing framework examples and tests
2 : : * Copyright (C) 2008 Red Hat, Inc
3 : : *
4 : : * SPDX-License-Identifier: LicenseRef-old-glib-tests
5 : : *
6 : : * This work is provided "as is"; redistribution and modification
7 : : * in whole or in part, in any medium, physical or electronic is
8 : : * permitted without restriction.
9 : : *
10 : : * This work is distributed in the hope that it will be useful,
11 : : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 : : *
14 : : * In no event shall the authors or contributors be liable for any
15 : : * direct, indirect, incidental, special, exemplary, or consequential
16 : : * damages (including, but not limited to, procurement of substitute
17 : : * goods or services; loss of use, data, or profits; or business
18 : : * interruption) however caused and on any theory of liability, whether
19 : : * in contract, strict liability, or tort (including negligence or
20 : : * otherwise) arising in any way out of the use of this software, even
21 : : * if advised of the possibility of such damage.
22 : : */
23 : :
24 : : #include <gio/gio.h>
25 : : #include <gio/gunixinputstream.h>
26 : : #include <gio/gunixoutputstream.h>
27 : : #include <glib.h>
28 : : #include <glib/glib-unix.h>
29 : : #include <signal.h>
30 : : #include <stdlib.h>
31 : : #include <string.h>
32 : : #include <unistd.h>
33 : : #include <fcntl.h>
34 : :
35 : : /* sizeof(DATA) will give the number of bytes in the array, plus the terminating nul */
36 : : static const gchar DATA[] = "abcdefghijklmnopqrstuvwxyz";
37 : :
38 : : int writer_pipe[2], reader_pipe[2];
39 : : GCancellable *writer_cancel, *reader_cancel, *main_cancel;
40 : : GMainLoop *loop;
41 : :
42 : :
43 : : static gpointer
44 : 2 : writer_thread (gpointer user_data)
45 : : {
46 : : GOutputStream *out;
47 : : gssize nwrote, offset;
48 : 2 : GError *err = NULL;
49 : :
50 : 2 : out = g_unix_output_stream_new (writer_pipe[1], TRUE);
51 : :
52 : : do
53 : : {
54 : 13954 : g_usleep (10);
55 : :
56 : 13954 : offset = 0;
57 : 27906 : while (offset < (gssize) sizeof (DATA))
58 : : {
59 : 13954 : nwrote = g_output_stream_write (out, DATA + offset,
60 : : sizeof (DATA) - offset,
61 : : writer_cancel, &err);
62 : 13954 : if (nwrote <= 0 || err != NULL)
63 : : break;
64 : 13952 : offset += nwrote;
65 : : }
66 : :
67 : 13954 : g_assert_true (nwrote > 0 || err != NULL);
68 : : }
69 : 13954 : while (err == NULL);
70 : :
71 : 2 : if (g_cancellable_is_cancelled (writer_cancel))
72 : : {
73 : 2 : g_clear_error (&err);
74 : 2 : g_cancellable_cancel (main_cancel);
75 : 2 : g_object_unref (out);
76 : 2 : return NULL;
77 : : }
78 : :
79 : 0 : g_warning ("writer: %s", err->message);
80 : : g_assert_not_reached ();
81 : : }
82 : :
83 : : static gpointer
84 : 2 : reader_thread (gpointer user_data)
85 : : {
86 : : GInputStream *in;
87 : 2 : gssize nread = 0, total;
88 : 2 : GError *err = NULL;
89 : : char buf[sizeof (DATA)];
90 : :
91 : 2 : in = g_unix_input_stream_new (reader_pipe[0], TRUE);
92 : :
93 : : do
94 : : {
95 : 6977 : total = 0;
96 : 13952 : while (total < (gssize) sizeof (DATA))
97 : : {
98 : 6977 : nread = g_input_stream_read (in, buf + total, sizeof (buf) - total,
99 : : reader_cancel, &err);
100 : 6977 : if (nread <= 0 || err != NULL)
101 : : break;
102 : 6975 : total += nread;
103 : : }
104 : :
105 : 6977 : if (err)
106 : 0 : break;
107 : :
108 : 6977 : if (nread == 0)
109 : : {
110 : 2 : g_assert_no_error (err);
111 : : /* pipe closed */
112 : 2 : g_object_unref (in);
113 : 2 : return NULL;
114 : : }
115 : :
116 : 6975 : g_assert_cmpstr (buf, ==, DATA);
117 : 6975 : g_assert_false (g_cancellable_is_cancelled (reader_cancel));
118 : : }
119 : 6975 : while (err == NULL);
120 : :
121 : 0 : g_warning ("reader: %s", err->message);
122 : : g_assert_not_reached ();
123 : : }
124 : :
125 : : static char main_buf[sizeof (DATA)];
126 : : static gssize main_len, main_offset;
127 : :
128 : : static void main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data);
129 : : static void main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data);
130 : : static void main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data);
131 : :
132 : : static void
133 : 2 : do_main_cancel (GOutputStream *out)
134 : : {
135 : 2 : g_output_stream_close (out, NULL, NULL);
136 : 2 : g_main_loop_quit (loop);
137 : 2 : }
138 : :
139 : : static void
140 : 6977 : main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data)
141 : : {
142 : 6977 : GInputStream *in = G_INPUT_STREAM (source);
143 : 6977 : GOutputStream *out = user_data;
144 : 6977 : GError *err = NULL;
145 : : gssize nskipped;
146 : :
147 : 6977 : nskipped = g_input_stream_skip_finish (in, res, &err);
148 : :
149 : 6977 : if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
150 : : {
151 : 2 : g_assert_true (g_cancellable_is_cancelled (main_cancel));
152 : 2 : do_main_cancel (out);
153 : 2 : g_clear_error (&err);
154 : 2 : return;
155 : : }
156 : :
157 : 6975 : g_assert_no_error (err);
158 : :
159 : 6975 : main_offset += nskipped;
160 : 6975 : if (main_offset == main_len)
161 : : {
162 : 6975 : main_offset = 0;
163 : 6975 : g_output_stream_write_async (out, main_buf, main_len,
164 : : G_PRIORITY_DEFAULT, main_cancel,
165 : : main_thread_wrote, in);
166 : : }
167 : : else
168 : : {
169 : 0 : g_input_stream_skip_async (in, main_len - main_offset,
170 : : G_PRIORITY_DEFAULT, main_cancel,
171 : : main_thread_skipped, out);
172 : : }
173 : : }
174 : :
175 : : static void
176 : 6977 : main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data)
177 : : {
178 : 6977 : GInputStream *in = G_INPUT_STREAM (source);
179 : 6977 : GOutputStream *out = user_data;
180 : 6977 : GError *err = NULL;
181 : : gssize nread;
182 : :
183 : 6977 : nread = g_input_stream_read_finish (in, res, &err);
184 : :
185 : 6977 : if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
186 : : {
187 : 0 : g_assert_true (g_cancellable_is_cancelled (main_cancel));
188 : 0 : do_main_cancel (out);
189 : 0 : g_clear_error (&err);
190 : 0 : return;
191 : : }
192 : :
193 : 6977 : g_assert_no_error (err);
194 : :
195 : 6977 : main_offset += nread;
196 : 6977 : if (main_offset == sizeof (DATA))
197 : : {
198 : 6977 : main_len = main_offset;
199 : 6977 : main_offset = 0;
200 : : /* Now skip the same amount */
201 : 6977 : g_input_stream_skip_async (in, main_len,
202 : : G_PRIORITY_DEFAULT, main_cancel,
203 : : main_thread_skipped, out);
204 : : }
205 : : else
206 : : {
207 : 0 : g_input_stream_read_async (in, main_buf, sizeof (main_buf),
208 : : G_PRIORITY_DEFAULT, main_cancel,
209 : : main_thread_read, out);
210 : : }
211 : : }
212 : :
213 : : static void
214 : 6975 : main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data)
215 : : {
216 : 6975 : GOutputStream *out = G_OUTPUT_STREAM (source);
217 : 6975 : GInputStream *in = user_data;
218 : 6975 : GError *err = NULL;
219 : : gssize nwrote;
220 : :
221 : 6975 : nwrote = g_output_stream_write_finish (out, res, &err);
222 : :
223 : 6975 : if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
224 : : {
225 : 0 : g_assert_true (g_cancellable_is_cancelled (main_cancel));
226 : 0 : do_main_cancel (out);
227 : 0 : g_clear_error (&err);
228 : 0 : return;
229 : : }
230 : :
231 : 6975 : g_assert_no_error (err);
232 : 6975 : g_assert_cmpint (nwrote, <=, main_len - main_offset);
233 : :
234 : 6975 : main_offset += nwrote;
235 : 6975 : if (main_offset == main_len)
236 : : {
237 : 6975 : main_offset = 0;
238 : 6975 : g_input_stream_read_async (in, main_buf, sizeof (main_buf),
239 : : G_PRIORITY_DEFAULT, main_cancel,
240 : : main_thread_read, out);
241 : : }
242 : : else
243 : : {
244 : 0 : g_output_stream_write_async (out, main_buf + main_offset,
245 : 0 : main_len - main_offset,
246 : : G_PRIORITY_DEFAULT, main_cancel,
247 : : main_thread_wrote, in);
248 : : }
249 : : }
250 : :
251 : : static gboolean
252 : 2 : timeout (gpointer cancellable)
253 : : {
254 : 2 : g_cancellable_cancel (cancellable);
255 : 2 : return FALSE;
256 : : }
257 : :
258 : : static void
259 : 2 : test_pipe_io (gconstpointer nonblocking)
260 : : {
261 : : GThread *writer, *reader;
262 : : GInputStream *in;
263 : : GOutputStream *out;
264 : :
265 : : /* Split off two (additional) threads, a reader and a writer. From
266 : : * the writer thread, write data synchronously in small chunks,
267 : : * which gets alternately read and skipped asynchronously by the
268 : : * main thread and then (if not skipped) written asynchronously to
269 : : * the reader thread, which reads it synchronously. Eventually a
270 : : * timeout in the main thread will cause it to cancel the writer
271 : : * thread, which will in turn cancel the read op in the main thread,
272 : : * which will then close the pipe to the reader thread, causing the
273 : : * read op to fail.
274 : : */
275 : :
276 : 2 : g_assert_true (pipe (writer_pipe) == 0 && pipe (reader_pipe) == 0);
277 : :
278 : 2 : if (nonblocking)
279 : : {
280 : 1 : GError *error = NULL;
281 : :
282 : 1 : g_unix_set_fd_nonblocking (writer_pipe[0], TRUE, &error);
283 : 1 : g_assert_no_error (error);
284 : 1 : g_unix_set_fd_nonblocking (writer_pipe[1], TRUE, &error);
285 : 1 : g_assert_no_error (error);
286 : 1 : g_unix_set_fd_nonblocking (reader_pipe[0], TRUE, &error);
287 : 1 : g_assert_no_error (error);
288 : 1 : g_unix_set_fd_nonblocking (reader_pipe[1], TRUE, &error);
289 : 1 : g_assert_no_error (error);
290 : : }
291 : :
292 : 2 : writer_cancel = g_cancellable_new ();
293 : 2 : reader_cancel = g_cancellable_new ();
294 : 2 : main_cancel = g_cancellable_new ();
295 : :
296 : 2 : writer = g_thread_new ("writer", writer_thread, NULL);
297 : 2 : reader = g_thread_new ("reader", reader_thread, NULL);
298 : :
299 : 2 : in = g_unix_input_stream_new (writer_pipe[0], TRUE);
300 : 2 : out = g_unix_output_stream_new (reader_pipe[1], TRUE);
301 : :
302 : 2 : g_input_stream_read_async (in, main_buf, sizeof (main_buf),
303 : : G_PRIORITY_DEFAULT, main_cancel,
304 : : main_thread_read, out);
305 : :
306 : 2 : g_timeout_add (500, timeout, writer_cancel);
307 : :
308 : 2 : loop = g_main_loop_new (NULL, TRUE);
309 : 2 : g_main_loop_run (loop);
310 : 2 : g_main_loop_unref (loop);
311 : :
312 : 2 : g_thread_join (reader);
313 : 2 : g_thread_join (writer);
314 : :
315 : 2 : g_object_unref (main_cancel);
316 : 2 : g_object_unref (reader_cancel);
317 : 2 : g_object_unref (writer_cancel);
318 : 2 : g_object_unref (in);
319 : 2 : g_object_unref (out);
320 : 2 : }
321 : :
322 : : static void
323 : 1 : test_basic (void)
324 : : {
325 : : GUnixInputStream *is;
326 : : GUnixOutputStream *os;
327 : : gint fd;
328 : : gboolean close_fd;
329 : :
330 : 1 : is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (0, TRUE));
331 : 1 : g_object_get (is,
332 : : "fd", &fd,
333 : : "close-fd", &close_fd,
334 : : NULL);
335 : 1 : g_assert_cmpint (fd, ==, 0);
336 : 1 : g_assert_true (close_fd);
337 : :
338 : 1 : g_unix_input_stream_set_close_fd (is, FALSE);
339 : 1 : g_assert_false (g_unix_input_stream_get_close_fd (is));
340 : 1 : g_assert_cmpint (g_unix_input_stream_get_fd (is), ==, 0);
341 : :
342 : 1 : g_assert_false (g_input_stream_has_pending (G_INPUT_STREAM (is)));
343 : :
344 : 1 : g_object_unref (is);
345 : :
346 : 1 : os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (1, TRUE));
347 : 1 : g_object_get (os,
348 : : "fd", &fd,
349 : : "close-fd", &close_fd,
350 : : NULL);
351 : 1 : g_assert_cmpint (fd, ==, 1);
352 : 1 : g_assert_true (close_fd);
353 : :
354 : 1 : g_unix_output_stream_set_close_fd (os, FALSE);
355 : 1 : g_assert_false (g_unix_output_stream_get_close_fd (os));
356 : 1 : g_assert_cmpint (g_unix_output_stream_get_fd (os), ==, 1);
357 : :
358 : 1 : g_assert_false (g_output_stream_has_pending (G_OUTPUT_STREAM (os)));
359 : :
360 : 1 : g_object_unref (os);
361 : 1 : }
362 : :
363 : : typedef struct {
364 : : GInputStream *is;
365 : : GOutputStream *os;
366 : : const guint8 *write_data;
367 : : guint8 *read_data;
368 : : } TestReadWriteData;
369 : :
370 : : static gpointer
371 : 1 : test_read_write_write_thread (gpointer user_data)
372 : : {
373 : 1 : TestReadWriteData *data = user_data;
374 : : gsize bytes_written;
375 : 1 : GError *error = NULL;
376 : : gboolean res;
377 : :
378 : 1 : res = g_output_stream_write_all (data->os, data->write_data, 1024, &bytes_written, NULL, &error);
379 : 1 : g_assert_true (res);
380 : 1 : g_assert_no_error (error);
381 : 1 : g_assert_cmpuint (bytes_written, ==, 1024);
382 : :
383 : 1 : return NULL;
384 : : }
385 : :
386 : : static gpointer
387 : 2 : test_read_write_read_thread (gpointer user_data)
388 : : {
389 : 2 : TestReadWriteData *data = user_data;
390 : : gsize bytes_read;
391 : 2 : GError *error = NULL;
392 : : gboolean res;
393 : :
394 : 2 : res = g_input_stream_read_all (data->is, data->read_data, 1024, &bytes_read, NULL, &error);
395 : 2 : g_assert_true (res);
396 : 2 : g_assert_no_error (error);
397 : 2 : g_assert_cmpuint (bytes_read, ==, 1024);
398 : :
399 : 2 : return NULL;
400 : : }
401 : :
402 : : static gpointer
403 : 1 : test_read_write_writev_thread (gpointer user_data)
404 : : {
405 : 1 : TestReadWriteData *data = user_data;
406 : : gsize bytes_written;
407 : 1 : GError *error = NULL;
408 : : gboolean res;
409 : : GOutputVector vectors[3];
410 : :
411 : 1 : vectors[0].buffer = data->write_data;
412 : 1 : vectors[0].size = 256;
413 : 1 : vectors[1].buffer = data->write_data + 256;
414 : 1 : vectors[1].size = 256;
415 : 1 : vectors[2].buffer = data->write_data + 512;
416 : 1 : vectors[2].size = 512;
417 : :
418 : 1 : res = g_output_stream_writev_all (data->os, vectors, G_N_ELEMENTS (vectors), &bytes_written, NULL, &error);
419 : 1 : g_assert_true (res);
420 : 1 : g_assert_no_error (error);
421 : 1 : g_assert_cmpuint (bytes_written, ==, 1024);
422 : :
423 : 1 : return NULL;
424 : : }
425 : :
426 : : /* test if normal writing/reading from a pipe works */
427 : : static void
428 : 2 : test_read_write (gconstpointer user_data)
429 : : {
430 : 2 : gboolean writev = GPOINTER_TO_INT (user_data);
431 : : GUnixInputStream *is;
432 : : GUnixOutputStream *os;
433 : : gint fd[2];
434 : : guint8 data_write[1024], data_read[1024];
435 : : guint i;
436 : : GThread *write_thread, *read_thread;
437 : : TestReadWriteData data;
438 : :
439 : 2050 : for (i = 0; i < sizeof (data_write); i++)
440 : 2048 : data_write[i] = i;
441 : :
442 : 2 : g_assert_cmpint (pipe (fd), ==, 0);
443 : :
444 : 2 : is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
445 : 2 : os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
446 : :
447 : 2 : data.is = G_INPUT_STREAM (is);
448 : 2 : data.os = G_OUTPUT_STREAM (os);
449 : 2 : data.read_data = data_read;
450 : 2 : data.write_data = data_write;
451 : :
452 : 2 : if (writev)
453 : 1 : write_thread = g_thread_new ("writer", test_read_write_writev_thread, &data);
454 : : else
455 : 1 : write_thread = g_thread_new ("writer", test_read_write_write_thread, &data);
456 : 2 : read_thread = g_thread_new ("reader", test_read_write_read_thread, &data);
457 : :
458 : 2 : g_thread_join (write_thread);
459 : 2 : g_thread_join (read_thread);
460 : :
461 : 2 : g_assert_cmpmem (data_write, sizeof data_write, data_read, sizeof data_read);
462 : :
463 : 2 : g_object_unref (os);
464 : 2 : g_object_unref (is);
465 : 2 : }
466 : :
467 : : /* test if g_pollable_output_stream_write_nonblocking() and
468 : : * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
469 : : * and correctly reset their status afterwards again, and all data that is
470 : : * written can also be read again.
471 : : */
472 : : static void
473 : 1 : test_write_wouldblock (void)
474 : : {
475 : : #ifndef F_GETPIPE_SZ
476 : : g_test_skip ("F_GETPIPE_SZ not defined");
477 : : #else /* if F_GETPIPE_SZ */
478 : : GUnixInputStream *is;
479 : : GUnixOutputStream *os;
480 : : gint fd[2];
481 : 1 : GError *err = NULL;
482 : : guint8 data_write[1024], data_read[1024];
483 : : gsize i;
484 : : int retval;
485 : : gsize pipe_capacity;
486 : :
487 : 1025 : for (i = 0; i < sizeof (data_write); i++)
488 : 1024 : data_write[i] = i;
489 : :
490 : 1 : g_assert_cmpint (pipe (fd), ==, 0);
491 : :
492 : 1 : g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
493 : 1 : retval = fcntl (fd[0], F_GETPIPE_SZ);
494 : 1 : g_assert_cmpint (retval, >=, 0);
495 : 1 : pipe_capacity = (gsize) retval;
496 : 1 : g_assert_cmpint (pipe_capacity, >=, 4096);
497 : 1 : g_assert_cmpint (pipe_capacity % 1024, >=, 0);
498 : :
499 : 1 : is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
500 : 1 : os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
501 : :
502 : : /* Run the whole thing three times to make sure that the streams
503 : : * reset the writability/readability state again */
504 : 4 : for (i = 0; i < 3; i++) {
505 : 3 : gssize written = 0, written_complete = 0;
506 : 3 : gssize read = 0, read_complete = 0;
507 : :
508 : : do
509 : : {
510 : 6 : written_complete += written;
511 : 6 : written = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
512 : : data_write,
513 : : sizeof (data_write),
514 : : NULL,
515 : : &err);
516 : : }
517 : 6 : while (written > 0);
518 : :
519 : 3 : g_assert_cmpuint (written_complete, >, 0);
520 : 3 : g_assert_nonnull (err);
521 : 3 : g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
522 : 3 : g_clear_error (&err);
523 : :
524 : : do
525 : : {
526 : 6 : read_complete += read;
527 : 6 : read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
528 : : data_read,
529 : : sizeof (data_read),
530 : : NULL,
531 : : &err);
532 : 6 : if (read > 0)
533 : 3 : g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
534 : : }
535 : 6 : while (read > 0);
536 : :
537 : 3 : g_assert_cmpuint (read_complete, ==, written_complete);
538 : 3 : g_assert_nonnull (err);
539 : 3 : g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
540 : 3 : g_clear_error (&err);
541 : : }
542 : :
543 : 1 : g_object_unref (os);
544 : 1 : g_object_unref (is);
545 : : #endif /* if F_GETPIPE_SZ */
546 : 1 : }
547 : :
548 : : /* test if g_pollable_output_stream_writev_nonblocking() and
549 : : * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
550 : : * and correctly reset their status afterwards again, and all data that is
551 : : * written can also be read again.
552 : : */
553 : : static void
554 : 1 : test_writev_wouldblock (void)
555 : : {
556 : : #ifndef F_GETPIPE_SZ
557 : : g_test_skip ("F_GETPIPE_SZ not defined");
558 : : #else /* if F_GETPIPE_SZ */
559 : : GUnixInputStream *is;
560 : : GUnixOutputStream *os;
561 : : gint fd[2];
562 : 1 : GError *err = NULL;
563 : : guint8 data_write[1024], data_read[1024];
564 : : gsize i;
565 : : int retval;
566 : : gsize pipe_capacity;
567 : : GOutputVector vectors[4];
568 : : GPollableReturn res;
569 : :
570 : 1025 : for (i = 0; i < sizeof (data_write); i++)
571 : 1024 : data_write[i] = i;
572 : :
573 : 1 : g_assert_cmpint (pipe (fd), ==, 0);
574 : :
575 : 1 : g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
576 : 1 : retval = fcntl (fd[0], F_GETPIPE_SZ);
577 : 1 : g_assert_cmpint (retval, >=, 0);
578 : 1 : pipe_capacity = (gsize) retval;
579 : 1 : g_assert_cmpint (pipe_capacity, >=, 4096);
580 : 1 : g_assert_cmpint (pipe_capacity % 1024, >=, 0);
581 : :
582 : 1 : is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
583 : 1 : os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
584 : :
585 : : /* Run the whole thing three times to make sure that the streams
586 : : * reset the writability/readability state again */
587 : 4 : for (i = 0; i < 3; i++) {
588 : 3 : gsize written = 0, written_complete = 0;
589 : 3 : gssize read = 0, read_complete = 0;
590 : :
591 : : do
592 : : {
593 : 6 : written_complete += written;
594 : :
595 : 6 : vectors[0].buffer = data_write;
596 : 6 : vectors[0].size = 256;
597 : 6 : vectors[1].buffer = data_write + 256;
598 : 6 : vectors[1].size = 256;
599 : 6 : vectors[2].buffer = data_write + 512;
600 : 6 : vectors[2].size = 256;
601 : 6 : vectors[3].buffer = data_write + 768;
602 : 6 : vectors[3].size = 256;
603 : :
604 : 6 : res = g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
605 : : vectors,
606 : : G_N_ELEMENTS (vectors),
607 : : &written,
608 : : NULL,
609 : : &err);
610 : : }
611 : 6 : while (res == G_POLLABLE_RETURN_OK);
612 : :
613 : 3 : g_assert_cmpuint (written_complete, >, 0);
614 : 3 : g_assert_null (err);
615 : 3 : g_assert_cmpint (res, ==, G_POLLABLE_RETURN_WOULD_BLOCK);
616 : : /* writev() on UNIX streams either succeeds fully or not at all */
617 : 3 : g_assert_cmpuint (written, ==, 0);
618 : :
619 : : do
620 : : {
621 : 6 : read_complete += read;
622 : 6 : read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
623 : : data_read,
624 : : sizeof (data_read),
625 : : NULL,
626 : : &err);
627 : 6 : if (read > 0)
628 : 3 : g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
629 : : }
630 : 6 : while (read > 0);
631 : :
632 : 3 : g_assert_cmpuint (read_complete, ==, written_complete);
633 : 3 : g_assert_nonnull (err);
634 : 3 : g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
635 : 3 : g_clear_error (&err);
636 : : }
637 : :
638 : 1 : g_object_unref (os);
639 : 1 : g_object_unref (is);
640 : : #endif /* if F_GETPIPE_SZ */
641 : 1 : }
642 : :
643 : : #ifdef F_GETPIPE_SZ
644 : : static void
645 : 1 : write_async_wouldblock_cb (GUnixOutputStream *os,
646 : : GAsyncResult *result,
647 : : gpointer user_data)
648 : : {
649 : 1 : gsize *bytes_written = user_data;
650 : 1 : GError *err = NULL;
651 : :
652 : 1 : g_output_stream_write_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, &err);
653 : 1 : g_assert_no_error (err);
654 : 1 : }
655 : :
656 : : static void
657 : 2 : read_async_wouldblock_cb (GUnixInputStream *is,
658 : : GAsyncResult *result,
659 : : gpointer user_data)
660 : : {
661 : 2 : gsize *bytes_read = user_data;
662 : 2 : GError *err = NULL;
663 : :
664 : 2 : g_input_stream_read_all_finish (G_INPUT_STREAM (is), result, bytes_read, &err);
665 : 2 : g_assert_no_error (err);
666 : 2 : }
667 : : #endif /* if F_GETPIPE_SZ */
668 : :
669 : : /* test if the async implementation of write_all() and read_all() in G*Stream
670 : : * around the GPollable*Stream API is working correctly.
671 : : */
672 : : static void
673 : 1 : test_write_async_wouldblock (void)
674 : : {
675 : : #ifndef F_GETPIPE_SZ
676 : : g_test_skip ("F_GETPIPE_SZ not defined");
677 : : #else /* if F_GETPIPE_SZ */
678 : : GUnixInputStream *is;
679 : : GUnixOutputStream *os;
680 : : gint fd[2];
681 : : guint8 *data, *data_read;
682 : : gsize i;
683 : : int retval;
684 : : gsize pipe_capacity;
685 : 1 : gsize bytes_written = 0, bytes_read = 0;
686 : :
687 : 1 : g_assert_cmpint (pipe (fd), ==, 0);
688 : :
689 : : /* FIXME: These should not be needed but otherwise
690 : : * g_unix_output_stream_write() will block because
691 : : * a) the fd is writable
692 : : * b) writing 4x capacity will block because writes are atomic
693 : : * c) the fd is blocking
694 : : *
695 : : * See https://gitlab.gnome.org/GNOME/glib/issues/1654
696 : : */
697 : 1 : g_unix_set_fd_nonblocking (fd[0], TRUE, NULL);
698 : 1 : g_unix_set_fd_nonblocking (fd[1], TRUE, NULL);
699 : :
700 : 1 : g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
701 : 1 : retval = fcntl (fd[0], F_GETPIPE_SZ);
702 : 1 : g_assert_cmpint (retval, >=, 0);
703 : 1 : pipe_capacity = (gsize) retval;
704 : 1 : g_assert_cmpint (pipe_capacity, >=, 4096);
705 : :
706 : 1 : data = g_new (guint8, 4 * pipe_capacity);
707 : 16385 : for (i = 0; i < 4 * pipe_capacity; i++)
708 : 16384 : data[i] = i;
709 : 1 : data_read = g_new (guint8, 4 * pipe_capacity);
710 : :
711 : 1 : is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
712 : 1 : os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
713 : :
714 : 1 : g_output_stream_write_all_async (G_OUTPUT_STREAM (os),
715 : : data,
716 : : 4 * pipe_capacity,
717 : : G_PRIORITY_DEFAULT,
718 : : NULL,
719 : : (GAsyncReadyCallback) write_async_wouldblock_cb,
720 : : &bytes_written);
721 : :
722 : 1 : g_input_stream_read_all_async (G_INPUT_STREAM (is),
723 : : data_read,
724 : : 4 * pipe_capacity,
725 : : G_PRIORITY_DEFAULT,
726 : : NULL,
727 : : (GAsyncReadyCallback) read_async_wouldblock_cb,
728 : : &bytes_read);
729 : :
730 : 5 : while (bytes_written == 0 && bytes_read == 0)
731 : 4 : g_main_context_iteration (NULL, TRUE);
732 : :
733 : 1 : g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
734 : 1 : g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
735 : 1 : g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
736 : :
737 : 1 : g_free (data);
738 : 1 : g_free (data_read);
739 : :
740 : 1 : g_object_unref (os);
741 : 1 : g_object_unref (is);
742 : : #endif /* if F_GETPIPE_SZ */
743 : 1 : }
744 : :
745 : : #ifdef F_GETPIPE_SZ
746 : : static void
747 : 1 : writev_async_wouldblock_cb (GUnixOutputStream *os,
748 : : GAsyncResult *result,
749 : : gpointer user_data)
750 : : {
751 : 1 : gsize *bytes_written = user_data;
752 : 1 : GError *err = NULL;
753 : :
754 : 1 : g_output_stream_writev_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, &err);
755 : 1 : g_assert_no_error (err);
756 : 1 : }
757 : : #endif /* if F_GETPIPE_SZ */
758 : :
759 : : /* test if the async implementation of writev_all() and read_all() in G*Stream
760 : : * around the GPollable*Stream API is working correctly.
761 : : */
762 : : static void
763 : 1 : test_writev_async_wouldblock (void)
764 : : {
765 : : #ifndef F_GETPIPE_SZ
766 : : g_test_skip ("F_GETPIPE_SZ not defined");
767 : : #else /* if F_GETPIPE_SZ */
768 : : GUnixInputStream *is;
769 : : GUnixOutputStream *os;
770 : : gint fd[2];
771 : : guint8 *data, *data_read;
772 : : gsize i;
773 : : int retval;
774 : : gsize pipe_capacity;
775 : 1 : gsize bytes_written = 0, bytes_read = 0;
776 : : GOutputVector vectors[4];
777 : :
778 : 1 : g_assert_cmpint (pipe (fd), ==, 0);
779 : :
780 : : /* FIXME: These should not be needed but otherwise
781 : : * g_unix_output_stream_writev() will block because
782 : : * a) the fd is writable
783 : : * b) writing 4x capacity will block because writes are atomic
784 : : * c) the fd is blocking
785 : : *
786 : : * See https://gitlab.gnome.org/GNOME/glib/issues/1654
787 : : */
788 : 1 : g_unix_set_fd_nonblocking (fd[0], TRUE, NULL);
789 : 1 : g_unix_set_fd_nonblocking (fd[1], TRUE, NULL);
790 : :
791 : 1 : g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
792 : 1 : retval = fcntl (fd[0], F_GETPIPE_SZ);
793 : 1 : g_assert_cmpint (retval, >=, 0);
794 : 1 : pipe_capacity = (gsize) retval;
795 : 1 : g_assert_cmpint (pipe_capacity, >=, 4096);
796 : :
797 : 1 : data = g_new (guint8, 4 * pipe_capacity);
798 : 16385 : for (i = 0; i < 4 * pipe_capacity; i++)
799 : 16384 : data[i] = i;
800 : 1 : data_read = g_new (guint8, 4 * pipe_capacity);
801 : :
802 : 1 : vectors[0].buffer = data;
803 : 1 : vectors[0].size = 1024;
804 : 1 : vectors[1].buffer = data + 1024;
805 : 1 : vectors[1].size = 1024;
806 : 1 : vectors[2].buffer = data + 2048;
807 : 1 : vectors[2].size = 1024;
808 : 1 : vectors[3].buffer = data + 3072;
809 : 1 : vectors[3].size = 4 * pipe_capacity - 3072;
810 : :
811 : 1 : is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
812 : 1 : os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
813 : :
814 : 1 : g_output_stream_writev_all_async (G_OUTPUT_STREAM (os),
815 : : vectors,
816 : : G_N_ELEMENTS (vectors),
817 : : G_PRIORITY_DEFAULT,
818 : : NULL,
819 : : (GAsyncReadyCallback) writev_async_wouldblock_cb,
820 : : &bytes_written);
821 : :
822 : 1 : g_input_stream_read_all_async (G_INPUT_STREAM (is),
823 : : data_read,
824 : : 4 * pipe_capacity,
825 : : G_PRIORITY_DEFAULT,
826 : : NULL,
827 : : (GAsyncReadyCallback) read_async_wouldblock_cb,
828 : : &bytes_read);
829 : :
830 : 5 : while (bytes_written == 0 && bytes_read == 0)
831 : 4 : g_main_context_iteration (NULL, TRUE);
832 : :
833 : 1 : g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
834 : 1 : g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
835 : 1 : g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
836 : :
837 : 1 : g_free (data);
838 : 1 : g_free (data_read);
839 : :
840 : 1 : g_object_unref (os);
841 : 1 : g_object_unref (is);
842 : : #endif /* F_GETPIPE_SZ */
843 : 1 : }
844 : :
845 : : int
846 : 1 : main (int argc,
847 : : char *argv[])
848 : : {
849 : 1 : g_test_init (&argc, &argv, NULL);
850 : :
851 : 1 : g_test_add_func ("/unix-streams/basic", test_basic);
852 : 1 : g_test_add_data_func ("/unix-streams/pipe-io-test",
853 : : GINT_TO_POINTER (FALSE),
854 : : test_pipe_io);
855 : 1 : g_test_add_data_func ("/unix-streams/nonblocking-io-test",
856 : : GINT_TO_POINTER (TRUE),
857 : : test_pipe_io);
858 : :
859 : 1 : g_test_add_data_func ("/unix-streams/read_write",
860 : : GINT_TO_POINTER (FALSE),
861 : : test_read_write);
862 : :
863 : 1 : g_test_add_data_func ("/unix-streams/read_writev",
864 : : GINT_TO_POINTER (TRUE),
865 : : test_read_write);
866 : :
867 : 1 : g_test_add_func ("/unix-streams/write-wouldblock",
868 : : test_write_wouldblock);
869 : 1 : g_test_add_func ("/unix-streams/writev-wouldblock",
870 : : test_writev_wouldblock);
871 : :
872 : 1 : g_test_add_func ("/unix-streams/write-async-wouldblock",
873 : : test_write_async_wouldblock);
874 : 1 : g_test_add_func ("/unix-streams/writev-async-wouldblock",
875 : : test_writev_async_wouldblock);
876 : :
877 : 1 : return g_test_run();
878 : : }
|