Skip to content

Commit f249764

Browse files
authored
fix: add impersonated SA via local ADC support for fetch_id_token (#1740)
* Add `from_impersonated_account_info` method to ImperstonatedCredentials. * id_token.py uses impersonated_credentials.from_impersonated_account_info. * Add token_id unit test for impersonated service account credentials. * _default uses impersonated_credentials method.
1 parent 6fd04d5 commit f249764

File tree

5 files changed

+143
-36
lines changed

5 files changed

+143
-36
lines changed

google/auth/_default.py

+2-36
Original file line numberDiff line numberDiff line change
@@ -484,42 +484,8 @@ def _get_impersonated_service_account_credentials(filename, info, scopes):
484484
from google.auth import impersonated_credentials
485485

486486
try:
487-
source_credentials_info = info.get("source_credentials")
488-
source_credentials_type = source_credentials_info.get("type")
489-
if source_credentials_type == _AUTHORIZED_USER_TYPE:
490-
source_credentials, _ = _get_authorized_user_credentials(
491-
filename, source_credentials_info
492-
)
493-
elif source_credentials_type == _SERVICE_ACCOUNT_TYPE:
494-
source_credentials, _ = _get_service_account_credentials(
495-
filename, source_credentials_info
496-
)
497-
elif source_credentials_type == _EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE:
498-
source_credentials, _ = _get_external_account_authorized_user_credentials(
499-
filename, source_credentials_info
500-
)
501-
else:
502-
raise exceptions.InvalidType(
503-
"source credential of type {} is not supported.".format(
504-
source_credentials_type
505-
)
506-
)
507-
impersonation_url = info.get("service_account_impersonation_url")
508-
start_index = impersonation_url.rfind("/")
509-
end_index = impersonation_url.find(":generateAccessToken")
510-
if start_index == -1 or end_index == -1 or start_index > end_index:
511-
raise exceptions.InvalidValue(
512-
"Cannot extract target principal from {}".format(impersonation_url)
513-
)
514-
target_principal = impersonation_url[start_index + 1 : end_index]
515-
delegates = info.get("delegates")
516-
quota_project_id = info.get("quota_project_id")
517-
credentials = impersonated_credentials.Credentials(
518-
source_credentials,
519-
target_principal,
520-
scopes,
521-
delegates,
522-
quota_project_id=quota_project_id,
487+
credentials = impersonated_credentials.Credentials.from_impersonated_service_account_info(
488+
info, scopes=scopes
523489
)
524490
except ValueError as caught_exc:
525491
msg = "Failed to load impersonated service account credentials from {}".format(

google/auth/impersonated_credentials.py

+75
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@
4747

4848
_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
4949

50+
_SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE = "authorized_user"
51+
_SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE = "service_account"
52+
_SOURCE_CREDENTIAL_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE = (
53+
"external_account_authorized_user"
54+
)
55+
5056

5157
def _make_iam_token_request(
5258
request,
@@ -410,6 +416,75 @@ def with_scopes(self, scopes, default_scopes=None):
410416
cred._target_scopes = scopes or default_scopes
411417
return cred
412418

419+
@classmethod
420+
def from_impersonated_service_account_info(cls, info, scopes=None):
421+
"""Creates a Credentials instance from parsed impersonated service account credentials info.
422+
423+
Args:
424+
info (Mapping[str, str]): The impersonated service account credentials info in Google
425+
format.
426+
scopes (Sequence[str]): Optional list of scopes to include in the
427+
credentials.
428+
429+
Returns:
430+
google.oauth2.credentials.Credentials: The constructed
431+
credentials.
432+
433+
Raises:
434+
InvalidType: If the info["source_credentials"] are not a supported impersonation type
435+
InvalidValue: If the info["service_account_impersonation_url"] is not in the expected format.
436+
ValueError: If the info is not in the expected format.
437+
"""
438+
439+
source_credentials_info = info.get("source_credentials")
440+
source_credentials_type = source_credentials_info.get("type")
441+
if source_credentials_type == _SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE:
442+
from google.oauth2 import credentials
443+
444+
source_credentials = credentials.Credentials.from_authorized_user_info(
445+
source_credentials_info
446+
)
447+
elif source_credentials_type == _SOURCE_CREDENTIAL_SERVICE_ACCOUNT_TYPE:
448+
from google.oauth2 import service_account
449+
450+
source_credentials = service_account.Credentials.from_service_account_info(
451+
source_credentials_info
452+
)
453+
elif (
454+
source_credentials_type
455+
== _SOURCE_CREDENTIAL_EXTERNAL_ACCOUNT_AUTHORIZED_USER_TYPE
456+
):
457+
from google.auth import external_account_authorized_user
458+
459+
source_credentials = external_account_authorized_user.Credentials.from_info(
460+
source_credentials_info
461+
)
462+
else:
463+
raise exceptions.InvalidType(
464+
"source credential of type {} is not supported.".format(
465+
source_credentials_type
466+
)
467+
)
468+
469+
impersonation_url = info.get("service_account_impersonation_url")
470+
start_index = impersonation_url.rfind("/")
471+
end_index = impersonation_url.find(":generateAccessToken")
472+
if start_index == -1 or end_index == -1 or start_index > end_index:
473+
raise exceptions.InvalidValue(
474+
"Cannot extract target principal from {}".format(impersonation_url)
475+
)
476+
target_principal = impersonation_url[start_index + 1 : end_index]
477+
delegates = info.get("delegates")
478+
quota_project_id = info.get("quota_project_id")
479+
480+
return cls(
481+
source_credentials,
482+
target_principal,
483+
scopes,
484+
delegates,
485+
quota_project_id=quota_project_id,
486+
)
487+
413488

414489
class IDTokenCredentials(credentials.CredentialsWithQuotaProject):
415490
"""Open ID Connect ID Token-based service account credentials.

google/oauth2/id_token.py

+12
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,18 @@ def fetch_id_token_credentials(audience, request=None):
284284
return service_account.IDTokenCredentials.from_service_account_info(
285285
info, target_audience=audience
286286
)
287+
elif info.get("type") == "impersonated_service_account":
288+
from google.auth import impersonated_credentials
289+
290+
target_credentials = impersonated_credentials.Credentials.from_impersonated_service_account_info(
291+
info
292+
)
293+
294+
return impersonated_credentials.IDTokenCredentials(
295+
target_credentials=target_credentials,
296+
target_audience=audience,
297+
include_email=True,
298+
)
287299
except ValueError as caught_exc:
288300
new_exc = exceptions.DefaultCredentialsError(
289301
"GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials.",

tests/oauth2/test_id_token.py

+15
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,20 @@
2020

2121
from google.auth import environment_vars
2222
from google.auth import exceptions
23+
from google.auth import impersonated_credentials
2324
from google.auth import transport
2425
from google.oauth2 import id_token
2526
from google.oauth2 import service_account
2627

2728
SERVICE_ACCOUNT_FILE = os.path.join(
2829
os.path.dirname(__file__), "../data/service_account.json"
2930
)
31+
32+
IMPERSONATED_SERVICE_ACCOUNT_FILE = os.path.join(
33+
os.path.dirname(__file__),
34+
"../data/impersonated_service_account_authorized_user_source.json",
35+
)
36+
3037
ID_TOKEN_AUDIENCE = "https://pubsub.googleapis.com"
3138

3239

@@ -262,6 +269,14 @@ def test_fetch_id_token_credentials_from_explicit_cred_json_file(monkeypatch):
262269
assert cred._target_audience == ID_TOKEN_AUDIENCE
263270

264271

272+
def test_fetch_id_token_credentials_from_impersonated_cred_json_file(monkeypatch):
273+
monkeypatch.setenv(environment_vars.CREDENTIALS, IMPERSONATED_SERVICE_ACCOUNT_FILE)
274+
275+
cred = id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
276+
assert isinstance(cred, impersonated_credentials.IDTokenCredentials)
277+
assert cred._target_audience == ID_TOKEN_AUDIENCE
278+
279+
265280
def test_fetch_id_token_credentials_no_cred_exists(monkeypatch):
266281
monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False)
267282

tests/test_impersonated_credentials.py

+39
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import copy
1516
import datetime
1617
import http.client as http_client
1718
import json
@@ -35,6 +36,9 @@
3536
PRIVATE_KEY_BYTES = fh.read()
3637

3738
SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
39+
IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE = os.path.join(
40+
DATA_DIR, "impersonated_service_account_authorized_user_source.json"
41+
)
3842

3943
ID_TOKEN_DATA = (
4044
"eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew"
@@ -49,6 +53,9 @@
4953
with open(SERVICE_ACCOUNT_JSON_FILE, "rb") as fh:
5054
SERVICE_ACCOUNT_INFO = json.load(fh)
5155

56+
with open(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE, "rb") as fh:
57+
IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO = json.load(fh)
58+
5259
SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
5360
TOKEN_URI = "https://example.com/oauth2/token"
5461

@@ -148,6 +155,38 @@ def make_credentials(
148155
iam_endpoint_override=iam_endpoint_override,
149156
)
150157

158+
def test_from_impersonated_service_account_info(self):
159+
credentials = impersonated_credentials.Credentials.from_impersonated_service_account_info(
160+
IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO
161+
)
162+
assert isinstance(credentials, impersonated_credentials.Credentials)
163+
164+
def test_from_impersonated_service_account_info_with_invalid_source_credentials_type(
165+
self
166+
):
167+
info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO)
168+
assert "source_credentials" in info
169+
# Set the source_credentials to an invalid type
170+
info["source_credentials"]["type"] = "invalid_type"
171+
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
172+
impersonated_credentials.Credentials.from_impersonated_service_account_info(
173+
info
174+
)
175+
assert excinfo.match(
176+
"source credential of type {} is not supported".format("invalid_type")
177+
)
178+
179+
def test_from_impersonated_service_account_info_with_invalid_impersonation_url(
180+
self
181+
):
182+
info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO)
183+
info["service_account_impersonation_url"] = "invalid_url"
184+
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
185+
impersonated_credentials.Credentials.from_impersonated_service_account_info(
186+
info
187+
)
188+
assert excinfo.match(r"Cannot extract target principal from")
189+
151190
def test_get_cred_info(self):
152191
credentials = self.make_credentials()
153192
assert not credentials.get_cred_info()

0 commit comments

Comments
 (0)