HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP Thu Apr 17 19:10:24 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.26
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //opt/osm/venv/lib/python3.10/site-packages/psutil/tests/__pycache__/test_testutils.cpython-310.pyc
o

=idI�@s�dZddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddl
mZddl
Z
ddlZ
ddl
mZddl
mZddl
mZddlmZddlmZdd	lmZdd
lmZddlmZddlmZdd
lmZddlmZddlmZddlmZddlmZddlmZddlmZddlm Z ddlm!Z!ddlm"Z"ddlm#Z#ddlm$Z$ddlm%Z%ddlm&Z&ddlm'Z'ddlm(Z(ddlm)Z)ddlm*Z*ddlm+Z+dd lm,Z,dd!lm-Z-dd"lm.Z.dd#lm/Z/dd$lm0Z0dd%lm1Z1dd&lm2Z2dd'lm3Z3Gd(d)�d)e�Z4Gd*d+�d+e�Z5Gd,d-�d-e�Z6Gd.d/�d/e�Z7Gd0d1�d1e�Z8e(j9j:d2d3�Gd4d5�d5e��Z;Gd6d7�d7e�Z<Gd8d9�d9e�Z=Gd:d;�d;e�Z>dS)<z1Tests for testing utils (psutil.tests namespace).�N)�mock)�FREEBSD)�NETBSD)�POSIX)�open_binary)�	open_text)�
supports_ipv6)�
CI_TESTING)�COVERAGE)�HAS_NET_CONNECTIONS_UNIX)�HERE)�
PYTHON_EXE)�PYTHON_EXE_ENV)�PsutilTestCase)�TestMemoryLeak)�bind_socket)�bind_unix_socket��
call_until)�chdir)�create_sockets)�fake_pytest)�filter_proc_net_connections)�
get_free_port)�
is_namedtuple)�process_namespace)�pytest)�
reap_children)�retry)�retry_on_failure)�
safe_mkdir)�safe_rmpath)�system_namespace)�tcp_socketpair)�	terminate)�unix_socketpair)�
wait_for_file)�wait_for_pidc@sxeZdZe�d�dd��Ze�d�dd��Ze�d�dd��Ze�d�dd	��Ze�d�d
d��Z	e�d�dd
��Z
dS)�TestRetryDecoratorz
time.sleepcsFtdddd��fdd��}ttd���|�dksJ�|jdks!J�dS)N����retries�intervalZlogfunc��r���dd�sdS�Nr*r��pop���queuer2�I/opt/osm/venv/lib/python3.10/site-packages/psutil/tests/test_testutils.py�fooG�
�z2TestRetryDecorator.test_retry_success.<locals>.foo�)r�list�range�
call_count��self�sleepr6r2r3r5�test_retry_successCs
z%TestRetryDecorator.test_retry_successcshtdddd��fdd��}ttd���t�t��|�Wd�n1s&wY|jdks2J�dS)Nr)r*r+cr.r/r0r2r3r2r5r6Ur7z2TestRetryDecorator.test_retry_failure.<locals>.foo�)rr9r:r�raises�ZeroDivisionErrorr;r<r2r3r5�test_retry_failureRs�z%TestRetryDecorator.test_retry_failurecCsVttdd�dd��}t�t��|�Wd�n1swY|jdks)J�dS)Nr*)�	exceptionr-cSst��N)�	TypeErrorr2r2r2r5r6c�z2TestRetryDecorator.test_exception_arg.<locals>.foor)r�
ValueErrorrrArFr;r<r2r2r5�test_exception_argas

