编写具有即时更改数据库功能的简单ORM

图片



哈Ha!由于在holivar文章下发表了粗心的评论,因此Karma耗尽了精力,这意味着您需要写一篇有趣的(我希望)帖子并进行自我恢复。



多年来,我一直在php中使用电报服务器客户端。和许多用户一样-厌倦了内存消耗的不断增长。某些会话可能占用1到8 GB的RAM!承诺支持数据库已有很长时间,但是在这个方向上没有任何进展。我必须自己解决问题:)开源项目的普及对请求请求提出了有趣的要求:



  1. 向后兼容所有现有会话都应在新版本中继续工作(会话是文件中应用程序的序列化实例);
  2. 数据库选择的自由由于用户具有不同的环境配置,因此可以随时更改存储类型而不会丢失数据的能力;
  3. 可扩展性易于添加新型数据库;
  4. 保存界面处理数据的应用程序代码不应更改;
  5. 异步性该项目使用amphp,因此所有数据库操作都必须是非阻塞的。


有关详细信息,我邀请所有人陪伴。



我们将转移什么



MadelineProto的大部分内存都被聊天,用户和文件占用。例如,在对等缓存中,我有2万多个条目。这些是该帐户曾经见过的所有用户(包括所有组的成员),以及渠道,漫游器和组。帐户越老越活跃,内存中就会存储更多数据。它们是数十和数百兆字节,并且大多数未使用。但是您无法清除整个缓存,因为在多次尝试接收相同数据时,电报会立即严重限制该帐户。例如,在我的公共演示服务器上重新创建会话后,一周之内的电报会以FLOOD_WAIT错误回答大多数请求,但实际上没有任何作用。缓存预热后,一切恢复正常。



从代码的角度来看,此数据作为数组存储在一对类的属性中。



建筑



根据需求,制定了一个方案:



  • 所有“重”数组都将替换为实现ArrayAccess的对象。
  • 对于每种类型的数据库,我们都创建自己的继承基类的类。
  • 在__consrtuct和__awake期间创建对象并将其写入属性。
  • 抽象工厂根据应用程序设置中选择的数据库为对象选择所需的类。
  • 如果应用程序已经具有另一种类型的存储,则我们从那里读取所有数据,并将阵列写入新的存储。


异步世界问题



我要做的第一件事是创建接口和一个用于在内存中存储数组的类。这是默认设置,其行为与程序的旧版本相同。在第一个晚上,我对原型的成功感到非常兴奋。该代码非常简单。到目前为止,尚未发现不可能在Iterator接口的方法内部以及负责unset和isset的方法内部使用生成器。



这里需要澄清的是,amphp使用生成器语法在php中实现异步。产量变得类似于异步...等待js。如果方法使用异步,则为了从中获取结果,您需要在代码中使用yield来等待该结果。例如:



<?php

include 'vendor/autoload.php';

$MadelineProto = new \danog\MadelineProto\API('session.madeline');
$MadelineProto->async(true);

$MadelineProto->loop(function() use($MadelineProto) {
    $myAsyncFunction = function() use($MadelineProto): \Generator {
        $me = yield $MadelineProto->start();
        yield $MadelineProto->echo(json_encode($me, JSON_PRETTY_PRINT|JSON_UNESCAPED_UNICODE));
    };

    yield $myAsyncFunction();
});


如果来自字符串
yield $myAsyncFunction();
删除yield,然后在执行此代码之前终止应用程序。我们不会得到结果。



在调用方法和函数之前添加yield并不是很困难。但是,由于使用了ArrayAccess接口,因此不会直接调用这些方法。例如,unset()调用offsetUnset(),而isset()调用offsetIsset()。使用Iterator接口时,情况与foreach迭代器相似。



在内置方法之前增加yield会产生错误,因为这些方法并非设计用于生成器。评论中的内容:此处此处



我不得不妥协并重写代码以使用自己的方法。幸运的是,这样的地方很少。在大多数情况下,数组用于按键读取或写入。此功能与生成器成为了好朋友。



结果界面为:



<?php

use Amp\Producer;
use Amp\Promise;

interface DbArray extends DbType, \ArrayAccess, \Countable
{
    public function getArrayCopy(): Promise;
    public function isset($key): Promise;
    public function offsetGet($offset): Promise;
    public function offsetSet($offset, $value);
    public function offsetUnset($offset): Promise;
    public function count(): Promise;
    public function getIterator(): Producer;

    /**
     * @deprecated
     * @internal
     * @see DbArray::isset();
     *
     * @param mixed $offset
     *
     * @return bool
     */
    public function offsetExists($offset);
}


