Graph Database を読んだ

最近 Graph Database に興味を持ったので Graph Databases: New Opportunities for Connected Data を読んだ。graph db の一つである Neo4j のサイトから PDF をダウンロードできる。

neo4j.com

Neo4j のサイトからダウンロードできることからわかる通り、サンプルは Neo4j を使ったものだったけれど、そこまでサンプルが多いわけでもなかったので、概念を学びつつ実際に軽く試しながら学ぶというのにちょうどよい塩梅だった。

この手の薄い本だと終始ツールの使い方になっていることが多々あると思うものの、この本はそういう感じではなく、Graph Database の内部構造のざっくりとした解説もあり Graph Database は RDBMS とどう違うのかというのがわかってよかった。

fish で "fatal: not a git repository (or any of the parent directories): .git" と出るようになった

2021/8/12 追記 本家の fishline にて修正された

github.com

fish, version 3.3.1

fish を立ち上げると以下のようなエラー文が常に出るようになってしまった。 git で管理されたディレクトリに移るとまた別のエラー文が出る。

fatal: not a git repository (or any of the parent directories): .git
fatal: ambiguous argument '^^': unknown revision or path not in the working tree.
Use '--' to separate paths from revisions, like this:
'git <command> [<revision>...] -- [<file>...]'

私の場合、プロンプトを小綺麗にするために fishline を使っているが、どうやらそこでの標準エラーへのリダイレクト部分でエラーが出ているようにある。 以下の commit のように修正したらエラー文がでなくなった。

change the way to redirect stderr · goropikari/fishline@4f8d706 · GitHub

昔は ^^ でリダイレクトしていたようだが fish のバージョンが上がって文法が変わったようだ。 リリースノートによると 3.3.0 の時点で ^ を使ったリダイレクトはデフォルトで無効になったらしい。 github.com

PostgreSQL 互換の DBMS を自作してみた

WEB+DB PRESS Vol.122

WEB+DB PRESS Vol.122

  • 発売日: 2021/04/24
  • メディア: Kindle

※ この記事では「PostgreSQL 互換」を「PostgreSQL client から接続できる」という意味で使っています。

最近はやりの自作 DBMS というものを私もやってみたので忘れぬうちに感想を書いておきます。

github.com

https://raw.githubusercontent.com/goropikari/psqlittle/main/doc/demo.gif

今回作った DBMS は Go の練習がてら書いたもので製作期間はおよそ10日でした。

SQL で操作できることを主とし、簡単な SELECT, UPDATE, DELETE, INSERT, CREATE TABLE, DROP TABLE に対応することができました。 また PostgreSQL client からも接続できるようにしたので、自分の中ではより DBMS を作った感がありました。 自作 DBMSPostgreSQL client から接続できるようにした人を今の所見たことないので、そこに関しては結構マニアックなことをしたのかなと思います。

PostgreSQL client 対応については別途記事にまとめてあります。

goropikari.hatenablog.com

SQL parser は自作ではなく pg_query_go1 を使用しました。PostgreSQL の文法なら過不足なく parse できるという安心感があることが主な選定理由です。

github.com

テーブルのレコードは slice に slice を突っ込んでいくだけの実装だったため、今回の自作 DBMS の主な作業は pg_query_go が生成する parse tree の評価方法を考えることでした。

以前コンパイラを作ったことがあったのでこの手のことは慣れていると思っていたのですが、SELECT id ... と書かれたときの id の値はレコードごとに違うので、どうやって値を持ってくるかということに悩みました。 最終的に「引数にレコードをとって該当する値を返す関数」を動的に生成することでそれっぽい動きをするものはできましたが、実際の DBMS ではこんなことをしていないと思うのでどうやっているのか気になるところです。

goropikari.hatenablog.com

SQL が3値論理を採用しているせいでプログラミング言語標準の &&|| を使うと NULL の値が入ってきたときにバグるので3値論理用の二項演算を定義したりと地味なところが地味に大変でした。

最終的にテスト等も含めると5000行ほど書いたので Go の使い方を覚えるのにはちょうどよい分量でした。このくらいの機能だったらこのくらいの分量でかけるという肌感もついたので良かったです。 最初は REPL も自作していましたが、もっとリッチな REPL がほしいと思い何故か PostgreSQL client 対応もしましたが、おかげで PostgreSQLプロトコルMySQL に比べるととても読みやすいということがわかったのは収穫でした。再び自作 DBMS に挑戦することになったら、そのときも PostgreSQL 互換に作ろうと思います。


  1. postgres parser である pg_query の Go ラッパー

PostgreSQL Client から自作 DBMS に接続する

最近、Go の練習がてら書いていた自作 DBMSPostgreSQL client で接続できるようになったので、そのやり方を残しておきます。(これから紹介するサンプルコードはすべて Python ですが)

