3
(he8                 @   s   d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlZd dl	m
Z
 d dlmZ dddgZG d	d de
jZG d
d de
jZG dd de
jZdS )    N)SkipTest)util)base)ProgrammingErrorTestConversion
TestCursorTestBulkInsertsc               @   sL   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dS )r   c             C   s  | j d }|j }|jd zdd"ddddd	j|jtjd
ddtjddddddtjddtjddtj	 f}|jd| |jd |j
 }| jtjd|d  | j|dd |dd  | jtjdd|d jd |d j  |d  | jtj|d# dd  |d$  |jd |jddgd  |jd |j
 }| jtdgd | |jd xXttttfD ]H}|jd |ddg}|jd|f |j }| jd'| |jd qpW W d|jd! X dS )(z test every data type r   zcreate table test_datatypes (b bit, i int, l bigint, f real, s varchar(32), u varchar(32), bb blob, d date, dt datetime, ts timestamp, td time, t time, st datetime)T   l   2}r g@zhello'" worldu
   EspaÃ±olzbinary datai     i           -   9             zeinsert into test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)z6select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes   
   <   Nzdelete from test_datatypes   z>insert into test_datatypes (i, l) values (2,4), (6,8), (10,12)z5select l from test_datatypes where i in %s order by i      zdrop table test_datatypesr   r   r   )r   r   )connectionscursorexecuteencodecharsetdatetimedate	timedeltatime	localtimefetchoneassertEqualr   int2bytehourminutetuplelistset	frozensetfetchall)selfconncvrZseq_typeseq r7   =/tmp/pip-install-q3hcpn_q/PyMySQL/pymysql/tests/test_basic.pytest_datatypes   s4    

R
0"