�z%TestRetryDecorator.test_exception_argcCsXtdddd�dd��}t�t��|�Wd�n1swY|jdks*J�dS)Nr)r+cS�dddSr/r2r2r2r2r5r6o�z4TestRetryDecorator.test_no_interval_arg.<locals>.foor�rrrArBr;r<r2r2r5�test_no_interval_argks
�z'TestRetryDecorator.test_no_interval_argcCsXtdddd�dd��}t�t��|�Wd�n1swY|jdks*J�dS)Nr)r*r+cSrJr/r2r2r2r2r5r6yrKz0TestRetryDecorator.test_retries_arg.<locals>.foorLr<r2r2r5�test_retries_argws
�z#TestRetryDecorator.test_retries_argcCs<t�t��tddd�Wd�dS1swYdS)Nr)r*)r,�timeout)rrArHr)r=r>r2r2r5�test_retries_and_timeout_args�s"�z0TestRetryDecorator.test_retries_and_timeout_argsN)�__name__�
__module__�__qualname__r�patchr?rCrIrMrNrPr2r2r2r5r(Bs


	

	r(c@s<eZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
S)�TestSyncTestUtilsc	Cs�tt���tt���d}tjdtdg�d��+t	�
tj��t|�Wd�n1s.wYWd�dSWd�dS1sFwYdS)Ni���psutil.tests.retry.__iter__r��return_value)r'�os�getpid�max�psutilZpidsrrT�iterrrAZ
NoSuchProcess)r=Znopidr2r2r5�test_wait_for_pid�s
��"�z#TestSyncTestUtils.test_wait_for_pidcCsX|��}t|d��
}|�d�Wd�n1swYt|�tj�|�r*J�dS)N�wr6��
get_testfn�open�writer&rY�path�exists�r=�testfn�fr2r2r5�test_wait_for_file�s�z$TestSyncTestUtils.test_wait_for_filecCsR|��}t|d��Wd�n1swYt|dd�tj�|�r'J�dS)Nr_T)�empty)rarbr&rYrdre�r=rgr2r2r5�test_wait_for_file_empty�s�z*TestSyncTestUtils.test_wait_for_file_emptyc	Cs�|��}tjdtdg�d��*t�t��t|�Wd�n1s#wYWd�dSWd�dS1s;wYdS)NrVrrW)rarrTr]rrA�OSErrorr&rkr2r2r5�test_wait_for_file_no_file�s
��"�z,TestSyncTestUtils.test_wait_for_file_no_filecCs\|��}t|d��
}|�d�Wd�n1swYt|dd�tj�|�s,J�dS)Nr_r6F)�deleter`rfr2r2r5�test_wait_for_file_no_delete�s�z.TestSyncTestUtils.test_wait_for_file_no_deletecCstdd��dS)NcS�dS�Nr*r2r2r2r2r5�<lambda>��z3TestSyncTestUtils.test_call_until.<locals>.<lambda>r�r=r2r2r5�test_call_until�sz!TestSyncTestUtils.test_call_untilN)	rQrRrSr^rirlrnrprvr2r2r2r5rU�srUc@s4eZdZdd�Zdd�Zdd�Zdd�Zd	d
�ZdS)�TestFSTestUtilscC�<tt��}|jdksJ�Wd�dS1swYdS)N�r)r�__file__�mode�r=rhr2r2r5�test_open_text��
"�zTestFSTestUtils.test_open_textcCrx)N�rb)rrzr{r|r2r2r5�test_open_binary�r~z TestFSTestUtils.test_open_binarycCs<|��}t|�tj�|�sJ�t|�tj�|�sJ�dSrE)rar rYrd�isdirrkr2r2r5�test_safe_mkdir�s
zTestFSTestUtils.test_safe_mkdirc	Cs�|��}t|d���t|�tj�|�rJ�t|�t�|�t|�tj�|�r,J�tj	dt
tjd�d��'}t
�t
��t|�Wd�n1sLwY|jsVJ�Wd�dS1sawYdS)Nr_zpsutil.tests.os.stat�)Zside_effect)rarb�closer!rYrdre�mkdirrrTrm�errno�EINVALrrAZcalled)r=rg�mr2r2r5�test_safe_rmpath�s"
�
�"�z TestFSTestUtils.test_safe_rmpathcCsp|��}t��}t�|�t|��t��tj�||�ksJ�Wd�n1s)wYt��|ks6J�dSrE)rarY�getcwdr�rrd�join)r=rg�baser2r2r5�
test_chdir�s

�zTestFSTestUtils.test_chdirN)rQrRrSr}r�r�r�r�r2r2r2r5rw�srwc@s>eZdZdd�Zdd�Zejjedd�dd��Z	d	d
�Z
dS)�TestProcessUtilscCsN|��}t�|j�}|��sJ�t�|��rJ�tjjrJ�tjjr%J�dSrE)	�
spawn_subprocr\�Process�pid�
is_runningr�testsZ
_pids_startedZ_subprocesses_started)r=Zsubp�pr2r2r5�test_reap_children�sz#TestProcessUtils.test_reap_childrencCs�|��\}}|j|jksJ�|��sJ�|��sJ�t����}||gks'J�t��jdd�}t|�dks7J�||vs=J�||vsCJ�|��t�	�ksMJ�|��|jksVJ�t
|�|��r`J�|��sfJ�t
|�|��rpJ�dS)NT)�	recursive�)Zspawn_children_pairr�r�r\r��children�len�ppidrYrZr$)r=�childZ
grandchildr�r2r2r5�test_spawn_children_pair�s"z)TestProcessUtils.test_spawn_children_pair�
POSIX only��reasoncCs"|��\}}|��tjksJ�dSrE)�spawn_zombie�statusr\Z
STATUS_ZOMBIE)r=Z_parent�zombier2r2r5�test_spawn_zombie�sz"TestProcessUtils.test_spawn_zombiecCs�|��}t|�|�|j�t|�t�|��j�}t|�|�|j�t|�tddg}tj|tj	tj	t
d�}t|�|�|j�t|�|��j}t|�|�|j�t|�trw|��\}}t|�t|�|�|j�|�|j�dSdS)Nz-cz3import time; [time.sleep(0.1) for x in range(100)];)�stdout�stderr�env)
r�r$Zassert_pid_goner�r\r�r
�Popen�
subprocess�PIPErrr�)r=r��cmdr��parentr�r2r2r5�test_terminates@��
�zTestProcessUtils.test_terminateN)rQrRrSr�r�r�mark�skipifrr�r�r2r2r2r5r��s	
r�c@s~eZdZdd�Zejjedd�dd��Zdd�Z	ejjedd�ejje
p'ed	d�ejjed
d�dd����Z
d
d�ZdS)�TestNetUtilscCsNt�}td|fd��}|��d|ksJ�Wd�dS1s wYdS)Nr���addrr*)rr�getsockname)r=�port�sr2r2r5r)s"�zTestNetUtils.bind_socketr�r�cCs�|��}t|��3}|jtjksJ�|jtjksJ�|��|ks!J�tj	�
|�s)J�t�t�|�j
�s4J�Wd�n1s>wY|��}t|tjd��}|jtjksWJ�Wd�dS1sbwYdS)N)�type)rar�family�socket�AF_UNIXr��SOCK_STREAMr�rYrdre�stat�S_ISSOCK�st_mode�
SOCK_DGRAM)r=�name�sockr2r2r5�test_bind_unix_socket.s
�"�z"TestNetUtils.test_bind_unix_socketc	Cs�dt�f}ttj|d�\}}|�;|� |��|ksJ�|��|ks$J�|��|ks,J�Wd�n1s6wYWd�dSWd�dS1sNwYdS)Nz	127.0.0.1r�)rr#r��AF_INETr��getpeername)r=r��server�clientr2r2r5�test_tcp_socketpair<s
P�z TestNetUtils.test_tcp_socketpairz*/var/run/log UNIX socket opened by defaultzcan't list UNIX socketscCs�t��}|��}t|jdd��rJ�|��}t|�\}}zFtj�	|�s%J�t
�t�
|�j�s0J�|��|dks:J�t
t|jdd���dksHJ�|��|ksPJ�|��|ksXJ�W|��|��dS|��|��w)N�unix)�kindr�)r\r��num_fdsrZnet_connectionsrar%rYrdrer�r�r�r�r�r�r�)r=r�r�r�r�r�r2r2r5�test_unix_socketpairEs(���
z!TestNetUtils.test_unix_socketpaircCs�t��c}t�t�}t�t�}|D]}||jd7<||�tjtj�d7<q|tj	dks3J�t
�r?|tjdks?J�trLt
rL|tjdksLJ�|tjdksUJ�|tjdks^J�Wd�dS1siwYdS)Nr*r�)r�collections�defaultdict�intr��
getsockoptr��
SOL_SOCKET�SO_TYPEr�r�AF_INET6rrr�r�r�)r=�socksZfams�typesr�r2r2r5�test_create_socketsbs