github.com

psql --version
psql (PostgreSQL) 13.2

pgcon の資料と PostgreSQL の公式 Document、加えて PostgreSQL server と client 間に流れるパケットを眺めると、自作DBMSは client から接続されたときにどういうパケットを返せばいいのかが見えてきます。

下記のパケットは psql で接続し、

create table hoge (id int, name varchar(255));
insert into hoge values (1, 'tanaka'), (2, 'suzuki');
select * from hoge;

を実行、接続を閉じた際に流れたパケットです。 このうち、server (172.18.0.4:5432) -> client (172.18.0.2:34058) の方向に流れたパケットを再現する server を構築できれば、client から接続できるようになります。

T 172.18.0.2:34058 -> 172.18.0.4:5432 [AP] #38
  00 00 00 08 04 d2 16 2f                               ......./        

T 172.18.0.4:5432 -> 172.18.0.2:34058 [AP] #40
  4e                                                    N               

T 172.18.0.2:34058 -> 172.18.0.4:5432 [AP] #42
  00 00 00 59 00 03 00 00    75 73 65 72 00 70 6f 73    ...Y....user.pos
  74 67 72 65 73 00 64 61    74 61 62 61 73 65 00 70    tgres.database.p
  6f 73 74 67 72 65 73 00    61 70 70 6c 69 63 61 74    ostgres.applicat
  69 6f 6e 5f 6e 61 6d 65    00 70 73 71 6c 00 63 6c    ion_name.psql.cl
  69 65 6e 74 5f 65 6e 63    6f 64 69 6e 67 00 53 51    ient_encoding.SQ
  4c 5f 41 53 43 49 49 00    00                         L_ASCII..       

T 172.18.0.4:5432 -> 172.18.0.2:34058 [AP] #44
  52 00 00 00 08 00 00 00    00 53 00 00 00 1a 61 70    R........S....ap
  70 6c 69 63 61 74 69 6f    6e 5f 6e 61 6d 65 00 70    plication_name.p
  73 71 6c 00 53 00 00 00    1e 63 6c 69 65 6e 74 5f    sql.S....client_
  65 6e 63 6f 64 69 6e 67    00 53 51 4c 5f 41 53 43    encoding.SQL_ASC
  49 49 00 53 00 00 00 17    44 61 74 65 53 74 79 6c    II.S....DateStyl
  65 00 49 53 4f 2c 20 4d    44 59 00 53 00 00 00 19    e.ISO, MDY.S....
  69 6e 74 65 67 65 72 5f    64 61 74 65 74 69 6d 65    integer_datetime
  73 00 6f 6e 00 53 00 00    00 1b 49 6e 74 65 72 76    s.on.S....Interv
  61 6c 53 74 79 6c 65 00    70 6f 73 74 67 72 65 73    alStyle.postgres
  00 53 00 00 00 14 69 73    5f 73 75 70 65 72 75 73    .S....is_superus
  65 72 00 6f 6e 00 53 00    00 00 19 73 65 72 76 65    er.on.S....serve
  72 5f 65 6e 63 6f 64 69    6e 67 00 55 54 46 38 00    r_encoding.UTF8.
  53 00 00 00 32 73 65 72    76 65 72 5f 76 65 72 73    S...2server_vers
  69 6f 6e 00 31 32 2e 36    20 28 44 65 62 69 61 6e    ion.12.6 (Debian
  20 31 32 2e 36 2d 31 2e    70 67 64 67 31 30 30 2b     12.6-1.pgdg100+
  31 29 00 53 00 00 00 23    73 65 73 73 69 6f 6e 5f    1).S...#session_
  61 75 74 68 6f 72 69 7a    61 74 69 6f 6e 00 70 6f    authorization.po
  73 74 67 72 65 73 00 53    00 00 00 23 73 74 61 6e    stgres.S...#stan
  64 61 72 64 5f 63 6f 6e    66 6f 72 6d 69 6e 67 5f    dard_conforming_
  73 74 72 69 6e 67 73 00    6f 6e 00 53 00 00 00 15    strings.on.S....
  54 69 6d 65 5a 6f 6e 65    00 45 74 63 2f 55 54 43    TimeZone.Etc/UTC
  00 4b 00 00 00 0c 00 00    00 55 e2 ed f9 8e 5a 00    .K.......U....Z.
  00 00 05 49                                           ...I            

T 172.18.0.2:34058 -> 172.18.0.4:5432 [AP] #46
  51 00 00 00 33 63 72 65    61 74 65 20 74 61 62 6c    Q...3create tabl
  65 20 68 6f 67 65 20 28    69 64 20 69 6e 74 2c 20    e hoge (id int, 
  6e 61 6d 65 20 76 61 72    63 68 61 72 28 32 35 35    name varchar(255
  29 29 3b 00                                           ));.            

