Оптимизация кодирования переменной длины
У меня есть случай, когда мне нужно сжать много часто небольших значений. Таким образом, я сжимаю их с байтовой кодировкой переменной длины (ULEB128, чтобы быть конкретным):
size_t
compress_unsigned_int(unsigned int n, char* data)
{
size_t size = 0;
while (n > 127)
{
++size;
*data++ = (n & 127)|128;
n >>= 7;
}
*data++ = n;
return ++size;
}
Есть ли более эффективный способ сделать это (возможно, используя SSE)?
Изменить: после этого сжатия результат сохраняется в data
, беря size
байты. Затем функция сжатия вызывается в следующем неподписанном int.
Ответы
Ответ 1
Если ваши значения unsigned int
ограничены определенным диапазоном - скажем, 32 бита - вы можете развернуть цикл:
size_t
compress_unsigned_int(unsigned int n, char* data)
{
size_t size;
if (n < 0x00000080U) {
size = 1;
goto b1;
}
if (n < 0x00004000U) {
size = 2;
goto b2;
}
if (n < 0x00200000U) {
size = 3;
goto b3;
}
if (n < 0x10000000U) {
size = 4;
goto b4;
}
size = 5;
*data++ = (n & 0x7f) | 0x80;
n >>= 7;
b4:
*data++ = (n & 0x7f) | 0x80;
n >>= 7;
b3:
*data++ = (n & 0x7f) | 0x80;
n >>= 7;
b2:
*data++ = (n & 0x7f) | 0x80;
n >>= 7;
b1:
*data = n;
return size;
}
Ответ 2
Первое, что вы хотите сделать, это проверить любое возможное решение против вашего текущего кода.
Я думаю, вы можете попытаться избавиться от зависимостей данных, чтобы процессор мог работать больше в одно и то же время.
Каковы зависимости от данных? Когда данные передаются через вашу функцию, текущее значение n
зависит от предыдущего значения n
, которое зависит от значения до этого... представляет собой длинную цепочку зависимостей данных. В приведенном ниже коде n
никогда не изменяется, поэтому процессор может "пропустить" и делать пару разных вещей одновременно, не дожидаясь вычисления нового n
.
// NOTE: This code is actually incorrect, as caf noted.
// The byte order is reversed.
size_t
compress_unsigned_int(unsigned int n, char *data)
{
if (n < (1U << 14)) {
if (n < (1U << 7)) {
data[0] = n;
return 1;
} else {
data[0] = (n >> 7) | 0x80;
data[1] = n & 0x7f;
return 2;
}
} else if (n < (1U << 28)) {
if (n < (1U << 21)) {
data[0] = (n >> 14) | 0x80;
data[1] = ((n >> 7) & 0x7f) | 0x80;
data[2] = n & 0x7f;
return 3;
} else {
data[0] = (n >> 21) | 0x80;
data[1] = ((n >> 14) & 0x7f) | 0x80;
data[2] = ((n >> 7) & 0x7f) | 0x80;
data[3] = n & 0x7f;
return 4;
}
} else {
data[0] = (n >> 28) | 0x80;
data[1] = ((n >> 21) & 0x7f) | 0x80;
data[2] = ((n >> 14) & 0x7f) | 0x80;
data[3] = ((n >> 7) & 0x7f) | 0x80;
data[4] = n & 0x7f;
return 5;
}
}
Я тестировал производительность, выполняя ее в узком цикле с 0..UINT_MAX. В моей системе время выполнения:
(Lower is better)
Original: 100%
caf unrolled version: 79%
My version: 57%
Некоторая незначительная настройка может привести к лучшим результатам, но я сомневаюсь, что вы получите гораздо больше улучшения, если не пойдете на сборку. Если ваши целые числа имеют определенные диапазоны, то вы можете использовать профилирование, чтобы заставить компилятор добавить правильные предсказания ветвей в каждую ветвь. Это может дать вам несколько дополнительных процентных пунктов скорости. ( EDIT: Я получил 8% от переупорядочивания ветвей, но это извращенная оптимизация, потому что он полагается на то, что каждый номер 0... UINT_MAX появляется с равной частотой. Я не рекомендую это. )
SSE не поможет. SSE предназначен для одновременной работы с несколькими частями данных с одинаковой шириной, что значительно затрудняет получение SIMD для ускорения чего-либо с кодировкой переменной длины. (Это не обязательно невозможно, но это может быть невозможно, и вам нужно быть довольно умным, чтобы понять это.)
Ответ 3
Быстрая реализация в буферах протокола google:
http://code.google.com/p/protobuf/
Посмотрите на методы CodedOutputStream:: WriteVarintXXX.
Первый метод может быть переписан как:
char *start = data;
while (n>=0x80)
{
*data++=(n|0x80);
n>>=7;
}
*data++=n;
return data-start;
Согласно моему тесту, реализация буферов google лучшая, а затем другие реализации. Однако мой тест довольно искусственный, лучше проверить каждый подход в своем приложении и выбрать лучшее. Представленные оптимизации лучше работают с определенными значениями числа.
Вот код моего тестового приложения. (Примечание: я удалил код из compress_unsigned_int_google_buf. Вы можете найти реализацию в следующем файле из протокола буфера google: метод coded_stream.cc CodedOutputStream:: WriteVarint32FallbackToArrayInline)
size_t compress_unsigned_int(unsigned int n, char* data)
{
size_t size = 0;
while (n > 127)
{
++size;
*data++ = (n & 127)|128;
n >>= 7;
}
*data++ = n;
return ++size;
}
size_t compress_unsigned_int_improved(unsigned int n, char* data)
{
size_t size;
if (n < 0x00000080U) {
size = 1;
goto b1;
}
if (n < 0x00004000U) {
size = 2;
goto b2;
}
if (n < 0x00200000U) {
size = 3;
goto b3;
}
if (n < 0x10000000U) {
size = 4;
goto b4;
}
size = 5;
*data++ = (n & 0x7f) | 0x80;
n >>= 7;
b4:
*data++ = (n & 0x7f) | 0x80;
n >>= 7;
b3:
*data++ = (n & 0x7f) | 0x80;
n >>= 7;
b2:
*data++ = (n & 0x7f) | 0x80;
n >>= 7;
b1:
*data = n;
return size;
}
size_t compress_unsigned_int_more_improved(unsigned int n, char *data)
{
if (n < (1U << 14)) {
if (n < (1U << 7)) {
data[0] = n;
return 1;
} else {
data[0] = (n >> 7) | 0x80;
data[1] = n & 0x7f;
return 2;
}
} else if (n < (1U << 28)) {
if (n < (1U << 21)) {
data[0] = (n >> 14) | 0x80;
data[1] = ((n >> 7) & 0x7f) | 0x80;
data[2] = n & 0x7f;
return 3;
} else {
data[0] = (n >> 21) | 0x80;
data[1] = ((n >> 14) & 0x7f) | 0x80;
data[2] = ((n >> 7) & 0x7f) | 0x80;
data[3] = n & 0x7f;
return 4;
}
} else {
data[0] = (n >> 28) | 0x80;
data[1] = ((n >> 21) & 0x7f) | 0x80;
data[2] = ((n >> 14) & 0x7f) | 0x80;
data[3] = ((n >> 7) & 0x7f) | 0x80;
data[4] = n & 0x7f;
return 5;
}
}
size_t compress_unsigned_int_simple(unsigned int n, char *data)
{
char *start = data;
while (n>=0x80)
{
*data++=(n|0x80);
n>>=7;
}
*data++=n;
return data-start;
}
inline size_t compress_unsigned_int_google_buf(unsigned int value, unsigned char* target) {
// This implementation might be found in google protocol buffers
}
#include <iostream>
#include <Windows.h>
using namespace std;
int _tmain(int argc, _TCHAR* argv[])
{
char data[20];
unsigned char udata[20];
size_t size = 0;
__int64 timer;
cout << "Plain copy: ";
timer = GetTickCount64();
size = 0;
for (int i=0; i<536870900; i++)
{
memcpy(data,&i,sizeof(i));
size += sizeof(i);
}
cout << GetTickCount64() - timer << " Size: " << size << endl;
cout << "Original: ";
timer = GetTickCount64();
size = 0;
for (int i=0; i<536870900; i++)
{
size += compress_unsigned_int(i,data);
}
cout << GetTickCount64() - timer << " Size: " << size << endl;
cout << "Improved: ";
timer = GetTickCount64();
size = 0;
for (int i=0; i<536870900; i++)
{
size += compress_unsigned_int_improved(i,data);
}
cout << GetTickCount64() - timer << " Size: " << size << endl;
cout << "More Improved: ";
timer = GetTickCount64();
size = 0;
for (int i=0; i<536870900; i++)
{
size += compress_unsigned_int_more_improved(i,data);
}
cout << GetTickCount64() - timer << " Size: " << size << endl;
cout << "Simple: ";
timer = GetTickCount64();
size = 0;
for (int i=0; i<536870900; i++)
{
size += compress_unsigned_int_simple(i,data);
}
cout << GetTickCount64() - timer << " Size: " << size << endl;
cout << "Google Buffers: ";
timer = GetTickCount64();
size = 0;
for (int i=0; i<536870900; i++)
{
size += compress_unsigned_int_google_buf(i,udata);
}
cout << GetTickCount64() - timer << " Size: " << size << endl;
return 0;
}
На моей машине с компилятором Visual С++ у меня есть следующие результаты:
Обычная копия: 358 мс
Оригинал: 2497 мс
Улучшено: 2215 мс
Улучшено: 2231 мс
Простой: 2059 мс
Буферы Google: 968 мс
Ответ 4
После большего просмотра я нашел еще одну часто используемую реализацию в Sqlite3 (версия кода 3070900):
inline int sqlite3PutVarint(unsigned char *p, unsigned __int64 v){
int i, j, n;
unsigned char buf[10];
if( v & (((unsigned __int64)0xff000000)<<32) ){
p[8] = (unsigned char)v;
v >>= 8;
for(i=7; i>=0; i--){
p[i] = (unsigned char)((v & 0x7f) | 0x80);
v >>= 7;
}
return 9;
}
n = 0;
do{
buf[n++] = (unsigned char)((v & 0x7f) | 0x80);
v >>= 7;
}while( v!=0 );
buf[0] &= 0x7f;
for(i=0, j=n-1; j>=0; j--, i++){
p[i] = buf[j];
}
return n;
}
Существует также небольшая оптимизированная версия для 32-битного int:
int sqlite3PutVarint32(unsigned char *p, unsigned int v){
if( (v & ~0x7f)==0 ){
p[0] = v;
return 1;
}
if( (v & ~0x3fff)==0 ){
p[0] = (unsigned char)((v>>7) | 0x80);
p[1] = (unsigned char)(v & 0x7f);
return 2;
}
return sqlite3PutVarint(p, v);
}
Разочаровывает то, что реализация Sqlite выполняет самое худшее в моем тесте. Поэтому, если вы собираетесь использовать Sqlite, рассмотрите возможность замены реализации по умолчанию на оптимизированную.
Между тем я думаю о дальнейших возможных оптимизации.
Ответ 5
Вы можете сэкономить несколько операций, заменив
size_t size=0;...++size;...;return size++;
с помощью
char* base=data;...;return data-base;
Ответ 6
Здесь моя оптимизация на языке ассемблера x86 (32 бит). Вы можете скомпилировать NASM и ссылку. Я не знаю, было ли это быстро или медленно, я просто получал удовольствие от кодирования:)
global compress_unsigned_int
; bit fields:
; 31 0
; eeeedddddddcccccccbbbbbbbaaaaaaa
compress_unsigned_int:
mov eax, [esp+4] ; n
mov ecx, [esp+8] ; data
cmp eax, 00001111111111111111111111111111b
jbe out4b
shld edx, eax, 11
shl eax, 10
shld edx, eax, 8
shl eax, 7
shld edx, eax, 8
shl eax, 7
shld edx, eax, 8
or edx, 10000000100000001000000010000000b
mov [ecx], edx
mov eax, [esp+4]
shr eax, 28
mov [ecx+4], al
mov eax, 5
jmp exit
out4b:
cmp eax, 00000000000111111111111111111111b
jbe out3b
shld edx, eax, 11
shl eax, 10
shld edx, eax, 8
shl eax, 7
shld edx, eax, 8
shl eax, 7
shld edx, eax, 8
or edx, 00000000100000001000000010000000b
mov [ecx], edx
mov eax, 4
jmp exit
out3b:
cmp eax, 00000000000000000011111111111111b
jbe out2b
shld edx, eax, 25
shl eax, 24
shld edx, eax, 8
mov eax, edx
or edx, 00000000000000001000000010000000b
mov [ecx], dx
shr eax, 15
mov [ecx+2], al
mov eax, 3
jmp exit
out2b:
cmp eax, 00000000000000000000000001111111b
jbe out1b
shld edx, eax, 25
shl eax, 24
shld edx, eax, 8
or edx, 00000000000000000000000010000000b
mov [ecx], dx
mov eax, 2
jmp exit
out1b:
mov [ecx], al
mov eax, 1
exit:
ret