OSDN Git Service

Remove support for PG9.2
[pgdbmsstats/pg_dbms_stats.git] / import.c
1 /*
2  * import.c
3  *
4  * Copyright (c) 2012-2017, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
5  */
6 #include "postgres.h"
7
8 #include "access/xact.h"
9 #include "catalog/namespace.h"
10 #include "catalog/pg_type.h"
11 #include "commands/copy.h"
12 #include "executor/spi.h"
13 #include "libpq/pqformat.h"
14 #include "mb/pg_wchar.h"
15 #include "tcop/tcopprot.h"
16 #include "utils/builtins.h"
17 #include "utils/lsyscache.h"
18 #include "utils/syscache.h"
19 #include "catalog/pg_class.h"
20 #if PG_VERSION_NUM >= 90300
21 #include "access/htup_details.h"
22 #endif
23
24 #include "pg_dbms_stats.h"
25
26 #define RELATION_PARAM_NUM      9
27
28 extern PGDLLIMPORT bool standard_conforming_strings;
29
30 PG_FUNCTION_INFO_V1(dbms_stats_import);
31
32 Datum   dbms_stats_import(PG_FUNCTION_ARGS);
33
34 static void get_args(FunctionCallInfo fcinfo, char **nspname, char **relname,
35                                 char **attname, char **filename);
36 static void spi_exec_utility(const char *query);
37 static void spi_exec_query(const char *query, int nargs, Oid *argtypes,
38                                 SPIPlanPtr *plan, Datum *values, const char *nulls, int result);
39 static void import_stats_from_file(char *filename, char *nspname, char *relname,
40                                 char *attname);
41
42 /*
43  * dbms_stats_import
44  *   Import exported statistics from stdin or a file.
45  *
46  *   Order of arguments:
47  *     1) schema name
48  *     2) relation oid
49  *     3) attribute name
50  *     4) absolute path of source file, or 'stdin' (case insensitive)
51  */
52 Datum
53 dbms_stats_import(PG_FUNCTION_ARGS)
54 {
55         char               *nspname;
56         char               *relname;
57         char               *attname;
58         char               *filename;   /* filename, or NULL for STDIN */
59         int                             ret;
60         int                             i;
61         uint32                  r_num;
62         HeapTuple          *r_tups;
63         TupleDesc               r_tupdesc;
64         SPIPlanPtr              r_upd_plan = NULL;
65         SPIPlanPtr              r_ins_plan = NULL;
66         SPIPlanPtr              c_sel_plan = NULL;
67         SPIPlanPtr              c_del_plan = NULL;
68         SPIPlanPtr              c_ins_plan = NULL;
69
70         /* get validated arguments */
71         get_args(fcinfo, &nspname, &relname, &attname, &filename);
72
73         /* for debug use */
74         elog(DEBUG3, "%s() f=%s n=%s r=%s a=%s", __FUNCTION__,
75                  filename ? filename : "(null)",
76                  nspname ? nspname : "(null)",
77                  relname ? relname : "(null)",
78                  attname ? attname : "(null)");
79
80         /* connect to SPI */
81         ret = SPI_connect();
82         if (ret != SPI_OK_CONNECT)
83                 elog(ERROR, "pg_dbms_stats: SPI_connect => %d", ret);
84
85         /* lock dummy statistics tables. */
86         spi_exec_utility("LOCK dbms_stats.relation_stats_locked"
87                                                 " IN SHARE UPDATE EXCLUSIVE MODE");
88         spi_exec_utility("LOCK dbms_stats.column_stats_locked"
89                                                 " IN SHARE UPDATE EXCLUSIVE MODE");
90
91         /*
92          * Create a temp table to save the statistics to import.
93          * This table should fit with the content of export files.
94          */
95         spi_exec_utility("CREATE TEMP TABLE dbms_stats_work_stats ("
96                                          "nspname          name   NOT NULL,"
97                                          "relname          name   NOT NULL,"
98                                          "relpages         int4   NOT NULL,"
99                                          "reltuples        float4 NOT NULL,"
100                                          "relallvisible    int4   NOT NULL,"
101                                          "curpages         int4   NOT NULL,"
102                                          "last_analyze     timestamp with time zone,"
103                                          "last_autoanalyze timestamp with time zone,"
104                                          "attname          name,"
105                                          "nspname_of_typename name,"
106                                          "typname name,"
107                                          "atttypmod int4,"
108                                          "stainherit       bool,"
109                                          "stanullfrac      float4,"
110                                          "stawidth         int4,"
111                                          "stadistinct      float4,"
112                                          "stakind1         int2,"
113                                          "stakind2         int2,"
114                                          "stakind3         int2,"
115                                          "stakind4         int2,"
116                                          "stakind5         int2,"
117                                          "staop1           oid,"
118                                          "staop2           oid,"
119                                          "staop3           oid,"
120                                          "staop4           oid,"
121                                          "staop5           oid,"
122                                          "stanumbers1      float4[],"
123                                          "stanumbers2      float4[],"
124                                          "stanumbers3      float4[],"
125                                          "stanumbers4      float4[],"
126                                          "stanumbers5      float4[],"
127                                          "stavalues1       dbms_stats.anyarray,"
128                                          "stavalues2       dbms_stats.anyarray,"
129                                          "stavalues3       dbms_stats.anyarray,"
130                                          "stavalues4       dbms_stats.anyarray"
131                                         ",stavalues5       dbms_stats.anyarray"
132                                          ")");
133
134         /* load the statistics from export file to the temp table */
135         import_stats_from_file(filename, nspname, relname, attname);
136
137         /* Determine the Oid of local table from the tablename and schemaname. */
138         ret = SPI_execute("SELECT DISTINCT w.nspname, w.relname, c.oid, "
139                                                          "w.relpages, w.reltuples, "
140                                                          "w.curpages, w.last_analyze, w.last_autoanalyze "
141                                                          ",w.relallvisible "
142                                                 "FROM pg_catalog.pg_class c "
143                                                 "JOIN pg_catalog.pg_namespace n "
144                                                   "ON (c.relnamespace = n.oid) "
145                                            "RIGHT JOIN dbms_stats_work_stats w "
146                                                   "ON (w.relname = c.relname AND w.nspname = n.nspname) "
147                                            "ORDER BY 1, 2", false, 0);
148         if (ret != SPI_OK_SELECT)
149                 elog(ERROR, "pg_dbms_stats: SPI_execute => %d", ret);
150
151         /*
152          * If there is no record in the staging table after loading source and
153          * deleting unnecessary records, we treat it as an error.
154          */
155         if (SPI_processed == 0)
156                 elog(ERROR, "no per-table statistic data to be imported");
157
158         /* */
159         r_num = SPI_processed;
160         r_tups = SPI_tuptable->vals;
161         r_tupdesc = SPI_tuptable->tupdesc;
162         for (i = 0; i < r_num; i++)
163         {
164                 bool    isnull;
165                 Datum   w_nspname;
166                 Datum   w_relname;
167                 Datum   w_relid;
168                 Datum   values[9];
169                 char    nulls[9] = {'t', 't', 't', 't', 't', 't', 't', 't', 't'};
170                 Oid             r_types[9] = {NAMEOID, NAMEOID, INT4OID, FLOAT4OID, INT4OID,
171                                                           TIMESTAMPTZOID, TIMESTAMPTZOID, OIDOID, INT4OID};
172                 Oid             c_types[5] = {OIDOID, INT2OID, NAMEOID, NAMEOID,
173                                                           NAMEOID};
174                 uint32          c_num;
175                 TupleDesc       c_tupdesc;
176                 HeapTuple  *c_tups;
177                 int                     j;
178
179                 values[0] = w_nspname = SPI_getbinval(r_tups[i], r_tupdesc, 1, &isnull);
180                 values[1] = w_relname = SPI_getbinval(r_tups[i], r_tupdesc, 2, &isnull);
181                 values[7] = w_relid = SPI_getbinval(r_tups[i], r_tupdesc, 3, &isnull);
182                 if (isnull)
183                 {
184                         elog(WARNING, "relation \"%s.%s\" does not exist",
185                                         DatumGetName(w_nspname)->data,
186                                         DatumGetName(w_relname)->data);
187                         continue;
188                 }
189
190                 values[2] = SPI_getbinval(r_tups[i], r_tupdesc, 4, &isnull);
191                 values[3] = SPI_getbinval(r_tups[i], r_tupdesc, 5, &isnull);
192                 values[4] = SPI_getbinval(r_tups[i], r_tupdesc, 6, &isnull);
193                 values[5] = SPI_getbinval(r_tups[i], r_tupdesc, 7, &isnull);
194                 nulls[5] = isnull ? 'n' : 't';
195                 values[6] = SPI_getbinval(r_tups[i], r_tupdesc, 8, &isnull);
196                 nulls[6] = isnull ? 'n' : 't';
197                 values[8] = SPI_getbinval(r_tups[i], r_tupdesc, 9, &isnull);
198
199                 /*
200                  * First we try UPDATE with the oid.  When no record matched, try
201                  * INSERT.  We can't use DELETE-then-INSERT method because we have FK
202                  * on relation_stats_locked so DELETE would delete child records in
203                  * column_stats_locked undesirably.
204                  */
205                 spi_exec_query("UPDATE dbms_stats.relation_stats_locked SET "
206                                 "relname = quote_ident($1) || '.' || quote_ident($2), "
207                                 "relpages = $3, reltuples = $4, relallvisible = $9, "
208                                 "curpages = $5, last_analyze = $6, last_autoanalyze = $7 "
209                                 "WHERE relid = $8",
210                                 RELATION_PARAM_NUM, r_types, &r_upd_plan, values, nulls,
211                                 SPI_OK_UPDATE);
212                 if (SPI_processed == 0)
213                 {
214                         spi_exec_query("INSERT INTO dbms_stats.relation_stats_locked "
215                                         "(relname, relpages, reltuples, curpages, "
216                                         "last_analyze, last_autoanalyze, relid, relallvisible"
217                                         ") VALUES (quote_ident($1) || '.' || quote_ident($2), "
218                                         "$3, $4, $5, $6, $7, $8, $9)",
219                                         RELATION_PARAM_NUM, r_types, &r_ins_plan, values, nulls,
220                                         SPI_OK_INSERT);
221                         /*  If we failed to insert, we can't proceed. */
222                         if (SPI_processed != 1)
223                                 elog(ERROR, "failed to insert import data");
224                 }
225
226                 elog(DEBUG2, "\"%s.%s\" relation statistic import",
227                         DatumGetName(w_nspname)->data, DatumGetName(w_relname)->data);
228
229                 /*
230                  * Determine the attnum of the attribute with given name, and load
231                  * statistics from temp table into dbms.column_stats_locked.
232                  */
233                 spi_exec_query("SELECT w.stainherit, w.attname, a.attnum, "
234                                                           "w.nspname_of_typename, tn.nspname, "
235                                                           "w.typname, t.typname, w.atttypmod, a.atttypmod "
236                                                  "FROM pg_catalog.pg_class c "
237                                                  "JOIN pg_catalog.pg_namespace cn "
238                                                    "ON (cn.oid = c.relnamespace) "
239                                                  "JOIN pg_catalog.pg_attribute a "
240                                                    "ON (a.attrelid = c.oid) "
241                                                  "JOIN pg_catalog.pg_type t "
242                                                    "ON (t.oid = a.atttypid) "
243                                                  "JOIN pg_catalog.pg_namespace tn "
244                                                    "ON (tn.oid = t.typnamespace) "
245                                                 "RIGHT JOIN dbms_stats_work_stats w "
246                                                    "ON (w.nspname = cn.nspname AND w.relname = c.relname "
247                                                            "AND (w.attname = a.attname OR w.attname = '')) "
248                                                 "WHERE w.nspname = $1 AND w.relname = $2 "
249                                                   "AND a.attnum > 0"
250                                                 "ORDER BY 1, 3, 2",
251                                 2, r_types, &c_sel_plan, values, NULL, SPI_OK_SELECT);
252
253                 /* This query ought to return at least one record. */
254                 if (SPI_processed == 0)
255                         elog(ERROR, "no per-column statistic data to be imported");
256
257                 values[0] = w_relid;
258                 values[2] = w_nspname;
259                 values[3] = w_relname;
260
261                 c_num = SPI_processed;
262                 c_tups = SPI_tuptable->vals;
263                 c_tupdesc = SPI_tuptable->tupdesc;
264                 for (j = 0; j < c_num; j++)
265                 {
266                         char   *w_typnamespace;
267                         char   *a_typnamespace;
268                         char   *w_typname;
269                         char   *a_typname;
270                         int             w_typmod;
271                         int             a_typmod;
272
273                         /*
274                          * If we have only per-relation statistics in source, all of
275                          * column_stats_effective for per-column statistics are NULL.
276                          */
277                         (void) SPI_getbinval(c_tups[j], c_tupdesc, 1, &isnull);
278                         if (isnull)
279                                 continue;
280
281                         /*
282                          * If there is no column with given name, we skip the rest of
283                          * import process.
284                          */
285                         values[4] = SPI_getbinval(c_tups[j], c_tupdesc, 2, &isnull);
286                         values[1] = SPI_getbinval(c_tups[j], c_tupdesc, 3, &isnull);
287                         if (isnull)
288                         {
289                                 elog(WARNING, "column \"%s\" of \"%s.%s\" does not exist",
290                                         DatumGetName(values[4])->data,
291                                                 DatumGetName(w_nspname)->data,
292                                                 DatumGetName(w_relname)->data);
293                                 continue;
294                         }
295
296                         /*
297                          * If the destination column has different data type from source
298                          * column, we stop importing to avoid corrupted statistics.
299                          */
300                         w_typnamespace = DatumGetName(SPI_getbinval(c_tups[j], c_tupdesc, 4,
301                                                 &isnull))->data;
302                         a_typnamespace = DatumGetName(SPI_getbinval(c_tups[j], c_tupdesc, 5,
303                                                 &isnull))->data;
304                         w_typname = DatumGetName(SPI_getbinval(c_tups[j], c_tupdesc, 6,
305                                                 &isnull))->data;
306                         a_typname = DatumGetName(SPI_getbinval(c_tups[j], c_tupdesc, 7,
307                                                 &isnull))->data;
308                         if (strcmp(w_typnamespace, a_typnamespace) != 0 ||
309                                 strcmp(w_typname, a_typname) != 0)
310                         {
311                                 ereport(WARNING,
312                                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
313                                                  errmsg("column \"%s\" is of type \"%s.%s\""
314                                                                 " but import data is of type \"%s.%s\"",
315                                                                 DatumGetName(values[4])->data,
316                                                                 a_typnamespace, a_typname,
317                                                                 w_typnamespace, w_typname)));
318                                 continue;
319                         }
320
321                         /*
322                          * If the atttypmod of the destination column is different from the
323                          * one of source, column, we stop importing to avoid corrupted
324                          * statistics.
325                          */
326                         w_typmod = DatumGetInt32(SPI_getbinval(c_tups[j], c_tupdesc, 8,
327                                                 &isnull));
328                         a_typmod = DatumGetInt32(SPI_getbinval(c_tups[j], c_tupdesc, 9,
329                                                 &isnull));
330                         if (w_typmod != a_typmod)
331                         {
332                                 ereport(WARNING,
333                                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
334                                                  errmsg("column \"%s\" is of atttypmod %d"
335                                                                 " but import data is of atttypmod %d",
336                                                                 DatumGetName(values[4])->data,
337                                                                 a_typmod, a_typmod)));
338                                 continue;
339                         }
340
341                         /*
342                          * First delete old dummy statistics, and import new one.  We use
343                          * DELETE-then-INSERT method here to simplify codes.
344                          */
345                         spi_exec_query("DELETE FROM dbms_stats.column_stats_locked "
346                                         "WHERE starelid = $1 AND staattnum = $2", 2, c_types,
347                                         &c_del_plan, values, NULL, SPI_OK_DELETE);
348
349                         spi_exec_query("INSERT INTO dbms_stats.column_stats_locked "
350                                 "SELECT $1, $2, "
351                                 "stainherit, stanullfrac, stawidth, stadistinct, "
352                                 "stakind1, stakind2, stakind3, stakind4, stakind5, "
353                                 "staop1, staop2, staop3, staop4, staop5, "
354                                 "stanumbers1, stanumbers2, stanumbers3, stanumbers4, "
355                                 "stanumbers5, "
356                                 "stavalues1, stavalues2, stavalues3, stavalues4 , stavalues5 "
357                                 "FROM dbms_stats_work_stats "
358                                 "WHERE nspname = $3 AND relname = $4 "
359                                 "AND attname = $5 "
360                                 "ORDER BY 3",
361                                 5, c_types, &c_ins_plan, values, NULL, SPI_OK_INSERT);
362
363                         elog(DEBUG2, "\"%s.%s.%s\" column statistic import",
364                                 DatumGetName(w_nspname)->data,
365                                 DatumGetName(w_relname)->data, DatumGetName(values[4])->data);
366                 }
367
368                 if (c_num == 0)
369                         elog(DEBUG2, "\"%s.%s\" column statistic no data",
370                                 DatumGetName(w_nspname)->data, DatumGetName(w_relname)->data);
371         }
372
373         /* release the cached plan */
374         SPI_freeplan(r_upd_plan);
375         SPI_freeplan(r_ins_plan);
376         SPI_freeplan(c_sel_plan);
377         SPI_freeplan(c_del_plan);
378         SPI_freeplan(c_ins_plan);
379
380         /* delete the temp table */
381         spi_exec_utility("DROP TABLE dbms_stats_work_stats");
382
383         /* disconnect SPI */
384         ret = SPI_finish();
385         if (ret != SPI_OK_FINISH)
386                 elog(ERROR, "pg_dbms_stats: SPI_finish => %d", ret);
387
388         /*
389          * Recover the protocol state because it has been invalidated by our
390          * COPY-from-stdin.
391          */
392         if (filename == NULL)
393                 pq_puttextmessage('C', "dbms_stats_import");
394
395         PG_RETURN_VOID();
396 }
397
398 /*
399  * spi_exec_utility
400  *   Execute given utility command via SPI.
401  */
402 static void
403 spi_exec_utility(const char *query)
404 {
405         int     ret;
406
407         ret = SPI_exec(query, 0);
408         if (ret != SPI_OK_UTILITY)
409                 elog(ERROR, "pg_dbms_stats: SPI_exec => %d", ret);
410 }
411
412 /*
413  * spi_exec_query
414  *   Execute given SQL command via SPI.
415  *   The plan will be cached by SPI_prepare if it hasn't been.
416  */
417 static void
418 spi_exec_query(const char *query, int nargs, Oid *argtypes, SPIPlanPtr *plan,
419                                 Datum *values, const char *nulls, int result)
420 {
421         int     ret;
422
423         if (*plan == NULL)
424                 *plan = SPI_prepare(query, nargs, argtypes);
425
426         ret = SPI_execute_plan(*plan, values, nulls, false, 0);
427         if (ret != result)
428                 elog(ERROR, "pg_dbms_stats: SPI_execute_plan => %d", ret);
429 }
430
431 static char *
432 get_text_arg(FunctionCallInfo fcinfo, int n, bool is_name)
433 {
434         text   *arg;
435         char   *s;
436         int             len;
437         char   *result;
438
439         arg = PG_GETARG_TEXT_PP(n);
440         s = text_to_cstring(arg);
441         PG_FREE_IF_COPY(arg, n);
442
443         if (!is_name)
444                 return s;
445
446         len = strlen(s);
447
448         /* Truncate oversize input */
449         if (len >= NAMEDATALEN)
450                 len = pg_mbcliplen(s, len, NAMEDATALEN - 1);
451
452         /* We use palloc0 here to ensure result is zero-padded */
453         result = (char *) palloc0(NAMEDATALEN);
454         memcpy(result, s, len);
455         pfree(s);
456
457         return result;
458 }
459
460 /*
461  * get_args
462  *   Retrieve arguments from FunctionCallInfo and validate them.  We assume
463  *   that order of arguments is:
464  *     1) schema name
465  *     2) relation oid
466  *     3) attribute name
467  *     4) absolute path of source file, or 'stdin' (case insensitive)
468  */
469 static void
470 get_args(FunctionCallInfo fcinfo, char **nspname, char **relname,
471                 char **attname, char **filename)
472 {
473         Oid                             nspid;
474         Oid                             relid;
475         AttrNumber              attnum;
476         HeapTuple               tp;
477         Form_pg_class   reltup;
478         char                    relkind;
479
480         *nspname = *relname = *attname = *filename = NULL;
481
482         /*
483          * First of all, check whether combination of arguments is consistent.
484          *
485          * 1) relid and attname can't be used with schemaname.
486          * 2) relid is required when attname is given.
487          */
488         if (!PG_ARGISNULL(0) && (!PG_ARGISNULL(1) || !PG_ARGISNULL(2)))
489                 elog(ERROR, "relid and attnum can not be used with schemaname");
490         else if (PG_ARGISNULL(1) && !PG_ARGISNULL(2))
491                 elog(ERROR, "relation is required");
492
493         /* filepath validation */
494         if (!PG_ARGISNULL(3))
495         {
496                 *filename = get_text_arg(fcinfo, 3, false);
497
498                 /*
499                  * If given filepath is "stdin", clear filename to tell caller to
500                  * import from standard input.  Note that we accept only absolute path
501                  * for security reason.
502                  */
503                 if (pg_strcasecmp(*filename, "stdin") == 0)
504                         *filename = NULL;
505                 else if (!is_absolute_path(*filename))
506                         ereport(ERROR,
507                                         (errcode(ERRCODE_INVALID_NAME),
508                                          errmsg("relative path not allowed for dbms_stats_export"
509                                                         " to file")));
510         }
511
512         /* schemaname validation */
513         if (!PG_ARGISNULL(0))
514         {
515                 *nspname = get_text_arg(fcinfo, 0, true);
516
517                 /* check that a schema with given name exists */
518                 get_namespace_oid(*nspname, false);
519
520                 /* check that given schema is not one of system schemas */
521                 if (dbms_stats_is_system_schema_internal(*nspname))
522                         elog(ERROR, "\"%s\" is a system catalog", *nspname);
523         }
524
525         /* table oid validation */
526         if (!PG_ARGISNULL(1))
527         {
528                 relid = PG_GETARG_OID(1);
529                 tp = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
530                 if (!HeapTupleIsValid(tp))
531                         elog(ERROR, "relid %d does not exist", relid);
532
533                 /* check that the target is an ordinary table or an index */
534                 reltup = (Form_pg_class) GETSTRUCT(tp);
535                 *relname = pstrdup(reltup->relname.data);
536                 relkind = reltup->relkind;
537                 nspid = reltup->relnamespace;
538                 ReleaseSysCache(tp);
539
540                 if (relkind != RELKIND_RELATION && relkind != RELKIND_INDEX
541                         && relkind != RELKIND_FOREIGN_TABLE
542 #if PG_VERSION_NUM >= 90300
543                         && relkind != RELKIND_MATVIEW
544 #endif
545                 )
546                         elog(ERROR, "relkind of \"%s\" is \"%c\", can not import",
547                                 get_rel_name(relid), relkind);
548
549                 /* check that the relation is not in one of system schemas */
550                 *nspname = get_namespace_name(nspid);
551                 if (dbms_stats_is_system_schema_internal(*nspname))
552                         elog(ERROR, "\"%s\" is a system catalog", *nspname);
553
554                 /* attribute name validation */
555                 if (!PG_ARGISNULL(2))
556                 {
557                         *attname = get_text_arg(fcinfo, 2, true);
558                         attnum = get_attnum(relid, *attname);
559                         if (!AttributeNumberIsValid(attnum))
560                                 elog(ERROR, "column \"%s\" of \"%s.%s\" does not exist", *attname, *nspname, *relname);
561                 }
562         }
563 }
564
565 /*
566  * appendLiteral - Format a string as a SQL literal, append to buf
567  *
568  * This function was copied from simple_quote_literal() in
569  * src/backend/utils/adt/ruleutils.c
570  */
571 static void
572 appendLiteral(StringInfo buf, const char *val)
573 {
574         const char *valptr;
575
576         /*
577          * We form the string literal according to the prevailing setting of
578          * standard_conforming_strings; we never use E''. User is responsible for
579          * making sure result is used correctly.
580          */
581         appendStringInfoChar(buf, '\'');
582         for (valptr = val; *valptr; valptr++)
583         {
584                 char            ch = *valptr;
585
586                 if (SQL_STR_DOUBLE(ch, !standard_conforming_strings))
587                         appendStringInfoChar(buf, ch);
588                 appendStringInfoChar(buf, ch);
589         }
590         appendStringInfoChar(buf, '\'');
591 }
592
593 /*
594  * import_stats_from_file
595  *       load data from file or stdin into work table, and delete unnecessary
596  *       records.
597  */
598 static void
599 import_stats_from_file(char *filename, char *nspname, char *relname,
600         char *attname)
601 {
602         StringInfoData  buf;
603         List               *parsetree_list;
604         uint64                  processed;
605         Datum                   values[3];
606         Oid                             argtypes[3] = { CSTRINGOID, CSTRINGOID, CSTRINGOID };
607         char                    nulls[3] = { 'n', 'n', 'n' };
608         int                             nargs;
609         int                             ret;
610
611         /* for debug use */
612         elog(DEBUG3, "%s() f=%s n=%s r=%s a=%s", __FUNCTION__,
613                  filename ? filename : "(null)",
614                  nspname ? nspname : "(null)",
615                  relname ? relname : "(null)",
616                  attname ? attname : "(null)");
617
618         /*
619          * Construct COPY statement.  NULL for filename indicates that source is
620          * stdin.
621          */
622         initStringInfo(&buf);
623         appendStringInfoString(&buf, "COPY dbms_stats_work_stats FROM ");
624         if (filename == NULL)
625                 appendStringInfoString(&buf, "stdin");
626         else
627                 appendLiteral(&buf, filename);
628
629         appendStringInfoString(&buf, " (FORMAT 'binary')");
630
631         /* Execute COPY FROM command. */
632         parsetree_list = pg_parse_query(buf.data);
633
634 #if PG_VERSION_NUM >= 100000
635         {
636                 /* 
637                  * parsetree_list is a list with one RawStmt since Pg10. Extract
638                  * CopyStmt to feed to DoCopy.
639                  */
640                 ParseState      *pstate = make_parsestate(NULL);
641                 RawStmt *rstmt = (RawStmt *)linitial (parsetree_list);
642                 CopyStmt *stmt = (CopyStmt *)rstmt->stmt;
643
644                 Assert(IsA(stmt, CopyStmt));
645
646                 pstate->p_sourcetext = pstrdup(buf.data);
647                 DoCopy(pstate, stmt, rstmt->stmt_location, rstmt->stmt_len, &processed);
648                 free_parsestate(pstate);
649         }
650 #elif PG_VERSION_NUM >= 90300
651         DoCopy((CopyStmt *)linitial(parsetree_list), buf.data, &processed);
652 #else
653         processed = DoCopy((CopyStmt *)linitial(parsetree_list), buf.data);
654 #endif
655
656         if (processed == 0)
657                 elog(ERROR, "no data to be imported");
658
659         /*
660          * Delete the statistics other than the specified object's statistic from
661          * the temp table.  We can skip DELETEing staging data when schemaname is
662          * NULL, because it means database-wise import.
663          */
664         if (nspname == NULL)
665                 return;
666
667         resetStringInfo(&buf);
668         appendStringInfoString(&buf,
669                                                    "DELETE FROM dbms_stats_work_stats "
670                                                    " WHERE nspname <> $1::text ");
671         values[0] = CStringGetDatum(nspname);
672         nulls[0] = 't';
673         nargs = 1;
674
675         if (relname != NULL)
676         {
677                 values[1] = CStringGetDatum(relname);
678                 nulls[1] = 't';
679                 nargs++;
680                 appendStringInfoString(&buf, " OR (relname <> $2::text) ");
681
682                 if (attname != NULL)
683                 {
684                         values[2] = CStringGetDatum(attname);
685                         nulls[2] = 't';
686                         nargs++;
687                         appendStringInfoString(&buf, " OR (attname <> $3::text) ");
688                 }
689         }
690
691         ret = SPI_execute_with_args(buf.data, nargs, argtypes, values, nulls,
692                                                                  false, 0);
693         if (ret != SPI_OK_DELETE)
694                 elog(ERROR, "pg_dbms_stats: SPI_execute_with_args => %d", ret);
695 }
696
697 #ifdef UNIT_TEST
698 void test_import(int *passed, int *total);
699 static void test_spi_exec_query(int *passed, int *total);
700 static void test_spi_exec_utility(int *passed, int *total);
701 static void test_appendLiteral(int *passed, int *total);
702
703 #define StringEq(actual, expected)      \
704                 (strcmp((actual), (expected)) == 0 ? 1 : 0)
705
706 /*
707  * Test appendLiteral function
708  */
709 static void
710 test_appendLiteral(int *passed, int *total)
711 {
712         bool                    org_standard_conforming_strings;
713         int                             caseno = 0;
714         StringInfoData  buf;
715
716         /* Backup current GUC parameters */
717         NewGUCNestLevel();
718         org_standard_conforming_strings = standard_conforming_strings;
719
720         /* Initialize resources for tests */
721         initStringInfo(&buf);
722
723         /*
724          * *-*-1:
725          *   - no special char
726          */
727         caseno++;
728         resetStringInfo(&buf);
729         appendStringInfoString(&buf, "BEFORE");
730         appendLiteral(&buf, "\"abc 123\tあいう\n\"");
731         if (StringEq(buf.data, "BEFORE'\"abc 123\tあいう\n\"'"))
732         {
733                 elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
734                 (*passed)++;
735         }
736         else
737         {
738                 elog(WARNING, "%s-%d failed: [%s]", __FUNCTION__, caseno, buf.data);
739         }
740
741         /*
742          * *-*-2:
743          *   - contains special chars (single quote, back slash),
744          *   - standard_conforming_strings is true
745          */
746         caseno++;
747         resetStringInfo(&buf);
748         appendStringInfoString(&buf, "BEFORE");
749         standard_conforming_strings = true;
750         appendLiteral(&buf, "'abc 123\tあいう\n\\");
751         if (StringEq(buf.data, "BEFORE'''abc 123\tあいう\n\\'"))
752         {
753                 elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
754                 (*passed)++;
755         }
756         else
757         {
758                 elog(WARNING, "%s-%d failed: [%s]", __FUNCTION__, caseno, buf.data);
759         }
760
761         /*
762          * *-*-3:
763          *   - contains special chars (single quote, back slash),
764          *   - standard_conforming_strings is false
765          */
766         caseno++;
767         resetStringInfo(&buf);
768         appendStringInfoString(&buf, "BEFORE");
769         standard_conforming_strings = false;
770         appendLiteral(&buf, "'abc 123\tあいう\n\\");
771         if (StringEq(buf.data, "BEFORE'''abc 123\tあいう\n\\\\'"))
772         {
773                 elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
774                 (*passed)++;
775         }
776         else
777         {
778                 elog(WARNING, "%s-%d failed: [%s]", __FUNCTION__, caseno, buf.data);
779         }
780
781         /*
782          * *-*-4:
783          *   - empty string
784          */
785         caseno++;
786         resetStringInfo(&buf);
787         appendStringInfoString(&buf, "BEFORE");
788         appendLiteral(&buf, "");
789         if (StringEq(buf.data, "BEFORE''"))
790         {
791                 elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
792                 (*passed)++;
793         }
794         else
795         {
796                 elog(WARNING, "%s-%d failed: [%s]", __FUNCTION__, caseno, buf.data);
797         }
798
799         /* report # of tests */
800         *total += caseno;
801
802         /* Restore GUC parameters */
803         standard_conforming_strings = org_standard_conforming_strings;
804 }
805
806 static void
807 test_spi_exec_query(int *passed, int *total)
808 {
809         int                             rc;
810         volatile int    caseno = 0;
811         SPIPlanPtr              ptr = NULL;
812         SPIPlanPtr              org_ptr;
813
814         /* Initialize */
815         rc = SPI_connect();
816         if (rc != SPI_OK_CONNECT)
817                 elog(ERROR, "could not connect SPI: %s", SPI_result_code_string(rc));
818
819         /*
820          * *-*-1
821          *   - plan is not cached
822          */
823         caseno++;
824         BeginInternalSubTransaction("test");
825         PG_TRY();
826         {
827                 spi_exec_query("SELECT 1", 0, NULL, &ptr, NULL, NULL, SPI_OK_SELECT);
828                 if (ptr != NULL && SPI_processed == 1)
829                 {
830                         elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
831                         (*passed)++;
832                 }
833                 ReleaseCurrentSubTransaction();
834         }
835         PG_CATCH();
836         {
837                 elog(WARNING, "*-*-%d failed", caseno);
838                 RollbackAndReleaseCurrentSubTransaction();
839                 SPI_restore_connection();
840         }
841         PG_END_TRY();
842
843         /*
844          * *-*-2
845          *   - plan is cached
846          */
847         caseno++;
848         BeginInternalSubTransaction("test");
849         PG_TRY();
850         {
851                 org_ptr = ptr;
852                 spi_exec_query(NULL, 0, NULL, &ptr, NULL, NULL, SPI_OK_SELECT);
853                 if (ptr == org_ptr && SPI_processed == 1)
854                 {
855                         elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
856                         (*passed)++;
857                 }
858                 ReleaseCurrentSubTransaction();
859         }
860         PG_CATCH();
861         {
862                 elog(WARNING, "*-*-%d failed", caseno);
863                 RollbackAndReleaseCurrentSubTransaction();
864                 FlushErrorState();
865                 SPI_restore_connection();
866         }
867         PG_END_TRY();
868         SPI_freeplan(ptr);
869         ptr = NULL;
870
871         /*
872          * *-*-3
873          *   - query error
874          */
875         caseno++;
876         BeginInternalSubTransaction("test");
877         PG_TRY();
878         {
879                 spi_exec_query("SELECT 1 / 0",
880                                            0, NULL, &ptr, NULL, NULL, SPI_OK_SELECT);
881                 elog(WARNING, "*-*-%d failed", caseno);
882                 ReleaseCurrentSubTransaction();
883         }
884         PG_CATCH();
885         {
886                 elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
887                 (*passed)++;
888                 RollbackAndReleaseCurrentSubTransaction();
889                 FlushErrorState();
890                 SPI_restore_connection();
891         }
892         PG_END_TRY();
893         SPI_freeplan(ptr);
894         ptr = NULL;
895
896         /*
897          * *-*-4
898          *   - query success
899          */
900         caseno++;
901         BeginInternalSubTransaction("test");
902         PG_TRY();
903         {
904                 spi_exec_query("SELECT 1", 0, NULL, &ptr, NULL, NULL, SPI_OK_SELECT);
905                 if (ptr != NULL && SPI_processed == 1)
906                 {
907                         elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
908                         (*passed)++;
909                 }
910                 ReleaseCurrentSubTransaction();
911         }
912         PG_CATCH();
913         {
914                 elog(WARNING, "*-*-%d failed", caseno);
915                 PG_RE_THROW();
916                 RollbackAndReleaseCurrentSubTransaction();
917                 SPI_restore_connection();
918         }
919         PG_END_TRY();
920         SPI_freeplan(ptr);
921         ptr = NULL;
922
923         /* report # of tests */
924         (*total) += caseno;
925
926         /* Cleanup */
927         rc = SPI_finish();
928         if (rc != SPI_OK_FINISH && rc != SPI_ERROR_UNCONNECTED)
929                 elog(ERROR, "could not finish SPI: %s", SPI_result_code_string(rc));
930 }
931
932 static void
933 test_spi_exec_utility(int *passed, int *total)
934 {
935         int                             rc;
936         volatile int    caseno = 0;
937
938         /* Initialize */
939         rc = SPI_connect();
940         if (rc != SPI_OK_CONNECT)
941                 elog(ERROR, "could not connect SPI: %s", SPI_result_code_string(rc));
942
943         /*
944          * *-*-1
945          *   - query error
946          */
947         caseno++;
948         BeginInternalSubTransaction("test");
949         PG_TRY();
950         {
951                 spi_exec_utility("RESET dummy_parameter");
952                 elog(WARNING, "*-*-%d failed", caseno);
953                 ReleaseCurrentSubTransaction();
954         }
955         PG_CATCH();
956         {
957                 elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
958                 (*passed)++;
959                 RollbackAndReleaseCurrentSubTransaction();
960                 FlushErrorState();
961                 SPI_restore_connection();
962         }
963         PG_END_TRY();
964
965         /*
966          * *-*-2
967          *   - query success
968          */
969         caseno++;
970         BeginInternalSubTransaction("test");
971         PG_TRY();
972         {
973                 spi_exec_utility("RESET client_min_messages");
974                 elog(WARNING, "%s-%d ok", __FUNCTION__, caseno);
975                 (*passed)++;
976                 ReleaseCurrentSubTransaction();
977         }
978         PG_CATCH();
979         {
980                 elog(WARNING, "*-*-%d failed", caseno);
981                 RollbackAndReleaseCurrentSubTransaction();
982                 SPI_restore_connection();
983         }
984         PG_END_TRY();
985
986         /* report # of tests */
987         (*total) += caseno;
988
989         /* Cleanup */
990         rc = SPI_finish();
991         if (rc != SPI_OK_FINISH && rc != SPI_ERROR_UNCONNECTED)
992                 elog(ERROR, "could not finish SPI: %s", SPI_result_code_string(rc));
993 }
994
995 /*
996  * Unit test entry point for import.c.  This will be called by PG_init()
997  * function, after initialization for this extension is completed .
998  * This funciton should add the numbers of tests passed and the total number of
999  * tests to parameter passed and total respectively.
1000  */
1001 void
1002 test_import(int *passed, int *total)
1003 {
1004         int local_passed = 0;
1005         int local_total = 0;
1006
1007         elog(WARNING, "==========");
1008
1009         /* Do tests here */
1010         test_appendLiteral(&local_passed, &local_total);
1011         test_spi_exec_query(&local_passed, &local_total);
1012         test_spi_exec_utility(&local_passed, &local_total);
1013
1014         elog(WARNING, "%s %d/%d passed", __FUNCTION__, local_passed, local_total);
1015         *passed += local_passed;
1016         *total += local_total;
1017 }
1018
1019 #endif