T 172.18.0.4:5432 -> 172.18.0.2:34058 [AP] #48
  43 00 00 00 11 43 52 45    41 54 45 20 54 41 42 4c    C....CREATE TABL
  45 00 5a 00 00 00 05 49                               E.Z....I        

T 172.18.0.2:34058 -> 172.18.0.4:5432 [AP] #50
  51 00 00 00 3a 69 6e 73    65 72 74 20 69 6e 74 6f    Q...:insert into
  20 68 6f 67 65 20 76 61    6c 75 65 73 20 28 31 2c     hoge values (1,
  20 27 74 61 6e 61 6b 61    27 29 2c 20 28 32 2c 20     'tanaka'), (2, 
  27 73 75 7a 75 6b 69 27    29 3b 00                   'suzuki');.     

T 172.18.0.4:5432 -> 172.18.0.2:34058 [AP] #52
  43 00 00 00 0f 49 4e 53    45 52 54 20 30 20 32 00    C....INSERT 0 2.
  5a 00 00 00 05 49                                     Z....I          

T 172.18.0.2:34058 -> 172.18.0.4:5432 [AP] #54
  51 00 00 00 18 73 65 6c    65 63 74 20 2a 20 66 72    Q....select * fr
  6f 6d 20 68 6f 67 65 3b    00                         om hoge;.       

T 172.18.0.4:5432 -> 172.18.0.2:34058 [AP] #56
  54 00 00 00 32 00 02 69    64 00 00 00 40 03 00 01    T...2..id...@...
  00 00 00 17 00 04 ff ff    ff ff 00 00 6e 61 6d 65    ............name
  00 00 00 40 03 00 02 00    00 04 13 ff ff 00 00 01    ...@............
  03 00 00 44 00 00 00 15    00 02 00 00 00 01 31 00    ...D..........1.
  00 00 06 74 61 6e 61 6b    61 44 00 00 00 15 00 02    ...tanakaD......
  00 00 00 01 32 00 00 00    06 73 75 7a 75 6b 69 43    ....2....suzukiC
  00 00 00 0d 53 45 4c 45    43 54 20 32 00 5a 00 00    ....SELECT 2.Z..
  00 05 49                                              ..I             

T 172.18.0.2:34058 -> 172.18.0.4:5432 [AP] #58
  58 00 00 00 04                                        X....

パケットを追う

SSL request

まずはじめに飛んできたパケットを見てみます。

T 172.18.0.2:34058 -> 172.18.0.4:5432 [AP] #38
  00 00 00 08 04 d2 16 2f                               ......./        

T 172.18.0.4:5432 -> 172.18.0.2:34058 [AP] #40
  4e                                                    N         

最初に流れていたパケットは SSL request です。 00 00 00 08 の部分がパケットの長さ(自身を含む。BigEndian)を表し、04 d2 16 2fssl code です。

postgres のソースコードを見ると #define NEGOTIATE_SSL_CODE PG_PROTOCOL(1234,5679) という定義がありますが、1234,5679 の部分を BigEndian にすると 04d2162f となり、飛んできた packet と一致します。

#!/usr/bin/env python3

import struct
print(struct.pack('>hh', 1234, 5679).hex())  #=> 04d2162f

server は SSL に対応していない場合 N を返します。

参考

startup

T 172.18.0.2:34058 -> 172.18.0.4:5432 [AP] #42
  00 00 00 59 00 03 00 00    75 73 65 72 00 70 6f 73    ...Y....user.pos
  74 67 72 65 73 00 64 61    74 61 62 61 73 65 00 70    tgres.database.p
  6f 73 74 67 72 65 73 00    61 70 70 6c 69 63 61 74    ostgres.applicat
  69 6f 6e 5f 6e 61 6d 65    00 70 73 71 6c 00 63 6c    ion_name.psql.cl
  69 65 6e 74 5f 65 6e 63    6f 64 69 6e 67 00 53 51    ient_encoding.SQ
  4c 5f 41 53 43 49 49 00    00                         L_ASCII..       