"�z TestNetUtils.test_create_socketsN)rQrRrSrrr�r�rr�r�rrrr�r�r2r2r2r5r�(s

	��r��serial�r�c@sheZdZe�dd��Zdd�Ze�ejje	dd�ejje
dd�dd	����Zd
d�Zdd
�Z
dd�ZdS)�TestMemLeakClasscs8�fdd�}ddi�|j|ddd��ddksJ�dS)	Ncs�dd7<dS)N�cntr*r2r2�r�r2r5�funwsz(TestMemLeakClass.test_times.<locals>.funr�r�
�)�times�warmup_times�)�execute)r=r�r2r�r5�
test_timesuszTestMemLeakClass.test_timescCs2t�t��|jdd�dd�Wd�n1swYt�t��|jdd�dd�Wd�n1s7wYt�t��|jdd�dd�Wd�n1sUwYt�t��|jd	d�dd
�Wd�n1sswYt�t��|jdd�dd�Wd�dS1s�wYdS)
NcSrq�Nrr2r2r2r2r5rs�rtz1TestMemLeakClass.test_param_err.<locals>.<lambda>r�r�cSrqr�r2r2r2r2r5rs�rt���cSrqr�r2r2r2r2r5rs�rt)r�cSrqr�r2r2r2r2r5rs�rt)�	tolerancecSrqr�r2r2r2r2r5rs�rt)r,)rrArHr�rur2r2r5�test_param_err~s����"�zTestMemLeakClass.test_param_errz
skipped on CIr�zskipped during test coveragec
Cs�g}|fdd�}z_tjtjjdd��Jt�t����&t�t����|j	|dd�Wd�n1s4wYWd�n1sCwYWd�W~dSWd�W~dS1s_wYW~dS~w)NcSs|�dd�dS)NZ�xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx���append��lsr2r2r5r���z+TestMemLeakClass.test_leak_mem.<locals>.funz	extra-mem��match�dr�)
rrA�fail�	Exception�
contextlib�redirect_stdout�io�StringIO�redirect_stderrr�)r=r�r�r2r2r5�
test_leak_mem�s(��������
zTestMemLeakClass.test_leak_memcsd��fdd�}g�tr
dnd}tjtjjd|d����|�Wd�dS1s+wYdS)Ncs"tt�}��|j���|�dSrE)rbrzZ
addCleanupr�r�)rh��boxr=r2r5r��sz1TestMemLeakClass.test_unclosed_files.<locals>.fun�fd�handlez	unclosed r�)rrrAr�r�r�)r=r�r�r2r�r5�test_unclosed_files�s"�z$TestMemLeakClass.test_unclosed_filescs>�fdd�}g�d}|j||ddd�t��|dksJ�dS)Ncs��dd�dS)NZxxxxxxxxxxxxxxxxxxxxxxxxr�r�r2r�r2r5r��r�z,TestMemLeakClass.test_tolerance.<locals>.funr�ri�)r�r�r�r*)r�r�)r=r�r�r2r�r5�test_tolerance�s�zTestMemLeakClass.test_tolerancecCs�dd�}|�t|�t�t��|�t|�Wd�n1s wYdd�}t�tjj��|�t|�Wd�dS1sBwYdS)NcSrJr/r2r2r2r2r5�fun_1�sz2TestMemLeakClass.test_execute_w_exc.<locals>.fun_1cSsdSrEr2r2r2r2r5�fun_2��z2TestMemLeakClass.test_execute_w_exc.<locals>.fun_2)Z
execute_w_excrBrrArmr�r�)r=r�r�r2r2r5�test_execute_w_exc�s�"�z#TestMemLeakClass.test_execute_w_excN)rQrRrSrr�r�rr�r�r	r
r�r�r�rr2r2r2r5r�ss
r�c@sLeZdZdd�Zdd�Zdd�Zdd�Zd	d
�Zdd�Zd
d�Z	dd�Z
dS)�TestFakePytestcCs0t��}|�|�tjt��d�}|�|�}|S)N)�stream)�unittestZ	TestSuiteZaddTestZTextTestRunnerr�r��run)r=�klass�suite�runner�resultr2r2r5�run_test_class�s


