Python Notes: Difference between revisions

From Federal Burro of Information
Jump to navigationJump to search
 
(8 intermediate revisions by the same user not shown)
Line 9: Line 9:
== package management ==
== package management ==


use pip.
Use pip... this just in ... IT'S COMPLICATED
 
https://stackoverflow.com/questions/75608323/how-do-i-solve-error-externally-managed-environment-every-time-i-use-pip-3


got yourself in a bind and get use network enabled pip ( cause you pooched your ssl libs? ).. fetch and "pip install -e ." -> https://pypi.org/simple/
got yourself in a bind and get use network enabled pip ( cause you pooched your ssl libs? ).. fetch and "pip install -e ." -> https://pypi.org/simple/
Line 248: Line 250:
pprint.pprint(d)
pprint.pprint(d)
</pre>
</pre>
== Fetch else Null ==
you have a cmplicate data strcutre from some crazy json, say an elastic search hit.
You some values from that data strcutre, but some of them might exist nad some might now.
Rather than walking that _specific"_ data striucter, you want to ask for some nexted key and get back the value or null.
YOu could do some nexts "if x in y: else null" tree for _this_ shape of data strcuture.
Or you can use this function and let it to the walking for you:
<pre>
def fetchelsenull (json,stringpath):
    print("START fetchelsenull")
    path = stringpath.split(".")
    print("Path: {}".format(path))
    head = path.pop(0)
    print("Head: {}".format(head))
    if head in json:
        print("Len of path is {}".format(len(path)))
        # if this is the end of the tree return the value.
        if len(path) == 0:
            print("value is {}".format(json[head]))
            return json[head]
        else:
            return fetchelsenull(json[head],".".join(path))
</pre>
so now instead of this:
<pre>
if '_source' in hit:
    if 'place' in hit['_source']:
        if 'State' in hit['_source']['place']:
            if 'Message' in hit['_source']['place']['State']:
                print("Message [}".format(hit['_source']['place']['State']['Message']))
            else:
                print("Message is null")
            if 'Exception' in hit['_source']['place']['State']:
                print("Exception [}".format(hit['_source']['place']['State']['Message']))
            else:
                print("Exception is null")
</pre>
you can do this:
<pre>
print("Message is {}".foramt(fetchelsenull(hit,"_source.place.State.Message")))
print("Exception is {}".foramt(fetchelsenull(hit,"_source.place.State.Exception")))
</pre>
You provide a "path" dot delimited and the function walks the path and does that validation as needed.
This is not very efficient:
# it walks the whole thing over and over again.
# it could copy around alot of data.


== Join to lists ==
== Join to lists ==
Line 357: Line 416:
print("read {} write {}".format(read,write))
print("read {} write {}".format(read,write))
</pre>
</pre>
== schedule ==
<pre>
import schedule
import time
print("{}".format(time.time()))
def run_thing(name):
  print("Thing {}".format(name))
schedule.every(4).seconds.do(run_thing,name='alice')
schedule.every(2).seconds.do(run_thing,name='bob')
while 1:
  schedule.run_pending()
  time.sleep(1)
</pre>
== async ==
good example and compare:
https://www.velotio.com/engineering-blog/async-features-in-python


== tuples ==
== tuples ==
Line 529: Line 614:
for x in perhour:
for x in perhour:
     print("{}: {}".format(x,perhour[x]))
     print("{}: {}".format(x,perhour[x]))
</pre>
== Logging ==
<pre>
import logging
logging.info(Elasticsearch.__version__)
logging.error(pp.pformat(client_info))
</pre>
</pre>


Line 535: Line 632:
=== urllib ===
=== urllib ===


https://stackoverflow.com/questions/3238925/python-urllib-urllib2-post
aka requests
 
at first I used:
 
https://stackoverflow.com/questions/3238925/python-urllib-urllib2-post
 
then I used:


come dave, get with the times:
https://urllib3.readthedocs.io/en/latest/


https://urllib3.readthedocs.io/en/latest/
then i used:
 
https://docs.python-requests.org/en/master/


=== datetime ===
=== datetime ===
Line 557: Line 662:


=== Pandas ===
=== Pandas ===
get your stuff in to data frame , you will thank me later.
lots of stuf you want to do you can do easily in pandas dataframe.


