我的编程空间,编程开发者的网络收藏夹
学习永远不晚

PostgreSQL 源码解读(89)- 查询语句#74(SeqNext函数#2)

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

PostgreSQL 源码解读(89)- 查询语句#74(SeqNext函数#2)

本节是SeqNext函数介绍的第二部分,主要介绍了SeqNext->heap_getnext函数的实现逻辑。

一、数据结构

TupleTableSlot
Tuple Table Slot,用于存储元组相关信息


typedef struct TupleTableSlot
{
    NodeTag     type;//Node标记
#define FIELDNO_TUPLETABLESLOT_FLAGS 1
    uint16      tts_flags;      
#define FIELDNO_TUPLETABLESLOT_NVALID 2
    AttrNumber  tts_nvalid;     
    const TupleTableSlotOps *const tts_ops; 
#define FIELDNO_TUPLETABLESLOT_TUPLEDESCRIPTOR 4
    TupleDesc   tts_tupleDescriptor;    
#define FIELDNO_TUPLETABLESLOT_VALUES 5
    Datum      *tts_values;     
#define FIELDNO_TUPLETABLESLOT_ISNULL 6
    bool       *tts_isnull;     
    MemoryContext tts_mcxt;     
} TupleTableSlot;


typedef struct tupleDesc
{
    int         natts;          
    Oid         tdtypeid;       
    int32       tdtypmod;       
    int         tdrefcount;     
    TupleConstr *constr;        
    
    //attrs[N]是第N+1个属性的描述符
    FormData_pg_attribute attrs[FLEXIBLE_ARRAY_MEMBER];
}  *TupleDesc;

HeapTuple
HeapTupleData是一个指向元组的内存数据结构
HeapTuple是指向HeapTupleData指针


typedef struct HeapTupleData
{
    uint32      t_len;          
    ItemPointerData t_self;     
    Oid         t_tableOid;     
#define FIELDNO_HEAPTUPLEDATA_DATA 3
    HeapTupleHeader t_data;     
} HeapTupleData;

typedef HeapTupleData *HeapTuple;

#define HEAPTUPLESIZE   MAXALIGN(sizeof(HeapTupleData))


HeapScanDesc
HeapScanDesc是指向HeapScanDescData结构体的指针

typedef struct HeapScanDescData
{
    
    Relation    rs_rd;          
    Snapshot    rs_snapshot;    
    int         rs_nkeys;       
    ScanKey     rs_key;         
    bool        rs_bitmapscan;  
    bool        rs_samplescan;  
    bool        rs_pageatatime; 
    bool        rs_allow_strat; 
    bool        rs_allow_sync;  
    bool        rs_temp_snap;   

    
    //在initscan时配置的状态
    BlockNumber rs_nblocks;     
    BlockNumber rs_startblock;  
    BlockNumber rs_numblocks;   
    
    //rs_numblocks通常值为InvalidBlockNumber,意味着扫描整个rel
    
    BufferAccessStrategy rs_strategy;   
    bool        rs_syncscan;    

    
    //扫描时的当前状态
    bool        rs_inited;      
    HeapTupleData rs_ctup;      
    BlockNumber rs_cblock;      
    Buffer      rs_cbuf;        
    
    //注意:如果rs_cbuf<>InvalidBuffer,在buffer设置pin

    ParallelHeapScanDesc rs_parallel;   

    
    //下面的变量只用于page-at-a-time模式以及位图扫描
    int         rs_cindex;      
    int         rs_ntuples;     
    OffsetNumber rs_vistuples[MaxHeapTuplesPerPage];    
} HeapScanDescData;


typedef struct HeapScanDescData *HeapScanDesc;

ScanState
ScanState扩展了对表示底层关系扫描的节点类型的PlanState。


typedef struct ScanState
{
    PlanState   ps;             
    Relation    ss_currentRelation;
    HeapScanDesc ss_currentScanDesc;
    TupleTableSlot *ss_ScanTupleSlot;
} ScanState;


typedef struct SeqScanState
{
    ScanState   ss;             
    Size        pscan_len;      
} SeqScanState;

二、源码解读

heap_getnext函数从数据表中获取下一个tuple.根据ScanDesc->rs_pageatatime的设定,如为T,则调用heapgettup_pagemode函数,使用page-at-a-time模式提取元组,否则调用函数heapgettup使用常规模式提取.