zTestFakePytest.run_test_classc
Cs�t�t��}ddWd�n1swYt|jt�s!J�tjtdd��}td��1s0wYztjtdd��}td��1sEwYWntyf}zt|�dks[J�WYd}~dSd}~wwt�	d�S)Nr*rr6r��barz"foo" does not match "bar"�exception not raised)
rrArB�
isinstance�valuerH�AssertionError�strrr�)r=�cm�errr2r2r5�test_raises�s 
�����
zTestFakePytest.test_raisescCsVtjjdd�dd��}|�dksJ�tjjdd�Gdd�d��}|���dks)J�dS)Nr�r�cSrqrrr2r2r2r2r5r6�rGz%TestFakePytest.test_mark.<locals>.foor*c@�eZdZdd�ZdS)z%TestFakePytest.test_mark.<locals>.FoocSrqrrr2rur2r2r5r
�r�z)TestFakePytest.test_mark.<locals>.Foo.barN)rQrRrSr
r2r2r2r5�Foo�sr)rr��xdist_groupr
)r=r6rr2r2r5�	test_mark�s
zTestFakePytest.test_markcCs�Gdd�dtj�}|�|d��}|��sJ�t|j�dksJ�|jdddks*J�Gdd�dtj�}|�|d��}|��s@J�t|j�dksIJ�dS)Nc@�$eZdZejjddd�dd��ZdS)�,TestFakePytest.test_skipif.<locals>.TestCaseTr�r�cS�ddksJ�dSrrr2rur2r2r5r6���0TestFakePytest.test_skipif.<locals>.TestCase.fooN�rQrRrSrr�r�r6r2r2r2r5�TestCase��rr6r*rr�c@r)rFr�r�cSrrrr2rur2r2r5r6�rrNrr2r2r2r5r�r�rrr	Z
wasSuccessfulr�Zskipped�r=rrr2r2r5�test_skipif�szTestFakePytest.test_skipifcCsXGdd�dtj�}|�|d��}|��sJ�t|j�dksJ�|jdddks*J�dS)Nc@r)z*TestFakePytest.test_skip.<locals>.TestCasecSst�d�ddksJ�dS)Nr�r*r)r�skiprur2r2r5r6�s
z.TestFakePytest.test_skip.<locals>.TestCase.fooN)rQrRrSr6r2r2r2r5r��rr6r*rr�rr r2r2r5�	test_skip�s
zTestFakePytest.test_skipc	Cs|jtd�}t�|�ttj�|d�d��Wd�n1s wYttj�|d�d��}|�t�	d��
��Wd�n1sDwYtj�
tjd|��,t�t����t��}Wd�n1siwY|��dksvJ�Wd�dS1s�wYdS)N)�dirz__init__.pyr_ztest_file.pyz�                import unittest

                class TestCase(unittest.TestCase):
                    def test_passed(self):
                        pass
                rr*)rarrYr�rbrdr�rc�textwrap�dedent�lstriprrT�objectr\r�r�r�r�r�r�mainZcountTestCases)r=Ztmpdirrhrr2r2r5�	test_mains 