T 172.18.0.4:5432 -> 172.18.0.2:34058 [AP] #44
  52 00 00 00 08 00 00 00    00 53 00 00 00 1a 61 70    R........S....ap
  70 6c 69 63 61 74 69 6f    6e 5f 6e 61 6d 65 00 70    plication_name.p
  73 71 6c 00 53 00 00 00    1e 63 6c 69 65 6e 74 5f    sql.S....client_
  65 6e 63 6f 64 69 6e 67    00 53 51 4c 5f 41 53 43    encoding.SQL_ASC
  49 49 00 53 00 00 00 17    44 61 74 65 53 74 79 6c    II.S....DateStyl
  65 00 49 53 4f 2c 20 4d    44 59 00 53 00 00 00 19    e.ISO, MDY.S....
  69 6e 74 65 67 65 72 5f    64 61 74 65 74 69 6d 65    integer_datetime
  73 00 6f 6e 00 53 00 00    00 1b 49 6e 74 65 72 76    s.on.S....Interv
  61 6c 53 74 79 6c 65 00    70 6f 73 74 67 72 65 73    alStyle.postgres
  00 53 00 00 00 14 69 73    5f 73 75 70 65 72 75 73    .S....is_superus
  65 72 00 6f 6e 00 53 00    00 00 19 73 65 72 76 65    er.on.S....serve
  72 5f 65 6e 63 6f 64 69    6e 67 00 55 54 46 38 00    r_encoding.UTF8.
  53 00 00 00 32 73 65 72    76 65 72 5f 76 65 72 73    S...2server_vers
  69 6f 6e 00 31 32 2e 36    20 28 44 65 62 69 61 6e    ion.12.6 (Debian
  20 31 32 2e 36 2d 31 2e    70 67 64 67 31 30 30 2b     12.6-1.pgdg100+
  31 29 00 53 00 00 00 23    73 65 73 73 69 6f 6e 5f    1).S...#session_
  61 75 74 68 6f 72 69 7a    61 74 69 6f 6e 00 70 6f    authorization.po
  73 74 67 72 65 73 00 53    00 00 00 23 73 74 61 6e    stgres.S...#stan
  64 61 72 64 5f 63 6f 6e    66 6f 72 6d 69 6e 67 5f    dard_conforming_
  73 74 72 69 6e 67 73 00    6f 6e 00 53 00 00 00 15    strings.on.S....
  54 69 6d 65 5a 6f 6e 65    00 45 74 63 2f 55 54 43    TimeZone.Etc/UTC
  00 4b 00 00 00 0c 00 00    00 55 e2 ed f9 8e 5a 00    .K.......U....Z.
  00 00 05 49                                           ...I            

次にくるのは startup msg です。ASCII からもなんとなく読み取れると思いますが、user は誰で database は何を使いたいといった情報が client から server に流れています。 先頭4バイトは packet の長さが BigEndian で入っています。

単に自作 DBMS に postgres client から接続させるためだけだったら user 情報などはいらないので client から送られた情報はすべて無視して構いません。

次の server から client に送るパケットからパケットの構造が変わります。 先頭1バイトは msg の tag を表し、続く4バイトがパケットのサイズ、最後にパケットのデータ部分になります。

最初の 52 00 00 00 08 00 00 00 00AuthenticationOk パケットです。 このとき、postgres server をPOSTGRES_HOST_AUTH_METHOD=trust で起動しており password なしでログインできる状態だったため、無条件で AuthenticationOk が飛んでいます。 自作 DBMS でも難しいことは避けるために同じ方法を使います。

続くパケットでは S....hoge.piyo といった形式のものがたくさん見られますが、これらは ParameterStatus です。見てわかるとおり、encoding は何を使うや、timezone は何かといった情報が含まれています。単に接続させるためにはこれらの情報はなくてもいいので無視します。(python の psycopg から接続する場合は client_encoding を返してあげる必要があります。)

途中の接続するだけだったらいらない部分は飛ばして最後の 5a 00 00 00 05 49ReadyForQuery パケットです。これを送ると client からクエリを投げられる状態になります。

まとめると

  • client からパケットを受け取る (内容はすべて無視)
  • 'N' を送る
  • client からパケットを受け取る (内容はすべて無視)
  • 52 00 00 00 08 00 00 00 00 5a 00 00 00 05 49 を client に送る

とすると、client からクエリを受け取る準備ができます。

python でのコード例

import socket
import struct

addr, port = "127.0.0.1", 15432
PACKET_LENGTH = 4

def read_packet(client_socket):
    size = struct.unpack(">I", client_socket.recv(PACKET_LENGTH))[0]
    payload = client_socket.recv(size-PACKET_LENGTH)
    return payload

def startup(client_socket):
    read_packet(client_socket)
    client_socket.sendall(b"N")

    read_packet(client_socket)
    client_socket.sendall(b"\x52\x00\x00\x00\x08\x00\x00\x00\x00")
    client_socket.sendall(b"\x5a\x00\x00\x00\x05\x49")


if __name__ == '__main__':
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
    server_socket.bind((addr, port))
    server_socket.listen()
    client_socket, (client_address, client_port) = server_socket.accept()

    startup(client_socket)

query packet

