Add ADSBX data option, and multiple enable bools

This commit is contained in:
Jxck-S 2020-08-14 23:58:51 -04:00
parent 688f617c87
commit 54c48dc8d4
6 changed files with 210 additions and 98 deletions

View File

@ -1,9 +1,9 @@
#Github Updated - NotifyBot 10 #Github Updated - NotifyBot 11
#Import Modules #Import Modules
#Setup Geopy
#Clear Terminal #Clear Terminal
import os import os
os.system('cls' if os.name == 'nt' else 'clear') os.system('cls' if os.name == 'nt' else 'clear')
#Setup Geopy
from geopy.geocoders import Nominatim from geopy.geocoders import Nominatim
geolocator = Nominatim(user_agent="OpenSkyBot", timeout=5) geolocator = Nominatim(user_agent="OpenSkyBot", timeout=5)
@ -11,13 +11,27 @@ import json
import time import time
from colorama import Fore, Back, Style from colorama import Fore, Back, Style
import datetime import datetime
from defOpenSky import pullplane from defOpenSky import pullOpenSky
from defMap import getMap from defADSBX import pullADSBX
#Setup Config File #Setup Config File
import configparser import configparser
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read('config.ini') config.read('config.ini')
if config.getboolean('GOOGLE', 'STATICMAP_ENABLE'):
from defMap import getMap
else:
from defSS import getSS
if config.getboolean('DISCORD', 'ENABLE'):
from defDiscord import sendDis
#Setup Tweepy
if config.getboolean('TWITTER', 'ENABLE'):
from defTweet import tweepysetup
tweet_api = tweepysetup()
else:
tweet_api = None
#Setup PushBullet #Setup PushBullet
if config.getboolean('PUSHBULLET', 'ENABLE'): if config.getboolean('PUSHBULLET', 'ENABLE'):
from pushbullet import Pushbullet from pushbullet import Pushbullet
@ -27,16 +41,8 @@ else:
pb_channel = None pb_channel = None
pb = None pb = None
from defSS import getSS
#Setup Tweepy
if config.getboolean('TWITTER', 'ENABLE'):
from defTweet import tweepysetup
tweet_api = tweepysetup()
else:
tweet_api = None
#Set Plane ICAO #Set Plane ICAO
TRACK_PLANE = config.get('PLANE', 'ICAO') TRACK_PLANE = config.get('DATA', 'ICAO')
icao = TRACK_PLANE.upper() icao = TRACK_PLANE.upper()
#Pre Set Non Reseting Variables #Pre Set Non Reseting Variables
geo_alt_ft = None geo_alt_ft = None
@ -66,53 +72,59 @@ while True:
on_ground = None on_ground = None
geo_alt_m = None geo_alt_m = None
#Get API States for Plane #Get API States for Plane
planeData = None plane_Dict = None
try: if config.get('DATA', 'SOURCE') == "OPENS":
planeData = pullplane(TRACK_PLANE) plane_Dict, failed = pullOpenSky(TRACK_PLANE)
except:
print ("Opensky Error")
print (Fore.YELLOW) print (Fore.YELLOW)
print ("OpenSky Debug", planeData) print ("OpenSky Sourced Data: ", plane_Dict)
print(Style.RESET_ALL)
elif config.get('DATA', 'SOURCE') == "ADSBX":
plane_Dict, failed = pullADSBX(TRACK_PLANE)
print (Fore.YELLOW)
print ("ADSBX Sourced Data: ", plane_Dict)
print(Style.RESET_ALL) print(Style.RESET_ALL)
#Pull Variables from planeData
if planeData != None:
for dataStates in planeData.states:
icao = (dataStates.icao24).upper()
callsign = (dataStates.callsign)
longitude = (dataStates.longitude)
latitude = (dataStates.latitude)
on_ground = (dataStates.on_ground)
geo_alt_m = (dataStates.geo_altitude)
if geo_alt_m != None:
geo_alt_ft = geo_alt_m * 3.281
elif geo_alt_m == None and on_ground:
geo_alt_ft = 0
print (Fore.CYAN) print (Fore.CYAN)
print ("ICAO: ", icao) print ("Failed:", failed)
print ("ICAO:", icao)
print(Style.RESET_ALL)
#Pull Variables from plane_Dict
if failed is False:
if plane_Dict == None:
feeding = False
elif plane_Dict != None:
for key, value in plane_Dict.items():
exec(key + '=value')
print (Fore.CYAN)
if config.get('DATA', 'SOURCE') == "ADSBX":
print("Registration: ", reg)
else:
print("Registration: ", "Only shows when using ADSBX!")
print ("Callsign: ", callsign) print ("Callsign: ", callsign)
print ("On Ground: ", on_ground) print ("On Ground: ", on_ground)
print ("Latitude: ", latitude) print ("Latitude: ", latitude)
print ("Longitude: ", longitude) print ("Longitude: ", longitude)
print ("GEO Alitude Ft: ", geo_alt_ft) print ("GEO Alitude Ft: ", geo_alt_ft)
print(Style.RESET_ALL)
#Lookup Location of coordinates #Lookup Location of coordinates
if longitude != None and latitude != None: if longitude != None and latitude != None:
combined = f"{latitude}, {longitude}" combined = f"{latitude}, {longitude}"
try: try:
location = geolocator.reverse(combined) location = geolocator.reverse(combined)
except: except:
print ("Geopy API Error") print ("Geopy API Error")
print (Fore.YELLOW) print (Fore.YELLOW)
print ("Geopy debug: ", location.raw) # print ("Geopy debug: ", location.raw)
print(Style.RESET_ALL) print(Style.RESET_ALL)
feeding = True feeding = True
else: else:
print (Fore.RED + 'Not Feeding') print (Fore.RED + 'No Location')
feeding = False feeding = False
print(Style.RESET_ALL) print(Style.RESET_ALL)
#Figure if valid location, valid being geopy finds a location #Figure if valid location, valid being geopy finds a location
if feeding: if feeding:
try: try:
@ -142,8 +154,6 @@ while True:
city = address.get('city', '') city = address.get('city', '')
town = address.get('town', '') town = address.get('town', '')
hamlet = address.get('hamlet', '') hamlet = address.get('hamlet', '')
# print (Fore.YELLOW) # print (Fore.YELLOW)
# print ("Address Fields debug: ", address) # print ("Address Fields debug: ", address)
# print(Style.RESET_ALL) # print(Style.RESET_ALL)
@ -158,7 +168,7 @@ while True:
print ("County: ", county) print ("County: ", county)
print(Style.RESET_ALL) print(Style.RESET_ALL)
#Check if below desire ft #Check if below desire ft
if geo_alt_ft is None: if geo_alt_ft is None:
below_desired_ft = False below_desired_ft = False
elif geo_alt_ft < 10000: elif geo_alt_ft < 10000:
@ -179,19 +189,27 @@ while True:
if tookoff: if tookoff:
tookoff_message = ("Just took off from" + " " + aera_hierarchy + ", " + state + ", " + country_code) tookoff_message = ("Just took off from" + " " + aera_hierarchy + ", " + state + ", " + country_code)
print (tookoff_message) print (tookoff_message)
#Google Map or tar1090 screenshot
if config.getboolean('GOOGLE', 'STATICMAP_ENABLE'):
getMap(aera_hierarchy + ", " + state + ", " + country_code) getMap(aera_hierarchy + ", " + state + ", " + country_code)
else:
getSS(icao) getSS(icao)
#Discord
if config.getboolean('DISCORD', 'ENABLE'):
dis_message = icao + " " + tookoff_message
sendDis(dis_message)
#PushBullet
if pb != None: if pb != None:
with open("map.png", "rb") as pic: with open("map.png", "rb") as pic:
map_data = pb.upload_file(pic, "Tookoff IMG") map_data = pb.upload_file(pic, "Tookoff IMG")
push = pb_channel.push_note(config.get('PUSHBULLET', 'TITLE'), tookoff_message) push = pb_channel.push_note(config.get('PUSHBULLET', 'TITLE'), tookoff_message)
push = pb_channel.push_file(**map_data) push = pb_channel.push_file(**map_data)
with open("screenshot.png", "rb") as pic: #Twitter
map_data = pb.upload_file(pic, "Tookoff IMG2")
push = pb_channel.push_file(**map_data)
if tweet_api != None: if tweet_api != None:
tweet_api.update_with_media("map.png", status = tookoff_message) tweet_api.update_with_media("map.png", status = tookoff_message)
takeoff_time = time.time() takeoff_time = time.time()
os.remove("map.png")
if landed: if landed:
landed_time_msg = "" landed_time_msg = ""
@ -200,21 +218,28 @@ while True:
landed_time_msg = time.strftime("Apx. flt. time %H Hours : %M Mins ", time.gmtime(landed_time)) landed_time_msg = time.strftime("Apx. flt. time %H Hours : %M Mins ", time.gmtime(landed_time))
landed_message = ("Landed just now in" + " " + aera_hierarchy + ", " + state + ", " + country_code + ". " + landed_time_msg) landed_message = ("Landed just now in" + " " + aera_hierarchy + ", " + state + ", " + country_code + ". " + landed_time_msg)
print (landed_message) print (landed_message)
#Google Map or tar1090 screenshot
if config.getboolean('GOOGLE', 'STATICMAP_ENABLE'):
getMap(aera_hierarchy + ", " + state + ", " + country_code) getMap(aera_hierarchy + ", " + state + ", " + country_code)
else:
getSS(icao) getSS(icao)
#Discord
if config.getboolean('DISCORD', 'ENABLE'):
dis_message = icao + " " + landed_message
sendDis(dis_message)
#PushBullet
if pb != None: if pb != None:
with open("map.png", "rb") as pic: with open("map.png", "rb") as pic:
map_data = pb.upload_file(pic, "Landed IMG") map_data = pb.upload_file(pic, "Landed IMG")
push = pb_channel.push_note(config.get('PUSHBULLET', 'TITLE'), landed_message) push = pb_channel.push_note(config.get('PUSHBULLET', 'TITLE'), landed_message)
push = pb_channel.push_file(**map_data) push = pb_channel.push_file(**map_data)
with open("screenshot.png", "rb") as pic: #Twitter
map_data = pb.upload_file(pic, "Landed IMG2")
push = pb_channel.push_file(**map_data)
if tweet_api != None: if tweet_api != None:
tweet_api.update_with_media("map.png", status = landed_message) tweet_api.update_with_media("map.png", status = landed_message)
takeoff_time = None takeoff_time = None
landed_time = None landed_time = None
time_since_tk = None time_since_tk = None
os.remove("map.png")
#Set Variables to compare to next check #Set Variables to compare to next check
last_feeding = feeding last_feeding = feeding
@ -222,9 +247,9 @@ while True:
last_on_ground = on_ground last_on_ground = on_ground
last_below_desired_ft = below_desired_ft last_below_desired_ft = below_desired_ft
else: elif failed:
print ("Rechecking OpenSky") print ("Failed to connect to data source rechecking in 15s")
planeDataMSG = str(planeData)
if takeoff_time != None: if takeoff_time != None:
elapsed_time = time.time() - takeoff_time elapsed_time = time.time() - takeoff_time
time_since_tk = time.strftime("Time Since Take off %H Hours : %M Mins : %S Secs", time.gmtime(elapsed_time)) time_since_tk = time.strftime("Time Since Take off %H Hours : %M Mins : %S Secs", time.gmtime(elapsed_time))

