← Back to writeups

HeroCTF v7 — Revoked (+ Revenge)

Your budget request for the new company personnel index has been declined. Instead, the intern has received a very small bonus in exchange for a homemade solution.

Show them their stinginess could cost them.

The chall maker forgot to remove a debug account... Here is the revenge challenge without this backdoor!

We're given a Python server that looks like this:

py

1import os
2import secrets
3import sqlite3
4import time
5from functools import wraps
6
7import bcrypt
8import jwt
9from dotenv import load_dotenv
10from flask import (
11    Flask,
12    flash,
13    jsonify,
14    make_response,
15    redirect,
16    render_template,
17    request,
18)
19
20app = Flask(__name__)
21app.static_folder = "static"
22load_dotenv()
23app.config["SECRET_KEY"] = "".join(
24    [secrets.choice("abcdef0123456789") for _ in range(32)]
25)
26FLAG = os.getenv("FLAG")
27
28
29def init_db():
30    conn = sqlite3.connect("database.db")
31    cursor = conn.cursor()
32    cursor.execute("""DROP TABLE IF EXISTS employees;""")
33    cursor.execute("""DROP TABLE IF EXISTS revoked_tokens;""")
34    cursor.execute("""DROP TABLE IF EXISTS users;""")
35    cursor.execute("""CREATE TABLE IF NOT EXISTS users (
36                        id INTEGER PRIMARY KEY AUTOINCREMENT,
37                        username TEXT UNIQUE NOT NULL,
38                        is_admin BOOL NOT NULL,
39                        password_hash TEXT NOT NULL)""")
40    cursor.execute("""CREATE TABLE IF NOT EXISTS revoked_tokens (
41                        id INTEGER PRIMARY KEY AUTOINCREMENT,
42                        token TEXT NOT NULL)""")
43    cursor.execute("""CREATE TABLE IF NOT EXISTS employees (
44                        id INTEGER PRIMARY KEY AUTOINCREMENT,
45                        name TEXT NOT NULL,
46                        email TEXT UNIQUE NOT NULL,
47                        position TEXT NOT NULL,
48                        phone TEXT NOT NULL,
49                        location TEXT NOT NULL)""")
50    conn.commit()
51    conn.close()
52
53
54def get_db_connection():
55    conn = sqlite3.connect("database.db")
56    conn.row_factory = sqlite3.Row
57    return conn
58
59
60def token_required(f):
61    @wraps(f)
62    def decorated(*args, **kwargs):
63        token = request.cookies.get("JWT")
64        if not token:
65            flash("Token is missing!", "error")
66            return redirect("/login")
67
68        try:
69            data = jwt.decode(token, app.config["SECRET_KEY"], algorithms=["HS256"])
70            username = data["username"]
71
72            conn = get_db_connection()
73            user = conn.execute(
74                "SELECT id,is_admin FROM users WHERE username = ?", (username,)
75            ).fetchone()
76            revoked = conn.execute(
77                "SELECT id FROM revoked_tokens WHERE token = ?", (token,)
78            ).fetchone()
79            conn.close()
80
81            if not user or revoked:
82                flash("Invalid or revoked token!", "error")
83                return redirect("/login")
84
85            request.is_admin = user["is_admin"]
86            request.username = username
87
88        except jwt.InvalidTokenError:
89            flash("Invalid token!", "error")
90            return redirect("/login")
91
92        return f(*args, **kwargs)
93
94    return decorated
95
96
97@app.route("/", methods=["GET"])
98def index():
99    token = request.cookies.get("JWT", None)
100    if token is None:
101        return redirect("/login")
102    else:
103        return redirect("/employees")
104
105
106@app.route("/register", methods=["GET", "POST"])
107def register():
108    if request.method == "GET":
109        return render_template("register.html")
110    elif request.method == "POST":
111        data = request.form
112        username = data.get("username")
113        password = data.get("password")
114
115        if not username or not password:
116            return jsonify({"message": "Username and password required!"}), 400
117
118        password_hash = bcrypt.hashpw(
119            password.encode("utf-8"), bcrypt.gensalt()
120        ).decode("utf-8")
121
122        conn = get_db_connection()
123        try:
124            conn.execute(
125                "INSERT INTO users (username, is_admin, password_hash) VALUES (?, ?, ?)",
126                (username, False, password_hash),
127            )
128            conn.commit()
129        except sqlite3.IntegrityError:
130            flash("User already exists.", "error")
131            return redirect("/register")
132        finally:
133            conn.close()
134
135        flash("User created successfully.", "success")
136        return redirect("/login")
137
138
139@app.route("/login", methods=["GET", "POST"])
140def login():
141    if request.method == "GET":
142        return render_template("login.html")
143    elif request.method == "POST":
144        data = request.form
145        username = data.get("username")
146        password = data.get("password")
147
148        conn = get_db_connection()
149        user = conn.execute(
150            "SELECT * FROM users WHERE username = ?", (username,)
151        ).fetchone()
152        conn.close()
153
154        if user and bcrypt.checkpw(
155            password.encode("utf-8"), user["password_hash"].encode("utf-8")
156        ):
157            token = jwt.encode(
158                {
159                    "username": username,
160                    "is_admin": user["is_admin"],
161                    "issued": time.time(),
162                },
163                app.config["SECRET_KEY"],
164                algorithm="HS256",
165            )
166            resp = make_response(redirect("/employees"))
167            resp.set_cookie("JWT", token)
168            return resp
169
170        flash("Invalid credentials.", "error")
171        return redirect("/login")
172
173
174@app.route("/logout", methods=["GET"])
175def logout():
176    token = request.cookies.get("JWT")
177    if token:
178        conn = get_db_connection()
179        conn.execute("INSERT INTO revoked_tokens (token) VALUES (?)", (token,))
180        conn.commit()
181        conn.close()
182    resp = make_response(redirect("/login"))
183    resp.delete_cookie("JWT")
184    return resp
185
186
187@app.route("/employees", methods=["GET"])
188@token_required
189def employees():
190    query = request.args.get("query", "")
191    conn = get_db_connection()
192    cursor = conn.cursor()
193    cursor.execute(
194        f"SELECT id, name, email, position FROM employees WHERE name LIKE '%{query}%'"
195    )
196    results = cursor.fetchall()
197    conn.close()
198    print(request.username)
199    return render_template("employees.html", username=request.username, employees=results, query=query)
200
201
202@app.route("/employee/<int:employee_id>", methods=["GET"])
203@token_required
204def employee_details(employee_id):
205    conn = get_db_connection()
206    employee = conn.execute(
207        "SELECT * FROM employees WHERE id = ?", (employee_id,)
208    ).fetchone()
209    conn.close()
210    print(employee)
211    if not employee:
212        flash("Employee not found", "error")
213        return redirect("/employees")
214    return render_template("employee_details.html", username=request.username, employee=employee)
215
216
217@app.route("/admin", methods=["GET"])
218@token_required
219def admin():
220    is_admin = getattr(request, "is_admin", None)
221    if is_admin:
222        return render_template("admin.html", username=request.username, flag=FLAG)
223
224    flash("You don't have the permission to access this area", "error")
225    return redirect("/employees")
226
227
228if __name__ == "__main__":
229    init_db()
230    app.run(debug=False, host="0.0.0.0", port=5000)

