#Github Updated - NotifyBot 11 #Import Modules #Clear Terminal print("\033[H\033[J") #Ability to Remove old Map import os #Setup Geopy from geopy.geocoders import Nominatim geolocator = Nominatim(user_agent="OpenSkyBot", timeout=5) import time from colorama import Fore, Back, Style from defOpenSky import pullOpenSky from defADSBX import pullADSBX #Setup Config File import configparser config = configparser.ConfigParser() config.read('config.ini') if config.getboolean('GOOGLE', 'STATICMAP_ENABLE'): from defMap import getMap else: from defSS import getSS if config.getboolean('DISCORD', 'ENABLE'): from defDiscord import sendDis #Setup Tweepy if config.getboolean('TWITTER', 'ENABLE'): from defTweet import tweepysetup tweet_api = tweepysetup() else: tweet_api = None #Setup PushBullet if config.getboolean('PUSHBULLET', 'ENABLE'): from pushbullet import Pushbullet pb = Pushbullet(config['PUSHBULLET']['API_KEY']) pb_channel = pb.get_channel(config.get('PUSHBULLET', 'CHANNEL_TAG')) else: pb_channel = None pb = None #Set Plane ICAO TRACK_PLANE = config.get('DATA', 'ICAO') icao = TRACK_PLANE.upper() #Pre Set Non Reseting Variables geo_alt_ft = None last_geo_alt_ft = None last_below_desired_ft = None feeding = None last_feeding = None last_on_ground = None on_ground = None invalid_Location = None longitude = None latitude = None geo_alt_m = None running_Count = 0 callsign = None takeoff_time = None #Begin Looping program while True: running_Count += 1 start_time = time.time() print (Back.MAGENTA, "--------", running_Count, "-------------------------------------------------------------", Style.RESET_ALL) #Reset Variables below_desired_ft = None geo_alt_ft = None longitude = None latitude = None on_ground = None geo_alt_m = None #Get API States for Plane plane_Dict = None if config.get('DATA', 'SOURCE') == "OPENS": plane_Dict, failed = pullOpenSky(TRACK_PLANE) print (Fore.YELLOW) print ("OpenSky Sourced Data: ", plane_Dict) print(Style.RESET_ALL) elif config.get('DATA', 'SOURCE') == "ADSBX": plane_Dict, failed = pullADSBX(TRACK_PLANE) print (Fore.YELLOW) print ("ADSBX Sourced Data: ", plane_Dict) print(Style.RESET_ALL) print (Fore.CYAN) print ("Failed:", failed) print ("ICAO:", icao) print(Style.RESET_ALL) #Pull Variables from plane_Dict if failed is False: if plane_Dict == None: feeding = False elif plane_Dict != None: locals().update(plane_Dict) print (Fore.CYAN) if config.get('DATA', 'SOURCE') == "ADSBX": print("Registration: ", reg) else: print("Registration: ", "Only shows when using ADSBX!") print ("Callsign: ", callsign) print ("On Ground: ", on_ground) print ("Latitude: ", latitude) print ("Longitude: ", longitude) print ("GEO Alitude Ft: ", geo_alt_ft) print(Style.RESET_ALL) #Lookup Location of coordinates if longitude != None and latitude != None: combined = f"{latitude}, {longitude}" try: location = geolocator.reverse(combined) except: print ("Geopy API Error") print (Fore.YELLOW) # print ("Geopy debug: ", location.raw) print(Style.RESET_ALL) feeding = True else: print (Fore.RED + 'No Location') feeding = False print(Style.RESET_ALL) #Figure if valid location, valid being geopy finds a location if feeding: try: geoError = location.raw['error'] except KeyError: invalid_Location = False geoError = None else: invalid_Location = True print ("Invalid Location: ", invalid_Location) if invalid_Location: print (Fore.RED) print (geoError) print ("Likely Over Water or Invalid Location") print(Style.RESET_ALL) #Convert Full address to sep variables only if Valid Location elif invalid_Location is False: address = location.raw['address'] country = address.get('country', '') country_code = address.get('country_code', '').upper() state = address.get('state', '') county = address.get('county', '') city = address.get('city', '') town = address.get('town', '') hamlet = address.get('hamlet', '') # print (Fore.YELLOW) # print ("Address Fields debug: ", address) # print(Style.RESET_ALL) print (Fore.GREEN) print("Entire Address: ", location.address) print ("Country Code: ", country_code) print ("Country: ", country) print ("State: ", state) print ("City: ", city) print ("Town: ", town) print ("Hamlet: ", hamlet) print ("County: ", county) print(Style.RESET_ALL) #Check if below desire ft if geo_alt_ft is None: below_desired_ft = False elif geo_alt_ft < 10000: below_desired_ft = True #Check if tookoff tookoff = bool(invalid_Location is False and below_desired_ft and on_ground is False and ((last_feeding is False and feeding) or (last_on_ground))) print ("Tookoff Just Now:", tookoff) #Check if Landed landed = bool(last_below_desired_ft and invalid_Location is False and ((last_feeding and feeding is False and last_on_ground is False) or (on_ground and last_on_ground is False))) print ("Landed Just Now:", landed) #Chose city town county or hamlet for location as not all are always avalible. if feeding and invalid_Location is False: aera_hierarchy = city or town or county or hamlet #Takeoff Notifcation and Landed if tookoff: tookoff_message = ("Just took off from" + " " + aera_hierarchy + ", " + state + ", " + country_code) print (tookoff_message) #Google Map or tar1090 screenshot if config.getboolean('GOOGLE', 'STATICMAP_ENABLE'): getMap(aera_hierarchy + ", " + state + ", " + country_code) else: getSS(icao) #Discord if config.getboolean('DISCORD', 'ENABLE'): dis_message = icao + " " + tookoff_message sendDis(dis_message) #PushBullet if pb != None: with open("map.png", "rb") as pic: map_data = pb.upload_file(pic, "Tookoff IMG") push = pb_channel.push_note(config.get('PUSHBULLET', 'TITLE'), tookoff_message) push = pb_channel.push_file(**map_data) #Twitter if tweet_api != None: tweet_api.update_with_media("map.png", status = tookoff_message) takeoff_time = time.time() os.remove("map.png") if landed: landed_time_msg = "" if takeoff_time != None: landed_time = time.time() - takeoff_time landed_time_msg = time.strftime("Apx. flt. time %H Hours : %M Mins ", time.gmtime(landed_time)) landed_message = ("Landed just now in" + " " + aera_hierarchy + ", " + state + ", " + country_code + ". " + landed_time_msg) print (landed_message) #Google Map or tar1090 screenshot if config.getboolean('GOOGLE', 'STATICMAP_ENABLE'): getMap(aera_hierarchy + ", " + state + ", " + country_code) else: getSS(icao) #Discord if config.getboolean('DISCORD', 'ENABLE'): dis_message = icao + " " + landed_message sendDis(dis_message) #PushBullet if pb != None: with open("map.png", "rb") as pic: map_data = pb.upload_file(pic, "Landed IMG") push = pb_channel.push_note(config.get('PUSHBULLET', 'TITLE'), landed_message) push = pb_channel.push_file(**map_data) #Twitter if tweet_api != None: tweet_api.update_with_media("map.png", status = landed_message) takeoff_time = None landed_time = None time_since_tk = None os.remove("map.png") #Set Variables to compare to next check last_feeding = feeding last_geo_alt_ft = geo_alt_ft last_on_ground = on_ground last_below_desired_ft = below_desired_ft elif failed: print ("Failed to connect to data source rechecking in 15s") if takeoff_time != None: elapsed_time = time.time() - takeoff_time time_since_tk = time.strftime("Time Since Take off %H Hours : %M Mins : %S Secs", time.gmtime(elapsed_time)) print(time_since_tk) elapsed_calc_time = time.time() - start_time print (Back.MAGENTA, "--------", running_Count, "------------------------Elapsed Time- ", elapsed_calc_time, "-------------------------------------", Style.RESET_ALL) print ("") time.sleep(15)