HeapTuple
heap_getnext(HeapScanDesc scan, ScanDirection direction)
{
    
    //注意:无需锁定处理
    HEAPDEBUG_1;                

    if (scan->rs_pageatatime)
        heapgettup_pagemode(scan, direction,
                            scan->rs_nkeys, scan->rs_key);//page-at-a-time模式
    else
        heapgettup(scan, direction, scan->rs_nkeys, scan->rs_key);//常规模式

    if (scan->rs_ctup.t_data == NULL)//已完成
    {
        HEAPDEBUG_2;            
        return NULL;
    }

    
    HEAPDEBUG_3;                

    pgstat_count_heap_getnext(scan->rs_rd);

    return &(scan->rs_ctup);
}



static void
heapgettup_pagemode(HeapScanDesc scan,//ScanDesc
                    ScanDirection dir,//扫描方向
                    int nkeys,//键个数
                    ScanKey key)//扫描键
{
    HeapTuple   tuple = &(scan->rs_ctup);//当前扫描的Tuple(scan->rs_ctup类型为HeapTupleData)
    bool        backward = ScanDirectionIsBackward(dir);//是否后向扫描
    BlockNumber page;//page编号
    bool        finished;//是否已完成
    Page        dp;//page
    int         lines;//
    int         lineindex;
    OffsetNumber lineoff;//偏移
    int         linesleft;
    ItemId      lpp;//项ID

    
    if (ScanDirectionIsForward(dir))
    {
        //前向扫描
        if (!scan->rs_inited)
        {
            //尚未初始化
            
            if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
            {
                Assert(!BufferIsValid(scan->rs_cbuf));
                tuple->t_data = NULL;
                return;
            }
            //判断是否并行扫描
            if (scan->rs_parallel != NULL)
            {
                //并行扫描初始化
                heap_parallelscan_startblock_init(scan);

                page = heap_parallelscan_nextpage(scan);

                
                //其他进程可能已经完成了扫描
                if (page == InvalidBlockNumber)
                {
                    Assert(!BufferIsValid(scan->rs_cbuf));
                    tuple->t_data = NULL;
                    return;
                }
            }
            else
                page = scan->rs_startblock; 
            //获取page
            heapgetpage(scan, page);
            //初始化lineindex为0
            lineindex = 0;
            //设置初始化标记为T
            scan->rs_inited = true;
        }
        else
        {
            //已完成初始化
            
            //从上一次返回的page/tuple处开始
            page = scan->rs_cblock; 
            lineindex = scan->rs_cindex + 1;//加+1
        }
        //根据buffer获取相应的page
        dp = BufferGetPage(scan->rs_cbuf);
        //验证快照是否过旧
        TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
        lines = scan->rs_ntuples;
        
        //page和lineindex现在依赖于下一个可见的tid
        linesleft = lines - lineindex;
    }
    else if (backward)
    {
        //反向扫描
        
        //并行后向扫描目前不支持
        Assert(scan->rs_parallel == NULL);

        if (!scan->rs_inited)
        {
            
            if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
            {
                Assert(!BufferIsValid(scan->rs_cbuf));
                tuple->t_data = NULL;
                return;
            }

            
            scan->rs_syncscan = false;//禁用sync扫描
            
            //从最后一个page开始
            if (scan->rs_startblock > 0)
                page = scan->rs_startblock - 1;//已开始扫描,减一
            else
                page = scan->rs_nblocks - 1;//未开始扫描,页数减一
            //获取page
            heapgetpage(scan, page);
        }
        else
        {
            
            //获取当前page
            page = scan->rs_cblock; 
        }
        //根据buffer获取page
        dp = BufferGetPage(scan->rs_cbuf);
        //快照是否过旧判断
        TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
        //行数
        lines = scan->rs_ntuples;

        if (!scan->rs_inited)
        {
            //未初始化,初始化相关信息
            lineindex = lines - 1;
            scan->rs_inited = true;
        }
        else
        {
            //已完成初始化,index-1
            lineindex = scan->rs_cindex - 1;
        }
        

        linesleft = lineindex + 1;
    }
    else
    {
        //既不是正向也不是反向,扫描不能移动
        
        if (!scan->rs_inited)
        {
            Assert(!BufferIsValid(scan->rs_cbuf));
            tuple->t_data = NULL;
            return;
        }

        page = ItemPointerGetBlockNumber(&(tuple->t_self));
        if (page != scan->rs_cblock)
            heapgetpage(scan, page);

        
        dp = BufferGetPage(scan->rs_cbuf);
        TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
        lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
        lpp = PageGetItemId(dp, lineoff);
        Assert(ItemIdIsNormal(lpp));

        tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
        tuple->t_len = ItemIdGetLength(lpp);

        
        Assert(scan->rs_cindex < scan->rs_ntuples);
        Assert(lineoff == scan->rs_vistuples[scan->rs_cindex]);

        return;
    }

    
    for (;;)
    {
        while (linesleft > 0)//该page中剩余的行数>0(linesleft > 0),亦即扫描该page
        {
            //获得偏移
            lineoff = scan->rs_vistuples[lineindex];
            //获取ItemID
            lpp = PageGetItemId(dp, lineoff);
            Assert(ItemIdIsNormal(lpp));
            //获取元组头部数据
            tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
            //大小
            tuple->t_len = ItemIdGetLength(lpp);
            //设置指针(ItemPointer是ItemPointerData结构体指针)
            ItemPointerSet(&(tuple->t_self), page, lineoff);

            
            if (key != NULL)
            {
                //扫描键不为NULL
                bool        valid;
                //验证是否符合要求
                HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
                            nkeys, key, valid);
                if (valid)
                {
                    //满足,则返回
                    scan->rs_cindex = lineindex;
                    return;
                }
            }
            else
            {
                //不存在扫描键,直接返回
                scan->rs_cindex = lineindex;
                return;
            }

            
            --linesleft;//减少剩余计数
            if (backward)
                --lineindex;//反向,减一
            else
                ++lineindex;//正向,加一
        }

        
        if (backward)//反向
        {
            finished = (page == scan->rs_startblock) ||
                (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false);//判断是否已完成
            if (page == 0)//如page为0
                page = scan->rs_nblocks;//重置为block数
            page--;//page减一
        }
        else if (scan->rs_parallel != NULL)
        {
            //并行扫描
            page = heap_parallelscan_nextpage(scan);
            finished = (page == InvalidBlockNumber);
        }
        else
        {
            //正向扫描
            page++;//page加一
            if (page >= scan->rs_nblocks)
                page = 0;//page超出总数,重置为0
            finished = (page == scan->rs_startblock) ||
                (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false);//判断是否已完成

            
            if (scan->rs_syncscan)
                //同步扫描,报告位置
                ss_report_location(scan->rs_rd, page);
        }

        
        if (finished)
        {
            if (BufferIsValid(scan->rs_cbuf))
                ReleaseBuffer(scan->rs_cbuf);
            scan->rs_cbuf = InvalidBuffer;
            scan->rs_cblock = InvalidBlockNumber;
            tuple->t_data = NULL;
            scan->rs_inited = false;
            return;
        }
        //获取下一个page,继续循环
        heapgetpage(scan, page);
        //执行类似的逻辑
        dp = BufferGetPage(scan->rs_cbuf);
        TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
        lines = scan->rs_ntuples;
        linesleft = lines;
        if (backward)
            lineindex = lines - 1;
        else
            lineindex = 0;//ItemID从0开始
    }
}