zTestConversion.test_datatypesc             C   s`   | j d }|j }|jd z2|jddddd |jd | jd|j  W d	|jd
 X d	S )z test dict escaping r   z8create table test_dict (a integer, b integer, c integer)z:insert into test_dict (a,b,c) values (%(a)s, %(b)s, %(c)s)r   r
   r	   )abr3   zselect a,b,c from test_dictNzdrop table test_dict)r   r
   r	   )r   r   r   r(   r'   )r1   r2   r3   r7   r7   r8   	test_dict:   s    


zTestConversion.test_dictc             C   s^   | j d }|j }|jd d}z,|jd| |jd | j|f|j  W d |jd X d S )Nr   zcreate table test_dict (a text)zI am a test stringz%insert into test_dict (a) values (%s)zselect a from test_dictzdrop table test_dict)r   r   r   r(   r'   )r1   r2   r3   
test_valuer7   r7   r8   test_stringF   s    


zTestConversion.test_stringc             C   s^   | j d }|j }|jd d}z,|jd| |jd | j|f|j  W d |jd X d S )Nr   z"create table test_dict (a integer)i90  z%insert into test_dict (a) values (%s)zselect a from test_dictzdrop table test_dict)r   r   r   r(   r'   )r1   r2   r3   r=   r7   r7   r8   test_integerR   s    


zTestConversion.test_integerc             C   sp   t ttdd }| jd }| j|dd |j 2}|jd|f |jd | j||j d  W dQ R X dS )	ztest binary data   r   r   	test_blobzcreate table test_blob (b blob)z%insert into test_blob (b) values (%s)zselect b from test_blobN)	bytes	bytearrayranger   safe_create_tabler   r   r(   r'   )r1   datar2   r3   r7   r7   r8   rA   ^   s    



zTestConversion.test_blobc             C   sJ   | j d }|j }|jd | jd|j  |jd | jd|j  dS )z' test conversion of null, empty string r   zselect null,''N zselect '',null)NrG   )rG   N)r   r   r   r(   r'   )r1   r2   r3   r7   r7   r8   test_untypedj   s    


zTestConversion.test_untypedc          
   C   s   | j d }|j }|jd | jtjddtjddtjdddtjdd tjdd tjddd tjdd f|j  dS )z test timedelta conversion r   zselect time('12:30'), time('23:12:59'), time('23:12:59.05100'), time('-12:30'), time('-23:12:59'), time('-23:12:59.05100'), time('-00:30')iȯ  i{F i8  i  N)r   r   r   r(   r"   r$   r'   )r1   r2   r3   r7   r7   r8   test_timedeltas   s    


zTestConversion.test_timedeltac             C   s   | j d }| j|dstd|j }tjdddd	d	d	d
}|jd z0|jdd|f |jd | j|f|j  W d|jd X dS )z( test datetime conversion w microsecondsr   r   r   r   z,target backend does not support microsecondsi     r   	   i: z3create table test_datetime (id int, ts datetime(6))z)insert into test_datetime values (%s, %s)r   zselect ts from test_datetimeNzdrop table test_datetime)r   r   r   )r   mysql_server_isr   r   r"   r   r(   r'   )r1   r2   r3   dtr7   r7   r8   test_datetime_microseconds   s    



z)TestConversion.test_datetime_microsecondsN)__name__
__module____qualname__r9   r<   r>   r?   rA   rH   rI   rN   r7   r7   r7   r8   r      s   '	c               @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
r   c             C   sT   | j d }|j }|jd z&d}|jd|f | jd|j  W d|jd X dS )z  test a fetchone() with no rows r   z$create table test_nr (b varchar(32))pymysqlz#insert into test_nr (b) values (%s)Nzdrop table test_nr)r   r   r   r(   r'   )r1   r2   r3   rF   r7   r7   r8   test_fetch_no_result   s    

zTestCursor.test_fetch_no_resultc             C   s~   | j d }|j }zZ|jd x tddD ]}|jd|f q*W |jd |j \}| jttdd| W d|jd X dS )z test aggregate functions r   z(create table test_aggregates (i integer)r   z+insert into test_aggregates (i) values (%s)z"select sum(i) from test_aggregatesNzdrop table test_aggregates)r   r   r   rD   r'   r(   sum)r1   r2   r3   ir5   r7   r7   r8   test_aggregates   s    



zTestCursor.test_aggregatesc             C   sb   | j d }|j }| j|dd |jd |jd |jdd
 | jdgt|j  |j  dS )z test a single tuple r   Zmystuffz-create table mystuff (id integer primary key)z#insert into mystuff (id) values (1)z#insert into mystuff (id) values (2)z%select id from mystuff where id in %sr   Nr   )rW   )r   )r   r   rE   r   r(   r-   r0   close)r1   r2   r3   r7   r7   r8   test_single_tuple   s    


zTestCursor.test_single_tuplec             C   s   | j d j }d|d< tjf |}| j|ds6td| j|dd |j }d	}|jd
|f |jd |j	 d }| j
tj|tj| |jd|f |j	 d }| j
tj|tj| d S )Nr   Zutf8mb4r!   r   r   z*JSON type is not supported on MySQL <= 5.6	test_jsonz]create table test_json (
    id int not null,
    json JSON not null,
    primary key (id)
);u   {"hello": "こんにちは"}z2INSERT INTO test_json (id, `json`) values (42, %s)z,SELECT `json` from `test_json` WHERE `id`=42zSELECT CAST(%s AS JSON) AS x)r   r   r   )Z	databasescopyrR   connectrL   r   rE   r   r   r'   r(   jsonloads)r1   argsr2   curZjson_strresr7   r7   r8   rZ      s     
zTestCursor.test_jsonN)rO   rP   rQ   rS   rV   rY   rZ   r7   r7   r7   r8   r      s   6c                   sT   e Zd ZejjZ fddZdd Zdd Z	dd Z
d	d
 Zdd Zdd Z  ZS )r   c                s<   t t| j  | jd  | _}|j| j}| j|dd d S )Nr   Z
bulkinsertz]CREATE TABLE bulkinsert
(
id int(11),
name char(20),
age int,
height int,
PRIMARY KEY (id)
)
)superr   setUpr   r2   r   cursor_typerE   )r1   r2   r3   )	__class__r7   r8   rc     s
    	zTestBulkInserts.setUpc             C   s<   | j d }|j }|jd |j }| jt|t| d S )Nr   z,SELECT id, name, age, height from bulkinsert)r   r   r   r0   r(   sorted)r1   rF   r2   r   resultr7   r7   r8   _verify_records!  s
    

zTestBulkInserts._verify_recordsc             C   sR   | j d }|j }dddg}|jd| | j|jtd |jd | j| d S )Nr   bob   {   r   jim8   r   r
   fredd      zCinsert into bulkinsert (id, name, age, height) values (%s,%s,%s,%s)si   insert into bulkinsert (id, name, age, height) values (0,'bob',21,123),(1,'jim',56,45),(2,'fred',100,180)commit)r   ri   rj   rk   )r   rl   rm   r   )r
   rn   ro   rp   )r   r   executemanyr(   _last_executedrC   r   rh   )r1   r2   r   rF   r7   r7   r8   test_bulk_insert(  s    


z TestBulkInserts.test_bulk_insertc             C   sV   | j d }|j }dddg}|jd| | j|jj td |jd | j| d S )Nr   ri   rj   rk   r   rl   rm   r   r
   rn   ro   rp   zJinsert
into bulkinsert (id, name,
age, height)
values (%s,
%s , %s,
%s )
 sx   insert
into bulkinsert (id, name,
age, height)
values (0,
'bob' , 21,
123 ),(1,
'jim' , 56,
45 ),(2,
'fred' , 100,
180 )rq   )r   ri   rj   rk   )r   rl   rm   r   )r
   rn   ro   rp   )	r   r   rr   r(   rs   striprC   r   rh   )r1   r2   r   rF   r7   r7   r8   $test_bulk_insert_multiline_statement6  s    

	
z4TestBulkInserts.test_bulk_insert_multiline_statementc             C   s<   | j d }|j }dg}|jd| |jd | j| d S )Nr   ri   rj   rk   zCinsert into bulkinsert (id, name, age, height) values (%s,%s,%s,%s)rq   )r   ri   rj   rk   )r   r   rr   r   rh   )r1   r2   r   rF   r7   r7   r8   test_bulk_insert_single_recordN  s    

z.TestBulkInserts.test_bulk_insert_single_recordc             C   sV   | j d }|j }dddg}|jd| | j|jj td |jd | j| dS )z4executemany should work with "insert ... on update" r   ri   rj   rk   r   rl   rm   r   r
   rn   ro   rp   ztinsert
into bulkinsert (id, name,
age, height)
values (%s,
%s , %s,
%s ) on duplicate key update
age = values(age)
 s   insert
into bulkinsert (id, name,
age, height)
values (0,
'bob' , 21,
123 ),(1,
'jim' , 56,
45 ),(2,
'fred' , 100,
180 ) on duplicate key update
age = values(age)rq   N)r   ri   rj   rk   )r   rl   rm   r   )r
   rn   ro   rp   )	r   r   rr   r(   rs   ru   rC   r   rh   )r1   r2   r   rF   r7   r7   r8   test_issue_288W  s    



zTestBulkInserts.test_issue_288c             C   s   | j d }|j }tjdd}tjd |jd W d Q R X | jt|d | j|d jt	j
 dt|d jkr| jdt|d jf  d S )	Nr   T)recordalwaysz$drop table if exists no_exists_tabler   Zno_exists_tablez'no_exists_table' not in %s)r   r   warningscatch_warningssimplefilterr   r(   lencategoryrR   Warningstrmessagefail)r1   conr`   wsr7   r7   r8   test_warningsr  s    

zTestBulkInserts.test_warnings)rO   rP   rQ   rR   ZcursorsZ
DictCursorrd   rc   rh   rt   rv   rw   rx   r   __classcell__r7   r7   )re   r8   r     s   	)r"   r]   r%   r{   Z	unittest2r   rR   r   Zpymysql.cursorsZpymysql.testsr   Zpymysql.errr   __all__ZPyMySQLTestCaser   r   r   r7   r7   r7   r8   <module>   s   
 w