T 172.18.0.2:34058 -> 172.18.0.4:5432 [AP] #46
  51 00 00 00 33 63 72 65    61 74 65 20 74 61 62 6c    Q...3create tabl
  65 20 68 6f 67 65 20 28    69 64 20 69 6e 74 2c 20    e hoge (id int, 
  6e 61 6d 65 20 76 61 72    63 68 61 72 28 32 35 35    name varchar(255
  29 29 3b 00                                           ));.            

T 172.18.0.4:5432 -> 172.18.0.2:34058 [AP] #48
  43 00 00 00 11 43 52 45    41 54 45 20 54 41 42 4c    C....CREATE TABL
  45 00 5a 00 00 00 05 49                               E.Z....I        et

Q から始まるパケットは Query パケットです。 | tag | length | string | から構成されます。このとき、string は null 末端です。

クエリが実行されると server は CommandComplete パケット(tag ’C’) と ReadyForQuery パケットを client に返しています。

CommandComplete パケットは Query パケット同様 | tag | length | string | というレイアウトになっています。string として何でも入れられます。

クエリを投げるフェーズに入ってからの基本的な流れは Query パケットもらって、CommandComplete, ReadyForQuery パケットを返すの繰り返しになります。

あとで説明しますが SELECT の場合はこれに加えてレコードのデータも送ることになります。

CommandComplete の string には何でも入れられると書きましたが、試しに入力したものをそのまま返す echo server を作ってみました。

import socket
import struct

addr, port = "127.0.0.1", 15432
PACKET_LENGTH = 4

ReadyForQuery = b"\x5a\x00\x00\x00\x05\x49"

def read_packet(client_socket):
    size = struct.unpack(">I", client_socket.recv(PACKET_LENGTH))[0]
    payload = client_socket.recv(size-PACKET_LENGTH)
    return payload

def startup(client_socket):
    read_packet(client_socket)
    client_socket.sendall(b"N")

    read_packet(client_socket)
    client_socket.sendall(b"\x52\x00\x00\x00\x08\x00\x00\x00\x00")
    client_socket.sendall(ReadyForQuery)

def read_regular_packet(client_socket):
    tag = client_socket.recv(1)
    payload = read_packet(client_socket)
    return payload

def send_packet(client_socket, data):
    size = struct.pack(">I", len(data)+PACKET_LENGTH)
    packet = b"C" + size + data
    client_socket.sendall(packet)
    client_socket.sendall(ReadyForQuery)


if __name__ == '__main__':
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
    server_socket.bind((addr, port))
    server_socket.listen()
    client_socket, (client_address, client_port) = server_socket.accept()

    startup(client_socket)

    while True:
        query = read_regular_packet(client_socket)
        send_packet(client_socket, query)
$ psql -h 127.0.0.1 -p 15432

psql (13.2, server 0.0.0)
Type "help" for help.

arch=> select * from hoge;
select * from hoge;
arch=> hoge ;
hoge ;
arch=> piyo
arch-> fuga;
piyo
fuga;

psql で打ったものがそのまま返ってくるというなんとも不思議な体験をすることができます。

自作DBMSで使うときは飛んできた Query を作った DBMS に投げてよしなに使ってください。

SELECT

次に SELECT の結果を表示する部分のパケットを見ていきます。

postgres=# select * from hoge;
 id |  name  
----+--------
  1 | tanaka
  2 | suzuki
(2 rows)

上の

 id |  name  
----+--------
  1 | tanaka
  2 | suzuki

の部分です。(2 rows) の部分は psql がよしなに作ってくれます。

T 172.18.0.2:34058 -> 172.18.0.4:5432 [AP] #54
  51 00 00 00 18 73 65 6c    65 63 74 20 2a 20 66 72    Q....select * fr
  6f 6d 20 68 6f 67 65 3b    00                         om hoge;.       

T 172.18.0.4:5432 -> 172.18.0.2:34058 [AP] #56
  54 00 00 00 32 00 02 69    64 00 00 00 40 03 00 01    T...2..id...@...
  00 00 00 17 00 04 ff ff    ff ff 00 00 6e 61 6d 65    ............name
  00 00 00 40 03 00 02 00    00 04 13 ff ff 00 00 01    ...@............
  03 00 00 44 00 00 00 15    00 02 00 00 00 01 31 00    ...D..........1.
  00 00 06 74 61 6e 61 6b    61 44 00 00 00 15 00 02    ...tanakaD......
  00 00 00 01 32 00 00 00    06 73 75 7a 75 6b 69 43    ....2....suzukiC
  00 00 00 0d 53 45 4c 45    43 54 20 32 00 5a 00 00    ....SELECT 2.Z..
  00 05 49                                              ..I

最初の SELECT の部分は CREATE TABLE のときと同じなので説明は飛ばします。

SELECT の結果はカラム名や型の情報が書かれた tag TRowDescription パケットと、実際のデータが入っている tag DDataRow パケットからなります。

