По-видимому, можно атомарно увеличивать два целых числа с помощью команд сравнения и свопинга. Этот разговор утверждает, что такой алгоритм существует, но он не детализирует, как он выглядит.
(Обратите внимание, что очевидное решение по приращению целых чисел один за другим не является атомарным. Кроме того, наложение нескольких целых чисел на одно машинное слово не учитывается, потому что оно ограничивает возможный диапазон.)
Ответ 2
Если sse2 доступен, вы можете использовать paddq для добавления 2 64-битных целых чисел в два других 64-битных целых числа в одной инструкции.
#include "emmintrin.h"
//initialize your values somewhere:
//const __m128i ones = _mm_set1_epi64x(1);
//volatile register __m128i vars =
// _mm_set_epi64x(24,7);
static inline __m128i inc_both(__m128i vars, __m128i ones){
return _mm_add_epi64(vars,ones);
}
Это должно скомпилировать
paddq %xmm0, %xmm1
Так как он является статическим, он может использовать другие регистры xmm. Если имеется значительное давление в регистре, то один из операндов может стать одним (℅rip)
Примечание: это может использоваться для добавления значений, отличных от 1, и для большинства других команд математики, побитового и сравнения требуется аналогичные операции, если они вам понадобятся.
Таким образом, вы можете использовать префикс блокировки и превратить его в встроенный макрос asm
#define inc64x2(vars) asm volatile( \
"paddq %0, %1\n":"+x"(vars):"x"(ones) \
);
Неонный эквивалент руки - это что-то вроде: vaddq_s64 (...), но есть хорошая статья об эквивалентах arm/x86 здесь.
Ответ 3
У меня есть решение, которое я тестировал. Содержащийся здесь суп - орехи, подтверждающие концептуальную программу.
Алгоритм - это "использование логического элемента потока данных CAS" как 3-е целое число. Я смотрел видео разговора дважды, и я считаю, что это подходит. Возможно, это не тот алгоритм, о котором думал ведущий, но он работает.
Значения X и Y могут быть в любом месте в памяти, и программа помещает их достаточно далеко друг от друга, чтобы они находились в разных строках кэша. Это не имеет большого значения.
Краткое описание алгоритма:
Каждый поток имеет unique id number
или tid
(ненулевой), взятый из одного из любимых источников: pthead_t
, getpid
, gettid
, make one up by whatever means you want
. В программе он просто назначает их последовательно, начиная с 1.
Каждый поток будет вызывать функцию приращения с этим номером.
Функция приращения будет вращаться по глобальной переменной gate
, используя CAS со старым значением 0 и новым значением tid
.
Когда CAS преуспевает, поток теперь "владеет" вещами. Другими словами, если gate
равно нулю, это для захватов. Не равным нулю значением является tid
владельца, а gate
заблокировано.
Теперь владелец может увеличивать значения X
и Y
с помощью простых x += 1
и y += 1
.
После этого функция increment освобождается, делая значение 0 в gate
.
Вот диагностическая/доказательная концепция со всем. Сам алгоритм не имеет ограничений, но я закодировал его для своей машины.
Некоторые оговорки:
- Он принимает
gcc
/clang
- Он предполагает 64-битную арку
x86_64
.
- Это было закодировано с использованием только встроенного asm и не нуждается в [и не использует каких-либо] компиляторов
atomic
для ясности, простоты и прозрачности.
- Это было построено под Linux, но должно работать на любой "разумной" машине x86/OS (например, BSD, OSX должно быть хорошо, возможно, cygwin и возможно mingw)
- Другие арки в порядке, если они поддерживают CAS, я просто не кодировал их (например,
arm
может работать, если вы кодируете CAS с парами ldex/stex
)
- Достаточно абстрактных примитивов, чтобы это было/должно быть легким.
- Никаких попыток совместимости с Windows [если вы этого хотите, сделайте свой собственный порт, но не посылайте мне никаких слез или комментариев: -)].
- В make файле и программе по умолчанию установлены лучшие значения
- Некоторым процессорам x86, возможно, потребуется использовать разные значения по умолчанию (например, инструкции о необходимости забора). См. Make файл.
В любом случае, вот оно:
// caslock -- prove cas lock algorithm
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <pthread.h>
#define systls __thread
// repeat the madness only once
#ifdef __clang__
#define inline_common inline
#else
#define inline_common static inline
#endif
#define inline_always inline_common __attribute__((__always_inline__))
#define inline_never __attribute__((__noinline__))
// WARNING: inline CAS fails for gcc but works for clang!
#if _USE_CASINLINE_
#define inline_cas inline_always
#else
#define inline_cas inline_never
#endif
typedef unsigned int u32;
typedef unsigned long long u64;
#ifndef LOOPMAX
#define LOOPMAX 1000000
#endif
#ifndef TIDMAX
#define TIDMAX 20
#endif
#if _USE_VPTR_
typedef volatile u32 *xptr32_p;
typedef volatile u64 *xptr64_p;
#else
typedef u32 *xptr32_p;
typedef u64 *xptr64_p;
#endif
#if _USE_TID64_
typedef u64 tid_t;
#define tidload(_xptr) loadu64(_xptr)
#define tidcas(_xptr,_oval,_nval) casu64(_xptr,_oval,_nval)
#define tidstore(_xptr,_nval) storeu64(_xptr,_nval)
#else
typedef u32 tid_t;
#define tidload(_xptr) loadu32(_xptr)
#define tidcas(_xptr,_oval,_nval) casu32(_xptr,_oval,_nval)
#define tidstore(_xptr,_nval) storeu32(_xptr,_nval)
#endif
tid_t tidgate; // gate control
tid_t readycnt; // number of threads ready
tid_t donecnt; // number of threads complete
// ensure that the variables are nowhere near each other
u64 ary[100];
#define kickoff ary[32] // sync to fire threads
#define xval ary[31] // the X value
#define yval ary[87] // the Y value
int inctype; // increment algorithm to use
tid_t tidmax; // maximum number of tasks
u64 loopmax; // loop maximum for each task
// task control
struct tsk {
tid_t tsk_tid; // task id
u32 tsk_casmiss; // cas miss count
};
typedef struct tsk tsk_t;
tsk_t *tsklist; // task list
systls tsk_t *tskcur; // current task block
// show progress
#define PGR(_pgr) \
do { \
fputs(_pgr,stdout); \
fflush(stdout); \
} while (0)
// NOTE: some x86 arches need fence instructions
// 0 -- no fence instructions
// 1 -- use mfence
// 2 -- use lfence/sfence
#if _USE_BARRIER_ == 0
#define BARRIER_RELEASE ""
#define BARRIER_ACQUIRE ""
#define BARRIER_ALL ""
#elif _USE_BARRIER_ == 1
#define BARRIER_ACQUIRE "\tmfence\n"
#define BARRIER_RELEASE "\tmfence\n"
#define BARRIER_ALL "\tmfence\n"
#elif _USE_BARRIER_ == 2
#define BARRIER_ACQUIRE "\tlfence\n"
#define BARRIER_RELEASE "\tsfence\n"
#define BARRIER_ALL "\tmfence\n"
#else
#error caslock: unknown barrier type
#endif
// barrier_acquire -- acquire barrier
inline_always void
barrier_acquire(void)
{
__asm__ __volatile__ (
BARRIER_ACQUIRE
:
:
: "memory");
}
// barrier_release -- release barrier
inline_always void
barrier_release(void)
{
__asm__ __volatile__ (
BARRIER_RELEASE
:
:
: "memory");
}
// barrier -- barrier
inline_always void
barrier(void)
{
__asm__ __volatile__ (
BARRIER_ALL
:
:
: "memory");
}
// casu32 -- compare and exchange four bytes
// RETURNS: 1=ok, 0=fail
inline_cas int
casu32(xptr32_p xptr,u32 oldval,u32 newval)
{
char ok;
__asm__ __volatile__ (
" lock\n"
" cmpxchg %[newval],%[xptr]\n"
" sete %[ok]\n"
: [ok] "=r" (ok),
[xptr] "=m" (*xptr)
: "a" (oldval),
[newval] "r" (newval)
: "memory");
return ok;
}
// casu64 -- compare and exchange eight bytes
// RETURNS: 1=ok, 0=fail
inline_cas int
casu64(xptr64_p xptr,u64 oldval,u64 newval)
{
char ok;
__asm__ __volatile__ (
" lock\n"
" cmpxchg %[newval],%[xptr]\n"
" sete %[ok]\n"
: [ok] "=r" (ok),
[xptr] "=m" (*xptr)
: "a" (oldval),
[newval] "r" (newval)
: "memory");
return ok;
}
// loadu32 -- load value with barrier
// RETURNS: loaded value
inline_always u32
loadu32(const xptr32_p xptr)
{
u32 val;
barrier_acquire();
val = *xptr;
return val;
}
// loadu64 -- load value with barrier
// RETURNS: loaded value
inline_always u64
loadu64(const xptr64_p xptr)
{
u64 val;
barrier_acquire();
val = *xptr;
return val;
}
// storeu32 -- store value with barrier
inline_always void
storeu32(xptr32_p xptr,u32 val)
{
*xptr = val;
barrier_release();
}
// storeu64 -- store value with barrier
inline_always void
storeu64(xptr64_p xptr,u64 val)
{
*xptr = val;
barrier_release();
}
// qsleep -- do a quick sleep
inline_always void
qsleep(int bigflg)
{
struct timespec ts;
if (bigflg) {
ts.tv_sec = 1;
ts.tv_nsec = 0;
}
else {
ts.tv_sec = 0;
ts.tv_nsec = 1000;
}
nanosleep(&ts,NULL);
}
// incby_tidgate -- increment by using thread id gate
void
incby_tidgate(tid_t tid)
// tid -- unique id for accessing entity (e.g. thread id)
{
tid_t *gptr;
tid_t oval;
gptr = &tidgate;
// acquire the gate
while (1) {
oval = 0;
// test mode -- just do a nop instead of CAS to prove diagnostic
#if _USE_CASOFF_
*gptr = oval;
break;
#else
if (tidcas(gptr,oval,tid))
break;
#endif
++tskcur->tsk_casmiss;
}
#if _USE_INCBARRIER_
barrier_acquire();
#endif
// increment the values
xval += 1;
yval += 1;
#if _USE_INCBARRIER_
barrier_release();
#endif
// release the gate
// NOTE: CAS will always provide a barrier
#if _USE_CASPOST_ && (_USE_CASOFF_ == 0)
oval = tidcas(gptr,tid,0);
#else
tidstore(gptr,0);
#endif
}
// tskcld -- child task
void *
tskcld(void *arg)
{
tid_t tid;
tid_t oval;
u64 loopcur;
tskcur = arg;
tid = tskcur->tsk_tid;
// tell master thread that we're fully ready
while (1) {
oval = tidload(&readycnt);
if (tidcas(&readycnt,oval,oval + 1))
break;
}
// wait until we're given the starting gun
while (1) {
if (loadu64(&kickoff))
break;
qsleep(0);
}
// do the increments
for (loopcur = loopmax; loopcur > 0; --loopcur)
incby_tidgate(tid);
barrier();
// tell master thread that we're fully complete
while (1) {
oval = tidload(&donecnt);
if (tidcas(&donecnt,oval,oval + 1))
break;
}
return (void *) 0;
}
// tskstart -- start a child task
void
tskstart(tid_t tid)
{
pthread_attr_t attr;
pthread_t thr;
int err;
tsk_t *tsk;
tsk = tsklist + tid;
tsk->tsk_tid = tid;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr,1);
err = pthread_create(&thr,&attr,tskcld,tsk);
pthread_attr_destroy(&attr);
if (err)
printf("tskstart: error -- err=%d\n",err);
}
// tskall -- run a single test
void
tskall(void)
{
tid_t tidcur;
tsk_t *tsk;
u64 incmax;
u64 val;
int err;
xval = 0;
yval = 0;
kickoff = 0;
readycnt = 0;
donecnt = 0;
tidgate = 0;
// prealloc the task blocks
tsklist = calloc(tidmax + 1,sizeof(tsk_t));
// start all tasks
PGR(" St");
for (tidcur = 1; tidcur <= tidmax; ++tidcur)
tskstart(tidcur);
// wait for all tasks to be fully ready
PGR(" Sw");
while (1) {
if (tidload(&readycnt) == tidmax)
break;
qsleep(1);
}
// the starting gun -- all tasks are waiting for this
PGR(" Ko");
storeu64(&kickoff,1);
// wait for all tasks to be fully done
PGR(" Wd");
while (1) {
if (tidload(&donecnt) == tidmax)
break;
qsleep(1);
}
PGR(" Done\n");
// check the final count
incmax = loopmax * tidmax;
// show per-task statistics
for (tidcur = 1; tidcur <= tidmax; ++tidcur) {
tsk = tsklist + tidcur;
printf("tskall: tsk=%llu tsk_casmiss=%d (%.3f%%)\n",
(u64) tidcur,tsk->tsk_casmiss,(double) tsk->tsk_casmiss / loopmax);
}
err = 0;
// check for failure
val = loadu64(&xval);
if (val != incmax) {
printf("tskall: xval fault -- xval=%lld incmax=%lld\n",val,incmax);
err = 1;
}
// check for failure
val = loadu64(&yval);
if (val != incmax) {
printf("tskall: yval fault -- yval=%lld incmax=%lld\n",val,incmax);
err = 1;
}
if (! err)
printf("tskall: SUCCESS\n");
free(tsklist);
}
// main -- master control
int
main(void)
{
loopmax = LOOPMAX;
tidmax = TIDMAX;
inctype = 0;
tskall();
return 0;
}
Вот Makefile. Извините за дополнительный шаблон:
# caslock/Makefile -- make file for caslock
#
# options:
# LOOPMAX -- maximum loops / thread
#
# TIDMAX -- maximum number of threads
#
# BARRIER -- generate fence/barrier instructions
# 0 -- none
# 1 -- use mfence everywhere
# 2 -- use lfence for acquire, sfence for release
#
# CASOFF -- disable CAS to prove diagnostic works
# 0 -- normal mode
# 1 -- inhibit CAS during X/Y increment
#
# CASINLINE -- inline the CAS functions
# 0 -- do _not_ inline
# 1 -- inline them (WARNING: this fails for gcc but works for clang!)
#
# CASPOST -- increment gate release mode
# 0 -- use fenced store
# 1 -- use CAS store (NOTE: not really required)
#
# INCBARRIER -- use extra barriers around increments
# 0 -- rely on CAS for barrier
# 1 -- add extra safety barriers immediately before increment of X/Y
#
# TID64 -- use 64 bit thread "id"s
# 0 -- use 32 bit
# 1 -- use 64 bit
#
# VPTR -- use volatile pointers in function definitions
# 0 -- use ordinary pointers
# 1 -- use volatile pointers (NOTE: not really required)
ifndef _CASLOCK_MK_
_CASLOCK_MK_ = 1
OLIST += caslock.o
ifndef LOOPMAX
LOOPMAX = 1000000
endif
ifndef TIDMAX
TIDMAX = 20
endif
ifndef BARRIER
BARRIER = 0
endif
ifndef CASINLINE
CASINLINE = 0
endif
ifndef CASOFF
CASOFF = 0
endif
ifndef CASPOST
CASPOST = 0
endif
ifndef INCBARRIER
INCBARRIER = 0
endif
ifndef TID64
TID64 = 0
endif
ifndef VPTR
VPTR = 0
endif
CFLAGS += -DLOOPMAX=$(LOOPMAX)
CFLAGS += -DTIDMAX=$(TIDMAX)
CFLAGS += -D_USE_BARRIER_=$(BARRIER)
CFLAGS += -D_USE_CASINLINE_=$(CASINLINE)
CFLAGS += -D_USE_CASOFF_=$(CASOFF)
CFLAGS += -D_USE_CASPOST_=$(CASPOST)
CFLAGS += -D_USE_INCBARRIER_=$(INCBARRIER)
CFLAGS += -D_USE_TID64_=$(TID64)
CFLAGS += -D_USE_VPTR_=$(VPTR)
STDLIB += -lpthread
ALL += caslock
CLEAN += caslock
OVRPUB := 1
ifndef OVRTOP
OVRTOP := $(shell pwd)
OVRTOP := $(dir $(OVRTOP))
endif
endif
# ovrlib/rules.mk -- rules control
#
# options:
# GDB -- enable debug symbols
# 0 -- normal
# 1 -- use -O0 and define _USE_GDB_=1
#
# CLANG -- use clang instead of gcc
# 0 -- use gcc
# 1 -- use clang
#
# BNC -- enable benchmarks
# 0 -- normal mode
# 1 -- enable benchmarks for function enter/exit pairs
ifdef OVRPUB
ifndef SDIR
SDIR := $(shell pwd)
STAIL := $(notdir $(SDIR))
endif
ifndef GENTOP
GENTOP := $(dir $(SDIR))
endif
ifndef GENDIR
GENDIR := $(GENTOP)/$(STAIL)
endif
ifndef ODIR
ODIR := $(GENDIR)
endif
PROTOLST := true
PROTOGEN := @true
endif
ifndef SDIR
$(error rules: SDIR not defined)
endif
ifndef ODIR
$(error rules: ODIR not defined)
endif
ifndef GENDIR
$(error rules: GENDIR not defined)
endif
ifndef GENTOP
$(error rules: GENTOP not defined)
endif
ifndef _RULES_MK_
_RULES_MK_ = 1
CLEAN += *.proto
CLEAN += *.a
CLEAN += *.o
CLEAN += *.i
CLEAN += *.dis
CLEAN += *.TMP
QPROTO := $(shell $(PROTOLST) -i -l -O$(GENTOP) $(SDIR)/*.c $(CPROTO))
HDEP += $(QPROTO)
###VPATH += $(GENDIR)
###VPATH += $(SDIR)
ifdef INCLUDE_MK
-include $(INCLUDE_MK)
endif
ifdef GSYM
CFLAGS += -gdwarf-2
endif
ifdef GDB
CFLAGS += -gdwarf-2
DFLAGS += -D_USE_GDB_
else
CFLAGS += -O2
endif
ifndef ZPRT
DFLAGS += -D_USE_ZPRT_=0
endif
ifdef BNC
DFLAGS += -D_USE_BNC_=1
endif
ifdef CLANG
CC := clang
endif
DFLAGS += -I$(GENTOP)
DFLAGS += -I$(OVRTOP)
CFLAGS += -Wall -Werror
CFLAGS += -Wno-unknown-pragmas
CFLAGS += -Wempty-body
CFLAGS += -fno-diagnostics-color
# NOTE: we now need this to prevent inlining (enabled at -O2)
ifndef CLANG
CFLAGS += -fno-inline-small-functions
endif
# NOTE: we now need this to prevent inlining (enabled at -O3)
CFLAGS += -fno-inline-functions
CFLAGS += $(DFLAGS)
endif
all: $(PREP) proto $(ALL)
%.o: %.c $(HDEP)
$(CC) $(CFLAGS) -c -o $*.o $<
%.i: %.c
cpp $(DFLAGS) -P $*.c > $*.i
%.s: %.c
$(CC) $(CFLAGS) -S -o $*.s $<
# build a library (type (2) build)
$(LIBNAME):: $(OLIST)
ar rv [email protected] $(OLIST)
.PHONY: proto
proto::
$(PROTOGEN) -i -v -O$(GENTOP) $(SDIR)/*.c $(CPROTO)
.PHONY: clean
clean::
rm -f $(CLEAN)
.PHONY: help
help::
egrep '^#' Makefile
caslock:: $(OLIST) $(LIBLIST) $(STDLIB)
$(CC) $(CFLAGS) -o caslock $(OLIST) $(LIBLIST) $(STDLIB)
ПРИМЕЧАНИЕ. Я, возможно, взорвал некоторые ограничения asm, потому что, выполняя функцию CAS как встроенную, компиляция с gcc
приводит к неправильным результатам. Однако clang
отлично работает с inline. Таким образом, по умолчанию функция CAS не является встроенной. Для согласованности я не использовал разные значения по умолчанию для gcc/clang, хотя мог.
Здесь разбор соответствующей функции с встроенной строкой, созданной gcc
(это не удается):
00000000004009c0 <incby_tidgate>:
4009c0: 31 c0 xor %eax,%eax
4009c2: f0 0f b1 3d 3a 1a 20 lock cmpxchg %edi,0x201a3a(%rip) # 602404 <tidgate>
4009c9: 00
4009ca: 0f 94 c2 sete %dl
4009cd: 84 d2 test %dl,%dl
4009cf: 75 23 jne 4009f4 <L01>
4009d1: 0f 1f 80 00 00 00 00 nopl 0x0(%rax)
4009d8:L00 64 48 8b 14 25 f8 ff mov %fs:0xfffffffffffffff8,%rdx
4009df: ff ff
4009e1: 83 42 04 01 addl $0x1,0x4(%rdx)
4009e5: f0 0f b1 3d 17 1a 20 lock cmpxchg %edi,0x201a17(%rip) # 602404 <tidgate>
4009ec: 00
4009ed: 0f 94 c2 sete %dl
4009f0: 84 d2 test %dl,%dl
4009f2: 74 e4 je 4009d8 <L00>
4009f4:L01 48 83 05 dc 17 20 00 addq $0x1,0x2017dc(%rip) # 6021d8 <ary+0xf8>
4009fb: 01
4009fc: 48 83 05 94 19 20 00 addq $0x1,0x201994(%rip) # 602398 <ary+0x2b8>
400a03: 01
400a04: c7 05 f6 19 20 00 00 movl $0x0,0x2019f6(%rip) # 602404 <tidgate>
400a0b: 00 00 00
400a0e: c3 retq
Здесь разбор соответствующей функции с встроенной строкой, построенной с помощью clang
(это выполняется):
0000000000400990 <incby_tidgate>:
400990: 31 c0 xor %eax,%eax
400992: f0 0f b1 3d 3a 1a 20 lock cmpxchg %edi,0x201a3a(%rip) # 6023d4 <tidgate>
400999: 00
40099a: 0f 94 c0 sete %al
40099d: eb 1a jmp 4009b9 <L01>
40099f: 90 nop
4009a0:L00 64 48 8b 04 25 f8 ff mov %fs:0xfffffffffffffff8,%rax
4009a7: ff ff
4009a9: ff 40 04 incl 0x4(%rax)
4009ac: 31 c0 xor %eax,%eax
4009ae: f0 0f b1 3d 1e 1a 20 lock cmpxchg %edi,0x201a1e(%rip) # 6023d4 <tidgate>
4009b5: 00
4009b6: 0f 94 c0 sete %al
4009b9:L01 84 c0 test %al,%al
4009bb: 74 e3 je 4009a0 <L00>
4009bd: 48 ff 05 e4 17 20 00 incq 0x2017e4(%rip) # 6021a8 <ary+0xf8>
4009c4: 48 ff 05 9d 19 20 00 incq 0x20199d(%rip) # 602368 <ary+0x2b8>
4009cb: c7 05 ff 19 20 00 00 movl $0x0,0x2019ff(%rip) # 6023d4 <tidgate>
4009d2: 00 00 00
4009d5: c3 retq
4009d6: 66 2e 0f 1f 84 00 00 nopw %cs:0x0(%rax,%rax,1)
4009dd: 00 00 00