From: Michael Meskes Date: Tue, 2 Oct 2007 09:50:00 +0000 (+0000) Subject: ITAGAKI Takahiro added thread-safe X-Git-Tag: REL9_0_0~4907 X-Git-Url: http://git.osdn.net/view?a=commitdiff_plain;h=0c2eb200d6e5fb59443ab31c6e9c38db9e2c7a7f;p=pg-rex%2Fsyncrep.git ITAGAKI Takahiro added thread-safe descriptor handling --- diff --git a/src/interfaces/ecpg/ChangeLog b/src/interfaces/ecpg/ChangeLog index b7d33ff48a..96028e23ca 100644 --- a/src/interfaces/ecpg/ChangeLog +++ b/src/interfaces/ecpg/ChangeLog @@ -2247,5 +2247,10 @@ Sun, 30 Sep 2007 13:37:31 +0200 - Applied another patch by ITAGAKI Takahiro to get memory allocation thread-safe. He also did some cleaning up. + +Tue, 02 Oct 2007 11:32:25 +0200 + + - ITAGAKI Takahiro added thread-safe + descriptor handling - Set ecpg library version to 6.0. - Set ecpg version to 4.4. diff --git a/src/interfaces/ecpg/ecpglib/connect.c b/src/interfaces/ecpg/ecpglib/connect.c index 8d4146cfae..f7ebbc8c0c 100644 --- a/src/interfaces/ecpg/ecpglib/connect.c +++ b/src/interfaces/ecpg/ecpglib/connect.c @@ -1,4 +1,4 @@ -/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/connect.c,v 1.44 2007/09/30 11:38:48 meskes Exp $ */ +/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/connect.c,v 1.45 2007/10/02 09:49:59 meskes Exp $ */ #define POSTGRES_ECPG_INTERNAL #include "postgres_fe.h" @@ -447,6 +447,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p this->cache_head = NULL; this->prep_stmts = NULL; + this->descriptors = NULL; if (all_connections == NULL) this->next = NULL; diff --git a/src/interfaces/ecpg/ecpglib/descriptor.c b/src/interfaces/ecpg/ecpglib/descriptor.c index f70aaa42f8..d278a74434 100644 --- a/src/interfaces/ecpg/ecpglib/descriptor.c +++ b/src/interfaces/ecpg/ecpglib/descriptor.c @@ -1,12 +1,13 @@ /* dynamic SQL support routines * - * $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/descriptor.c,v 1.23 2007/08/14 10:01:52 meskes Exp $ + * $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/descriptor.c,v 1.24 2007/10/02 09:49:59 meskes Exp $ */ #define POSTGRES_ECPG_INTERNAL #include "postgres_fe.h" #include "pg_type.h" +#include "ecpg-pthread-win32.h" #include "ecpgtype.h" #include "ecpglib.h" #include "ecpgerrno.h" @@ -14,18 +15,55 @@ #include "sqlca.h" #include "sql3types.h" -struct descriptor *all_descriptors = NULL; +static void descriptor_free(struct descriptor *desc); +static void descriptor_deallocate_all(struct descriptor *list); + +/* We manage descriptors separately for each thread. */ +#ifdef ENABLE_THREAD_SAFETY +static pthread_key_t descriptor_key; +#ifndef WIN32 +static pthread_once_t descriptor_once = PTHREAD_ONCE_INIT; +#endif + +static void +descriptor_destructor(void *arg) +{ + descriptor_deallocate_all(arg); +} + +NON_EXEC_STATIC void +descriptor_key_init(void) +{ + pthread_key_create(&descriptor_key, descriptor_destructor); +} + +static struct descriptor * +get_descriptors(void) +{ + pthread_once(&descriptor_once, descriptor_key_init); + return (struct descriptor *) pthread_getspecific(descriptor_key); +} + +static void +set_descriptors(struct descriptor *value) +{ + pthread_setspecific(descriptor_key, value); +} + +#else +static struct descriptor *all_descriptors = NULL; +#define get_descriptors() (all_descriptors) +#define set_descriptors(value) do { all_descriptors = (value); } while(0) +#endif /* old internal convenience function that might go away later */ -static PGresult - * +static PGresult * ECPGresultByDescriptor(int line, const char *name) { - PGresult **resultpp = ECPGdescriptor_lvalue(line, name); - - if (resultpp) - return *resultpp; - return NULL; + struct descriptor *desc = ECPGfind_desc(line, name); + if (desc == NULL) + return NULL; + return desc->result; } static unsigned int @@ -445,20 +483,9 @@ ECPGget_desc(int lineno, const char *desc_name, int index,...) bool ECPGset_desc_header(int lineno, const char *desc_name, int count) { - struct descriptor *desc; - - for (desc = all_descriptors; desc; desc = desc->next) - { - if (strcmp(desc_name, desc->name) == 0) - break; - } - + struct descriptor *desc = ECPGfind_desc(lineno, desc_name); if (desc == NULL) - { - ECPGraise(lineno, ECPG_UNKNOWN_DESCRIPTOR, ECPG_SQLSTATE_INVALID_SQL_DESCRIPTOR_NAME, desc_name); return false; - } - desc->count = count; return true; } @@ -471,17 +498,9 @@ ECPGset_desc(int lineno, const char *desc_name, int index,...) struct descriptor_item *desc_item; struct variable *var; - for (desc = all_descriptors; desc; desc = desc->next) - { - if (strcmp(desc_name, desc->name) == 0) - break; - } - + desc = ECPGfind_desc(lineno, desc_name); if (desc == NULL) - { - ECPGraise(lineno, ECPG_UNKNOWN_DESCRIPTOR, ECPG_SQLSTATE_INVALID_SQL_DESCRIPTOR_NAME, desc_name); return false; - } for (desc_item = desc->items; desc_item; desc_item = desc_item->next) { @@ -506,7 +525,7 @@ ECPGset_desc(int lineno, const char *desc_name, int index,...) va_start(args, index); - do + for (;;) { enum ECPGdtype itemtype; const char *tobeinserted = NULL; @@ -585,40 +604,50 @@ ECPGset_desc(int lineno, const char *desc_name, int index,...) return false; } } - } while (true); + } ECPGfree(var); return true; } +/* Free the descriptor and items in it. */ +static void +descriptor_free(struct descriptor *desc) +{ + struct descriptor_item *desc_item; + + for (desc_item = desc->items; desc_item;) + { + struct descriptor_item *di; + + ECPGfree(desc_item->data); + di = desc_item; + desc_item = desc_item->next; + ECPGfree(di); + } + + ECPGfree(desc->name); + PQclear(desc->result); + ECPGfree(desc); +} + bool ECPGdeallocate_desc(int line, const char *name) { struct descriptor *desc; - struct descriptor **lastptr = &all_descriptors; + struct descriptor *prev; struct sqlca_t *sqlca = ECPGget_sqlca(); ECPGinit_sqlca(sqlca); - for (desc = all_descriptors; desc; lastptr = &desc->next, desc = desc->next) + for (desc = get_descriptors(), prev = NULL; desc; prev = desc, desc = desc->next) { if (!strcmp(name, desc->name)) { - struct descriptor_item *desc_item; - - for (desc_item = desc->items; desc_item;) - { - struct descriptor_item *di; - - ECPGfree(desc_item->data); - di = desc_item; - desc_item = desc_item->next; - ECPGfree(di); - } - - *lastptr = desc->next; - ECPGfree(desc->name); - PQclear(desc->result); - ECPGfree(desc); + if (prev) + prev->next = desc->next; + else + set_descriptors(desc->next); + descriptor_free(desc); return true; } } @@ -626,6 +655,18 @@ ECPGdeallocate_desc(int line, const char *name) return false; } +/* Deallocate all descriptors in the list */ +static void +descriptor_deallocate_all(struct descriptor *list) +{ + while (list) + { + struct descriptor *next = list->next; + descriptor_free(list); + list = next; + } +} + bool ECPGallocate_desc(int line, const char *name) { @@ -636,7 +677,7 @@ ECPGallocate_desc(int line, const char *name) new = (struct descriptor *) ECPGalloc(sizeof(struct descriptor), line); if (!new) return false; - new->next = all_descriptors; + new->next = get_descriptors(); new->name = ECPGalloc(strlen(name) + 1, line); if (!new->name) { @@ -654,23 +695,24 @@ ECPGallocate_desc(int line, const char *name) return false; } strcpy(new->name, name); - all_descriptors = new; + set_descriptors(new); return true; } -PGresult ** -ECPGdescriptor_lvalue(int line, const char *descriptor) +/* Find descriptor with name in the connection. */ +struct descriptor * +ECPGfind_desc(int line, const char *name) { - struct descriptor *i; + struct descriptor *desc; - for (i = all_descriptors; i != NULL; i = i->next) + for (desc = get_descriptors(); desc; desc = desc->next) { - if (!strcmp(descriptor, i->name)) - return &i->result; + if (strcmp(name, desc->name) == 0) + return desc; } - ECPGraise(line, ECPG_UNKNOWN_DESCRIPTOR, ECPG_SQLSTATE_INVALID_SQL_DESCRIPTOR_NAME, (char *) descriptor); - return NULL; + ECPGraise(line, ECPG_UNKNOWN_DESCRIPTOR, ECPG_SQLSTATE_INVALID_SQL_DESCRIPTOR_NAME, name); + return NULL; /* not found */ } bool diff --git a/src/interfaces/ecpg/ecpglib/execute.c b/src/interfaces/ecpg/ecpglib/execute.c index c5ae1dadf3..946f6811ff 100644 --- a/src/interfaces/ecpg/ecpglib/execute.c +++ b/src/interfaces/ecpg/ecpglib/execute.c @@ -1,4 +1,4 @@ -/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/execute.c,v 1.70 2007/09/26 10:57:00 meskes Exp $ */ +/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/execute.c,v 1.71 2007/10/02 09:49:59 meskes Exp $ */ /* * The aim is to get a simpler inteface to the database routines. @@ -1088,17 +1088,9 @@ ECPGexecute(struct statement * stmt) struct descriptor *desc; struct descriptor_item *desc_item; - for (desc = all_descriptors; desc; desc = desc->next) - { - if (strcmp(var->pointer, desc->name) == 0) - break; - } - + desc = ECPGfind_desc(stmt->lineno, var->pointer); if (desc == NULL) - { - ECPGraise(stmt->lineno, ECPG_UNKNOWN_DESCRIPTOR, ECPG_SQLSTATE_INVALID_SQL_DESCRIPTOR_NAME, var->pointer); return false; - } desc_counter++; for (desc_item = desc->items; desc_item; desc_item = desc_item->next) @@ -1334,16 +1326,15 @@ ECPGexecute(struct statement * stmt) if (var != NULL && var->type == ECPGt_descriptor) { - PGresult **resultpp = ECPGdescriptor_lvalue(stmt->lineno, (const char *) var->pointer); - - if (resultpp == NULL) + struct descriptor *desc = ECPGfind_desc(stmt->lineno, var->pointer); + if (desc == NULL) status = false; else { - if (*resultpp) - PQclear(*resultpp); - *resultpp = results; - clear_result = FALSE; + if (desc->result) + PQclear(desc->result); + desc->result = results; + clear_result = false; ECPGlog("ECPGexecute putting result (%d tuples) into descriptor '%s'\n", PQntuples(results), (const char *) var->pointer); } var = var->next; diff --git a/src/interfaces/ecpg/ecpglib/extern.h b/src/interfaces/ecpg/ecpglib/extern.h index f7e91d3127..0df6506b0b 100644 --- a/src/interfaces/ecpg/ecpglib/extern.h +++ b/src/interfaces/ecpg/ecpglib/extern.h @@ -1,4 +1,4 @@ -/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/extern.h,v 1.28 2007/09/30 11:38:48 meskes Exp $ */ +/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/extern.h,v 1.29 2007/10/02 09:49:59 meskes Exp $ */ #ifndef _ECPG_LIB_EXTERN_H #define _ECPG_LIB_EXTERN_H @@ -36,6 +36,8 @@ bool ECPGget_data(const PGresult *, int, int, int, enum ECPGttype type, #ifdef ENABLE_THREAD_SAFETY void ecpg_pthreads_init(void); +#else +#define ecpg_pthreads_init() ((void)0) #endif struct connection *ECPGget_connection(const char *); char *ECPGalloc(long, int); @@ -92,6 +94,7 @@ struct connection int autocommit; struct ECPGtype_information_cache *cache_head; struct prepared_statement *prep_stmts; + struct descriptor *descriptors; struct connection *next; }; @@ -105,8 +108,6 @@ struct descriptor struct descriptor_item *items; }; -extern struct descriptor *all_descriptors; - struct descriptor_item { int num; @@ -136,7 +137,7 @@ struct variable struct variable *next; }; -PGresult **ECPGdescriptor_lvalue(int line, const char *descriptor); +struct descriptor *ECPGfind_desc(int line, const char *name); bool ECPGstore_result(const PGresult *results, int act_field, const struct statement * stmt, struct variable * var); diff --git a/src/interfaces/ecpg/ecpglib/misc.c b/src/interfaces/ecpg/ecpglib/misc.c index 4dfa8cab54..51bda5a7de 100644 --- a/src/interfaces/ecpg/ecpglib/misc.c +++ b/src/interfaces/ecpg/ecpglib/misc.c @@ -1,4 +1,4 @@ -/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/misc.c,v 1.37 2007/09/30 11:38:48 meskes Exp $ */ +/* $PostgreSQL: pgsql/src/interfaces/ecpg/ecpglib/misc.c,v 1.38 2007/10/02 09:49:59 meskes Exp $ */ #define POSTGRES_ECPG_INTERNAL #include "postgres_fe.h" @@ -431,6 +431,7 @@ DllMain(HANDLE module, DWORD reason, LPVOID reserved) auto_mem_key_init(); ecpg_actual_connection_init(); ecpg_sqlca_key_init(); + descriptor_key_init(); } return TRUE; } diff --git a/src/interfaces/ecpg/include/ecpg-pthread-win32.h b/src/interfaces/ecpg/include/ecpg-pthread-win32.h index e0bbc93f39..369cefb754 100644 --- a/src/interfaces/ecpg/include/ecpg-pthread-win32.h +++ b/src/interfaces/ecpg/include/ecpg-pthread-win32.h @@ -1,4 +1,4 @@ -/* $PostgreSQL: pgsql/src/interfaces/ecpg/include/ecpg-pthread-win32.h,v 1.2 2007/09/30 11:38:48 meskes Exp $ */ +/* $PostgreSQL: pgsql/src/interfaces/ecpg/include/ecpg-pthread-win32.h,v 1.3 2007/10/02 09:49:59 meskes Exp $ */ /* * pthread mapping macros for win32 native thread implementation */ @@ -47,6 +47,7 @@ extern pthread_mutex_t debug_init_mutex; extern void auto_mem_key_init(void); extern void ecpg_actual_connection_init(void); extern void ecpg_sqlca_key_init(void); +extern void descriptor_key_init(void); extern BOOL WINAPI DllMain(HANDLE module, DWORD reason, LPVOID reserved); #endif /* WIN32 */ diff --git a/src/interfaces/ecpg/test/ecpg_schedule b/src/interfaces/ecpg/test/ecpg_schedule index 6067db2666..c478ed126b 100644 --- a/src/interfaces/ecpg/test/ecpg_schedule +++ b/src/interfaces/ecpg/test/ecpg_schedule @@ -43,3 +43,4 @@ test: thread/thread test: thread/thread_implicit test: thread/prep test: thread/alloc +test: thread/descriptor diff --git a/src/interfaces/ecpg/test/ecpg_schedule_tcp b/src/interfaces/ecpg/test/ecpg_schedule_tcp index 20e322bbde..5dbca9dd16 100644 --- a/src/interfaces/ecpg/test/ecpg_schedule_tcp +++ b/src/interfaces/ecpg/test/ecpg_schedule_tcp @@ -43,5 +43,6 @@ test: thread/thread test: thread/thread_implicit test: thread/prep test: thread/alloc +test: thread/descriptor test: connect/test1 diff --git a/src/interfaces/ecpg/test/expected/thread-descriptor.c b/src/interfaces/ecpg/test/expected/thread-descriptor.c new file mode 100644 index 0000000000..9aa66d6366 --- /dev/null +++ b/src/interfaces/ecpg/test/expected/thread-descriptor.c @@ -0,0 +1,155 @@ +/* Processed by ecpg (regression mode) */ +/* These include files are added by the preprocessor */ +#include +#include +#include +#include +/* End of automatic include section */ +#define ECPGdebug(X,Y) ECPGdebug((X)+100,(Y)) + +#line 1 "descriptor.pgc" +#ifdef WIN32 +#define WIN32_LEAN_AND_MEAN +#include +#include +#else +#include +#endif +#include + +#define THREADS 16 +#define REPEATS 50000 + + +#line 1 "sqlca.h" +#ifndef POSTGRES_SQLCA_H +#define POSTGRES_SQLCA_H + +#ifndef PGDLLIMPORT +#if defined(WIN32) || defined(__CYGWIN__) +#define PGDLLIMPORT __declspec (dllimport) +#else +#define PGDLLIMPORT +#endif /* __CYGWIN__ */ +#endif /* PGDLLIMPORT */ + +#define SQLERRMC_LEN 150 + +#ifdef __cplusplus +extern "C" +{ +#endif + +struct sqlca_t +{ + char sqlcaid[8]; + long sqlabc; + long sqlcode; + struct + { + int sqlerrml; + char sqlerrmc[SQLERRMC_LEN]; + } sqlerrm; + char sqlerrp[8]; + long sqlerrd[6]; + /* Element 0: empty */ + /* 1: OID of processed tuple if applicable */ + /* 2: number of rows processed */ + /* after an INSERT, UPDATE or */ + /* DELETE statement */ + /* 3: empty */ + /* 4: empty */ + /* 5: empty */ + char sqlwarn[8]; + /* Element 0: set to 'W' if at least one other is 'W' */ + /* 1: if 'W' at least one character string */ + /* value was truncated when it was */ + /* stored into a host variable. */ + + /* + * 2: if 'W' a (hopefully) non-fatal notice occurred + */ /* 3: empty */ + /* 4: empty */ + /* 5: empty */ + /* 6: empty */ + /* 7: empty */ + + char sqlstate[5]; +}; + +struct sqlca_t *ECPGget_sqlca(void); + +#ifndef POSTGRES_ECPG_INTERNAL +#define sqlca (*ECPGget_sqlca()) +#endif + +#ifdef __cplusplus +} +#endif + +#endif + +#line 13 "descriptor.pgc" + +/* exec sql whenever sqlerror sqlprint ; */ +#line 14 "descriptor.pgc" + +/* exec sql whenever not found sqlprint ; */ +#line 15 "descriptor.pgc" + + +#ifdef WIN32 +static unsigned STDCALL fn(void* arg) +#else +void* fn(void* arg) +#endif +{ + int i; + + for (i = 1; i <= REPEATS; ++i) + { + ECPGallocate_desc(__LINE__, "mydesc"); +#line 27 "descriptor.pgc" + +if (sqlca.sqlcode < 0) sqlprint(); +#line 27 "descriptor.pgc" + + ECPGdeallocate_desc(__LINE__, "mydesc"); +#line 28 "descriptor.pgc" + +if (sqlca.sqlcode < 0) sqlprint(); +#line 28 "descriptor.pgc" + + } + + return 0; +} + +int main (int argc, char** argv) +{ + int i; +#ifdef WIN32 + HANDLE threads[THREADS]; +#else + pthread_t threads[THREADS]; +#endif + +#ifdef WIN32 + for (i = 0; i < THREADS; ++i) + { + unsigned id; + threads[i] = (HANDLE)_beginthreadex(NULL, 0, fn, NULL, 0, &id); + } + + WaitForMultipleObjects(THREADS, threads, TRUE, INFINITE); + for (i = 0; i < THREADS; ++i) + CloseHandle(threads[i]); +#else + for (i = 0; i < THREADS; ++i) + pthread_create(&threads[i], NULL, fn, NULL); + for (i = 0; i < THREADS; ++i) + pthread_join(threads[i], NULL); +#endif + + return 0; +} diff --git a/src/interfaces/ecpg/test/expected/thread-descriptor.stderr b/src/interfaces/ecpg/test/expected/thread-descriptor.stderr new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/interfaces/ecpg/test/expected/thread-descriptor.stdout b/src/interfaces/ecpg/test/expected/thread-descriptor.stdout new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/interfaces/ecpg/test/expected/thread-descriptor.stdout.diff b/src/interfaces/ecpg/test/expected/thread-descriptor.stdout.diff new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/interfaces/ecpg/test/thread/Makefile b/src/interfaces/ecpg/test/thread/Makefile index e4df0d3cda..99591e7c98 100644 --- a/src/interfaces/ecpg/test/thread/Makefile +++ b/src/interfaces/ecpg/test/thread/Makefile @@ -7,6 +7,7 @@ include $(top_srcdir)/$(subdir)/../Makefile.regress TESTS = thread_implicit thread_implicit.c \ thread thread.c \ prep prep.c \ + descriptor descriptor.c \ alloc alloc.c all: $(TESTS) diff --git a/src/interfaces/ecpg/test/thread/descriptor.pgc b/src/interfaces/ecpg/test/thread/descriptor.pgc new file mode 100644 index 0000000000..d42d1857f7 --- /dev/null +++ b/src/interfaces/ecpg/test/thread/descriptor.pgc @@ -0,0 +1,61 @@ +#ifdef WIN32 +#define WIN32_LEAN_AND_MEAN +#include +#include +#else +#include +#endif +#include + +#define THREADS 16 +#define REPEATS 50000 + +EXEC SQL include sqlca; +EXEC SQL whenever sqlerror sqlprint; +EXEC SQL whenever not found sqlprint; + +#ifdef WIN32 +static unsigned STDCALL fn(void* arg) +#else +void* fn(void* arg) +#endif +{ + int i; + + for (i = 1; i <= REPEATS; ++i) + { + EXEC SQL ALLOCATE DESCRIPTOR mydesc; + EXEC SQL DEALLOCATE DESCRIPTOR mydesc; + } + + return 0; +} + +int main (int argc, char** argv) +{ + int i; +#ifdef WIN32 + HANDLE threads[THREADS]; +#else + pthread_t threads[THREADS]; +#endif + +#ifdef WIN32 + for (i = 0; i < THREADS; ++i) + { + unsigned id; + threads[i] = (HANDLE)_beginthreadex(NULL, 0, fn, NULL, 0, &id); + } + + WaitForMultipleObjects(THREADS, threads, TRUE, INFINITE); + for (i = 0; i < THREADS; ++i) + CloseHandle(threads[i]); +#else + for (i = 0; i < THREADS; ++i) + pthread_create(&threads[i], NULL, fn, NULL); + for (i = 0; i < THREADS; ++i) + pthread_join(threads[i], NULL); +#endif + + return 0; +}