We can see right off the bat that we have SQL injection in the /employees route:

py

1@app.route("/employees", methods=["GET"])
2@token_required
3def employees():
4    query = request.args.get("query", "")
5    conn = get_db_connection()
6    cursor = conn.cursor()
7    cursor.execute(
8        f"SELECT id, name, email, position FROM employees WHERE name LIKE '%{query}%'"
9    )
10    results = cursor.fetchall()
11    conn.close()
12    print(request.username)
13    return render_template("employees.html", username=request.username, employees=results, query=query)

We can then use UNION SELECT to leak any part of their database. But what's this about revoked tokens?

In their token_required decorator, we can see that our JWT is decoded and properties injected into the request object so long as:

  • The JWT is valid.
  • The username corresponds to an existing user.
  • The token isn't in the revoked_tokens table.

py

1def token_required(f):
2    @wraps(f)
3    def decorated(*args, **kwargs):
4        token = request.cookies.get("JWT")
5        if not token:
6            flash("Token is missing!", "error")
7            return redirect("/login")
8
9        try:
10            data = jwt.decode(token, app.config["SECRET_KEY"], algorithms=["HS256"])
11            username = data["username"]
12
13            conn = get_db_connection()
14            user = conn.execute(
15                "SELECT id,is_admin FROM users WHERE username = ?", (username,)
16            ).fetchone()
17            revoked = conn.execute(
18                "SELECT id FROM revoked_tokens WHERE token = ?", (token,)
19            ).fetchone()
20            conn.close()
21
22            if not user or revoked:
23                flash("Invalid or revoked token!", "error")
24                return redirect("/login")
25
26            request.is_admin = user["is_admin"]
27            request.username = username
28
29        except jwt.InvalidTokenError:
30            flash("Invalid token!", "error")
31            return redirect("/login")
32
33        return f(*args, **kwargs)
34
35    return decorated