View File

@ -1,19 +1,28 @@
#V1 #V2
[PLANE] [DATA]
#Plane #Plane to track, based of ICAO or ICAO24 which is the unique transponder address of a plane.
ICAO = planeicaohere ICAO = planeicaohere
#Source to pull data from
#SHOULD BE ADSBX which is ADS-B Exchange or OPENS which is OpenSky
#By default configured with OpenSky which anyone can use without a login
#ADS-B Exchange has better data but is not avalible unless you feed their network or pay.
SOURCE = OPENS
#Place Opensky credentials here #ADS-B Exchange https://www.adsbexchange.com/data/
[ADSBX]
API_KEY = apikey
#OpenSky https://opensky-network.org/apidoc/index.html
[OPENSKY] [OPENSKY]
USERNAME = None USERNAME = None
PASSWORD = None PASSWORD = None
[GOOGLE] [GOOGLE]
#API KEYS - enable static map images API in GCP and get key #API KEYS
#If static map disabled will load up tar1090 ads-b exchange and take screenshot instead.
STATICMAP_ENABLE = FALSE
STATICMAPKEY = googleapikey STATICMAPKEY = googleapikey
[TWITTER] [TWITTER]
ENABLE = FALSE ENABLE = FALSE
CONSUMER_KEY = ckhere CONSUMER_KEY = ckhere
@ -23,6 +32,11 @@ ACCESS_TOKEN_SECRET = atshere
[PUSHBULLET] [PUSHBULLET]
ENABLE = FALSE ENABLE = FALSE
TITLE = "Title" TITLE = Title Of Pushbullet message
API_KEY = apikey API_KEY = apikey
CHANNEL_TAG = channeltag CHANNEL_TAG = channeltag
[DISCORD]
ENABLE = TRUE
#WEBHOOK URL https://support.discord.com/hc/en-us/articles/228383668-Intro-to-Webhooks
URL = webhookurlhere