static void
heapgettup(HeapScanDesc scan,//ScanDesc
           ScanDirection dir,//扫描方向
           int nkeys,//扫描键个数
           ScanKey key)//扫描键
{
    HeapTuple   tuple = &(scan->rs_ctup);//当前的tuple
    Snapshot    snapshot = scan->rs_snapshot;//快照
    bool        backward = ScanDirectionIsBackward(dir);//
    BlockNumber page;
    bool        finished;
    Page        dp;
    int         lines;
    OffsetNumber lineoff;
    int         linesleft;
    ItemId      lpp;

    
    if (ScanDirectionIsForward(dir))
    {
        //参照heapgettup_pagemode注释
        if (!scan->rs_inited)
        {
            
            if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
            {
                Assert(!BufferIsValid(scan->rs_cbuf));
                tuple->t_data = NULL;
                return;
            }
            if (scan->rs_parallel != NULL)
            {
                heap_parallelscan_startblock_init(scan);

                page = heap_parallelscan_nextpage(scan);

                
                if (page == InvalidBlockNumber)
                {
                    Assert(!BufferIsValid(scan->rs_cbuf));
                    tuple->t_data = NULL;
                    return;
                }
            }
            else
                page = scan->rs_startblock; 
            heapgetpage(scan, page);
            lineoff = FirstOffsetNumber;    
            scan->rs_inited = true;
        }
        else
        {
            
            page = scan->rs_cblock; 
            lineoff =           
                OffsetNumberNext(ItemPointerGetOffsetNumber(&(tuple->t_self)));
        }

        LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);

        dp = BufferGetPage(scan->rs_cbuf);
        TestForOldSnapshot(snapshot, scan->rs_rd, dp);
        lines = PageGetMaxOffsetNumber(dp);
        

        linesleft = lines - lineoff + 1;
    }
    else if (backward)
    {
        //参照heapgettup_pagemode注释
        
        Assert(scan->rs_parallel == NULL);

        if (!scan->rs_inited)
        {
            
            if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
            {
                Assert(!BufferIsValid(scan->rs_cbuf));
                tuple->t_data = NULL;
                return;
            }

            
            scan->rs_syncscan = false;
            
            if (scan->rs_startblock > 0)
                page = scan->rs_startblock - 1;
            else
                page = scan->rs_nblocks - 1;
            heapgetpage(scan, page);
        }
        else
        {
            
            page = scan->rs_cblock; 
        }
        //锁定buffer(BUFFER_LOCK_SHARE)
        //这里跟pagemode不同,需要锁定
        LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
        //
        dp = BufferGetPage(scan->rs_cbuf);
        TestForOldSnapshot(snapshot, scan->rs_rd, dp);
        //获取最大偏移
        lines = PageGetMaxOffsetNumber(dp);

        if (!scan->rs_inited)
        {
            lineoff = lines;    
            scan->rs_inited = true;
        }
        else
        {
            lineoff =           
                OffsetNumberPrev(ItemPointerGetOffsetNumber(&(tuple->t_self)));
        }
        

        linesleft = lineoff;
    }
    else
    {
        
        if (!scan->rs_inited)
        {
            Assert(!BufferIsValid(scan->rs_cbuf));
            tuple->t_data = NULL;
            return;
        }

        page = ItemPointerGetBlockNumber(&(tuple->t_self));
        if (page != scan->rs_cblock)
            heapgetpage(scan, page);

        
        dp = BufferGetPage(scan->rs_cbuf);
        TestForOldSnapshot(snapshot, scan->rs_rd, dp);
        lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
        lpp = PageGetItemId(dp, lineoff);
        Assert(ItemIdIsNormal(lpp));

        tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
        tuple->t_len = ItemIdGetLength(lpp);

        return;
    }

    
    lpp = PageGetItemId(dp, lineoff);
    for (;;)
    {
        while (linesleft > 0)
        {
            if (ItemIdIsNormal(lpp))
            {
                bool        valid;

                tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
                tuple->t_len = ItemIdGetLength(lpp);
                ItemPointerSet(&(tuple->t_self), page, lineoff);

                
                //判断是否满足可见性(MVCC机制)
                valid = HeapTupleSatisfiesVisibility(tuple,
                                                     snapshot,
                                                     scan->rs_cbuf);
                //检查是否存在Serializable冲突
                CheckForSerializableConflictOut(valid, scan->rs_rd, tuple,
                                                scan->rs_cbuf, snapshot);

                if (valid && key != NULL)
                    HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
                                nkeys, key, valid);//扫描键验证

                if (valid)
                {
                    //解锁buffer,返回
                    LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
                    return;
                }
            }

            
            --linesleft;//下一个Item
            if (backward)
            {
                --lpp;          
                --lineoff;
            }
            else
            {
                ++lpp;          
                ++lineoff;
            }
        }

        
        //解锁
        LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);

        
        if (backward)
        {
            finished = (page == scan->rs_startblock) ||
                (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false);
            if (page == 0)
                page = scan->rs_nblocks;
            page--;
        }
        else if (scan->rs_parallel != NULL)
        {
            page = heap_parallelscan_nextpage(scan);
            finished = (page == InvalidBlockNumber);
        }
        else
        {
            page++;
            if (page >= scan->rs_nblocks)
                page = 0;
            finished = (page == scan->rs_startblock) ||
                (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false);

            
            if (scan->rs_syncscan)
                ss_report_location(scan->rs_rd, page);
        }

        
        if (finished)
        {
            if (BufferIsValid(scan->rs_cbuf))
                ReleaseBuffer(scan->rs_cbuf);
            scan->rs_cbuf = InvalidBuffer;
            scan->rs_cblock = InvalidBlockNumber;
            tuple->t_data = NULL;
            scan->rs_inited = false;
            return;
        }

        heapgetpage(scan, page);
        //锁定buffer
        LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);

        dp = BufferGetPage(scan->rs_cbuf);
        TestForOldSnapshot(snapshot, scan->rs_rd, dp);
        lines = PageGetMaxOffsetNumber((Page) dp);
        linesleft = lines;
        if (backward)
        {
            lineoff = lines;
            lpp = PageGetItemId(dp, lines);
        }
        else
        {
            lineoff = FirstOffsetNumber;
            lpp = PageGetItemId(dp, FirstOffsetNumber);
        }
    }
}


