Skip to content

Commit

Permalink
Merge pull request #584 from ngardiner/better_refresh
Browse files Browse the repository at this point in the history
Better token expiration handling
  • Loading branch information
MikeBishop authored May 16, 2024
2 parents f307e97 + 23aade9 commit 5639a4d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 42 deletions.
3 changes: 0 additions & 3 deletions lib/TWCManager/Control/HTTPControl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,11 +1265,8 @@ def process_save_settings(self, page="settings"):
carapi = master.getModuleByName("TeslaAPI")
if key == "carApiBearerToken":
carapi.setCarApiBearerToken(self.getFieldValue(key))
# New tokens expire after 8 hours
carapi.setCarApiTokenExpireTime(time.time() + 8 * 60 * 60)
elif key == "carApiRefreshToken":
carapi.setCarApiRefreshToken(self.getFieldValue(key))
carapi.setCarApiTokenExpireTime(time.time() + 45 * 24 * 60 * 60)

else:
# Write setting to dictionary
Expand Down
91 changes: 52 additions & 39 deletions lib/TWCManager/Vehicle/TeslaAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,45 +131,46 @@ def apiRefresh(self):
try:
req = requests.post(self.refreshURL, headers=headers, json=data)
logger.log(logging.INFO2, "Car API request" + str(req))
req.raise_for_status()
apiResponseDict = json.loads(req.text)
except requests.exceptions.RequestException:
logger.log(
logging.INFO2, "Request Exception parsing API Token Refresh Response"
)
pass
except ValueError:
pass
if req.status_code == 401:
logger.log(
logging.INFO2,
"TeslaAPI",
"ERROR: Can't access Tesla car via API. Please supply fresh tokens.",
)
self.setCarApiBearerToken("")
self.setCarApiRefreshToken("")
self.updateCarApiLastErrorTime()
# Instead of just setting carApiLastErrorTime, erase tokens to
# prevent further authorization attempts until user enters password
# on web interface. I feel this is safer than trying to log in every
# ten minutes with a bad token because Tesla might decide to block
# remote access to your car after too many authorization errors.
self.master.queue_background_task({"cmd": "saveSettings"})
return False
except json.decoder.JSONDecodeError:
logger.log(
logging.INFO2, "JSON Decode Error parsing API Token Refresh Response"
)
pass
except ValueError:
pass

try:
logger.log(logging.INFO4, "Car API auth response" + str(apiResponseDict))
self.setCarApiBearerToken(apiResponseDict["access_token"])
self.setCarApiRefreshToken(apiResponseDict["refresh_token"])
self.setCarApiTokenExpireTime(now + apiResponseDict["expires_in"])
self.master.queue_background_task({"cmd": "saveSettings"})
return True

except KeyError:
logger.log(
logging.INFO2,
"TeslaAPI",
"ERROR: Can't access Tesla car via API. Please log in again via web interface.",
)
self.updateCarApiLastErrorTime()
# Instead of just setting carApiLastErrorTime, erase tokens to
# prevent further authorization attempts until user enters password
# on web interface. I feel this is safer than trying to log in every
# ten minutes with a bad token because Tesla might decide to block
# remote access to your car after too many authorization errors.
self.setCarApiBearerToken("")
self.setCarApiRefreshToken("")
self.master.queue_background_task({"cmd": "saveSettings"})
except UnboundLocalError:
except:
pass

return False

def car_api_available(
self, email=None, password=None, charge=None, applyLimit=None
):
Expand Down Expand Up @@ -1124,29 +1125,41 @@ def setCarApiBearerToken(self, token=None):
return False
else:
self.carApiBearerToken = token
if not self.baseURL:
try:
decoded = jwt.decode(
token,
options={
"verify_signature": False,
"verify_aud": False,
"verify_exp": False,
},
)
try:
decoded = jwt.decode(
token,
options={
"verify_signature": False,
"verify_aud": False,
"verify_exp": False,
},
)
if not self.baseURL:
if "owner-api" in "".join(decoded.get("aud", "")):
self.baseURL = self.regionURL["OwnerAPI"]
elif decoded.get("ou_code", "") in self.regionURL:
self.baseURL = self.regionURL[decoded["ou_code"]]
except jwt.exceptions.DecodeError:
# Fallback to owner-api if we get an exception decoding jwt token
self.baseURL = self.regionURL["OwnerAPI"]

if "exp" in decoded:
self.setCarApiTokenExpireTime(int(decoded["exp"]))
else:
self.setCarApiTokenExpireTime(time.time() + 8 * 60 * 60)

except jwt.exceptions.DecodeError:
# Fallback to owner-api if we get an exception decoding jwt token
self.baseURL = self.regionURL["OwnerAPI"]
self.setCarApiTokenExpireTime(time.time() + 8 * 60 * 60)
return True
else:
return False

def setCarApiRefreshToken(self, token):
self.carApiRefreshToken = token
if token and not self.master.tokenSyncEnabled() and (
self.getCarApiBearerToken() == ""
or self.getCarApiTokenExpireTime() - time.time() < 60 * 60
):
return self.apiRefresh()
return True

def setCarApiTokenExpireTime(self, value):
Expand Down Expand Up @@ -1242,8 +1255,8 @@ def wakeVehicle(self, vehicle):
except requests.exceptions.RequestException:
if req.status_code == 401 and "expired" in req.text:
# If the token is expired, refresh it and try again
self.apiRefresh()
return self.wakeVehicle(vehicle)
if self.apiRefresh():
return self.wakeVehicle(vehicle)
elif req.status_code == 429:
# We're explicitly being told to back off
self.errorCount = max(30, self.errorCount)
Expand Down Expand Up @@ -1413,8 +1426,8 @@ def get_car_api(self, url, checkReady=True, provesOnline=True):
except requests.exceptions.RequestException:
if req.status_code == 401 and "expired" in req.text:
# If the token is expired, refresh it and try again
self.apiRefresh()
continue
if self.apiRefresh():
continue
elif req.status_code == 429:
# We're explicitly being told to back off
self.errorCount = max(30, self.errorCount)
Expand Down

0 comments on commit 5639a4d

Please sign in to comment.