|
31 | 31 | import asyncio
|
32 | 32 | import os
|
33 | 33 | import platform
|
| 34 | +import ssl |
34 | 35 | import unittest
|
35 | 36 |
|
36 | 37 | from contextlib import nullcontext
|
|
41 | 42 | import mysql.connector
|
42 | 43 | import mysql.connector.aio
|
43 | 44 |
|
44 |
| -from mysql.connector.errors import NotSupportedError |
| 45 | +from mysql.connector.errors import InterfaceError, NotSupportedError |
45 | 46 | from mysql.connector.tls_ciphers import (
|
46 | 47 | APPROVED_TLS_CIPHERSUITES,
|
47 | 48 | DEPRECATED_TLS_CIPHERSUITES,
|
@@ -82,7 +83,22 @@ def setUpModule() -> None:
|
82 | 83 | "No SSL support.",
|
83 | 84 | )
|
84 | 85 | class CipherTests(tests.MySQLConnectorTests):
|
85 |
| - """Testing cipher and cipher-suite lists for synchronous/asynchronous connection.""" |
| 86 | + """Testing cipher and cipher-suite lists for synchronous/asynchronous connection. |
| 87 | +
|
| 88 | + These tests verify the following about tls versions and ciphers: |
| 89 | +
|
| 90 | + Ciphers |
| 91 | + ------- |
| 92 | + * Mandatory and approved ciphers are allowed. |
| 93 | + * Deprecated ciphers are allowed, however a warning is raised. |
| 94 | + * Unacceptable ciphers are forbidden; an error is raised. |
| 95 | +
|
| 96 | + TLS Versions |
| 97 | + ------------ |
| 98 | + * Approved TLS versions are allowed. |
| 99 | + * Deprecated TLS versions are allowed, however a warning is raised. |
| 100 | + * Unacceptable TLS versions are forbidden; an error is raised. |
| 101 | + """ |
86 | 102 |
|
87 | 103 | # when more than one approved TLS version is defined,
|
88 | 104 | # the latest available version is enforced.
|
@@ -368,3 +384,185 @@ def test_tls_ciphersuites_7(self):
|
368 | 384 | @tests.foreach_cnx()
|
369 | 385 | def test_tls_ciphersuites_8(self):
|
370 | 386 | self._test_tls_ciphersuites(test_case_id="8")
|
| 387 | + |
| 388 | + |
| 389 | +@unittest.skipIf( |
| 390 | + tests.MYSQL_EXTERNAL_SERVER, |
| 391 | + "Test not available for external MySQL servers", |
| 392 | +) |
| 393 | +@unittest.skipIf( |
| 394 | + not tests.SSL_AVAILABLE, |
| 395 | + "No SSL support.", |
| 396 | +) |
| 397 | +@unittest.skipIf( |
| 398 | + tests.MYSQL_VERSION < (9, 0, 0), |
| 399 | + "MySQL Server should be 9.0 or newer.", |
| 400 | +) |
| 401 | +class CiphersAndTlsTests(tests.MySQLConnectorTests): |
| 402 | + """Testing cipher and cipher-suite lists for synchronous/asynchronous connection. |
| 403 | +
|
| 404 | + These tests verify Connector/Python does support the TLS versions v1.2 and v1.3 |
| 405 | + when specifying valid ciphers. |
| 406 | +
|
| 407 | + When only caring about a specific TLS version, users can use the connection option |
| 408 | + `tls_versions`. The ciphers to be used will be determined by the MySQL Server |
| 409 | + during TLS negotiation. |
| 410 | +
|
| 411 | + On the other hand, when caring about specific ciphers, you should specify the |
| 412 | + connection option `tls_ciphersuites`. Additionally, we recommend to also specify |
| 413 | + the option `tls_versions`. If this latter option is skipped, Connector/Python will |
| 414 | + use the latest supported TLS version. This might lead to cipher discrepancies; |
| 415 | + assuming you provided TLSv1.2-related ciphers but didn't specify the TLS version, |
| 416 | + these ciphers will be ignored as TLSv1.3 will be used, and the actual cipher in |
| 417 | + use will be determined by the MySQL Server. |
| 418 | +
|
| 419 | + NOTE: the pure-python implementation does not support cipher selection |
| 420 | + for TLSv1.3. The ultimate cipher to be used will be determined by the MySQL Server |
| 421 | + during TLS negotiation. This limitation is because TLSv1.3 ciphers cannot be |
| 422 | + disabled with `SSLContext.set_ciphers(...)`. |
| 423 | + See https://docs.python.org/3/library/ssl.html#ssl.SSLContext.set_ciphers. |
| 424 | + """ |
| 425 | + |
| 426 | + conf_ssl = { |
| 427 | + "ssl_ca": os.path.abspath(os.path.join(tests.SSL_DIR, "tests_CA_cert.pem")), |
| 428 | + "ssl_cert": os.path.abspath( |
| 429 | + os.path.join(tests.SSL_DIR, "tests_client_cert.pem") |
| 430 | + ), |
| 431 | + "ssl_key": os.path.abspath(os.path.join(tests.SSL_DIR, "tests_client_key.pem")), |
| 432 | + } |
| 433 | + |
| 434 | + # SHOW STATUS LIKE 'ssl_cipher_list'; |
| 435 | + ssl_v13_ciphers_server = ( |
| 436 | + "TLS_AES_128_GCM_SHA256:TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:" |
| 437 | + "TLS_AES_128_CCM_SHA256" |
| 438 | + ) |
| 439 | + ssl_v12_ciphers_server = ( |
| 440 | + "ECDHE-RSA-AES128-GCM-SHA256:" |
| 441 | + "ECDHE-RSA-AES256-GCM-SHA384:" |
| 442 | + "ECDHE-RSA-CHACHA20-POLY1305:" |
| 443 | + "DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-CCM:" |
| 444 | + "DHE-RSA-AES128-CCM:DHE-RSA-CHACHA20-POLY1305" |
| 445 | + ) |
| 446 | + |
| 447 | + def setUp(self) -> None: |
| 448 | + # tls_cases[i] is a 2-tuple: (expected error/warning if any, cipher to be used) |
| 449 | + self.tls_v12_cases = [ |
| 450 | + (None, [cipher]) for cipher in self.ssl_v12_ciphers_server.split(":") |
| 451 | + ] |
| 452 | + self.tls_v13_cases = [ |
| 453 | + (None, [cipher]) for cipher in self.ssl_v13_ciphers_server.split(":") |
| 454 | + ] |
| 455 | + |
| 456 | + async def _check_async_tls_ciphersuites( |
| 457 | + self, settings, exp_ssl_version: Optional[str] |
| 458 | + ) -> Tuple[Optional[str], Optional[str]]: |
| 459 | + ssl_version, ssl_cipher = None, None |
| 460 | + async with await mysql.connector.aio.connect(**settings) as cnx: |
| 461 | + async with await cnx.cursor() as cur: |
| 462 | + if exp_ssl_version: |
| 463 | + await cur.execute("SHOW STATUS LIKE 'Ssl_version'") |
| 464 | + res = await cur.fetchone() |
| 465 | + ssl_version = res[-1] |
| 466 | + |
| 467 | + await cur.execute("SHOW STATUS LIKE 'Ssl_cipher'") |
| 468 | + res = await cur.fetchone() |
| 469 | + ssl_cipher = res[-1] |
| 470 | + |
| 471 | + return ssl_version, ssl_cipher |
| 472 | + |
| 473 | + def _check_sync_tls_ciphersuites( |
| 474 | + self, settings, exp_ssl_version: Optional[str] |
| 475 | + ) -> Tuple[Optional[str], Optional[str]]: |
| 476 | + ssl_version, ssl_cipher = None, None |
| 477 | + with mysql.connector.connect(**settings) as cnx: |
| 478 | + with cnx.cursor() as cur: |
| 479 | + if exp_ssl_version: |
| 480 | + cur.execute("SHOW STATUS LIKE 'Ssl_version'") |
| 481 | + res = cur.fetchone() |
| 482 | + ssl_version = res[-1] |
| 483 | + |
| 484 | + cur.execute("SHOW STATUS LIKE 'Ssl_cipher'") |
| 485 | + res = cur.fetchone() |
| 486 | + ssl_cipher = res[-1] |
| 487 | + |
| 488 | + return ssl_version, ssl_cipher |
| 489 | + |
| 490 | + def _test_tls_ciphersuites( |
| 491 | + self, tls_versions: Optional[list[str]], test_case: tuple, verify: bool = False |
| 492 | + ): |
| 493 | + exp_event, tls_ciphersuites = test_case |
| 494 | + |
| 495 | + conf = {**tests.get_mysql_config(), **self.conf_ssl} |
| 496 | + conf["use_pure"] = isinstance(self.cnx, mysql.connector.MySQLConnection) |
| 497 | + conf["unix_socket"] = None |
| 498 | + conf["tls_ciphersuites"] = tls_ciphersuites |
| 499 | + |
| 500 | + if tls_versions is not None: |
| 501 | + conf["tls_versions"] = tls_versions |
| 502 | + |
| 503 | + event_handler = ( |
| 504 | + self.assertWarns if exp_event == DeprecationWarning else self.assertRaises |
| 505 | + ) |
| 506 | + |
| 507 | + exp_ssl_version = tls_versions[0] if tls_versions else "TLSv1.3" |
| 508 | + with nullcontext() if exp_event is None else event_handler(exp_event): |
| 509 | + ssl_version, ssl_cipher = self._check_sync_tls_ciphersuites( |
| 510 | + conf, exp_ssl_version |
| 511 | + ) |
| 512 | + self.assertEqual(ssl_version, exp_ssl_version) |
| 513 | + if verify: |
| 514 | + self.assertEqual(ssl_cipher, tls_ciphersuites[0]) |
| 515 | + |
| 516 | + # C-ext implementation isn't supported yet for aio. |
| 517 | + if (CEXT_SUPPORT_FOR_AIO and not conf["use_pure"]) or conf["use_pure"]: |
| 518 | + with nullcontext() if exp_event is None else event_handler(exp_event): |
| 519 | + ssl_version, ssl_cipher = asyncio.run( |
| 520 | + self._check_async_tls_ciphersuites(conf, exp_ssl_version) |
| 521 | + ) |
| 522 | + self.assertEqual(ssl_version, exp_ssl_version) |
| 523 | + if verify: |
| 524 | + self.assertEqual(ssl_cipher, tls_ciphersuites[0]) |
| 525 | + |
| 526 | + @tests.foreach_cnx(mysql.connector.MySQLConnection) |
| 527 | + def test_tls_v12_ciphers(self): |
| 528 | + # verify=True means the test checks the selected cipher matches |
| 529 | + # with the one returned by the server. |
| 530 | + for test_case in self.tls_v12_cases: |
| 531 | + self._test_tls_ciphersuites( |
| 532 | + tls_versions=["TLSv1.2"], test_case=test_case, verify=True |
| 533 | + ) |
| 534 | + |
| 535 | + @tests.foreach_cnx(mysql.connector.MySQLConnection) |
| 536 | + def test_tls_v13_ciphers(self): |
| 537 | + # verify=True means the test checks the selected cipher matches |
| 538 | + # with the one returned by the server. |
| 539 | + for test_case in self.tls_v13_cases: |
| 540 | + # verification should be False since cipher selection |
| 541 | + # for TLSv1.3 isn't supported. |
| 542 | + self._test_tls_ciphersuites( |
| 543 | + tls_versions=["TLSv1.3"], test_case=test_case, verify=False |
| 544 | + ) |
| 545 | + self._test_tls_ciphersuites( |
| 546 | + tls_versions=None, test_case=test_case, verify=False |
| 547 | + ) |
| 548 | + |
| 549 | + @tests.foreach_cnx(mysql.connector.CMySQLConnection) |
| 550 | + def test_tls_v12_ciphers_cext(self): |
| 551 | + # verify=True means the test checks the selected cipher matches |
| 552 | + # with the one returned by the server. |
| 553 | + for test_case in self.tls_v12_cases: |
| 554 | + self._test_tls_ciphersuites( |
| 555 | + tls_versions=["TLSv1.2"], test_case=test_case, verify=True |
| 556 | + ) |
| 557 | + |
| 558 | + @tests.foreach_cnx(mysql.connector.CMySQLConnection) |
| 559 | + def test_tls_v13_ciphers_cext(self): |
| 560 | + # verify=True means the test checks the selected cipher matches |
| 561 | + # with the one returned by the server. |
| 562 | + for test_case in self.tls_v13_cases: |
| 563 | + self._test_tls_ciphersuites( |
| 564 | + tls_versions=["TLSv1.3"], test_case=test_case, verify=True |
| 565 | + ) |
| 566 | + self._test_tls_ciphersuites( |
| 567 | + tls_versions=None, test_case=test_case, verify=True |
| 568 | + ) |
0 commit comments