So the main idea is this: we can leak the revoked_tokens table and (assumedly) find an admin token there. If we can slightly perturb this token such that it is no longer exactly the same token as before (but the token still decodes correctly), we can become admin and win!

py

1@app.route("/admin", methods=["GET"])
2@token_required
3def admin():
4    is_admin = getattr(request, "is_admin", None)
5    if is_admin:
6        return render_template("admin.html", username=request.username, flag=FLAG)
7
8    flash("You don't have the permission to access this area", "error")
9    return redirect("/employees")

We can use our simple SQL injection to leak the revoked_tokens table,

image

Code (sql)

1a%'UNION SELECT id,token,token,token FROM revoked_tokens;--

and just check tokens until we find one corresponding to an admin (in this case,

Code

1eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFkbWluIiwiaXNfYWRtaW4iOjEsImlzc3VlZCI6MTc2NDY0NjgzOC4zODE5NzgzfQ.5xEmNRYdgbWg77FWf3kPs28Ulcsqm_JimpCYymoCCCk

image

So how do we change this slightly to bypass the revoked_tokens check? What if we simply add an = to the end of it? (after all, = is a padding character in base64 each part of the JWT is base64 encoded, right?)

Setting our JWT cookie to

Code

1eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFkbWluIiwiaXNfYWRtaW4iOjEsImlzc3VlZCI6MTc2NDY0NjgzOC4zODE5NzgzfQ.5xEmNRYdgbWg77FWf3kPs28Ulcsqm_JimpCYymoCCCk=

we get the flag:

image

image

(this same solve works on both Revoked and Revoked Revenge, as we didn't rely on any shenanigans with any "debug accounts").

Why does this payload actually work? Indeed, if we tried to add an = to the JWT on jwt.io, we'd get the following error message:

image

The caveat is this: JWT segments aren't base64 encoded, they are base64url encoded. And seemingly from reading the MDN documentation,

A common variant of this definition allows only characters that are safe to use in filenames and URL values. This version, defined in RFC 4648, section 5, omits the padding and replaces + and / with - and _.

So if base64url omits the padding = characters, why did Python's jwt decode it anyways? I decided to read the PyJWT source to find out:

py

1    def decode(
2        self,
3        jwt: str | bytes,
4        key: AllowedPublicKeys | PyJWK | str | bytes = "",
5        algorithms: Sequence[str] | None = None,
6        options: Options | None = None,
7        # deprecated arg, remove in pyjwt3
8        verify: bool | None = None,
9        # could be used as passthrough to api_jws, consider removal in pyjwt3
10        detached_payload: bytes | None = None,
11        # passthrough arguments to _validate_claims
12        # consider putting in options
13        audience: str | Iterable[str] | None = None,
14        subject: str | None = None,
15        issuer: str | Container[str] | None = None,
16        leeway: float | timedelta = 0,
17        # kwargs
18        **kwargs: Any,
19    ) -> dict[str, Any]:
20        """
21        ...
22        """
23        if kwargs:
24            warnings.warn(
25                "passing additional kwargs to decode() is deprecated "
26                "and will be removed in pyjwt version 3. "
27                f"Unsupported kwargs: {tuple(kwargs.keys())}",
28                RemovedInPyjwt3Warning,
29                stacklevel=2,
30            )
31        decoded = self.decode_complete(
32            jwt,
33            key,
34            algorithms,
35            options,
36            verify=verify,
37            detached_payload=detached_payload,
38            audience=audience,
39            subject=subject,
40            issuer=issuer,
41            leeway=leeway,
42        )
43        return decoded["payload"]

Seemingly, decode(...) just calls decode_complete(...):

py

1    def decode_complete(
2        self,
3        jwt: str | bytes,
4        key: AllowedPublicKeyTypes = "",
5        algorithms: Sequence[str] | None = None,
6        options: Options | None = None,
7        # deprecated arg, remove in pyjwt3
8        verify: bool | None = None,
9        # could be used as passthrough to api_jws, consider removal in pyjwt3
10        detached_payload: bytes | None = None,
11        # passthrough arguments to _validate_claims
12        # consider putting in options
13        audience: str | Iterable[str] | None = None,
14        issuer: str | Container[str] | None = None,
15        subject: str | None = None,
16        leeway: float | timedelta = 0,
17        # kwargs
18        **kwargs: Any,
19    ) -> dict[str, Any]:
20        """
21        ...
22        """
23        if kwargs:
24            warnings.warn(
25                "passing additional kwargs to decode_complete() is deprecated "
26                "and will be removed in pyjwt version 3. "
27                f"Unsupported kwargs: {tuple(kwargs.keys())}",
28                RemovedInPyjwt3Warning,
29                stacklevel=2,
30            )
31
32        if options is None:
33            verify_signature = True
34        else:
35            verify_signature = options.get("verify_signature", True)
36
37        # If the user has set the legacy `verify` argument, and it doesn't match
38        # what the relevant `options` entry for the argument is, inform the user
39        # that they're likely making a mistake.
40        if verify is not None and verify != verify_signature:
41            warnings.warn(
42                "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. "
43                "The equivalent is setting `verify_signature` to False in the `options` dictionary. "
44                "This invocation has a mismatch between the kwarg and the option entry.",
45                category=DeprecationWarning,
46                stacklevel=2,
47            )
48
49        sig_options: SigOptions = {"verify_signature": verify_signature}
50        decoded = api_jws.decode_complete(
51            jwt,
52            key=key,
53            algorithms=algorithms,
54            options=sig_options,
55            detached_payload=detached_payload,
56        )
57
58        payload = self._decode_payload(decoded)
59
60        merged_options = self._merge_options(options)
61        self._validate_claims(
62            payload,
63            merged_options,
64            audience=audience,
65            issuer=issuer,
66            leeway=leeway,
67            subject=subject,
68        )
69
70        decoded["payload"] = payload
71        return decoded

and in decode_complete(), the decoding of the JWT string is handled by api_jws.decode_complete(...)

py

1    def decode_complete(
2        self,
3        jwt: str | bytes,
4        key: AllowedPublicKeys | PyJWK | str | bytes = "",
5        algorithms: Sequence[str] | None = None,
6        options: SigOptions | None = None,
7        detached_payload: bytes | None = None,
8        **kwargs: dict[str, Any],
9    ) -> dict[str, Any]:
10        if kwargs:
11            warnings.warn(
12                "passing additional kwargs to decode_complete() is deprecated "
13                "and will be removed in pyjwt version 3. "
14                f"Unsupported kwargs: {tuple(kwargs.keys())}",
15                RemovedInPyjwt3Warning,
16                stacklevel=2,
17            )
18        merged_options: SigOptions
19        if options is None:
20            merged_options = self.options
21        else:
22            merged_options = {**self.options, **options}
23
24        verify_signature = merged_options["verify_signature"]
25
26        if verify_signature and not algorithms and not isinstance(key, PyJWK):
27            raise DecodeError(
28                'It is required that you pass in a value for the "algorithms" argument when calling decode().'
29            )
30
31        payload, signing_input, header, signature = self._load(jwt)
32
33        if header.get("b64", True) is False:
34            if detached_payload is None:
35                raise DecodeError(
36                    'It is required that you pass in a value for the "detached_payload" argument to decode a message having the b64 header set to false.'
37                )
38            payload = detached_payload
39            signing_input = b".".join([signing_input.rsplit(b".", 1)[0], payload])
40
41        if verify_signature:
42            self._verify_signature(signing_input, header, signature, key, algorithms)
43
44        return {
45            "payload": payload,
46            "header": header,
47            "signature": signature,
48        }

which calls _load(...)

py

1    def _load(self, jwt: str | bytes) -> tuple[bytes, bytes, dict[str, Any], bytes]:
2        if isinstance(jwt, str):
3            jwt = jwt.encode("utf-8")
4
5        if not isinstance(jwt, bytes):
6            raise DecodeError(f"Invalid token type. Token must be a {bytes}")
7
8        try:
9            signing_input, crypto_segment = jwt.rsplit(b".", 1)
10            header_segment, payload_segment = signing_input.split(b".", 1)
11        except ValueError as err:
12            raise DecodeError("Not enough segments") from err
13
14        try:
15            header_data = base64url_decode(header_segment)
16        except (TypeError, binascii.Error) as err:
17            raise DecodeError("Invalid header padding") from err
18
19        try:
20            header: dict[str, Any] = json.loads(header_data)
21        except ValueError as e:
22            raise DecodeError(f"Invalid header string: {e}") from e
23
24        if not isinstance(header, dict):
25            raise DecodeError("Invalid header string: must be a json object")
26
27        try:
28            payload = base64url_decode(payload_segment)
29        except (TypeError, binascii.Error) as err:
30            raise DecodeError("Invalid payload padding") from err
31
32        try:
33            signature = base64url_decode(crypto_segment)
34        except (TypeError, binascii.Error) as err:
35            raise DecodeError("Invalid crypto padding") from err
36
37        return (payload, signing_input, header, signature)

which finally calls base64url_decode(...):

py

1def base64url_decode(input: Union[bytes, str]) -> bytes:
2    input_bytes = force_bytes(input)
3
4    rem = len(input_bytes) % 4
5
6    if rem > 0:
7        input_bytes += b"=" * (4 - rem)
8
9    return base64.urlsafe_b64decode(input_bytes)
10
11
12def base64url_encode(input: bytes) -> bytes:
13    return base64.urlsafe_b64encode(input).replace(b"=", b"")

which, apart from actually adding padding to our input, just hooks into the Python standard library urlsafe_b64 method. Surprisingly, base64.urlsafe_b64decode() makes no mention of omitting padding characters!

image

Finally, what does the RFC say? According to the linked RFC 4648 section 5,

image

so padding characters can be skipped? Sometimes?

And the padding issue seemingly isn't just an MDN misunderstanding either; if you google "base64 URL encoder", many sites (including the somewhat well-known base64encode.org, but also the third result on google, this npm library, and this random Medium article I came across) will omit padding characters when encoding base64url:

image

But others, including the first result that happened to come up on google, don't.

py

1>>> base64.urlsafe_b64encode(b'aaaaaaa')
2b'YWFhYWFhYQ=='

Hell, even the C# standard library for ASP.NET seems to drop padding characters when encoding:

Code (cs)

1    public static int Base64UrlEncode(byte[] input, int offset, char[] output, int outputOffset, int count)
2    {
3        ArgumentNullThrowHelper.ThrowIfNull(input);
4        ArgumentNullThrowHelper.ThrowIfNull(output);
5
6        ValidateParameters(input.Length, nameof(input), offset, count);
7        ArgumentOutOfRangeThrowHelper.ThrowIfNegative(outputOffset);
8
9        var arraySizeRequired = GetArraySizeRequiredToEncode(count);
10        if (output.Length - outputOffset < arraySizeRequired)
11        {
12            throw new ArgumentException(
13                string.Format(
14                    CultureInfo.CurrentCulture,
15                    EncoderResources.WebEncoders_InvalidCountOffsetOrLength,
16                    nameof(count),
17                    nameof(outputOffset),
18                    nameof(output)),
19                nameof(count));
20        }
21
22#if NETCOREAPP
23        return Base64UrlEncode(input.AsSpan(offset, count), output.AsSpan(outputOffset));
24#else
25        // Special-case empty input.
26        if (count == 0)
27        {
28            return 0;
29        }
30
31        // Use base64url encoding with no padding characters. See RFC 4648, Sec. 5.
32
33        // Start with default Base64 encoding.
34        var numBase64Chars = Convert.ToBase64CharArray(input, offset, count, output, outputOffset);
35
36        // Fix up '+' -> '-' and '/' -> '_'. Drop padding characters.
37        for (var i = outputOffset; i - outputOffset < numBase64Chars; i++)
38        {
39            var ch = output[i];
40            if (ch == '+')
41            {
42                output[i] = '-';
43            }
44            else if (ch == '/')
45            {
46                output[i] = '_';
47            }
48            else if (ch == '=')
49            {
50                // We've reached a padding character; truncate the remainder.
51                return i - outputOffset;
52            }
53        }
54
55        return numBase64Chars;
56#endif
57    }

(in C#'s Base64UrlDecode, a comment says to assume that the input contains no padding characters, though at first glance the method appears to work even if padding exists.)

All this to say, I think it's a bit strange that there's so much differing behavior around this spec.