���
�"�zTestFakePytest.test_maincCs6t�t��tjdtdd�Wd�n1swYzt�t��tjdtdd�Wd�n1s6wYWn	tyEYnwt�d�Stjtdd��tjdtdd�Wd�n1sewYz!tjtdd��tjdtdd�Wd�n1s�wYWn
ty�YdSwt�d�S)Nr6r*)�
stacklevelrr�r
)	rZwarns�UserWarning�warnings�warn�DeprecationWarningrrr�rur2r2r5�
test_warnss0����
����
zTestFakePytest.test_warnscCs.t�tjj��t�d��1swYdS)Nr�)rrAr�r�rur2r2r5�	test_fail5s
�zTestFakePytest.test_failN)rQrRrSr	rrr!r$r+r1r2r2r2r2r5r�src@seZdZdd�Zdd�ZdS)�TestTestingUtilscCsLt��}t|�}|��tdd�|�|j�D��d}|�|��ks$J�dS)Ncs� �|]}|ddkr|VqdS)r*r�Nr2��.0�xr2r2r5�	<genexpr>?��z:TestTestingUtils.test_process_namespace.<locals>.<genexpr>r)r\r�r�test�nextr]�gettersr�)r=r��nsr�r2r2r5�test_process_namespace;s
z'TestTestingUtils.test_process_namespacecCs:t�}tdd�|�|j�D��d}|�t��ksJ�dS)Ncsr4)r*�net_if_addrsNr2r5r2r2r5r8Dr9z9TestTestingUtils.test_system_namespace.<locals>.<genexpr>r)r"r;r]r<r\r?)r=r=r�r2r2r5�test_system_namespaceBsz&TestTestingUtils.test_system_namespaceN)rQrRrSr>r@r2r2r2r5r3:sr3c@r)�TestOtherUtilscCs.tt�dd�ddd��sJ�tt��rJ�dS)Nr6za b cr*r�r8)rr��
namedtuple�tuplerur2r2r5�test_is_namedtupleIsz!TestOtherUtils.test_is_namedtupleN)rQrRrSrDr2r2r2r5rAHr#rA)?�__doc__r�r�r�r�rYr�r�r�r&rr.rr\Zpsutil.testsrrrZpsutil._commonrrrr	r
rrr
rrrrrrrrrrrrrrrrrr r!r"r#r$r%r&r'r(rUrwr�r�r�rr�rr3rAr2r2r2r5�<module>szE(-LKNx