34
defADSBX.py Normal file
View File

@ -0,0 +1,34 @@
import requests
import json
import configparser
config = configparser.ConfigParser()
config.read('config.ini')
def pullADSBX(icao):
url = 'https://adsbexchange.com/api/aircraft/icao/' + icao + "/"
headers = {
'api-auth': config.get('ADSBX', 'API_KEY')
}
failed = False
try:
response = requests.get(url, headers = headers)
data = response.text
data = json.loads(data)
except:
print("ADSBX Error")
failed = True
plane_Dict = None
if failed is False:
ac = data['ac']
if ac != None:
ac_dict = ac[0]
plane_Dict = {'icao' : ac_dict['icao'], 'callsign' : ac_dict['call'], 'reg' : ac_dict['reg'], 'latitude' : float(ac_dict['lat']), 'longitude' : float(ac_dict['lon']), 'geo_alt_ft' : int(ac_dict['galt']), 'on_ground' : bool(int(ac_dict["gnd"]))}
if plane_Dict['on_ground']:
plane_Dict['geo_alt_ft'] = 0
else:
plane_Dict = None
return plane_Dict, failed

10
defDiscord.py Normal file
View File

@ -0,0 +1,10 @@
from discord_webhook import DiscordWebhook
import configparser
config = configparser.ConfigParser()
config.read('config.ini')
def sendDis(message):
webhook = DiscordWebhook(url=config.get('DISCORD', 'URL'), content=message, username="plane-notify")
with open("map.png", "rb") as f:
webhook.add_file(file=f.read(), filename='map.png')
response = webhook.execute()

