|
1 | 1 | import array
|
| 2 | +import os |
| 3 | +import struct |
| 4 | +import threading |
2 | 5 | import unittest
|
3 | 6 | from test.support import get_attribute
|
| 7 | +from test.support import threading_helper |
4 | 8 | from test.support.import_helper import import_module
|
5 |
| -import os, struct |
6 | 9 | fcntl = import_module('fcntl')
|
7 | 10 | termios = import_module('termios')
|
8 |
| -get_attribute(termios, 'TIOCGPGRP') #Can't run tests without this feature |
9 |
| - |
10 |
| -try: |
11 |
| - tty = open("/dev/tty", "rb") |
12 |
| -except OSError: |
13 |
| - raise unittest.SkipTest("Unable to open /dev/tty") |
14 |
| -else: |
15 |
| - with tty: |
16 |
| - # Skip if another process is in foreground |
17 |
| - r = fcntl.ioctl(tty, termios.TIOCGPGRP, struct.pack("i", 0)) |
18 |
| - rpgrp = struct.unpack("i", r)[0] |
19 |
| - if rpgrp not in (os.getpgrp(), os.getsid(0)): |
20 |
| - raise unittest.SkipTest("Neither the process group nor the session " |
21 |
| - "are attached to /dev/tty") |
22 |
| - del tty, r, rpgrp |
23 | 11 |
|
24 | 12 | try:
|
25 | 13 | import pty
|
26 | 14 | except ImportError:
|
27 | 15 | pty = None
|
28 | 16 |
|
29 |
| -class IoctlTests(unittest.TestCase): |
| 17 | +class IoctlTestsTty(unittest.TestCase): |
| 18 | + @classmethod |
| 19 | + def setUpClass(cls): |
| 20 | + TIOCGPGRP = get_attribute(termios, 'TIOCGPGRP') |
| 21 | + try: |
| 22 | + tty = open("/dev/tty", "rb") |
| 23 | + except OSError: |
| 24 | + raise unittest.SkipTest("Unable to open /dev/tty") |
| 25 | + with tty: |
| 26 | + # Skip if another process is in foreground |
| 27 | + r = fcntl.ioctl(tty, TIOCGPGRP, struct.pack("i", 0)) |
| 28 | + rpgrp = struct.unpack("i", r)[0] |
| 29 | + if rpgrp not in (os.getpgrp(), os.getsid(0)): |
| 30 | + raise unittest.SkipTest("Neither the process group nor the session " |
| 31 | + "are attached to /dev/tty") |
| 32 | + |
30 | 33 | def test_ioctl_immutable_buf(self):
|
31 | 34 | # If this process has been put into the background, TIOCGPGRP returns
|
32 | 35 | # the session ID instead of the process group id.
|
@@ -132,23 +135,27 @@ def test_ioctl_mutate_2048(self):
|
132 | 135 | self._check_ioctl_mutate_len(2048)
|
133 | 136 | self.assertRaises(ValueError, self._check_ioctl_not_mutate_len, 2048)
|
134 | 137 |
|
| 138 | + |
| 139 | +@unittest.skipIf(pty is None, 'pty module required') |
| 140 | +class IoctlTestsPty(unittest.TestCase): |
| 141 | + def setUp(self): |
| 142 | + self.master_fd, self.slave_fd = pty.openpty() |
| 143 | + self.addCleanup(os.close, self.slave_fd) |
| 144 | + self.addCleanup(os.close, self.master_fd) |
| 145 | + |
| 146 | + @unittest.skipUnless(hasattr(termios, 'TCFLSH'), 'requires termios.TCFLSH') |
135 | 147 | def test_ioctl_tcflush(self):
|
136 |
| - with open("/dev/tty", "rb") as tty: |
137 |
| - r = fcntl.ioctl(tty, termios.TCFLSH, termios.TCIFLUSH) |
138 |
| - self.assertEqual(r, 0) |
| 148 | + r = fcntl.ioctl(self.slave_fd, termios.TCFLSH, termios.TCIFLUSH) |
| 149 | + self.assertEqual(r, 0) |
| 150 | + r = fcntl.ioctl(self.slave_fd, termios.TCFLSH, termios.TCOFLUSH) |
| 151 | + self.assertEqual(r, 0) |
139 | 152 |
|
140 |
| - @unittest.skipIf(pty is None, 'pty module required') |
141 | 153 | def test_ioctl_set_window_size(self):
|
142 |
| - mfd, sfd = pty.openpty() |
143 |
| - try: |
144 |
| - # (rows, columns, xpixel, ypixel) |
145 |
| - our_winsz = struct.pack("HHHH", 20, 40, 0, 0) |
146 |
| - result = fcntl.ioctl(mfd, termios.TIOCSWINSZ, our_winsz) |
147 |
| - new_winsz = struct.unpack("HHHH", result) |
148 |
| - self.assertEqual(new_winsz[:2], (20, 40)) |
149 |
| - finally: |
150 |
| - os.close(mfd) |
151 |
| - os.close(sfd) |
| 154 | + # (rows, columns, xpixel, ypixel) |
| 155 | + our_winsz = struct.pack("HHHH", 20, 40, 0, 0) |
| 156 | + result = fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, our_winsz) |
| 157 | + new_winsz = struct.unpack("HHHH", result) |
| 158 | + self.assertEqual(new_winsz[:2], (20, 40)) |
152 | 159 |
|
153 | 160 |
|
154 | 161 | if __name__ == "__main__":
|
|
0 commit comments