まずは RowDescription パケットから見ていきます。

  54 00 00 00 32 00 02 69    64 00 00 00 40 03 00 01    T...2..id...@...
  00 00 00 17 00 04 ff ff    ff ff 00 00 6e 61 6d 65    ............name
  00 00 00 40 03 00 02 00    00 04 13 ff ff 00 00 01    ...@............
  03 00 00                                              ...

公式 doc の RowDescription のところを見ながらパケットを分解すると以下のようになります。

tag Byte1('T') 54
message length int32 00 00 00 32 packet の長さ
# field int16 00 02 カラム数
field name string 69 64 00 (id\0) 6e 61 6d 65 00 (name\0) カラム名
object id of the table int32 00 00 40 03 00 00 40 03 テーブルの object id. pg_class テーブル の oid を BigEndian で表示したもの
cols id int16 00 01 00 02 カラムのID
object id of datatype int32 00 00 00 17 00 00 04 13 pg_type テーブルに書かれている oid に対応
data type size int16 00 04 ff ff pg_type テーブルに書かれている typlen に対応
type modifier int32 ff ff ff ff 00 00 01 03 pg_attribute テーブルに書かれている atttypmod に対応
format code int16 00 00 00 00

テーブルの object id のところは自作 DBMS にとってはどうでもいい値なので適当な数字を入れていればokです。

object id of datatype / data type size / type modifier は client に server から飛んできた値が何の型になるのかを教えるためにありますが、単に psql で表示させたいだけならば、varchar を表す 00 00 04 13 / ff ff / 00 00 01 03 を常に送っていれば大丈夫です。

次に DataRow パケットを見ていきましょう。

                 44 00 00 00 15    00 02 00 00 00 01 31 00       D..........1.
  00 00 06 74 61 6e 61 6b    61 44 00 00 00 15 00 02    ...tanakaD......
  00 00 00 01 32 00 00 00    06 73 75 7a 75 6b 69         ....2....suzuki

ASCII を見ればわかるように D という文字が2つ見えるので、この中には DataRow が2つ含まれています。 1つ目の DataRow を分解すると以下のようになります。

tag Bytel('D') 44
length of message Int32 00 00 00 15
# cols Int16 00 02
length of column value Int32 00 00 00 01 00 00 00 06
column value Byten 31 (ASCII で 1) 74 61 6e 61 6b 61 (ASCII で tanaka)

見たままで送りたいデータを文字列化し、byte に変換すれば良いだけです。 2つ目の DataRow に関しても同様です。 このとき NULL を送りたいときは length of column value の部分が ff ff ff ff になり、column value の部分には何も入れません。

最後に CommandComplete と ReadyForQuery を送れば SELECT の内容を送ることができます。

terminate

最後が terminate です。client が切断してきたら socket をclose しましょう。

T 172.18.0.2:34058 -> 172.18.0.4:5432 [AP] #58
  58 00 00 00 04                                        X....

まとめ

最終的にまとめると以下のようになります。

# main.py
import socket
import struct

addr, port = "127.0.0.1", 15432
PACKET_LENGTH = 4

ReadyForQuery = b"\x5a\x00\x00\x00\x05\x49"

def read_packet(client_socket):
    size = struct.unpack(">I", client_socket.recv(PACKET_LENGTH))[0]
    payload = client_socket.recv(size-PACKET_LENGTH)
    return payload

def startup(client_socket):
    read_packet(client_socket)
    client_socket.sendall(b"N")

    read_packet(client_socket)
    client_socket.sendall(b"\x52\x00\x00\x00\x08\x00\x00\x00\x00")
    client_socket.sendall(ReadyForQuery)

def read_regular_packet(client_socket):
    tag = client_socket.recv(1)
    payload = read_packet(client_socket)
    return tag, payload

def send_packet(client_socket, data):
    data = data.encode() + b"\x00"
    size = struct.pack(">I", len(data)+PACKET_LENGTH)
    packet = b"C" + size + data
    client_socket.sendall(packet)
    client_socket.sendall(ReadyForQuery)

def do_something(query):
    query = query.decode().lower()
    if 'select' in query:
        return 'select', [['id', 'name'], [[1, 'tanaka'], [2, 'suzuki']]]

    return 'other', None