View File

@ -1,8 +1,37 @@
def pullplane(TRACK_PLANE): def pullOpenSky(TRACK_PLANE):
import configparser import configparser
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read('config.ini') config.read('config.ini')
from opensky_api import OpenSkyApi from opensky_api import OpenSkyApi
planeData = None
opens_api = OpenSkyApi(config.get('OPENSKY', 'USERNAME'), config.get('OPENSKY', 'PASSWORD')) opens_api = OpenSkyApi(config.get('OPENSKY', 'USERNAME'), config.get('OPENSKY', 'PASSWORD'))
failed = False
try:
planeData = opens_api.get_states(time_secs=0, icao24=TRACK_PLANE.lower()) planeData = opens_api.get_states(time_secs=0, icao24=TRACK_PLANE.lower())
return planeData except:
print ("OpenSky Error")
failed = True
if failed is False and planeData != None:
plane_Dict = {}
geo_alt_m = None
for dataStates in planeData.states:
plane_Dict['icao'] = (dataStates.icao24).upper()
plane_Dict['callsign'] = (dataStates.callsign)
plane_Dict['longitude'] = (dataStates.longitude)
plane_Dict['latitude'] = (dataStates.latitude)
plane_Dict['on_ground'] = (dataStates.on_ground)
geo_alt_m = (dataStates.geo_altitude)
try:
if geo_alt_m != None:
plane_Dict['geo_alt_ft'] = geo_alt_m * 3.281
elif plane_Dict['on_ground']:
plane_Dict['geo_alt_ft'] = 0
except KeyError:
pass
if plane_Dict == {}:
plane_Dict = None
else:
plane_Dict = None
return plane_Dict, failed

View File

@ -11,10 +11,10 @@ def getSS(icao):
chrome_options = webdriver.ChromeOptions() chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless') chrome_options.add_argument('--headless')
chrome_options.add_argument('window-size=800,800') chrome_options.add_argument('window-size=800,800')
# chrome_options.add_argument('--no-sandbox') # required when running as root user. otherwise you would get no sandbox errors. # chrome_options.add_argument('--no-sandbox') # required when running as root user. otherwise you would get no sandbox errors.
browser = webdriver.Chrome(options=chrome_options) browser = webdriver.Chrome(options=chrome_options)
url = "https://globe.adsbexchange.com/?largeMode=2&hideButtons&hideSidebar&zoom=9&icao=" + icao url = "https://globe.adsbexchange.com/?largeMode=2&hideButtons&hideSidebar&mapDim=0&zoom=9&icao=" + icao
browser.get(url) browser.get(url)
time.sleep(10) time.sleep(30)
browser.save_screenshot("screenshot.png") browser.save_screenshot("map.png")
browser.quit() browser.quit()