on chopping up data:
on chopping up data:
Line 658: Line 767:
print(result)
print(result)
</pre>
</pre>
https://pandas.pydata.org/docs/getting_started/intro_tutorials/05_add_columns.html


=== Matplotlib ===
=== Matplotlib ===
Line 678: Line 789:
</pre>
</pre>


colour and cateogries:
https://kanoki.org/2020/08/30/matplotlib-scatter-plot-color-by-category-in-python/


== Scripts ==
== Scripts ==


[[/yaml2xml.py]]
[[/yaml2xml.py]]

Latest revision as of 01:42, 19 November 2024

Start

Docs:

package management

Use pip... this just in ... IT'S COMPLICATED

https://stackoverflow.com/questions/75608323/how-do-i-solve-error-externally-managed-environment-every-time-i-use-pip-3

got yourself in a bind and get use network enabled pip ( cause you pooched your ssl libs? ).. fetch and "pip install -e ." -> https://pypi.org/simple/

testing

in visual studio code et al
https://code.visualstudio.com/docs/python/unit-testing

Basics

string encoding

at the top of the file ( second line ) :

(2.7.11 on freenas 9 this worked)

# coding=utf_8

more: https://www.python.org/dev/peps/pep-0263/


Encoding

python 3

>>> b'0376'
b'0376'
>>> b'376'
b'376'
>>> str(b'0376')
"b'0376'"
>>> str(b'\376')
"b'\\xfe'"
>>> str(b'\0376')
"b'\\x1f6'"
>>> str(b'\76')
"b'>'"
>>> str(b'\276')
"b'\\xbe'"
>>> str(b'\176')
"b'~'"
>>> 
(edited)
8:35
>>> str('\376')
'þ'
>>> str(b'\376')
"b'\\xfe'"
>>> 

wc(1) - word count , useful, counts lines, words, and _bytes_.

$ echo 'þ' | wc 
       1       1       3
  • 1 line
  • 1 word
  • 3 bytes!

od(1) - Octal dump.

$ echo 'þ' | od
0000000    137303  000012                                                
0000003

oops I meant -c for "ascii representation":

$ echo 'þ' | od -c
0000000    þ  **  \n                                                    
0000003
$ 

from od(1):