def send_select(client_socket, result):
    names = result[1][0]
    rows = result[1][1]

    # column description
    payload = b""
    l = len(names)
    payload += struct.pack('>h', l)
    for i, name in enumerate(names):
        payload += name.encode() + b"\x00"
        payload += b"\x00\x00\x00\x00" # table oid
        payload += struct.pack('>h', i+1) # col id
        payload += b"\x00\x00\x04\x13" # oid of datatype
        payload += b"\xff\xff"  # data type size
        payload += b"\x00\x00\x01\x03"  # type modifier
        payload += b"\x00\x00"  # format code

    desc_size = struct.pack(">I", len(payload) + PACKET_LENGTH)
    client_socket.sendall(b"T" + desc_size + payload)

    # data row
    for row in rows:
        payload = struct.pack('>h', l)
        for val in row:
            s = str(val).encode()
            ls = len(s)
            ss = struct.pack('>I', ls)
            payload += ss + s

        client_socket.sendall(b"D" + struct.pack('>I', len(payload) + PACKET_LENGTH) + payload)

    send_packet(client_socket, f"SELECT {len(rows)}")


if __name__ == '__main__':
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
    server_socket.bind((addr, port))
    server_socket.listen()
    client_socket, (client_address, client_port) = server_socket.accept()

    startup(client_socket)

    while True:
        tag, query = read_regular_packet(client_socket)
        if tag == b"\x58":
            break

        result = do_something(query)
        if result[0] == 'select':
            send_select(client_socket, result)
        else:
            send_packet(client_socket, "OK")

    client_socket.close()
    server_socket.close()
# terminal 1
$ python main.py

# terminal 2
$ psql -h 127.0.0.1 -p 15432 -U postgres
psql (13.2, server 0.0.0)
Type "help" for help.

postgres=> hoge;
OK
postgres=> select * from hoge;
 id |  name  
----+--------
 1  | tanaka
 2  | suzuki
(2 rows)

postgres=> \q

自作DBMSと連携させるときは do_something の部分をよしなに DBMS とつないでください。

PostgreSQL: Documentation: 13: 52.7. Message Formats

AWS Lambda Runtime Interface Clients / Emulator とはなにか

以前の記事を先に読むと理解が進むかも(?) goropikari.hatenablog.com

AWS Lambda がコンテナイメージをサポートしたのと時を同じくして Runtime Interface Clients (RIC) と Runtime Interface Emulator (RIE) というツールが公開されました。

RIC / RIE は AWS が配布している各言語用のイメージの中にはすでに含まれており、また暗黙的に動いているので普通に Lambda を使う分にはこれらのツールについて意識することはおそらくありません。

ではどんなときに意識するとよいのかというと次のような場合が考えられると思います

  • AmazonLinux 以外の OS イメージを使いたい
    • Debian が使いたいー
    • Alpine が使いたいー
    • etc...
  • 既存のコンテナイメージを Lambda 用に改造したい
  • Lambda の挙動をローカルでテストしたい

これらのことを簡単に実現させますよというのが RIC とRIE です。

Lambda はどう動いているのか

RIC/RIE を説明する前に Lambda はどのような流れで実行されるのかを確認しておきます。

Lambda

AWS Lambda execution environment

  1. API Gateway や S3 event などで Lambda が着火すると、まず Lambda Service は Execution Environment というものを作成します。
  2. その上で runtime などの初期化がなされます。
  3. その後 runtime は Runtime API を介して Lambda Service と通信し、event、context を取得、Function (Handler) にそれらを渡します
  4. Function の返り値を Runtime API を通して Lambda Service に渡します。
  5. Invoke がされなくなるまで 3~4 を繰り返します
  6. 一定時間の間 invoke がされなかったとき、Lambda Service は runtime をシャットダウンし、Execution Environment を削除します。

Runtime Interface Clients

先に説明したとおり runtime は Lambda Service と通信する必要があります。 ですが、世にあるコンテナイメージは Lambda Service と通信する機能を用意していません。 そんなときに Lambda Service とおしゃべりする仕組みを提供してくれるのが Runtime Interface Clients (RIC)です。 RIC を使わず Lambda Service と通信しようとすると前回の記事の bootstrap のようなものを自分で書かねばならず面倒です。

RIC は各言語用にパッケージとして提供されており、現在提供されているのは Lambda が標準でサポートしている以下の6種類です。

ref: Runtime support for Lambda container images

それ以外の言語は公式には用意されていないので自分で作りましょう!

一例として RIC を使って素の Python イメージを Lambda 用に変えてみます。

Dockerfile

FROM python:3.9-slim

WORKDIR /var/task
COPY main.py .

RUN pip install awslambdaric

ENTRYPOINT ["/usr/local/bin/python", "-m", "awslambdaric"]
CMD ["main.handler"]

main.py

def handler(event, context):
    return f"Hello {event}"
$ docker build ...
$ docker push ...
$ aws lambda update-funciton-code ...
$ aws lambda invoke ... --payload '"John"'
Hello John

前回の記事の ENTRYPOINT ["/var/runtime/bootstrap"] の部分が ENTRYPOINT ["/usr/local/bin/python", "-m", "awslambdaric"] に置き換わった感じです。

他の言語でも似たような感じで使うことができます。