处理数据的例子



<?php
...
//
$existingChat = yield $this->chats[$user['id']];

//. 
yield $this->chats[$user['id']] = $user;
//   yield,           .
$this->chats[$user['id']] = $user;


//unset
yield $this->chats->offsetUnset($id);

//foreach
$iterator = $this->chats->getIterator();
while (yield $iterator->advance()) {
    [$key, $value] = $iterator->getCurrent();
    //  
}


数据存储



存储数据的最简单方法是序列化。为了支持对象,我不得不放弃使用json。该表有两个主要列:键和值。



创建表的示例sql查询:



            CREATE TABLE IF NOT EXISTS `{$this->table}`
            (
                `key` VARCHAR(255) NOT NULL,
                `value` MEDIUMBLOB NULL,
                `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                PRIMARY KEY (`key`)
            )
            ENGINE = InnoDB
            CHARACTER SET 'utf8mb4' 
            COLLATE 'utf8mb4_general_ci'


每次应用程序启动时,我们都会尝试为每个属性创建一个表。不建议电报客户端每隔几个小时重新启动一次,因此,我们每秒将没有几个创建表的请求:)



由于主键不会自动递增,因此可以使用一个查询完成数据插入和更新,就像在常规数组中那样:



INSERT INTO `{$this->table}` 
            SET `key` = :index, `value` = :value 
            ON DUPLICATE KEY UPDATE `value` = :value


将为每个变量创建一个名称格式为%account_id%_%class%_%variable_name%的表。但是,当您第一次启动该应用程序时,还没有帐户。在这种情况下,您必须生成带有tmp前缀的随机临时ID。每次启动时,每个变量的类都会检查帐户ID是否已出现。如果存在id,则表将被重命名。



指标



数据库的结构尽可能简单,以便将来会自动添加新属性。没有连接。仅使用PRIMARY键索引。但是在某些情况下,您需要在其他字段中进行搜索。



例如,有一个数组/表聊天。其中的关键是聊天ID。但通常您必须按用户名进行搜索。当应用程序将数据存储在数组中时,按常规通过在foreach中遍历数组来执行按用户名的搜索。该搜索在内存中以可接受的速度工作,但在数据库中没有。因此,创建了另一个表/数组,并在类中创建了相应的属性。密钥是用户名,值是聊天ID。这种方法的唯一缺点是,您必须编写其他代码来同步两个表。



快取



本地mysql速度很快,但是一点缓存也不会带来任何麻烦。尤其是如果连续多次使用相同的值。例如,首先我们检查数据库中是否存在聊天,然后从中获取一些数据。



编写了一个简单的自行车特征。



<?php

namespace danog\MadelineProto\Db;

use Amp\Loop;
use danog\MadelineProto\Logger;

trait ArrayCacheTrait
{
    /**
     * Values stored in this format:
     * [
     *      [
     *          'value' => mixed,
     *          'ttl' => int
     *      ],
     *      ...
     * ].
     * @var array
     */
    protected array $cache = [];
    protected string $ttl = '+5 minutes';
    private string $ttlCheckInterval = '+1 minute';

    protected function getCache(string $key, $default = null)
    {
        $cacheItem = $this->cache[$key] ?? null;
        $result = $default;

        if (\is_array($cacheItem)) {
            $result = $cacheItem['value'];
            $this->cache[$key]['ttl'] = \strtotime($this->ttl);
        }

        return $result;
    }

    /**
     * Save item in cache.
     *
     * @param string $key
     * @param $value
     */
    protected function setCache(string $key, $value): void
    {
        $this->cache[$key] = [
            'value' => $value,
            'ttl' => \strtotime($this->ttl),
        ];
    }

    /**
     * Remove key from cache.
     *
     * @param string $key
     */
    protected function unsetCache(string $key): void
    {
        unset($this->cache[$key]);
    }

    protected function startCacheCleanupLoop(): void
    {
        Loop::repeat(\strtotime($this->ttlCheckInterval, 0) * 1000, fn () => $this->cleanupCache());
    }

    /**
     * Remove all keys from cache.
     */
    protected function cleanupCache(): void
    {
        $now = \time();
        $oldKeys = [];
        foreach ($this->cache as $cacheKey => $cacheValue) {
            if ($cacheValue['ttl'] < $now) {
                $oldKeys[] = $cacheKey;
            }
        }
        foreach ($oldKeys as $oldKey) {
            $this->unsetCache($oldKey);
        }

        Logger::log(
            \sprintf(
                "cache for table:%s; keys left: %s; keys removed: %s",
                $this->table,
                \count($this->cache),
                \count($oldKeys)
            ),
            Logger::VERBOSE
        );
    }
}


我要特别注意startCacheCleanupLoop。多亏了amphp的魔力,使缓存无效的过程变得尽可能简单。回调从指定的时间间隔开始,循环遍历所有值并查看ts字段,该字段存储了对该元素的最后一次调用的时间戳。如果呼叫时间超过5分钟(可在设置中配置),则该元素将被删除。使用amphp从redis或memcache实现ttl模拟非常容易。所有这些都在后台发生,并且不会阻塞主线程。



借助高速缓存和异步,不仅可以加快读取速度,而且还可以加快写入速度。



这是将数据写入数据库的方法的源代码。



/**
     * Set value for an offset.
     *
     * @link https://php.net/manual/en/arrayiterator.offsetset.php
     *
     * @param string $index <p>
     * The index to set for.
     * </p>
     * @param $value
     *
     * @throws \Throwable
     */

    public function offsetSet($index, $value): Promise
    {
        if ($this->getCache($index) === $value) {
            return call(fn () =>null);
        }

        $this->setCache($index, $value);

        $request = $this->request(
            "
            INSERT INTO `{$this->table}` 
            SET `key` = :index, `value` = :value 
            ON DUPLICATE KEY UPDATE `value` = :value
        ",
            [
                'index' => $index,
                'value' => \serialize($value),
            ]
        );

        //Ensure that cache is synced with latest insert in case of concurrent requests.
        $request->onResolve(fn () => $this->setCache($index, $value));

        return $request;
    }


$ this->请求创建一个Promise,该Promise异步写入数据。并且与缓存的操作同步发生。也就是说,您不必等待对数据库的写入,并且同时确保读取操作将立即开始返回新数据。



事实证明,amphp的onResolve方法非常有用。插入完成后,数据将再次写入高速缓存。如果某些写操作延迟,并且缓存和基址开始不同,则将使用最后写入基数的值来更新缓存。那些。我们的缓存将再次与基准保持一致。



资源



链接到拉取请求



就像另一个用户添加了postgre支持一样。只需5分钟即可编写说明



通过将重复的方法移到通用抽象类SqlArray中,可以减少代码量。



还有一件事



注意到从电报下载媒体文件时,标准的垃圾收集器php无法处理工作,并且文件的一部分保留在内存中。通常,泄漏与文件大小相同。可能的原因:累积10,000个链接时,垃圾收集器会自动触发。在我们的例子中,链接很少(几十个),但是每个链接都可以引用内存中的兆字节数据。使用mtproto实现研究数千行代码是非常懒惰的。为什么不先使用\ gc_collect_cycles();尝试优雅的拐杖?



令人惊讶的是,它解决了这个问题。这意味着配置定期清洁就足够了。幸运的是,amphp为指定间隔的后台执行提供了简单的工具。



每秒清除内存似乎太容易了,而且效果不佳。我确定了一种算法,用于检查自上次清理以来的内存增益。如果增益大于阈值,则会发生清除。



<?php

namespace danog\MadelineProto\MTProtoTools;

use Amp\Loop;
use danog\MadelineProto\Logger;

class GarbageCollector
{
    /**
     * Ensure only one instance of GarbageCollector
     * 		when multiple instances of MadelineProto running.
     * @var bool
     */
    public static bool $lock = false;

    /**
     * How often will check memory.
     * @var int
     */
    public static int $checkIntervalMs = 1000;

    /**
     * Next cleanup will be triggered when memory consumption will increase by this amount.
     * @var int
     */
    public static int $memoryDiffMb = 1;

    /**
     * Memory consumption after last cleanup.
     * @var int
     */
    private static int $memoryConsumption = 0;

    public static function start(): void
    {
        if (static::$lock) {
            return;
        }
        static::$lock = true;

        Loop::repeat(static::$checkIntervalMs, static function () {
            $currentMemory = static::getMemoryConsumption();
            if ($currentMemory > static::$memoryConsumption + static::$memoryDiffMb) {
                \gc_collect_cycles();
                static::$memoryConsumption = static::getMemoryConsumption();
                $cleanedMemory = $currentMemory - static::$memoryConsumption;
                Logger::log("gc_collect_cycles done. Cleaned memory: $cleanedMemory Mb", Logger::VERBOSE);
            }
        });
    }

    private static function getMemoryConsumption(): int
    {
        $memory = \round(\memory_get_usage()/1024/1024, 1);
        Logger::log("Memory consumption: $memory Mb", Logger::ULTRA_VERBOSE);
        return (int) $memory;
    }
}



All Articles