//--------------------------------------------------- heapgetpage


void
heapgetpage(HeapScanDesc scan, BlockNumber page)
{
    Buffer      buffer;
    Snapshot    snapshot;
    Page        dp;
    int         lines;
    int         ntup;
    OffsetNumber lineoff;
    ItemId      lpp;
    bool        all_visible;

    Assert(page < scan->rs_nblocks);

    
    //释放上一次扫描使用的buffer
    if (BufferIsValid(scan->rs_cbuf))
    {
        ReleaseBuffer(scan->rs_cbuf);
        scan->rs_cbuf = InvalidBuffer;
    }

    
    CHECK_FOR_INTERRUPTS();

    
    //使用选定的策略读取page
    //赋值:rs_cbuf & rs_cblock
    scan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM, page,
                                       RBM_NORMAL, scan->rs_strategy);
    scan->rs_cblock = page;
    //如非page-at-a-time模式,直接返回
    if (!scan->rs_pageatatime)
        return;

    //page-at-a-time模式
    buffer = scan->rs_cbuf;
    snapshot = scan->rs_snapshot;

    
    heap_page_prune_opt(scan->rs_rd, buffer);

    
    //上锁BUFFER_LOCK_SHARE
    LockBuffer(buffer, BUFFER_LOCK_SHARE);
    //获取page
    dp = BufferGetPage(buffer);
    //验证快照是否过旧
    TestForOldSnapshot(snapshot, scan->rs_rd, dp);
    //行数
    lines = PageGetMaxOffsetNumber(dp);
    //初始化
    ntup = 0;

    
    //验证可见性
    all_visible = PageIsAllVisible(dp) && !snapshot->takenDuringRecovery;
    //扫描Item
    for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
         lineoff <= lines;
         lineoff++, lpp++)
    {
        if (ItemIdIsNormal(lpp))
        {
            HeapTupleData loctup;
            bool        valid;

            loctup.t_tableOid = RelationGetRelid(scan->rs_rd);
            loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
            loctup.t_len = ItemIdGetLength(lpp);
            ItemPointerSet(&(loctup.t_self), page, lineoff);

            if (all_visible)
                valid = true;
            else
                valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);

            CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup,
                                            buffer, snapshot);

            if (valid)
                scan->rs_vistuples[ntup++] = lineoff;
        }
    }
    //done,释放共享锁
    LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

    Assert(ntup <= MaxHeapTuplesPerPage);
    scan->rs_ntuples = ntup;
}