Multi-byte characters are displayed in the area corresponding to the first byte of the character. The remaining bytes are shown as `**'.

þ is 2 bytes + \n is 1 = 3 bytes.

>>> int.from_bytes(b'\x00\xfe', byteorder='big')
254
>>> int.from_bytes(b'\xfe', byteorder='big')
254
>>> 

old way in python3:

>>> print('%(x)o' % {'x':254} )
376
>>> 
  • "o" - octal format

new way in python 3:

>>> print('{!r}'.format(254) )
254
>>> print('{!a}'.format(254) )
254
>>> print('{!s}'.format(254) )
254
>>> print('{}'.format(oct(254)) )
0o376
>>>
  • r - repr()
  • a - ascii()
  • s - str()

__debug__

Use __debug__ in your code:

if __debug__:

   print 'Debug ON'

else:

   print 'Debug OFF'

Create a script abc.py with the above code and then

Run with python -O abc.py Run with python abc.py Observe the difference.


formatting

https://pyformat.info/

'{:>10}'.format('test') # padding
'{} {}'.format('one', 'two')
'{0!s} {0!r}'.format(Data()) # forms
'{:.5}'.format('xylophone') # truncate
'{:4d}'.format(42)

localization

import locale
locale.setlocale(locale.LC_ALL, )  # Use  for auto, or force e.g. to 'en_US.UTF-8'
f'{value:n}'  # For Python ≥3.6

Functions


def functionname(argument1,argument2):
    """"Docstring can be
    Multiline """
    dostuff()
    return(stuff)

ghetto command line argv

#!/usr/bin/python

import sys

print 'Number of arguments:', len(sys.argv), 'arguments.'
print 'Argument List:', str(sys.argv)

Less ghetto argparse

import argparse
parser = argparse.ArgumentParser()
parser.add_argument('access_key', help='Access Key');
parser.add_argument('secret_key', help='Secret Key');
args = parser.parse_args()
global access_key
global secret_key
access_key = args.access_key
secret_key = args.secret_key

make bcrypt password

import bcrypt
bcrypt.hashpw('myPlainPassword', bcrypt.gensalt())

what's in that object?

https://download.tuxfamily.org/jeremyblog/diveintopython-5.4/py/apihelper.py

/apihelper.py

/apihelper.py - short

then use it like this:

#!/usr/bin/env python

from apihelper import info

mything

info(mything)


AutoVivification

import pprint

class Vividict(dict):
    def __missing__(self, key):
        value = self[key] = type(self)()
        return value

d = Vividict()

d['foo']['bar']
d['foo']['baz']
d['fizz']['buzz']
d['primary']['secondary']['tertiary']['quaternary']
pprint.pprint(d)

Fetch else Null

you have a cmplicate data strcutre from some crazy json, say an elastic search hit.

You some values from that data strcutre, but some of them might exist nad some might now. Rather than walking that _specific"_ data striucter, you want to ask for some nexted key and get back the value or null. YOu could do some nexts "if x in y: else null" tree for _this_ shape of data strcuture.

Or you can use this function and let it to the walking for you:

def fetchelsenull (json,stringpath):
    print("START fetchelsenull")
    path = stringpath.split(".")
    print("Path: {}".format(path))
    head = path.pop(0)
    print("Head: {}".format(head))
    if head in json:
        print("Len of path is {}".format(len(path)))
        # if this is the end of the tree return the value.
        if len(path) == 0:
            print("value is {}".format(json[head]))
            return json[head]
        else:
            return fetchelsenull(json[head],".".join(path))

so now instead of this:

if '_source' in hit:
    if 'place' in hit['_source']:
        if 'State' in hit['_source']['place']:
            if 'Message' in hit['_source']['place']['State']:
                print("Message [}".format(hit['_source']['place']['State']['Message']))
            else:
                print("Message is null")
            if 'Exception' in hit['_source']['place']['State']:
                print("Exception [}".format(hit['_source']['place']['State']['Message']))
            else:
                print("Exception is null")

you can do this:

print("Message is {}".foramt(fetchelsenull(hit,"_source.place.State.Message")))
print("Exception is {}".foramt(fetchelsenull(hit,"_source.place.State.Exception")))

You provide a "path" dot delimited and the function walks the path and does that validation as needed.

This is not very efficient:

  1. it walks the whole thing over and over again.
  2. it could copy around alot of data.

Join to lists

on is the keys, the other is the values:

use zip:

keys = ['a', 'b', 'c']
values = [1, 2, 3]
dictionary = dict(zip(keys, values))
print(dictionary)
{'a': 1, 'b': 2, 'c': 3}

vim tabs

in file:

# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

in ~/.vimrc:

set tabstop=8
set expandtab
set shiftwidth=4
set softtabstop=4

parse json

ref: https://docs.python.org/2/library/json.html

python -m json.tool my_json.json

parse yaml

pip3 install pyyaml
python3 -c 'import yaml,sys;yaml.safe_load_all(sys.stdin)' < file.yml

parse yaml

python -c "from yaml import load, Loader; load(open('filename.yml'), Loader=Loader)"

parse lines

in the context of scapy packets:

from scapy.all import *
from scapy.layers import http

myhttp = pkt.getlayer(http.HTTPRequest).fields
headers = myhttp['Headers'].splitlines()
for header in headers:
  ( k, v )  = header.split(": ")

extract by character

useful for files with very long lines

first find the line that has _many_ characters and put it in it's own file:

sed -n '459p' postgresql-9.3-main.log > tmp

Then read the characters one by one..

#!/usr/local/bin/env python
import sys

with open('tmp') as f:
  for x in range(600):
    c = f.read(1)
    if not c:
      print "End of file"
      break
    sys.stdout.write(c)
  print

another example.

rump is a rdis sync tool that writes one line of "r" or "w" for each read or write operation.

Here is a python to count those operations:

file: status.py

#!/usr/bin/env python3

import sys

filename="mylog"
write=0
read=0
with open(sys.argv[1]) as f:
  while True:
    c = f.read(1)
    if c =="w":
      write = write + 1
    elif c =="r":
      read = read + 1
    else:
      break

print("read {} write {}".format(read,write))

schedule

import schedule
import time

print("{}".format(time.time()))

def run_thing(name):
  print("Thing {}".format(name))

schedule.every(4).seconds.do(run_thing,name='alice')
schedule.every(2).seconds.do(run_thing,name='bob')

while 1:
  schedule.run_pending()
  time.sleep(1)

async

good example and compare:

https://www.velotio.com/engineering-blog/async-features-in-python

tuples

>>> x = [(1,2), (3,4), (5,6)]
>>> for item in x:
...     print "A tuple", item
A tuple (1, 2)
A tuple (3, 4)
A tuple (5, 6)
>>> for a, b in x:
...     print "First", a, "then", b
First 1 then 2
First 3 then 4
First 5 then 6

Decorators

https://python-3-patterns-idioms-test.readthedocs.io/en/latest/PythonDecorators.html

https://realpython.com/primer-on-python-decorators/

Exception handling

try:
  # rv, data = M.search(None, "ALL")
  # rv, data = M.search(None, 'SENTSINCE 1-Jan-2017 SENTBEFORE 31-Dec-2017')
  rv, data = M.search(None, 'SENTSINCE 1 Jan 2017')
except Exception, e:
  print "M.search failed"
  print "Error %s" % M.error.message
  print "Exception is %s" % str(e)

also:

try:
    pass
except Exception as e:
    # Just print(e) is cleaner and more likely what you want,
    # but if you insist on printing message specifically whenever possible...
    if hasattr(e, 'message'):
        print(e.message)
    else:
        print(e)

tempfile

temp file

f = NamedTemporaryFile(delete=False)
f
# <open file '<fdopen>', mode 'w+b' at 0x384698>
f.name
# '/var/folders/5q/5qTPn6xq2RaWqk+1Ytw3-U+++TI/-Tmp-/tmpG7V1Y0'
f.write("Hello World!\n")
f.close()
os.unlink(f.name)
os.path.exists(f.name)
# False

Pretty Printer

aka Object Dump

import pprint
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(stuff)

List Comprehension

[ <output expression> for <iterator variable> in <iterable> if <predicate expression> ].

Converting your brain:

new_things = []
for ITEM in old_things:
   if condition_based_on(ITEM):
   new_things.append("something with " + ITEM)

You can rewrite the above for loop as a list comprehension like this:

new_things = ["something with " + ITEM for ITEM in old_things if condition_based_on(ITEM)]

unconditionally:

doubled_numbers = []
for n in numbers:
    doubled_numbers.append(n * 2)

That same code written as a comprehension:

doubled_numbers = [n * 2 for n in numbers]

reference: http://treyhunner.com/2015/12/python-list-comprehensions-now-in-color/

Tag extract

I have no idea why this works:

you get an instance from boto3 aws describe_instances and you want _one_ tag:

        retention_days = [
            int(t.get('Value')) for t in instance['Tags']
            if t['Key'] == 'snapshot_retention'][0]

Log file loop + re

#!/usr/bin/env python3
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
import sys
import re
from dateutil import parser
myfile="/root/selenoid.log.Jan6"
perhour = {}

# Dec 22 09:44:10 chrome01 xvfb-run[34607]: 2019/12/22 09:44:10 [8157622] [TERMINATED_PROCESS] [41686] [0.02s]
#                          mon    day    hh  :min  :sec   hostname print pid
line_regex = re.compile(r"(\D*)\s(\d*)\s(\d*):(\d*):(\d*)\s(\w)\s(\w)\[(\d*)\]")
line_regex = re.compile(r"(\D*)\s(\d*)\s(\d*):(\d*):(\d*)\s(\S*)\s(\S*)\s(\S*)\s(\S*)\s\[(\S*)\]\s\[(\S*)\]\s")

with open(myfile) as f:
    for line in f:
        try:
            # print ("line {}".format(line),)
            m = line_regex.search(line)
            if (m):
                # print (".",end='')
                #print ("zero {}".format(m.group(0)))
                #print ("month  {}".format(m.group(1)))
                #print ("date  {}".format(m.group(2)))
                #print ("hour  {}".format(m.group(3)))
                #print ("min  {}".format(m.group(4)))
                #print ("sec  {}".format(m.group(5)))
                #print ("hostname  {}".format(m.group(6)))
                #print ("processname  {}".format(m.group(7)))
                #print ("date2  {}".format(m.group(8)))
                #print ("time2  {}".format(m.group(9)))
                #print ("session  {}".format(m.group(10)))
                #print ("verb  {}".format(m.group(11)))
                if ( m.group(11) == "LOCATING_SERVICE" ) :
                    key = m.group(1)+m.group(2)+m.group(3)
                    if key in perhour:
                        perhour[key] = perhour[key] + 1
                    else:
                        perhour[key] = 1
            #else:
                # print("x",end='')
        except StopIteration:
            print("StopIteration")
            sys.exit()
    print("end of file")
    parser.parse("Jan 01 00:00:25")

print("per hour")
for x in perhour:
    print("{}: {}".format(x,perhour[x]))

Logging

import logging

logging.info(Elasticsearch.__version__)
logging.error(pp.pformat(client_info))


Libs of note

urllib

aka requests

at first I used:

https://stackoverflow.com/questions/3238925/python-urllib-urllib2-post

then I used:

https://urllib3.readthedocs.io/en/latest/

then i used:

https://docs.python-requests.org/en/master/

datetime

stuff

def last_day_of_month(any_day):
    next_month = any_day.replace(day=28) + datetime.timedelta(days=4)  # this will never fail
    return next_month - datetime.timedelta(days=next_month.day)


t0 = datetime(1, 1, 1)
now = datetime.utcnow()
seconds = (now - t0).total_seconds()

Pandas

get your stuff in to data frame , you will thank me later.

lots of stuf you want to do you can do easily in pandas dataframe.

on chopping up data:

coloumns and rows.

https://medium.com/dunder-data/selecting-subsets-of-data-in-pandas-6fcd0170be9c

connecting pandas to mysql:

import MySQLdb
mysql_cn= MySQLdb.connect(host='myhost', 
                port=3306,user='myusername', passwd='mypassword', 
                db='information_schema')
df_mysql = pd.read_sql('select * from VIEWS;', con=mysql_cn)    
print 'loaded dataframe from MySQL. records:', len(df_mysql)
mysql_cn.close()

dialects in SQLAlchemy:

from sqlalchemy import create_engine
import pandas as pd
engine = create_engine('dialect://user:pass@host:port/schema', echo=False)
f = pd.read_sql_query('SELECT * FROM mytable', engine, index_col = 'ID')

( reference: https://stackoverflow.com/questions/10065051/python-pandas-and-databases-like-mysql )


from sqlalchemy import create_engine
engine = create_engine('postgresql://user@localhost:5432/mydb')


Series

datacamp exercises:

# Import pandas
import pandas as pd

# Import Twitter data as DataFrame: df
df = pd.read_csv('tweets.csv')

# Initialize an empty dictionary: langs_count
langs_count = {}

# Extract column from DataFrame: col
col = df['lang']

# Iterate over lang column in DataFrame
for entry in col:

    # If the language is in langs_count, add 1
    if entry in langs_count.keys():
        langs_count[entry] = langs_count[entry] + 1
    # Else add the language to langs_count, set the value to 1
    else:
        langs_count[entry] = 1

# Print the populated dictionary
print(langs_count)

now as a fucntion

# Define count_entries()
def count_entries(df, col_name):
    """Return a dictionary with counts of 
    occurrences as value for each key."""

    # Initialize an empty dictionary: langs_count
    langs_count = {}
    
    # Extract column from DataFrame: col
    col = df[col_name]
    
    # Iterate over lang column in DataFrame
    for entry in col:

        # If the language is in langs_count, add 1
        if entry in langs_count.keys():
            langs_count[entry] = langs_count[entry] + 1
        # Else add the language to langs_count, set the value to 1
        else:
            langs_count[entry] = 1

    # Return the langs_count dictionary
    return(langs_count)

# Call count_entries(): result
result = count_entries(tweets_df,'lang')

# Print the result
print(result)

https://pandas.pydata.org/docs/getting_started/intro_tutorials/05_add_columns.html

Matplotlib

Categorization

Ref: https://datascience.stackexchange.com/questions/14039/tool-to-label-images-for-classification

I just hacked together a very basic helper in python it requires that all images are stored in a python list allImages.

import matplotlib.pyplot as plt
category=[]
plt.ion()

for i,image in enumerate(allImages):
    plt.imshow(image)
    plt.pause(0.05)
    category.append(raw_input('category: '))

colour and cateogries:

https://kanoki.org/2020/08/30/matplotlib-scatter-plot-color-by-category-in-python/

Scripts

/yaml2xml.py