既存のイメージを Lambda 用に改造したい場合は、handler 関数作って、RIC 入れて、ENTRYPOINT, CMD を上記のような感じで書けば Lambda で動かせるようになります。ね、簡単でしょ?

Runtime Interface Emulator

RIC によって Lambda Service と通信する手段を獲得しましたが、我々のローカル環境には Lambda Service はいないので作ったプログラムをテストできません。困りました。 そんなときに Lambda Service の代わりをしてくれるのが Runtime Interface Emulator (RIE) です。こちらは RIC と違い言語縛りはありません。現時点では Linux x86-64 用のバイナリのみ配布されています。とはいえ Go 製のツールなのでソースコードから自分でビルドすれば Mac でも動くと思います(試していませんが)。コンテナイメージ内で使われることを想定しているっぽいので今後も Linux 以外の OS はサポートしないと思います。

使い方は以下のように ENTRYPOINT, CMD で書いた内容をそのまま引数に取ります。

./aws-lambda-rie /usr/local/bin/python -m awslambdaric main.handler

このとき注意点としては python への PATH は絶対 PATH で書きます。

RIE を使うと 8080, 9001 port で Listen している HTTP server が立ち上がります。9001 が Runtime API 用で、8080 が Lambda 関数を invoke するための口です。

$ curl -XPOST "http://localhost:8080/2015-03-31/functions/function/invocations" -d '"John"'
"Hello John"

POST すると実行時間などが出力されますが、本物の Lambda でやった場合と比べるとかけ離れた数値を返すので参考になりません。

START RequestId: 762d8fc3-3fe8-4d08-8690-212e727fbbcc Version: $LATEST
END RequestId: 762d8fc3-3fe8-4d08-8690-212e727fbbcc
REPORT RequestId: 762d8fc3-3fe8-4d08-8690-212e727fbbcc  Init Duration: 0.28 ms  Duration: 64.73 ms  Billed Duration: 100 ms Memory Size: 3008 MB    Max Memory Used: 3008 MB    
START RequestId: ccacb48a-a5e2-4200-a7b6-51559ea2332e Version: $LATEST
END RequestId: ccacb48a-a5e2-4200-a7b6-51559ea2332e
REPORT RequestId: ccacb48a-a5e2-4200-a7b6-51559ea2332e  Duration: 1.31 ms   Billed Duration: 100 ms Memory Size: 3008 MB    Max Memory Used: 3008 MB

自作 Lambda Service

RIE の動きを見て、とりあえず HTTP server を2つ建てておけば良さそうな雰囲気を察したので雑にオレオレ Lambda Service を作ってみました。

server.py

# https://stackoverflow.com/a/60753

import json
import re
import uuid
from http.server import BaseHTTPRequestHandler, HTTPServer
from queue import Queue
from socketserver import ThreadingMixIn
from threading import Thread

inputs = Queue()
results = Queue()


class Lambda(BaseHTTPRequestHandler):
    def do_POST(self):
        if self.path == '/2015-03-31/functions/function/invocations':
            content_length = int(self.headers['Content-Length'])
            post_data = self.rfile.read(content_length)
            inputs.put(post_data)
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()
            self.wfile.write(results.get())


class RuntimeAPI(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == '/2018-06-01/runtime/invocation/next':
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.send_header("Lambda-Runtime-Aws-Request-Id",
                             str(uuid.uuid4()))
            self.end_headers()
            self.wfile.write(inputs.get())

    def do_POST(self):
        if re.match(r"/2018-06-01/runtime/invocation/.*/response", self.path):
            self.send_response(200)
            self.end_headers()
            content_length = int(self.headers['Content-Length'])
            post_data = self.rfile.read(content_length)
            results.put(post_data)


class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
    daemon_threads = True


def serve_on_port(port, handler):
    server = ThreadingHTTPServer(("localhost", port), handler)
    server.serve_forever()


if __name__ == '__main__':
    Thread(target=serve_on_port, args=[8080, Lambda]).start()
    serve_on_port(9001, RuntimeAPI)

RIC が AWS_LAMBDA_RUNTIME_API という環境変数を使うのでそれだけセットして実行します。

# terminal 1
$ python server.py

# terminal 2
$ export AWS_LAMBDA_RUNTIME_API=localhost:9001
$ /usr/local/bin/python -m awslambdaric main.handler

# terminal 3
$ curl -XPOST "http://localhost:8080/2015-03-31/functions/function/invocations" -d '"John"'
"Hello John"

無事にそれっぽい動きをしてくれました。

機能を絞ってしまえば、案外自分でも書けるものですね。

現時点で RIE は Logs API に対応しておらず Lambda extension の動作検証ができないので、案外早くオレオレ Lambda Service を使う日が来るかも(?)