//--------------------------------------------------- PageGetMaxOffsetNumber


#define PageGetMaxOffsetNumber(page) \
    (((PageHeader) (page))->pd_lower <= SizeOfPageHeaderData ? 0 : \
     ((((PageHeader) (page))->pd_lower - SizeOfPageHeaderData) \
      / sizeof(ItemIdData)))


//--------------------------------------------------- TestForOldSnapshot

static inline void
TestForOldSnapshot(Snapshot snapshot, Relation relation, Page page)
{
    Assert(relation != NULL);

    if (old_snapshot_threshold >= 0
        && (snapshot) != NULL
        && ((snapshot)->satisfies == HeapTupleSatisfiesMVCC
            || (snapshot)->satisfies == HeapTupleSatisfiesToast)
        && !XLogRecPtrIsInvalid((snapshot)->lsn)
        && PageGetLSN(page) > (snapshot)->lsn)
        TestForOldSnapshot_impl(snapshot, relation);
}

//--------------------------------------------------- TestForOldSnapshot_impl


void
TestForOldSnapshot_impl(Snapshot snapshot, Relation relation)
{
    if (RelationAllowsEarlyPruning(relation)
        && (snapshot)->whenTaken < GetOldSnapshotThresholdTimestamp())
        ereport(ERROR,
                (errcode(ERRCODE_SNAPSHOT_TOO_OLD),
                 errmsg("snapshot too old")));
}


//--------------------------------------------------- HeapKeyTest


#define HeapKeyTest(tuple, \
                    tupdesc, \
                    nkeys, \
                    keys, \
                    result) \
do \
{ \
     \
     \
    int         __cur_nkeys = (nkeys); \
    ScanKey     __cur_keys = (keys); \
 \
    (result) = true;  \
    for (; __cur_nkeys--; __cur_keys++) \
    { \
        Datum   __atp; \
        bool    __isnull; \
        Datum   __test; \
 \
        if (__cur_keys->sk_flags & SK_ISNULL) \
        { \
            (result) = false; \
            break; \
        } \
 \
        __atp = heap_getattr((tuple), \
                             __cur_keys->sk_attno, \
                             (tupdesc), \
                             &__isnull); \
 \
        if (__isnull) \
        { \
            (result) = false; \
            break; \
        } \
 \
        __test = FunctionCall2Coll(&__cur_keys->sk_func, \
                                   __cur_keys->sk_collation, \
                                   __atp, __cur_keys->sk_argument); \
 \
        if (!DatumGetBool(__test)) \
        { \
            (result) = false; \
            break; \
        } \
    } \
} while (0)

三、跟踪分析

测试脚本如下

testdb=# explain select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je 
testdb-# from t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je 
testdb(#                         from t_grxx gr inner join t_jfxx jf 
testdb(#                                        on gr.dwbh = dw.dwbh 
testdb(#                                           and gr.grbh = jf.grbh) grjf
testdb-# order by dw.dwbh;
                                        QUERY PLAN                                        
------------------------------------------------------------------------------------------
 Sort  (cost=20070.93..20320.93 rows=100000 width=47)
   Sort Key: dw.dwbh
   ->  Hash Join  (cost=3754.00..8689.61 rows=100000 width=47)
         Hash Cond: ((gr.dwbh)::text = (dw.dwbh)::text)
         ->  Hash Join  (cost=3465.00..8138.00 rows=100000 width=31)
               Hash Cond: ((jf.grbh)::text = (gr.grbh)::text)
               ->  Seq Scan on t_jfxx jf  (cost=0.00..1637.00 rows=100000 width=20)
               ->  Hash  (cost=1726.00..1726.00 rows=100000 width=16)
                     ->  Seq Scan on t_grxx gr  (cost=0.00..1726.00 rows=100000 width=16)
         ->  Hash  (cost=164.00..164.00 rows=10000 width=20)
               ->  Seq Scan on t_dwxx dw  (cost=0.00..164.00 rows=10000 width=20)
(11 rows)

启动gdb,设置断点,进入heap_getnext

(gdb) b heap_getnext
Breakpoint 1 at 0x4de01f: file heapam.c, line 1841.
(gdb) c
Continuing.

Breakpoint 1, heap_getnext (scan=0x2aadc18, direction=ForwardScanDirection) at heapam.c:1841
1841        if (scan->rs_pageatatime)

查看输入参数,注意rs_pageatatime = true,使用page-at-a-time模式查询

(gdb) p *scan
$1 = {rs_rd = 0x7efdb8f2dfd8, rs_snapshot = 0x2a2a6d0, rs_nkeys = 0, rs_key = 0x0, rs_bitmapscan = false, 
  rs_samplescan = false, rs_pageatatime = true, rs_allow_strat = true, rs_allow_sync = true, rs_temp_snap = false, 
  rs_nblocks = 726, rs_startblock = 0, rs_numblocks = 4294967295, rs_strategy = 0x0, rs_syncscan = false, 
  rs_inited = false, rs_ctup = {t_len = 2139062143, t_self = {ip_blkid = {bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, 
    t_tableOid = 16742, t_data = 0x0}, rs_cblock = 4294967295, rs_cbuf = 0, rs_parallel = 0x0, rs_cindex = 2139062143, 
  rs_ntuples = 2139062143, rs_vistuples = {32639 <repeats 291 times>}}

进入heapgettup_pagemode函数

(gdb) n
1842            heapgettup_pagemode(scan, direction,
(gdb) step
heapgettup_pagemode (scan=0x2aadc18, dir=ForwardScanDirection, nkeys=0, key=0x0) at heapam.c:794
794     HeapTuple   tuple = &(scan->rs_ctup);
(gdb) 

heapgettup_pagemode->变量赋值,注意tuple还是一个"野"指针;尚未初始化p scan->rs_inited = false

794     HeapTuple   tuple = &(scan->rs_ctup);
(gdb) n
795     bool        backward = ScanDirectionIsBackward(dir);
(gdb) p *tuple
$2 = {t_len = 2139062143, t_self = {ip_blkid = {bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, t_tableOid = 16742, 
  t_data = 0x0}
(gdb) n
808     if (ScanDirectionIsForward(dir))
(gdb) p scan->rs_inited
$3 = false

heapgettup_pagemode->非并行扫描,page = scan->rs_startblock(即page = 0)

(gdb) n
815             if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
(gdb) n
821             if (scan->rs_parallel != NULL)
(gdb) 
836                 page = scan->rs_startblock; 
(gdb) 

进入heapgetpage

(gdb) n
837             heapgetpage(scan, page);
(gdb) step
heapgetpage (scan=0x2aadc18, page=0) at heapam.c:362
362     Assert(page < scan->rs_nblocks);

heapgetpage->检查验证&读取page

362     Assert(page < scan->rs_nblocks);
(gdb) n
365     if (BufferIsValid(scan->rs_cbuf))
(gdb) p scan->rs_cbuf
$4 = 0
(gdb) n
376     CHECK_FOR_INTERRUPTS();
(gdb) 
379     scan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM, page,
(gdb) 
381     scan->rs_cblock = page;
(gdb) 
383     if (!scan->rs_pageatatime)

heapgetpage->rs_cbuf为346/rs_cblock为0

(gdb) p scan->rs_cbuf
$5 = 346
(gdb) p scan->rs_cblock
$6 = 0

heapgetpage->page-at-a-time模式读取,变量赋值,锁定缓冲区

(gdb) n
386     buffer = scan->rs_cbuf;
(gdb) n
386     buffer = scan->rs_cbuf;
(gdb) 
387     snapshot = scan->rs_snapshot;
(gdb) 
392     heap_page_prune_opt(scan->rs_rd, buffer);
(gdb) 
399     LockBuffer(buffer, BUFFER_LOCK_SHARE);
(gdb) p buffer
$7 = 346

heapgetpage->获取page,检查快照是否过旧,获取行数

(gdb) n
401     dp = BufferGetPage(buffer);
(gdb) 
402     TestForOldSnapshot(snapshot, scan->rs_rd, dp);
(gdb) p dp
$8 = (Page) 0x7efda4b7ac00 "\001"
(gdb) n
403     lines = PageGetMaxOffsetNumber(dp);
(gdb) 
404     ntup = 0;
(gdb) p lines
$11 = 158

heapgetpage->验证可见性

(gdb) n
426     all_visible = PageIsAllVisible(dp) && !snapshot->takenDuringRecovery;
(gdb) 
428     for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
(gdb) p all_visible
$12 = false
(gdb) n
429          lineoff <= lines;
(gdb) 
428     for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
(gdb) p lineoff
$13 = 1
(gdb) n
432         if (ItemIdIsNormal(lpp))
(gdb) 
437             loctup.t_tableOid = RelationGetRelid(scan->rs_rd);
(gdb) 
438             loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
(gdb) 
439             loctup.t_len = ItemIdGetLength(lpp);
(gdb) 
440             ItemPointerSet(&(loctup.t_self), page, lineoff);
(gdb) 
442             if (all_visible)
(gdb) 
445                 valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
(gdb) 
447             CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup,
(gdb) 
450             if (valid)
(gdb) 
451                 scan->rs_vistuples[ntup++] = lineoff;
(gdb) 
430          lineoff++, lpp++)
(gdb) 
...

heapgettup_pagemode->退出heapgetpage,回到heapgettup_pagemode,初始化lineindex为0,设置rs_inited为T

(gdb) finish
Run till exit from #0  heapgetpage (scan=0x2aadc18, page=0) at heapam.c:430
heapgettup_pagemode (scan=0x2aadc18, dir=ForwardScanDirection, nkeys=0, key=0x0) at heapam.c:838
838             lineindex = 0;
(gdb) n
839             scan->rs_inited = true;
(gdb) 
848         dp = BufferGetPage(scan->rs_cbuf);

heapgettup_pagemode->获取page,验证快照是否过旧

(gdb) n
849         TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
(gdb) p dp
$18 = (Page) 0x7efda4b7ac00 "\001"

heapgettup_pagemode->计算Item数,开始循环

(gdb) n
850         lines = scan->rs_ntuples;
(gdb) 
853         linesleft = lines - lineindex;
(gdb) 
948         while (linesleft > 0)
(gdb) p lines
$19 = 158
(gdb) p linesleft
$20 = 158
(gdb) 

heapgettup_pagemode->获取Item偏移(lineoff)和ItemId

(gdb) p lineoff
$21 = 1
(gdb) p lpp
$22 = (ItemId) 0x7efda4b7ac18
(gdb) p *lpp
$23 = {lp_off = 8152, lp_flags = 1, lp_len = 40}

heapgettup_pagemode->给tuple中的变量赋值,ItemPointer是ItemPointerData结构体指针

(gdb) n
954             tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
(gdb) 
955             tuple->t_len = ItemIdGetLength(lpp);
(gdb) 
956             ItemPointerSet(&(tuple->t_self), page, lineoff);
(gdb) 
961             if (key != NULL)
(gdb) p *tuple->t_data
$26 = {t_choice = {t_heap = {t_xmin = 862, t_xmax = 0, t_field3 = {t_cid = 0, t_xvac = 0}}, t_datum = {datum_len_ = 862, 
      datum_typmod = 0, datum_typeid = 0}}, t_ctid = {ip_blkid = {bi_hi = 0, bi_lo = 0}, ip_posid = 1}, t_infomask2 = 5, 
  t_infomask = 2306, t_hoff = 24 '\030', t_bits = 0x7efda4b7cbef ""}
(gdb) p tuple->t_len
$28 = 40
(gdb) p tuple->t_self
$29 = {ip_blkid = {bi_hi = 0, bi_lo = 0}, ip_posid = 1}

heapgettup_pagemode->设置scan->rs_cindex,返回

(gdb) n
975                 scan->rs_cindex = lineindex;
(gdb) n
976                 return;
(gdb) p scan->rs_cindex 
$30 = 0

回到heap_getnext

(gdb) 
heap_getnext (scan=0x2aadc18, direction=ForwardScanDirection) at heapam.c:1847
1847        if (scan->rs_ctup.t_data == NULL)

返回获得的tuple

1847        if (scan->rs_ctup.t_data == NULL)
(gdb) n
1859        pgstat_count_heap_getnext(scan->rs_rd);
(gdb) 
1861        return &(scan->rs_ctup);
(gdb) p scan->rs_ctup
$31 = {t_len = 40, t_self = {ip_blkid = {bi_hi = 0, bi_lo = 0}, ip_posid = 1}, t_tableOid = 16742, t_data = 0x7efda4b7cbd8}

结束第一次调用,再次进入该函数

(gdb) c
Continuing.

Breakpoint 1, heap_getnext (scan=0x2aadc18, direction=ForwardScanDirection) at heapam.c:1841
1841        if (scan->rs_pageatatime)
(gdb) n
1842            heapgettup_pagemode(scan, direction,
(gdb) step
heapgettup_pagemode (scan=0x2aadc18, dir=ForwardScanDirection, nkeys=0, key=0x0) at heapam.c:794
794     HeapTuple   tuple = &(scan->rs_ctup);
(gdb) n
795     bool        backward = ScanDirectionIsBackward(dir);
(gdb) 
808     if (ScanDirectionIsForward(dir))
(gdb) 
810         if (!scan->rs_inited)
(gdb) 
844             page = scan->rs_cblock; 

查看输入参数scan,与上一次有所不同,存储了上一次调用返回的一些信息,如rs_vistuples等

(gdb) p *scan
$32 = {rs_rd = 0x7efdb8f2dfd8, rs_snapshot = 0x2a2a6d0, rs_nkeys = 0, rs_key = 0x0, rs_bitmapscan = false, 
  rs_samplescan = false, rs_pageatatime = true, rs_allow_strat = true, rs_allow_sync = true, rs_temp_snap = false, 
  rs_nblocks = 726, rs_startblock = 0, rs_numblocks = 4294967295, rs_strategy = 0x0, rs_syncscan = false, rs_inited = true, 
  rs_ctup = {t_len = 40, t_self = {ip_blkid = {bi_hi = 0, bi_lo = 0}, ip_posid = 1}, t_tableOid = 16742, 
    t_data = 0x7efda4b7cbd8}, rs_cblock = 0, rs_cbuf = 346, rs_parallel = 0x0, rs_cindex = 0, rs_ntuples = 158, 
  rs_vistuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 
    29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 
    59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 
    89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 
    115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 
    139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 
    32639 <repeats 133 times>}}

DONE!

四、参考资料

PostgreSQL Page页结构解析(1)-基础
PostgreSQL Page页结构解析(2)- 页头和行数据指针
PostgreSQL Page页结构解析(3)- 行数据

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

PostgreSQL 源码解读(89)- 查询语句#74(SeqNext函数#2)

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

PostgreSQL 源码解读(71)- 查询语句#56(make_one_rel函数#21-...

本节大体介绍了动态规划算法实现(standard_join_search)中的join_search_one_level->make_join_rel->populate_joinrel_with_paths->add_paths_to_j
2022-11-30

编程热搜

目录